Skip to content

MoonrakerClient

pymoonraker.client.MoonrakerClient

Async-first client for the Moonraker API.

Provides: * WebSocket RPC with auto-reconnect. * HTTP transport for file operations and one-shot queries. * Typed event system for Moonraker notifications. * Auth management (API key, JWT, oneshot).

Usage::

async with MoonrakerClient("192.168.1.100") as client:
    info = await client.server_info()
    print(info.klippy_state)

    client.on(EventType.STATUS_UPDATE, my_handler)
    await client.subscribe_objects({"toolhead": None})
Source code in src/pymoonraker/client.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
class MoonrakerClient:
    """Async-first client for the Moonraker API.

    Provides:
    * WebSocket RPC with auto-reconnect.
    * HTTP transport for file operations and one-shot queries.
    * Typed event system for Moonraker notifications.
    * Auth management (API key, JWT, oneshot).

    Usage::

        async with MoonrakerClient("192.168.1.100") as client:
            info = await client.server_info()
            print(info.klippy_state)

            client.on(EventType.STATUS_UPDATE, my_handler)
            await client.subscribe_objects({"toolhead": None})
    """

    def __init__(
        self,
        host: str,
        port: int = 7125,
        *,
        api_key: str | None = None,
        ssl: Any = None,
        auto_reconnect: bool = True,
        reconnect_interval: float = 5.0,
        max_reconnect_interval: float = 60.0,
        rpc_timeout: float = 30.0,
    ) -> None:
        """Initialize a Moonraker client instance.

        Args:
            host: Moonraker host name or IP address.
            port: Moonraker API port (defaults to ``7125``).
            api_key: Optional Moonraker API key for authenticated requests.
            ssl: SSL context/config passed through to HTTP and WebSocket transports.
            auto_reconnect: Whether to reconnect after unexpected disconnects.
            reconnect_interval: Initial reconnect delay in seconds.
            max_reconnect_interval: Maximum reconnect delay in seconds.
            rpc_timeout: Default timeout for JSON-RPC calls in seconds.

        """
        scheme_ws = "wss" if ssl else "ws"
        scheme_http = "https" if ssl else "http"

        self._host = host
        self._port = port
        self._auto_reconnect = auto_reconnect
        self._reconnect_interval = reconnect_interval
        self._max_reconnect_interval = max_reconnect_interval

        self._ws_transport = WebSocketTransport(
            f"{scheme_ws}://{host}:{port}/websocket",
            ssl=ssl,
        )
        self._http_transport = HttpTransport(
            f"{scheme_http}://{host}:{port}",
            api_key=api_key,
            ssl_context=ssl,
        )

        self._events = EventDispatcher()
        self._rpc = JsonRpcHandler(self._ws_transport, default_timeout=rpc_timeout)
        self._auth = AuthManager(self._http_transport)

        self._reconnect_task: asyncio.Task[None] | None = None
        self._connected = asyncio.Event()
        self._closing = False
        self._subscribed_objects: dict[str, list[str] | None] | None = None

    # -- Context manager --------------------------------------------------

    async def __aenter__(self) -> MoonrakerClient:
        """Connect and return ``self`` for async context-manager usage."""
        await self.connect()
        return self

    async def __aexit__(self, *exc: object) -> None:
        """Disconnect when exiting an async context manager."""
        await self.disconnect()

    # -- Connection lifecycle ---------------------------------------------

    async def connect(self) -> None:
        """Establish WebSocket and HTTP connections, identify, and start listeners."""
        self._closing = False
        await self._http_transport.connect()
        await self._ws_transport.connect()

        self._events.drain_notification_queue()
        self._rpc.start(
            self._events.notification_queue,
            on_disconnect=self._on_transport_disconnect,
        )
        self._events.start()

        await self._identify()
        await self._restore_subscriptions()

        self._connected.set()
        logger.info(
            "Connected to Moonraker at %s:%s",
            self._host,
            self._port,
            extra=self._connection_log_extra(),
        )

    async def disconnect(self) -> None:
        """Gracefully shut down all connections and background tasks."""
        self._closing = True
        self._connected.clear()

        if self._reconnect_task is not None:
            self._reconnect_task.cancel()
            with contextlib.suppress(asyncio.CancelledError):
                await self._reconnect_task
            self._reconnect_task = None

        await self._events.stop()
        await self._rpc.stop()
        await self._ws_transport.disconnect()
        await self._http_transport.disconnect()
        logger.info("Disconnected from Moonraker", extra=self._connection_log_extra())

    async def wait_connected(self, timeout: float | None = None) -> None:
        """Block until the client is connected (useful after auto-reconnect)."""
        await asyncio.wait_for(self._connected.wait(), timeout=timeout)

    @property
    def is_connected(self) -> bool:
        """Return ``True`` if the WebSocket is currently connected."""
        return self._ws_transport.connected

    # -- API namespaces ---------------------------------------------------

    @property
    def printer(self) -> PrinterNamespace:
        """Printer namespace: info, gcode, objects, print control."""
        return PrinterNamespace(self)

    @property
    def server(self) -> ServerNamespace:
        """Server namespace: info, config, temperature/gcode stores."""
        return ServerNamespace(self)

    @property
    def files(self) -> FilesNamespace:
        """Files namespace: list, metadata, directories, move, copy."""
        return FilesNamespace(self)

    @property
    def job_queue(self) -> JobQueueNamespace:
        """Job queue namespace: status, add, delete, pause, resume, start."""
        return JobQueueNamespace(self)

    @property
    def history(self) -> HistoryNamespace:
        """History namespace: list, totals, get_job, delete_job."""
        return HistoryNamespace(self)

    @property
    def machine(self) -> MachineNamespace:
        """Machine namespace: system_info, proc_stats, reboot, shutdown."""
        return MachineNamespace(self)

    @property
    def update_manager(self) -> UpdateManagerNamespace:
        """Update manager namespace: get_status, refresh."""
        return UpdateManagerNamespace(self)

    @property
    def power(self) -> PowerNamespace:
        """Power namespace: get_devices, get_device_status, toggle_device."""
        return PowerNamespace(self)

    @property
    def access(self) -> AccessNamespace:
        """Access namespace: login, logout, users, JWT."""
        return AccessNamespace(self)

    @property
    def database(self) -> DatabaseNamespace:
        """Database namespace: get_item, list_namespaces, get_namespace_item."""
        return DatabaseNamespace(self)

    @property
    def announcements(self) -> AnnouncementsNamespace:
        """Announcements namespace: list, dismiss, dismiss_wake."""
        return AnnouncementsNamespace(self)

    @property
    def webcams(self) -> WebcamsNamespace:
        """Webcams namespace: list, get_item."""
        return WebcamsNamespace(self)

    # -- Event registration -----------------------------------------------

    def on(self, event: str | EventType, callback: Callback) -> Callable[[], None]:
        """Register a callback for a Moonraker notification event.

        Args:
            event: Moonraker notification name or ``EventType`` constant.
            callback: Sync or async callback invoked when the event is received.

        Returns:
            Callable with no arguments that unsubscribes the callback.

        """
        return self._events.on(str(event), callback)

    def once(self, event: str | EventType, callback: Callback) -> None:
        """Register a one-shot callback for an event.

        The callback is removed after the first matching notification.

        Args:
            event: Moonraker notification name or ``EventType`` constant.
            callback: Sync or async callback invoked once.

        """
        self._events.once(str(event), callback)

    # -- RPC convenience --------------------------------------------------

    async def call(
        self,
        method: str,
        params: dict[str, Any] | None = None,
        *,
        timeout: float | None = None,
    ) -> Any:
        """Send a JSON-RPC request over WebSocket.

        Args:
            method: Moonraker JSON-RPC method (for example ``"server.info"``).
            params: Optional JSON-RPC params object.
            timeout: Optional request timeout override in seconds.

        Returns:
            Decoded ``result`` payload from the JSON-RPC response.

        """
        result = await self._rpc.call(method, params, timeout=timeout)
        if method == "printer.objects.subscribe":
            self._remember_subscription(params)
        return result

    async def http_request(
        self,
        method: str,
        path: str,
        **kwargs: Any,
    ) -> Any:
        """Send an HTTP request through Moonraker's REST endpoints.

        Args:
            method: HTTP method such as ``"GET"`` or ``"POST"``.
            path: Endpoint path beginning with ``/``.
            **kwargs: Additional arguments forwarded to the HTTP transport.

        Returns:
            Decoded HTTP response payload.

        """
        return await self._http_transport.request(method, path, **kwargs)

    # -- Server / Printer info --------------------------------------------

    async def server_info(self) -> ServerInfo:
        """Query ``server.info`` and return a typed model."""
        raw = await self.call("server.info")
        return ServerInfo.model_validate(raw)

    async def printer_info(self) -> PrinterInfo:
        """Query ``printer.info`` and return a typed model."""
        raw = await self.call("printer.info")
        return PrinterInfo.model_validate(raw)

    async def klippy_state(self) -> KlippyState:
        """Return the current Klippy state."""
        info = await self.server_info()
        state = info.klippy_state
        if state is None:
            return KlippyState.STARTUP
        return state

    # -- Printer objects --------------------------------------------------

    async def query_objects(
        self,
        objects: dict[str, list[str] | None],
    ) -> dict[str, Any]:
        """Query one or more printer objects.

        Example::

            result = await client.query_objects({
                "toolhead": ["position", "homed_axes"],
                "heater_bed": None,  # all fields
            })

        Args:
            objects: Mapping of object name to field list or ``None`` for all fields.

        Returns:
            ``status`` mapping keyed by printer object name.

        """
        return cast(
            "dict[str, Any]", await self.call("printer.objects.query", {"objects": objects})
        )

    async def subscribe_objects(
        self,
        objects: dict[str, list[str] | None],
    ) -> dict[str, Any]:
        """Subscribe to printer object updates.

        Updated values arrive as ``notify_status_update`` events.

        Args:
            objects: Mapping of object name to field list or ``None`` for all fields.

        Returns:
            Initial snapshot payload returned by Moonraker.

        """
        return cast(
            "dict[str, Any]", await self.call("printer.objects.subscribe", {"objects": objects})
        )

    # -- G-code -----------------------------------------------------------

    async def gcode(self, script: str) -> str:
        """Execute a G-code script.

        Args:
            script: G-code commands to execute.

        Returns:
            Moonraker response string.

        """
        return cast("str", await self.call("printer.gcode.script", {"script": script}))

    async def emergency_stop(self) -> None:
        """Trigger an immediate emergency stop."""
        await self.call("printer.emergency_stop")

    # -- Print control ----------------------------------------------------

    async def print_start(self, filename: str) -> None:
        """Start printing the specified file."""
        await self.call("printer.print.start", {"filename": filename})

    async def print_pause(self) -> None:
        """Pause the current print."""
        await self.call("printer.print.pause")

    async def print_resume(self) -> None:
        """Resume the paused print."""
        await self.call("printer.print.resume")

    async def print_cancel(self) -> None:
        """Cancel the current print."""
        await self.call("printer.print.cancel")

    # -- File operations (HTTP) -------------------------------------------

    async def upload_file(
        self,
        file_path: str,
        content: bytes,
        *,
        root: str = "gcodes",
        target_path: str | None = None,
    ) -> Any:
        """Upload a file to Moonraker over HTTP.

        Args:
            file_path: Source filename (used as upload name unless ``target_path`` is set).
            content: File bytes.
            root: Moonraker file root, usually ``"gcodes"``.
            target_path: Optional path within the selected root.

        Returns:
            Moonraker upload response payload.

        """
        return await self._http_transport.upload_file(
            file_path, content, root=root, target_path=target_path
        )

    async def download_file(self, root: str, file_path: str) -> bytes:
        """Download a file from Moonraker over HTTP.

        Args:
            root: Moonraker file root.
            file_path: Path to the file inside ``root``.

        Returns:
            Raw file bytes.

        """
        return await self._http_transport.download_file(root, file_path)

    # -- Restart / power --------------------------------------------------

    async def restart_klipper(self) -> None:
        """Soft-restart Klipper."""
        await self.call("printer.restart")

    async def firmware_restart(self) -> None:
        """Restart Klipper with a firmware restart (MCU reset)."""
        await self.call("printer.firmware_restart")

    async def restart_moonraker(self) -> None:
        """Restart the Moonraker server process."""
        await self.call("server.restart")

    # -- Auto-reconnect ---------------------------------------------------

    def _schedule_reconnect(self) -> None:
        """Spawn a background reconnection task if enabled."""
        if not self._auto_reconnect or self._closing:
            return
        if self._reconnect_task is not None and not self._reconnect_task.done():
            return
        self._reconnect_task = asyncio.create_task(
            self._reconnect_loop(), name="moonraker-reconnect"
        )

    async def _reconnect_loop(self) -> None:
        delay = self._reconnect_interval
        while not self._closing:
            logger.info(
                "Reconnecting in %.1fs…",
                delay,
                extra=self._connection_log_extra(),
            )
            await asyncio.sleep(delay)
            try:
                await self._ws_transport.connect()
                self._events.drain_notification_queue()
                self._rpc.start(
                    self._events.notification_queue,
                    on_disconnect=self._on_transport_disconnect,
                )
                self._events.start()
                await self._identify()
                await self._restore_subscriptions()
                self._connected.set()
                logger.info("Reconnected to Moonraker", extra=self._connection_log_extra())
                return
            except MoonrakerConnectionError:
                delay = min(delay * 2, self._max_reconnect_interval)
                logger.warning(
                    "Reconnect failed; retrying in %.1fs",
                    delay,
                    extra=self._connection_log_extra(),
                )
            except MoonrakerRPCError as exc:
                delay = min(delay * 2, self._max_reconnect_interval)
                logger.warning(
                    "Reconnect handshake failed (%s); retrying in %.1fs",
                    exc,
                    delay,
                    extra=self._connection_log_extra(),
                )

    # -- Internal helpers -------------------------------------------------

    async def _identify(self) -> ConnectionIdentifyResult:
        """Send ``server.connection.identify`` per Moonraker protocol."""
        raw = await self.call(
            "server.connection.identify",
            {
                "client_name": _CLIENT_NAME,
                "version": "0.1.0",
                "type": _CLIENT_TYPE,
                "url": "https://github.com/thewillft/pymoonraker",
            },
        )
        return ConnectionIdentifyResult.model_validate(raw)

    def _on_transport_disconnect(self) -> None:
        """Handle unexpected transport closure by resetting state and reconnecting."""
        if self._closing:
            return
        self._connected.clear()
        logger.warning("Transport disconnected unexpectedly", extra=self._connection_log_extra())
        self._events.drain_notification_queue()
        self._schedule_reconnect()

    def _remember_subscription(self, params: dict[str, Any] | None) -> None:
        """Persist latest printer object subscription for replay after reconnect."""
        if params is None:
            return
        objects = params.get("objects")
        if not isinstance(objects, dict):
            return

        stored: dict[str, list[str] | None] = {}
        for key, fields in objects.items():
            if isinstance(fields, list):
                stored[key] = list(fields)
            else:
                stored[key] = None
        self._subscribed_objects = stored

    async def _restore_subscriptions(self) -> None:
        """Re-apply saved printer object subscriptions after reconnect.

        Moonraker object subscriptions are tied to the active persistent
        connection. After a disconnect/reconnect cycle, the client must
        subscribe again to resume ``notify_status_update`` traffic.
        """
        if self._subscribed_objects is None:
            return
        await self.call("printer.objects.subscribe", {"objects": self._subscribed_objects})

    def _connection_log_extra(self) -> dict[str, Any]:
        return {"host": self._host, "port": self._port}

