-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathrpc.py
More file actions
150 lines (125 loc) · 4.68 KB
/
rpc.py
File metadata and controls
150 lines (125 loc) · 4.68 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
import queue
import threading
import traceback
from typing import Any, Callable, Dict, List, Literal, Optional
import bpy
import pynvim
class NvimRpc:
"""RPC Interface for Blender <-> Neovim communication (singleton)"""
# --- Class --- #
_instance: Optional["NvimRpc"] = None
_request_handlers: Dict[str, Callable[["NvimRpc", List[Any]], None]] = {}
_notification_handlers: Dict[str, Callable[["NvimRpc", List[Any]], None]] = {}
@classmethod
def initialize(
cls, sock: str, on_setup: Optional[Callable[["NvimRpc"], None]] = None
):
if cls._instance is not None:
raise ValueError("NvimRpc instance is already initialized")
cls._instance = cls(sock, on_setup)
return cls._instance
@classmethod
def get_instance(cls):
if cls._instance is None:
raise ValueError("NvimRpc instance is not initialized")
return cls._instance
@classmethod
def get_instance_safe(cls):
return cls._instance
@classmethod
def _register_handler(
cls,
kind: Literal["request", "notification"],
name: str,
handler: Callable[[List[Any]], None],
main_thread: bool = True,
):
def wrapper(self, args):
if main_thread:
self.schedule(lambda: handler(*args))
else:
handler(*args)
registry = (
cls._request_handlers if kind == "request" else cls._notification_handlers
)
registry[name] = wrapper
@classmethod
def request_handler(cls, name: str, main_thread: bool = True):
def decorator(handler: Callable[[List[Any]], None]):
cls._register_handler("request", name, handler, main_thread)
return handler
return decorator
@classmethod
def notification_handler(cls, name: str, main_thread: bool = True):
def decorator(handler: Callable[[List[Any]], None]):
cls._register_handler("notification", name, handler, main_thread)
return handler
return decorator
# --- Instance --- #
_sock: str
nvim: pynvim.Nvim
_main_thread: threading.Thread
_session_thread: Optional[threading.Thread]
_execution_queue: queue.Queue
_on_setup_cb: Optional[Callable[["NvimRpc"], None]]
def __init__(
self, sock: str, on_setup: Optional[Callable[["NvimRpc"], None]] = None
):
if self._instance is not None:
raise ValueError("NvimRpc instance is already initialized")
self._sock = sock
self.nvim = pynvim.attach("socket", path=sock)
self._main_thread = threading.current_thread()
self._session_thread = None
self._execution_queue = queue.Queue()
self._on_setup_cb = on_setup
def schedule(self, func: Callable[[], None]):
self._execution_queue.put(func)
def _on_request(self, name: str, args: List[Any]):
print("RPC request:", name, args)
if name not in self._request_handlers:
print("No handler for request:", name)
return
self._request_handlers[name](self, args)
def _on_notification(self, name: str, args: list):
print("RPC notification:", name, args)
if name not in self._notification_handlers:
print("No handler for notification:", name)
return
self._notification_handlers[name](self, args)
def _on_setup(self):
print("RPC setup")
if self._on_setup_cb is not None:
self._on_setup_cb(self)
def _start_session(self):
def run():
if self._session_thread is None:
return
self.nvim._session.run(
request_cb=self._on_request,
notification_cb=self._on_notification,
setup_cb=self._on_setup,
)
self._session_thread = threading.Thread(target=run)
self._session_thread.daemon = True
self._session_thread.start()
def _start_executor(self):
def executor():
while not self._execution_queue.empty():
func = self._execution_queue.get()
try:
func()
except: # noqa: E722
traceback.print_exc()
return 0.1
bpy.app.timers.register(executor, persistent=True)
def start(self):
self._start_executor()
self._start_session()
def send(self, data):
if threading.current_thread() != self._session_thread:
self.nvim._session.threadsafe_call(lambda: self.send(data))
return
print("RPC send:", data)
self.nvim.exec_lua('require("blender.rpc").handle(...)', data)
# TODO: Handle errors