Module pipettin-piper.piper.plugins.sio_status
Functions
def load_plugin(controller: "'Controller'", **kwargs)-
Expand source code
def load_plugin(controller: "Controller", **kwargs): """Status update plugin using SocketIO, for the Pipettin Writer web UI. Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception. """ logging.debug(f"load_plugin: loading {plugin_name} plugin.") try: class_instance = StatusUpdater(controller) except Exception as e: msg = f"Failed to load with error: {e}" logging.error(msg) raise PluginError(msg) from e return class_instanceStatus update plugin using SocketIO, for the Pipettin Writer web UI. Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.
Classes
class StatusUpdater (controller: Controller, config=None, verbose=True)-
Expand source code
class StatusUpdater(Plugin): """Sends status events to the GUI/backend. Gather status information from the controller and sends Socket.io events to the GUI. Example outgoing event: {'data': {'controller_id': 'piper-414193', 'message': {'background_task': False, 'killed': False, 'printer_ready': False, 'ready': False, 'recovering': False, 'run': True, 'sio_connected': True, 'sio_ready': True}, 'status': 'WARN'}, 'id': 'status-1698206305.9561307', 'method': 'status'} Also responds to an incoming status request event, and responds with the above data. Development issue: https://gitlab.com/pipettin-bot/pipettin-gui/-/issues/39 """ def __init__(self, controller: Controller, config = None, verbose = True): # Save configuration. if config is None: config = {} self.config = config self.verbose = verbose # Save controller. self.controller: Controller = controller # Register additional controller coroutines. self.controller.coroutine_methods.extend([ self.status_message_loop(), self.tool_data_loop() ]) # Register default socketio event handlers. self.register_sio_callbacks() # Register action handlers. # self.register_action_handlers() # Set status. self._status = True # A queue object for commands from the GUI/socket. event_queue = Queue() # SOCKET.IO CALLBACKS AND EVENTS SECTION #### status_request_event = 'status_update_request' def register_sio_callbacks(self): """Function to register socketio event callbacks, typically sent by the Pipettin GUI.""" if self.controller.comms.sio: @self.controller.comms.sio.on(self.status_request_event) async def status_update_handler(data): """Receives a event from the GUI/backend.""" response_data = await self.make_status(data) return response_data tool_data_socket_event = "tool_data" tool_data_wait_time = 0.5 async def tool_data_loop(self): """Request tool data to the firmware, and relay it to the UI through socketio.""" logging.info("Coroutine started.") try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") # Wait for the websocket to be defined. # NOTE: If it is closed, the error will be caught further below. while not self.controller.machine.websocket and self.controller.run: logging.debug("Websocket not defined, plugin disabled. Re-checking in 2 seconds.") await asyncio.sleep(2) while self.controller.run: # Build a unique ID. command_id = "motion_report_" + self.controller.machine.hash_cmd(self.tool_data_socket_event) # Build RPC command. rpc_cmd = rpc_primitives.query_motion_report(id=command_id) # Track time initial_time = time.time() # Send the command cmd_id, (wait_pass, check_pass) = await self.controller.machine.send_cmd( rpc_cmd=rpc_cmd, wait=True, check=False, timeout=self.tool_data_wait_time) # Process the response. if not wait_pass: # If not found yet, it must have timed-out. logging.warning("Timed out waiting for response.") elif self.controller.machine.dry: logging.warning("Machine in dry mode. Sleeping for a bit and skipping tool reports.") await asyncio.sleep(2.0) else: try: # Get the response. response = self.controller.machine.tracker[command_id]["response"] # Get the position information from the report. # They look like this: [270.57, 64.25999999999999, 81.0, 0.0, 0.0, 0.0, [0.0]] coords = response["result"]["status"]["motion_report"]["live_position"] logging.debug(f"Received live-position coordinates: {coords}") # Get XYZ coordinates, discarding ABC and E coordinates, and "subtracting" tool offsets. tool_coords = self.controller.builder.addToolOffsets(x=-coords[0], y=-coords[1], z=-coords[2]) # Save the XYZ coordinates to a dict, rounding the values, and reverting the sign change. position_data = {a: round(-tool_coords[i], 2) for i, a in enumerate('xyz')} logging.debug(f"Relaying tool position data to socketio: {position_data}") # Send the coordinates to the UI. event_data = {"data": position_data} await self.controller.comms.emit( self.tool_data_socket_event, event_data, callback=self.controller.comms.dummy_ack) # Update the plugin's status. self.status = True except KeyError as e: msg = f"Failed to get tool data: key {e} not found in tracker with request id '{command_id}'." msg += f" Actual tracker entry: {self.controller.machine.tracker[command_id]}" logging.warning(msg) # Update the plugin's status. self.status = False # Wait for the actual remaining time (after the previous await). final_time = time.time() if final_time - initial_time <= self.tool_data_wait_time: await asyncio.sleep(self.tool_data_wait_time - (final_time - initial_time)) logging.info("Coroutine loop ended: controller not running.") except asyncio.exceptions.CancelledError: logging.warning("Coroutine cancelled.") except Exception as e: msg = f"Coroutine failed due to an unhandled error: {e}\n" + traceback.format_exc() logging.critical(msg) print(msg) # Update the plugin's status. self.status = False logging.warning("Coroutine ended.") # Possible status codes and their colors. # NOTE: These must match the ones defined at `pipettin-gui/client/src/components/Buttons/StatusController.jsx`. status_ok = 'OK' # green status_warn = 'WARN' # orange status_error = 'ERROR' # red status_unk = 'UNK' # yellow status_off = 'OFF' # grey status_standby = 'STANDBY' # blue # Status codes, ordered by severity. status_off_code = 1 status_unk_code = 2 status_ok_code = 3 status_standby_code = 4 status_warn_code = 5 status_error_code = 6 # Status code map. status_codes = { status_ok_code: status_ok, status_warn_code: status_warn, status_error_code: status_error, status_unk_code: status_unk, status_off_code: status_off, status_standby_code: status_standby } async def make_status(self, data=None): """Process and digest status data. The controller's status data might look like this: { 'controller': {'background_task': False, 'killed': False, 'plugins': {'pocketpcr_serial': False}, 'ready': False, 'run': True}, 'klipper': {'plugins': {}, 'printer_ready': False, 'recovering': False}, 'sio': {'connected': True, 'plugins': {'example': True, 'p2g_command': True, 'pcr_template': True, 'sio_status': True}, 'ready': True} } """ logging.debug(f"Received status request event with data:\n{pformat(data)}") # Get the program-wide status information, including plugins. status: dict = await self.controller.status() logging.debug("Gathered status data:\n" + pformat(status)) # Look for plugins that failed to load. failing_plugins = [ plugin_name for comm in status.values() for plugin_name, plugin_loaded in comm.get("plugins", {}).items() if not plugin_loaded ] recovering_modules = [k for k, s in status.items() if s.get("recovering", "OFF") != "OFF"] killed_modules = [k for k, s in status.items() if s.get("killed", False)] undead_modules = [k for k, s in status.items() if (s.get("run", True) ^ s.get("ready", True)) and not s.get("dry", False)] # Bit-wise operator "^" is XOR. all_ok = all([s.get("run", True) and s.get("ready", True) for s in status.values()]) nok_modules = [k for k, s in status.items() if s.get("status", "OK") != "OK" and not s.get("dry", "OFF") != "OFF"] dry_modules = [k for k, s in status.items() if s.get("dry", "OFF") != "OFF"] # unstarted_modules = [k for k, s in status.items() if not s.get("run", True) and not s.get("ready", True)] # Default to unknown. status_id = self.status_unk_code # Compute current overall status. if recovering_modules: # 1) Reopening a connection, like the websocket connection to moonraker. logging.warning(f"A controller component is failing and trying to recover: {recovering_modules}") status_id = max(self.status_error_code, status_id) if killed_modules: # 2) Killed means horrible death. logging.warning(f"A controller component has died a horrible death: {killed_modules}") status_id = max(self.status_error_code, status_id) if failing_plugins: # 3) A false value in a plugin means it failed to load. logging.warning(f"Plugins reporting failures: {failing_plugins}") status_id = max(self.status_error_code, status_id) if dry_modules: logging.warning(f"Some modules are running in dry mode: {dry_modules}") status_id = max(self.status_warn_code, status_id) if undead_modules: # 4.1) If it is not killed, but also not running or not ready, # it might be that it hasn't completely started yet. logging.warning(f"All modules are running, but are not ready: {undead_modules}") status_id = max(self.status_warn_code, status_id) if nok_modules: # The modules report something wrong, even though we do not find it here, logging.warning(f"Something is wrong in the following modules: {nok_modules}") status_id = max(self.status_warn_code, status_id) if all_ok: # 4.2) Everything is fine (:fire:). logging.debug("All modules running and ready.") status_id = max(self.status_ok_code, status_id) if status_id == self.status_unk_code: # 6) Unknown state. logging.warning("Global status interpretation failed. No condition matched.") # Build and return status message. status_info = { "id": "status-" + str(time.time()), "method": "status", "data": { "controller_id": f"piper-{str(os.getpid())}-{str(socket.gethostname())}", # NOTE: puede ser "OK", "WARN", o "ERROR". "status": self.status_codes[status_id], "info": status } } logging.debug(f"Sending status with data:\n{pformat(status_info)}") return status_info status_update_interval = 1.0 update_status_event = "controller_status" """Seconds between status update events.""" async def status_message_loop(self): """Sequentially process events queued by socketio events.""" logging.info("Coroutine started.") ready_before = True try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while self.controller.run: if self.controller.comms.sio_ready: if not ready_before: logging.warning("Sio connection reestablished, sending status event.") ready_before = True try: # Prepare the status data. status = await self.make_status() # Send the data through socketio. await self.controller.comms.emit( self.update_status_event, status, callback=self.controller.comms.dummy_ack ) logging.debug(f"Sent status event:\n{pformat(status)}") # Update the plugin's status. self.status = True except Exception as e: msg = f"Caught exception with message: {e}\n" + traceback.format_exc() logging.error(msg) # Update the plugin's status. self.status = False elif self.controller.comms.sio_address: logging.warning("Sio connection not ready, skipping status event and waiting.") ready_before = False # Update the plugin's status. self.status = False await asyncio.sleep(self.status_update_interval) logging.info("Coroutine loop ended: controller not running.") except asyncio.exceptions.CancelledError: logging.warning("Coroutine cancelled.") # Update the plugin's status. self.status = False logging.warning("Coroutine ended.")Sends status events to the GUI/backend.
Gather status information from the controller and sends Socket.io events to the GUI.
Example outgoing event:
{'data': {'controller_id': 'piper-414193', 'message': {'background_task': False, 'killed': False, 'printer_ready': False, 'ready': False, 'recovering': False, 'run': True, 'sio_connected': True, 'sio_ready': True}, 'status': 'WARN'}, 'id': 'status-1698206305.9561307', 'method': 'status'}Also responds to an incoming status request event, and responds with the above data.
Development issue: https://gitlab.com/pipettin-bot/pipettin-gui/-/issues/39
Ancestors
Class variables
var event_queuevar status_codesvar status_errorvar status_error_codevar status_offvar status_off_codevar status_okvar status_ok_codevar status_request_eventvar status_standbyvar status_standby_codevar status_unkvar status_unk_codevar status_update_intervalvar status_warnvar status_warn_codevar tool_data_socket_eventvar tool_data_wait_timevar update_status_event-
Seconds between status update events.
Methods
async def make_status(self, data=None)-
Expand source code
async def make_status(self, data=None): """Process and digest status data. The controller's status data might look like this: { 'controller': {'background_task': False, 'killed': False, 'plugins': {'pocketpcr_serial': False}, 'ready': False, 'run': True}, 'klipper': {'plugins': {}, 'printer_ready': False, 'recovering': False}, 'sio': {'connected': True, 'plugins': {'example': True, 'p2g_command': True, 'pcr_template': True, 'sio_status': True}, 'ready': True} } """ logging.debug(f"Received status request event with data:\n{pformat(data)}") # Get the program-wide status information, including plugins. status: dict = await self.controller.status() logging.debug("Gathered status data:\n" + pformat(status)) # Look for plugins that failed to load. failing_plugins = [ plugin_name for comm in status.values() for plugin_name, plugin_loaded in comm.get("plugins", {}).items() if not plugin_loaded ] recovering_modules = [k for k, s in status.items() if s.get("recovering", "OFF") != "OFF"] killed_modules = [k for k, s in status.items() if s.get("killed", False)] undead_modules = [k for k, s in status.items() if (s.get("run", True) ^ s.get("ready", True)) and not s.get("dry", False)] # Bit-wise operator "^" is XOR. all_ok = all([s.get("run", True) and s.get("ready", True) for s in status.values()]) nok_modules = [k for k, s in status.items() if s.get("status", "OK") != "OK" and not s.get("dry", "OFF") != "OFF"] dry_modules = [k for k, s in status.items() if s.get("dry", "OFF") != "OFF"] # unstarted_modules = [k for k, s in status.items() if not s.get("run", True) and not s.get("ready", True)] # Default to unknown. status_id = self.status_unk_code # Compute current overall status. if recovering_modules: # 1) Reopening a connection, like the websocket connection to moonraker. logging.warning(f"A controller component is failing and trying to recover: {recovering_modules}") status_id = max(self.status_error_code, status_id) if killed_modules: # 2) Killed means horrible death. logging.warning(f"A controller component has died a horrible death: {killed_modules}") status_id = max(self.status_error_code, status_id) if failing_plugins: # 3) A false value in a plugin means it failed to load. logging.warning(f"Plugins reporting failures: {failing_plugins}") status_id = max(self.status_error_code, status_id) if dry_modules: logging.warning(f"Some modules are running in dry mode: {dry_modules}") status_id = max(self.status_warn_code, status_id) if undead_modules: # 4.1) If it is not killed, but also not running or not ready, # it might be that it hasn't completely started yet. logging.warning(f"All modules are running, but are not ready: {undead_modules}") status_id = max(self.status_warn_code, status_id) if nok_modules: # The modules report something wrong, even though we do not find it here, logging.warning(f"Something is wrong in the following modules: {nok_modules}") status_id = max(self.status_warn_code, status_id) if all_ok: # 4.2) Everything is fine (:fire:). logging.debug("All modules running and ready.") status_id = max(self.status_ok_code, status_id) if status_id == self.status_unk_code: # 6) Unknown state. logging.warning("Global status interpretation failed. No condition matched.") # Build and return status message. status_info = { "id": "status-" + str(time.time()), "method": "status", "data": { "controller_id": f"piper-{str(os.getpid())}-{str(socket.gethostname())}", # NOTE: puede ser "OK", "WARN", o "ERROR". "status": self.status_codes[status_id], "info": status } } logging.debug(f"Sending status with data:\n{pformat(status_info)}") return status_infoProcess and digest status data. The controller's status data might look like this: { 'controller': {'background_task': False, 'killed': False, 'plugins': {'pocketpcr_serial': False}, 'ready': False, 'run': True}, 'klipper': {'plugins': {}, 'printer_ready': False, 'recovering': False}, 'sio': {'connected': True, 'plugins': {'example': True, 'p2g_command': True, 'pcr_template': True, 'sio_status': True}, 'ready': True} }
def register_sio_callbacks(self)-
Expand source code
def register_sio_callbacks(self): """Function to register socketio event callbacks, typically sent by the Pipettin GUI.""" if self.controller.comms.sio: @self.controller.comms.sio.on(self.status_request_event) async def status_update_handler(data): """Receives a event from the GUI/backend.""" response_data = await self.make_status(data) return response_dataFunction to register socketio event callbacks, typically sent by the Pipettin GUI.
async def status_message_loop(self)-
Expand source code
async def status_message_loop(self): """Sequentially process events queued by socketio events.""" logging.info("Coroutine started.") ready_before = True try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while self.controller.run: if self.controller.comms.sio_ready: if not ready_before: logging.warning("Sio connection reestablished, sending status event.") ready_before = True try: # Prepare the status data. status = await self.make_status() # Send the data through socketio. await self.controller.comms.emit( self.update_status_event, status, callback=self.controller.comms.dummy_ack ) logging.debug(f"Sent status event:\n{pformat(status)}") # Update the plugin's status. self.status = True except Exception as e: msg = f"Caught exception with message: {e}\n" + traceback.format_exc() logging.error(msg) # Update the plugin's status. self.status = False elif self.controller.comms.sio_address: logging.warning("Sio connection not ready, skipping status event and waiting.") ready_before = False # Update the plugin's status. self.status = False await asyncio.sleep(self.status_update_interval) logging.info("Coroutine loop ended: controller not running.") except asyncio.exceptions.CancelledError: logging.warning("Coroutine cancelled.") # Update the plugin's status. self.status = False logging.warning("Coroutine ended.")Sequentially process events queued by socketio events.
async def tool_data_loop(self)-
Expand source code
async def tool_data_loop(self): """Request tool data to the firmware, and relay it to the UI through socketio.""" logging.info("Coroutine started.") try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") # Wait for the websocket to be defined. # NOTE: If it is closed, the error will be caught further below. while not self.controller.machine.websocket and self.controller.run: logging.debug("Websocket not defined, plugin disabled. Re-checking in 2 seconds.") await asyncio.sleep(2) while self.controller.run: # Build a unique ID. command_id = "motion_report_" + self.controller.machine.hash_cmd(self.tool_data_socket_event) # Build RPC command. rpc_cmd = rpc_primitives.query_motion_report(id=command_id) # Track time initial_time = time.time() # Send the command cmd_id, (wait_pass, check_pass) = await self.controller.machine.send_cmd( rpc_cmd=rpc_cmd, wait=True, check=False, timeout=self.tool_data_wait_time) # Process the response. if not wait_pass: # If not found yet, it must have timed-out. logging.warning("Timed out waiting for response.") elif self.controller.machine.dry: logging.warning("Machine in dry mode. Sleeping for a bit and skipping tool reports.") await asyncio.sleep(2.0) else: try: # Get the response. response = self.controller.machine.tracker[command_id]["response"] # Get the position information from the report. # They look like this: [270.57, 64.25999999999999, 81.0, 0.0, 0.0, 0.0, [0.0]] coords = response["result"]["status"]["motion_report"]["live_position"] logging.debug(f"Received live-position coordinates: {coords}") # Get XYZ coordinates, discarding ABC and E coordinates, and "subtracting" tool offsets. tool_coords = self.controller.builder.addToolOffsets(x=-coords[0], y=-coords[1], z=-coords[2]) # Save the XYZ coordinates to a dict, rounding the values, and reverting the sign change. position_data = {a: round(-tool_coords[i], 2) for i, a in enumerate('xyz')} logging.debug(f"Relaying tool position data to socketio: {position_data}") # Send the coordinates to the UI. event_data = {"data": position_data} await self.controller.comms.emit( self.tool_data_socket_event, event_data, callback=self.controller.comms.dummy_ack) # Update the plugin's status. self.status = True except KeyError as e: msg = f"Failed to get tool data: key {e} not found in tracker with request id '{command_id}'." msg += f" Actual tracker entry: {self.controller.machine.tracker[command_id]}" logging.warning(msg) # Update the plugin's status. self.status = False # Wait for the actual remaining time (after the previous await). final_time = time.time() if final_time - initial_time <= self.tool_data_wait_time: await asyncio.sleep(self.tool_data_wait_time - (final_time - initial_time)) logging.info("Coroutine loop ended: controller not running.") except asyncio.exceptions.CancelledError: logging.warning("Coroutine cancelled.") except Exception as e: msg = f"Coroutine failed due to an unhandled error: {e}\n" + traceback.format_exc() logging.critical(msg) print(msg) # Update the plugin's status. self.status = False logging.warning("Coroutine ended.")Request tool data to the firmware, and relay it to the UI through socketio.
Inherited members