access property

access: AccessNamespace

Access namespace: login, logout, users, JWT.

announcements property

announcements: AnnouncementsNamespace

Announcements namespace: list, dismiss, dismiss_wake.

database property

database: DatabaseNamespace

Database namespace: get_item, list_namespaces, get_namespace_item.

files property

files: FilesNamespace

Files namespace: list, metadata, directories, move, copy.

history property

history: HistoryNamespace

History namespace: list, totals, get_job, delete_job.

is_connected property

is_connected: bool

Return True if the WebSocket is currently connected.

job_queue property

job_queue: JobQueueNamespace

Job queue namespace: status, add, delete, pause, resume, start.

machine property

machine: MachineNamespace

Machine namespace: system_info, proc_stats, reboot, shutdown.

power property

power: PowerNamespace

Power namespace: get_devices, get_device_status, toggle_device.

printer property

printer: PrinterNamespace

Printer namespace: info, gcode, objects, print control.

server property

server: ServerNamespace

Server namespace: info, config, temperature/gcode stores.

update_manager property

update_manager: UpdateManagerNamespace

Update manager namespace: get_status, refresh.

webcams property

webcams: WebcamsNamespace

Webcams namespace: list, get_item.

call async

call(method: str, params: dict[str, Any] | None = None, *, timeout: float | None = None) -> Any

