Skip to content

SyncMoonrakerClient

pymoonraker.sync_client.SyncMoonrakerClient

Blocking wrapper for MoonrakerClient.

Runs an event loop in a background daemon thread so that async operations are transparent to the caller.

Limitation: The sync client is currently limited and cannot be used with API namespaces (e.g. client.printer, client.files). Use the convenience methods on this client (e.g. server_info(), gcode(), print_start()) or use :class:MoonrakerClient for full namespace access.

Usage::

with SyncMoonrakerClient("192.168.1.100") as client:
    info = client.server_info()
    print(info.klippy_state)
Source code in src/pymoonraker/sync_client.py
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
class SyncMoonrakerClient:
    """Blocking wrapper for ``MoonrakerClient``.

    Runs an event loop in a background daemon thread so that async
    operations are transparent to the caller.

    **Limitation:** The sync client is currently limited and cannot be used
    with API namespaces (e.g. ``client.printer``, ``client.files``). Use the
    convenience methods on this client (e.g. ``server_info()``, ``gcode()``,
    ``print_start()``) or use :class:`MoonrakerClient` for full namespace
    access.

    Usage::

        with SyncMoonrakerClient("192.168.1.100") as client:
            info = client.server_info()
            print(info.klippy_state)
    """

    def __init__(self, *args: Any, **kwargs: Any) -> None:
        """Store async client construction arguments for deferred startup.

        Args:
            *args: Positional args forwarded to ``MoonrakerClient``.
            **kwargs: Keyword args forwarded to ``MoonrakerClient``.

        """
        self._args = args
        self._kwargs = kwargs
        self._loop: asyncio.AbstractEventLoop | None = None
        self._thread: threading.Thread | None = None
        self._client: MoonrakerClient | None = None

    # -- Context manager --------------------------------------------------

    def __enter__(self) -> SyncMoonrakerClient:
        """Connect and return ``self`` for context-manager usage."""
        self.connect()
        return self

    def __exit__(self, *exc: object) -> None:
        """Disconnect when leaving the context manager."""
        self.disconnect()

    # -- Lifecycle --------------------------------------------------------

    def connect(self) -> None:
        """Start the background loop and connect the underlying async client."""
        self._loop = asyncio.new_event_loop()
        self._thread = threading.Thread(
            target=self._loop.run_forever, daemon=True, name="pymoonraker-sync"
        )
        self._thread.start()
        self._client = MoonrakerClient(*self._args, **self._kwargs)
        self._run(self._client.connect())

    def disconnect(self) -> None:
        """Disconnect and stop the background event loop thread."""
        if self._client:
            self._run(self._client.disconnect())
            self._client = None
        if self._loop:
            self._loop.call_soon_threadsafe(self._loop.stop)
        if self._thread:
            self._thread.join(timeout=5)
            self._thread = None
        if self._loop:
            self._loop.close()
            self._loop = None

    # -- Delegated methods ------------------------------------------------

    def server_info(self) -> ServerInfo:
        """Query ``server.info``."""
        return self._run(self._ensure_client().server_info())

    def printer_info(self) -> PrinterInfo:
        """Query ``printer.info``."""
        return self._run(self._ensure_client().printer_info())

    def klippy_state(self) -> KlippyState:
        """Return the current Klippy state."""
        return self._run(self._ensure_client().klippy_state())

    def call(
        self,
        method: str,
        params: dict[str, Any] | None = None,
        *,
        timeout: float | None = None,
    ) -> Any:
        """Send a Moonraker JSON-RPC request and block for the response.

        Args:
            method: Moonraker JSON-RPC method.
            params: Optional request params.
            timeout: Optional timeout override in seconds.

        Returns:
            Decoded ``result`` payload.

        """
        return self._run(self._ensure_client().call(method, params, timeout=timeout))

    def gcode(self, script: str) -> str:
        """Execute a G-code script."""
        return self._run(self._ensure_client().gcode(script))

    def emergency_stop(self) -> None:
        """Trigger an emergency stop."""
        self._run(self._ensure_client().emergency_stop())

    def print_start(self, filename: str) -> None:
        """Start printing a file."""
        self._run(self._ensure_client().print_start(filename))

    def print_pause(self) -> None:
        """Pause the current print."""
        self._run(self._ensure_client().print_pause())

    def print_resume(self) -> None:
        """Resume the paused print."""
        self._run(self._ensure_client().print_resume())

    def print_cancel(self) -> None:
        """Cancel the current print."""
        self._run(self._ensure_client().print_cancel())

    def upload_file(
        self,
        file_path: str,
        content: bytes,
        *,
        root: str = "gcodes",
        target_path: str | None = None,
    ) -> Any:
        """Upload a file through the underlying HTTP transport."""
        return self._run(
            self._ensure_client().upload_file(
                file_path, content, root=root, target_path=target_path
            )
        )

    def download_file(self, root: str, file_path: str) -> bytes:
        """Download a file through the underlying HTTP transport."""
        return self._run(self._ensure_client().download_file(root, file_path))

    def query_objects(self, objects: dict[str, list[str] | None]) -> dict[str, Any]:
        """Query one or more printer objects and return current status."""
        return self._run(self._ensure_client().query_objects(objects))

    def subscribe_objects(self, objects: dict[str, list[str] | None]) -> dict[str, Any]:
        """Subscribe to printer object updates for status notifications."""
        return self._run(self._ensure_client().subscribe_objects(objects))

    def on(self, event: str | EventType, callback: Callback) -> Callable[[], None]:
        """Register a persistent event callback.

        Returns:
            Callable with no arguments that unregisters the callback.

        """
        return self._ensure_client().on(event, callback)

    def once(self, event: str | EventType, callback: Callback) -> None:
        """Register a one-shot event callback."""
        self._ensure_client().once(event, callback)

    # -- Internal ---------------------------------------------------------

    def _ensure_client(self) -> MoonrakerClient:
        """Return connected async client or raise if not connected."""
        if self._client is None:
            raise RuntimeError("Client is not connected; call connect() first")
        return self._client

    def _run(self, coro: Coroutine[Any, Any, T]) -> T:
        """Run a coroutine on the background loop and block for completion."""
        if self._loop is None:
            raise RuntimeError("Event loop not running; call connect() first")
        future = asyncio.run_coroutine_threadsafe(coro, self._loop)
        return future.result()