Send a JSON-RPC request over WebSocket.

Parameters:

Name Type Description Default
method str

Moonraker JSON-RPC method (for example "server.info").

required
params dict[str, Any] | None

Optional JSON-RPC params object.

None
timeout float | None

Optional request timeout override in seconds.

None

Returns:

Type Description
Any

Decoded result payload from the JSON-RPC response.

Source code in src/pymoonraker/client.py
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
async def call(
    self,
    method: str,
    params: dict[str, Any] | None = None,
    *,
    timeout: float | None = None,
) -> Any:
    """Send a JSON-RPC request over WebSocket.

    Args:
        method: Moonraker JSON-RPC method (for example ``"server.info"``).
        params: Optional JSON-RPC params object.
        timeout: Optional request timeout override in seconds.

    Returns:
        Decoded ``result`` payload from the JSON-RPC response.

    """
    result = await self._rpc.call(method, params, timeout=timeout)
    if method == "printer.objects.subscribe":
        self._remember_subscription(params)
    return result

connect async

connect() -> None

Establish WebSocket and HTTP connections, identify, and start listeners.

Source code in src/pymoonraker/client.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
async def connect(self) -> None:
    """Establish WebSocket and HTTP connections, identify, and start listeners."""
    self._closing = False
    await self._http_transport.connect()
    await self._ws_transport.connect()

    self._events.drain_notification_queue()
    self._rpc.start(
        self._events.notification_queue,
        on_disconnect=self._on_transport_disconnect,
    )
    self._events.start()

    await self._identify()
    await self._restore_subscriptions()

    self._connected.set()
    logger.info(
        "Connected to Moonraker at %s:%s",
        self._host,
        self._port,
        extra=self._connection_log_extra(),
    )

disconnect async

disconnect() -> None

Gracefully shut down all connections and background tasks.

Source code in src/pymoonraker/client.py
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
async def disconnect(self) -> None:
    """Gracefully shut down all connections and background tasks."""
    self._closing = True
    self._connected.clear()

    if self._reconnect_task is not None:
        self._reconnect_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._reconnect_task
        self._reconnect_task = None

    await self._events.stop()
    await self._rpc.stop()
    await self._ws_transport.disconnect()
    await self._http_transport.disconnect()
    logger.info("Disconnected from Moonraker", extra=self._connection_log_extra())

download_file async

download_file(root: str, file_path: str) -> bytes

Download a file from Moonraker over HTTP.

Parameters:

Name Type Description Default
root str

Moonraker file root.

required
file_path str

Path to the file inside root.

required

Returns:

Type Description
bytes

Raw file bytes.

Source code in src/pymoonraker/client.py
440
441
442
443
444
445
446
447
448
449
450
451
async def download_file(self, root: str, file_path: str) -> bytes:
    """Download a file from Moonraker over HTTP.

    Args:
        root: Moonraker file root.
        file_path: Path to the file inside ``root``.

    Returns:
        Raw file bytes.

    """
    return await self._http_transport.download_file(root, file_path)

emergency_stop async