call

call(method: str, params: dict[str, Any] | None = None, *, timeout: float | None = None) -> Any

Send a Moonraker JSON-RPC request and block for the response.

Parameters:

Name Type Description Default
method str

Moonraker JSON-RPC method.

required
params dict[str, Any] | None

Optional request params.

None
timeout float | None

Optional timeout override in seconds.

None

Returns:

Type Description
Any

Decoded result payload.

Source code in src/pymoonraker/sync_client.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def call(
    self,
    method: str,
    params: dict[str, Any] | None = None,
    *,
    timeout: float | None = None,
) -> Any:
    """Send a Moonraker JSON-RPC request and block for the response.

    Args:
        method: Moonraker JSON-RPC method.
        params: Optional request params.
        timeout: Optional timeout override in seconds.

    Returns:
        Decoded ``result`` payload.

    """
    return self._run(self._ensure_client().call(method, params, timeout=timeout))

connect

connect() -> None

Start the background loop and connect the underlying async client.

Source code in src/pymoonraker/sync_client.py
72
73
74
75
76
77
78
79
80
def connect(self) -> None:
    """Start the background loop and connect the underlying async client."""
    self._loop = asyncio.new_event_loop()
    self._thread = threading.Thread(
        target=self._loop.run_forever, daemon=True, name="pymoonraker-sync"
    )
    self._thread.start()
    self._client = MoonrakerClient(*self._args, **self._kwargs)
    self._run(self._client.connect())

disconnect

disconnect() -> None

Disconnect and stop the background event loop thread.

Source code in src/pymoonraker/sync_client.py
82
83
84
85
86
87
88
89
90
91
92
93
94
def disconnect(self) -> None:
    """Disconnect and stop the background event loop thread."""
    if self._client:
        self._run(self._client.disconnect())
        self._client = None
    if self._loop:
        self._loop.call_soon_threadsafe(self._loop.stop)
    if self._thread:
        self._thread.join(timeout=5)
        self._thread = None
    if self._loop:
        self._loop.close()
        self._loop = None

download_file

download_file(root: str, file_path: str) -> bytes

Download a file through the underlying HTTP transport.

Source code in src/pymoonraker/sync_client.py
169
170
171
def download_file(self, root: str, file_path: str) -> bytes:
    """Download a file through the underlying HTTP transport."""
    return self._run(self._ensure_client().download_file(root, file_path))

emergency_stop

emergency_stop() -> None

Trigger an emergency stop.