emergency_stop() -> None

Trigger an immediate emergency stop.

Source code in src/pymoonraker/client.py
392
393
394
async def emergency_stop(self) -> None:
    """Trigger an immediate emergency stop."""
    await self.call("printer.emergency_stop")

firmware_restart async

firmware_restart() -> None

Restart Klipper with a firmware restart (MCU reset).

Source code in src/pymoonraker/client.py
459
460
461
async def firmware_restart(self) -> None:
    """Restart Klipper with a firmware restart (MCU reset)."""
    await self.call("printer.firmware_restart")

gcode async

gcode(script: str) -> str

Execute a G-code script.

Parameters:

Name Type Description Default
script str

G-code commands to execute.

required

Returns:

Type Description
str

Moonraker response string.

Source code in src/pymoonraker/client.py
380
381
382
383
384
385
386
387
388
389
390
async def gcode(self, script: str) -> str:
    """Execute a G-code script.

    Args:
        script: G-code commands to execute.

    Returns:
        Moonraker response string.

    """
    return cast("str", await self.call("printer.gcode.script", {"script": script}))

http_request async

http_request(method: str, path: str, **kwargs: Any) -> Any

Send an HTTP request through Moonraker's REST endpoints.

Parameters:

Name Type Description Default
method str

HTTP method such as "GET" or "POST".