Source code in src/pymoonraker/sync_client.py
134
135
136
def emergency_stop(self) -> None:
    """Trigger an emergency stop."""
    self._run(self._ensure_client().emergency_stop())

gcode

gcode(script: str) -> str

Execute a G-code script.

Source code in src/pymoonraker/sync_client.py
130
131
132
def gcode(self, script: str) -> str:
    """Execute a G-code script."""
    return self._run(self._ensure_client().gcode(script))

klippy_state

klippy_state() -> KlippyState

Return the current Klippy state.

Source code in src/pymoonraker/sync_client.py
106
107
108
def klippy_state(self) -> KlippyState:
    """Return the current Klippy state."""
    return self._run(self._ensure_client().klippy_state())

on

on(event: str | EventType, callback: Callback) -> Callable[[], None]

Register a persistent event callback.

Returns:

Type Description
Callable[[], None]

Callable with no arguments that unregisters the callback.

Source code in src/pymoonraker/sync_client.py
181
182
183
184
185
186
187
188
def on(self, event: str | EventType, callback: Callback) -> Callable[[], None]:
    """Register a persistent event callback.

    Returns:
        Callable with no arguments that unregisters the callback.

    """
    return self._ensure_client().on(event, callback)

once

once(event: str | EventType, callback: Callback) -> None

Register a one-shot event callback.

Source code in src/pymoonraker/sync_client.py
190
191
192
def once(self, event: str | EventType, callback: Callback) -> None:
    """Register a one-shot event callback."""
    self._ensure_client().once(event, callback)

print_cancel

print_cancel() -> None

Cancel the current print.

Source code in src/pymoonraker/sync_client.py
150
151
152
def print_cancel(self) -> None:
    """Cancel the current print."""
    self._run(self._ensure_client().print_cancel())

print_pause

print_pause() -> None

Pause the current print.

Source code in src/pymoonraker/sync_client.py
142
143
144
def print_pause(self) -> None:
    """Pause the current print."""
    self._run(self._ensure_client().print_pause())

print_resume

print_resume() -> None

Resume the paused print.

Source code in src/pymoonraker/sync_client.py
146
147
148
def print_resume(self) -> None:
    """Resume the paused print."""
    self._run(self._ensure_client().print_resume())

print_start

print_start(filename: str) -> None

Start printing a file.

Source code in src/pymoonraker/sync_client.py
138
139
140
def print_start(self, filename: str) -> None:
    """Start printing a file."""
    self._run(self._ensure_client().print_start(filename))

printer_info

printer_info() -> PrinterInfo

Query printer.info.

Source code in src/pymoonraker/sync_client.py
102
103
104
def printer_info(self) -> PrinterInfo:
    """Query ``printer.info``."""
    return self._run(self._ensure_client().printer_info())

query_objects

query_objects(objects: dict[str, list[str] | None]) -> dict[str, Any]

Query one or more printer objects and return current status.

Source code in src/pymoonraker/sync_client.py
173
174
175
def query_objects(self, objects: dict[str, list[str] | None]) -> dict[str, Any]:
    """Query one or more printer objects and return current status."""
    return self._run(self._ensure_client().query_objects(objects))

server_info

server_info() -> ServerInfo

Query server.info.

Source code in src/pymoonraker/sync_client.py
 98
 99
100
def server_info(self) -> ServerInfo:
    """Query ``server.info``."""
    return self._run(self._ensure_client().server_info())

subscribe_objects

subscribe_objects(objects: dict[str, list[str] | None]) -> dict[str, Any]

Subscribe to printer object updates for status notifications.

Source code in src/pymoonraker/sync_client.py
177
178
179
def subscribe_objects(self, objects: dict[str, list[str] | None]) -> dict[str, Any]:
    """Subscribe to printer object updates for status notifications."""
    return self._run(self._ensure_client().subscribe_objects(objects))

upload_file

upload_file(file_path: str, content: bytes, *, root: str = 'gcodes', target_path: str | None = None) -> Any

Upload a file through the underlying HTTP transport.

Source code in src/pymoonraker/sync_client.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def upload_file(
    self,
    file_path: str,
    content: bytes,
    *,
    root: str = "gcodes",
    target_path: str | None = None,
) -> Any:
    """Upload a file through the underlying HTTP transport."""
    return self._run(
        self._ensure_client().upload_file(
            file_path, content, root=root, target_path=target_path
        )
    )