required
path str

Endpoint path beginning with /.

required
**kwargs Any

Additional arguments forwarded to the HTTP transport.

{}

Returns:

Type Description
Any

Decoded HTTP response payload.

Source code in src/pymoonraker/client.py
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
async def http_request(
    self,
    method: str,
    path: str,
    **kwargs: Any,
) -> Any:
    """Send an HTTP request through Moonraker's REST endpoints.

    Args:
        method: HTTP method such as ``"GET"`` or ``"POST"``.
        path: Endpoint path beginning with ``/``.
        **kwargs: Additional arguments forwarded to the HTTP transport.

    Returns:
        Decoded HTTP response payload.

    """
    return await self._http_transport.request(method, path, **kwargs)

klippy_state async

klippy_state() -> KlippyState

Return the current Klippy state.

Source code in src/pymoonraker/client.py
325
326
327
328
329
330
331
async def klippy_state(self) -> KlippyState:
    """Return the current Klippy state."""
    info = await self.server_info()
    state = info.klippy_state
    if state is None:
        return KlippyState.STARTUP
    return state

on

on(event: str | EventType, callback: Callback) -> Callable[[], None]

Register a callback for a Moonraker notification event.

Parameters:

Name Type Description Default
event str | EventType

Moonraker notification name or EventType constant.

required
callback Callback

Sync or async callback invoked when the event is received.

required

Returns:

Type Description
Callable[[], None]

Callable with no arguments that unsubscribes the callback.

Source code in src/pymoonraker/client.py
244
245
246
247
248
249
250
251
252
253
254
255
def on(self, event: str | EventType, callback: Callback) -> Callable[[], None]:
    """Register a callback for a Moonraker notification event.

    Args:
        event: Moonraker notification name or ``EventType`` constant.
        callback: Sync or async callback invoked when the event is received.

    Returns:
        Callable with no arguments that unsubscribes the callback.

    """
    return self._events.on(str(event), callback)

once

once(event: str | EventType, callback: Callback) -> None

Register a one-shot callback for an event.

The callback is removed after the first matching notification.

Parameters:

Name Type Description Default
event str | EventType

Moonraker notification name or EventType constant.

required
callback Callback

Sync or async callback invoked once.

required
Source code in src/pymoonraker/client.py
257
258
259
260
261
262
263
264
265
266
267
def once(self, event: str | EventType, callback: Callback) -> None:
    """Register a one-shot callback for an event.

    The callback is removed after the first matching notification.

    Args:
        event: Moonraker notification name or ``EventType`` constant.
        callback: Sync or async callback invoked once.

    """
    self._events.once(str(event), callback)

print_cancel async

print_cancel() -> None

Cancel the current print.

Source code in src/pymoonraker/client.py
410
411
412
async def print_cancel(self) -> None:
    """Cancel the current print."""
    await self.call("printer.print.cancel")

print_pause async

print_pause() -> None

Pause the current print.

Source code in src/pymoonraker/client.py
402
403
404
async def print_pause(self) -> None:
    """Pause the current print."""
    await self.call("printer.print.pause")

print_resume async

print_resume() -> None

Resume the paused print.

Source code in src/pymoonraker/client.py
406
407
408
async def print_resume(self) -> None:
    """Resume the paused print."""
    await self.call("printer.print.resume")

print_start async

print_start(filename: str) -> None

Start printing the specified file.

Source code in src/pymoonraker/client.py
398
399
400
async def print_start(self, filename: str) -> None:
    """Start printing the specified file."""
    await self.call("printer.print.start", {"filename": filename})

printer_info async

printer_info() -> PrinterInfo

Query printer.info and return a typed model.

Source code in src/pymoonraker/client.py
320
321
322
323
async def printer_info(self) -> PrinterInfo:
    """Query ``printer.info`` and return a typed model."""
    raw = await self.call("printer.info")
    return PrinterInfo.model_validate(raw)

query_objects async

query_objects(objects: dict[str, list[str] | None]) -> dict[str, Any]

Query one or more printer objects.

Example::

result = await client.query_objects({
    "toolhead": ["position", "homed_axes"],
    "heater_bed": None,  # all fields
})

Parameters:

Name Type Description Default
objects dict[str, list[str] | None]

Mapping of object name to field list or None for all fields.

required

Returns:

Type Description
dict[str, Any]

status mapping keyed by printer object name.

Source code in src/pymoonraker/client.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
async def query_objects(
    self,
    objects: dict[str, list[str] | None],
) -> dict[str, Any]:
    """Query one or more printer objects.

    Example::

        result = await client.query_objects({
            "toolhead": ["position", "homed_axes"],
            "heater_bed": None,  # all fields
        })

    Args:
        objects: Mapping of object name to field list or ``None`` for all fields.

    Returns:
        ``status`` mapping keyed by printer object name.

    """
    return cast(
        "dict[str, Any]", await self.call("printer.objects.query", {"objects": objects})
    )

restart_klipper async

restart_klipper() -> None

Soft-restart Klipper.

Source code in src/pymoonraker/client.py
455
456
457
async def restart_klipper(self) -> None:
    """Soft-restart Klipper."""
    await self.call("printer.restart")

restart_moonraker async

restart_moonraker() -> None

Restart the Moonraker server process.

Source code in src/pymoonraker/client.py
463
464
465
async def restart_moonraker(self) -> None:
    """Restart the Moonraker server process."""
    await self.call("server.restart")

server_info async

server_info() -> ServerInfo

Query server.info and return a typed model.

Source code in src/pymoonraker/client.py
315
316
317
318
async def server_info(self) -> ServerInfo:
    """Query ``server.info`` and return a typed model."""
    raw = await self.call("server.info")
    return ServerInfo.model_validate(raw)

subscribe_objects async

subscribe_objects(objects: dict[str, list[str] | None]) -> dict[str, Any]

Subscribe to printer object updates.

Updated values arrive as notify_status_update events.

Parameters:

Name Type Description Default
objects dict[str, list[str] | None]

Mapping of object name to field list or None for all fields.

required

Returns:

Type Description
dict[str, Any]

Initial snapshot payload returned by Moonraker.

Source code in src/pymoonraker/client.py
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
async def subscribe_objects(
    self,
    objects: dict[str, list[str] | None],
) -> dict[str, Any]:
    """Subscribe to printer object updates.

    Updated values arrive as ``notify_status_update`` events.

    Args:
        objects: Mapping of object name to field list or ``None`` for all fields.

    Returns:
        Initial snapshot payload returned by Moonraker.

    """
    return cast(
        "dict[str, Any]", await self.call("printer.objects.subscribe", {"objects": objects})
    )

upload_file async

upload_file(file_path: str, content: bytes, *, root: str = 'gcodes', target_path: str | None = None) -> Any

Upload a file to Moonraker over HTTP.

Parameters:

Name Type Description Default
file_path str

Source filename (used as upload name unless target_path is set).

required
content bytes

File bytes.

required
root str

Moonraker file root, usually "gcodes".

'gcodes'
target_path str | None

Optional path within the selected root.

None

Returns:

Type Description
Any

Moonraker upload response payload.

Source code in src/pymoonraker/client.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
async def upload_file(
    self,
    file_path: str,
    content: bytes,
    *,
    root: str = "gcodes",
    target_path: str | None = None,
) -> Any:
    """Upload a file to Moonraker over HTTP.

    Args:
        file_path: Source filename (used as upload name unless ``target_path`` is set).
        content: File bytes.
        root: Moonraker file root, usually ``"gcodes"``.
        target_path: Optional path within the selected root.

    Returns:
        Moonraker upload response payload.

    """
    return await self._http_transport.upload_file(
        file_path, content, root=root, target_path=target_path
    )

wait_connected async

wait_connected(timeout: float | None = None) -> None

Block until the client is connected (useful after auto-reconnect).

Source code in src/pymoonraker/client.py
171
172
173
async def wait_connected(self, timeout: float | None = None) -> None:
    """Block until the client is connected (useful after auto-reconnect)."""
    await asyncio.wait_for(self._connected.wait(), timeout=timeout)