Module pipettin-piper.piper.plugins.sio_status

Functions

def load_plugin(controller: "'Controller'", **kwargs)
Expand source code
def load_plugin(controller: "Controller", **kwargs):
    """Status update plugin using SocketIO, for the Pipettin Writer web UI.
    Plugins are expected to have a function named 'load_plugin' which will instantiate
    the plugin's class and returning it to the main Commander class.
    If they fail to load, they must raise a PluginError exception.
    """
    logging.debug(f"load_plugin: loading {plugin_name} plugin.")
    try:
        class_instance = StatusUpdater(controller)
    except Exception as e:
        msg = f"Failed to load with error: {e}"
        logging.error(msg)
        raise PluginError(msg) from e

    return class_instance

Status update plugin using SocketIO, for the Pipettin Writer web UI. Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.

Classes

class StatusUpdater (controller: Controller, config=None, verbose=True)
Expand source code
class StatusUpdater(Plugin):
    """Sends status events to the GUI/backend.

    Gather status information from the controller and sends Socket.io events to the GUI.

    Example outgoing event:

        {'data': {'controller_id': 'piper-414193',
                'message': {'background_task': False,
                            'killed': False,
                            'printer_ready': False,
                            'ready': False,
                            'recovering': False,
                            'run': True,
                            'sio_connected': True,
                            'sio_ready': True},
                'status': 'WARN'},
        'id': 'status-1698206305.9561307',
        'method': 'status'}

    Also responds to an incoming status request event, and responds with the above data.

    Development issue: https://gitlab.com/pipettin-bot/pipettin-gui/-/issues/39
    """
    def __init__(self,
                 controller: Controller,
                 config = None,
                 verbose = True):

        # Save configuration.
        if config is None:
            config = {}
        self.config = config
        self.verbose = verbose
        # Save controller.
        self.controller: Controller = controller

        # Register additional controller coroutines.
        self.controller.coroutine_methods.extend([
            self.status_message_loop(),
            self.tool_data_loop()
        ])

        # Register default socketio event handlers.
        self.register_sio_callbacks()

        # Register action handlers.
        # self.register_action_handlers()

        # Set status.
        self._status = True


    # A queue object for commands from the GUI/socket.
    event_queue = Queue()

    # SOCKET.IO CALLBACKS AND EVENTS SECTION ####
    status_request_event = 'status_update_request'
    def register_sio_callbacks(self):
        """Function to register socketio event callbacks, typically sent by the Pipettin GUI."""
        if self.controller.comms.sio:
            @self.controller.comms.sio.on(self.status_request_event)
            async def status_update_handler(data):
                """Receives a event from the GUI/backend."""
                response_data = await self.make_status(data)
                return response_data

    tool_data_socket_event = "tool_data"
    tool_data_wait_time = 0.5
    async def tool_data_loop(self):
        """Request tool data to the firmware, and relay it to the UI through socketio."""
        logging.info("Coroutine started.")

        try:
            while not self.controller.run:
                asyncio.sleep(self.controller.wait_default)
            logging.info("Coroutine ready.")

            # Wait for the websocket to be defined.
            # NOTE: If it is closed, the error will be caught further below.
            while not self.controller.machine.websocket and self.controller.run:
                logging.debug("Websocket not defined, plugin disabled. Re-checking in 2 seconds.")
                await asyncio.sleep(2)

            while self.controller.run:
                # Build a unique ID.
                command_id = "motion_report_" + self.controller.machine.hash_cmd(self.tool_data_socket_event)
                # Build RPC command.
                rpc_cmd = rpc_primitives.query_motion_report(id=command_id)

                # Track time
                initial_time = time.time()

                # Send the command
                cmd_id, (wait_pass, check_pass) = await self.controller.machine.send_cmd(
                    rpc_cmd=rpc_cmd, wait=True, check=False,
                    timeout=self.tool_data_wait_time)

                # Process the response.
                if not wait_pass:
                    # If not found yet, it must have timed-out.
                    logging.warning("Timed out waiting for response.")
                elif self.controller.machine.dry:
                    logging.warning("Machine in dry mode. Sleeping for a bit and skipping tool reports.")
                    await asyncio.sleep(2.0)
                else:
                    try:
                        # Get the response.
                        response = self.controller.machine.tracker[command_id]["response"]

                        # Get the position information from the report.
                        # They look like this: [270.57, 64.25999999999999, 81.0, 0.0, 0.0, 0.0, [0.0]]
                        coords = response["result"]["status"]["motion_report"]["live_position"]
                        logging.debug(f"Received live-position coordinates: {coords}")

                        # Get XYZ coordinates, discarding ABC and E coordinates, and "subtracting" tool offsets.
                        tool_coords = self.controller.builder.addToolOffsets(x=-coords[0], y=-coords[1], z=-coords[2])

                        # Save the XYZ coordinates to a dict, rounding the values, and reverting the sign change.
                        position_data = {a: round(-tool_coords[i], 2) for i, a in enumerate('xyz')}
                        logging.debug(f"Relaying tool position data to socketio: {position_data}")

                        # Send the coordinates to the UI.
                        event_data = {"data": position_data}
                        await self.controller.comms.emit(
                            self.tool_data_socket_event,
                            event_data,
                            callback=self.controller.comms.dummy_ack)

                        # Update the plugin's status.
                        self.status = True

                    except KeyError as e:
                        msg = f"Failed to get tool data: key {e} not found in tracker with request id '{command_id}'."
                        msg += f" Actual tracker entry: {self.controller.machine.tracker[command_id]}"
                        logging.warning(msg)
                        # Update the plugin's status.
                        self.status = False

                # Wait for the actual remaining time (after the previous await).
                final_time = time.time()
                if final_time - initial_time <= self.tool_data_wait_time:
                    await asyncio.sleep(self.tool_data_wait_time - (final_time - initial_time))

            logging.info("Coroutine loop ended: controller not running.")

        except asyncio.exceptions.CancelledError:
            logging.warning("Coroutine cancelled.")
        except Exception as e:
            msg = f"Coroutine failed due to an unhandled error: {e}\n" + traceback.format_exc()
            logging.critical(msg)
            print(msg)

        # Update the plugin's status.
        self.status = False
        logging.warning("Coroutine ended.")

    # Possible status codes and their colors.
    # NOTE: These must match the ones defined at `pipettin-gui/client/src/components/Buttons/StatusController.jsx`.
    status_ok = 'OK'            # green
    status_warn = 'WARN'        # orange
    status_error = 'ERROR'      # red
    status_unk = 'UNK'          # yellow
    status_off = 'OFF'          # grey
    status_standby = 'STANDBY'  # blue
    # Status codes, ordered by severity.
    status_off_code = 1
    status_unk_code = 2
    status_ok_code = 3
    status_standby_code = 4
    status_warn_code = 5
    status_error_code = 6
    # Status code map.
    status_codes = {
        status_ok_code: status_ok,
        status_warn_code: status_warn,
        status_error_code: status_error,
        status_unk_code: status_unk,
        status_off_code: status_off,
        status_standby_code: status_standby
    }

    async def make_status(self, data=None):
        """Process and digest status data.
        The controller's status data might look like this:
        {
            'controller': {'background_task': False,
                           'killed': False,
                           'plugins': {'pocketpcr_serial': False},
                           'ready': False,
                           'run': True},
            'klipper': {'plugins': {},
                        'printer_ready': False,
                        'recovering': False},
            'sio': {'connected': True,
                    'plugins': {'example': True,
                                'p2g_command': True,
                                'pcr_template': True,
                                'sio_status': True},
                    'ready': True}
        }
        """
        logging.debug(f"Received status request event with data:\n{pformat(data)}")

        # Get the program-wide status information, including plugins.
        status: dict = await self.controller.status()

        logging.debug("Gathered status data:\n" + pformat(status))

        # Look for plugins that failed to load.
        failing_plugins = [
            plugin_name for comm in status.values()
            for plugin_name, plugin_loaded
            in comm.get("plugins", {}).items()
            if not plugin_loaded
        ]

        recovering_modules = [k for k, s in status.items() if s.get("recovering", "OFF") != "OFF"]
        killed_modules = [k for k, s in status.items() if s.get("killed", False)]
        undead_modules = [k for k, s in status.items() if (s.get("run", True) ^ s.get("ready", True)) and not s.get("dry", False)]  # Bit-wise operator "^" is XOR.
        all_ok = all([s.get("run", True) and s.get("ready", True) for s in status.values()])
        nok_modules = [k for k, s in status.items() if s.get("status", "OK") != "OK" and not s.get("dry", "OFF") != "OFF"]
        dry_modules = [k for k, s in status.items() if s.get("dry", "OFF") != "OFF"]
        # unstarted_modules = [k for k, s in status.items() if not s.get("run", True) and not s.get("ready", True)]

        # Default to unknown.
        status_id = self.status_unk_code

        # Compute current overall status.
        if recovering_modules:
            # 1) Reopening a connection, like the websocket connection to moonraker.
            logging.warning(f"A controller component is failing and trying to recover: {recovering_modules}")
            status_id = max(self.status_error_code, status_id)

        if killed_modules:
            # 2) Killed means horrible death.
            logging.warning(f"A controller component has died a horrible death: {killed_modules}")
            status_id = max(self.status_error_code, status_id)

        if failing_plugins:
            # 3) A false value in a plugin means it failed to load.
            logging.warning(f"Plugins reporting failures: {failing_plugins}")
            status_id = max(self.status_error_code, status_id)

        if dry_modules:
            logging.warning(f"Some modules are running in dry mode: {dry_modules}")
            status_id = max(self.status_warn_code, status_id)

        if undead_modules:
            # 4.1) If it is not killed, but also not running or not ready,
            # it might be that it hasn't completely started yet.
            logging.warning(f"All modules are running, but are not ready: {undead_modules}")
            status_id = max(self.status_warn_code, status_id)

        if nok_modules:
            # The modules report something wrong, even though we do not find it here,
            logging.warning(f"Something is wrong in the following modules: {nok_modules}")
            status_id = max(self.status_warn_code, status_id)

        if all_ok:
            # 4.2) Everything is fine (:fire:).
            logging.debug("All modules running and ready.")
            status_id = max(self.status_ok_code, status_id)

        if status_id == self.status_unk_code:
            # 6) Unknown state.
            logging.warning("Global status interpretation failed. No condition matched.")

        # Build and return status message.
        status_info = {
            "id": "status-" + str(time.time()),
            "method": "status",
            "data": {
                "controller_id": f"piper-{str(os.getpid())}-{str(socket.gethostname())}",
                # NOTE: puede ser "OK", "WARN", o "ERROR".
                "status": self.status_codes[status_id],
                "info": status
            }
        }
        logging.debug(f"Sending status with data:\n{pformat(status_info)}")
        return status_info

    status_update_interval = 1.0
    update_status_event = "controller_status"
    """Seconds between status update events."""
    async def status_message_loop(self):
        """Sequentially process events queued by socketio events."""
        logging.info("Coroutine started.")

        ready_before = True

        try:
            while not self.controller.run:
                asyncio.sleep(self.controller.wait_default)
            logging.info("Coroutine ready.")

            while self.controller.run:
                if self.controller.comms.sio_ready:
                    if not ready_before:
                        logging.warning("Sio connection reestablished, sending status event.")
                        ready_before = True
                    try:
                        # Prepare the status data.
                        status = await self.make_status()
                        # Send the data through socketio.
                        await self.controller.comms.emit(
                            self.update_status_event,
                            status,
                            callback=self.controller.comms.dummy_ack
                        )
                        logging.debug(f"Sent status event:\n{pformat(status)}")
                        # Update the plugin's status.
                        self.status = True

                    except Exception as e:
                        msg = f"Caught exception with message: {e}\n" + traceback.format_exc()
                        logging.error(msg)
                        # Update the plugin's status.
                        self.status = False

                elif self.controller.comms.sio_address:
                    logging.warning("Sio connection not ready, skipping status event and waiting.")
                    ready_before = False
                    # Update the plugin's status.
                    self.status = False

                await asyncio.sleep(self.status_update_interval)

            logging.info("Coroutine loop ended: controller not running.")

        except asyncio.exceptions.CancelledError:
            logging.warning("Coroutine cancelled.")

        # Update the plugin's status.
        self.status = False
        logging.warning("Coroutine ended.")

Sends status events to the GUI/backend.

Gather status information from the controller and sends Socket.io events to the GUI.

Example outgoing event:

{'data': {'controller_id': 'piper-414193',
        'message': {'background_task': False,
                    'killed': False,
                    'printer_ready': False,
                    'ready': False,
                    'recovering': False,
                    'run': True,
                    'sio_connected': True,
                    'sio_ready': True},
        'status': 'WARN'},
'id': 'status-1698206305.9561307',
'method': 'status'}

Also responds to an incoming status request event, and responds with the above data.

Development issue: https://gitlab.com/pipettin-bot/pipettin-gui/-/issues/39

Ancestors

Class variables

var event_queue
var status_codes
var status_error
var status_error_code
var status_off
var status_off_code
var status_ok
var status_ok_code
var status_request_event
var status_standby
var status_standby_code
var status_unk
var status_unk_code
var status_update_interval
var status_warn
var status_warn_code
var tool_data_socket_event
var tool_data_wait_time
var update_status_event

Seconds between status update events.

Methods

async def make_status(self, data=None)
Expand source code
async def make_status(self, data=None):
    """Process and digest status data.
    The controller's status data might look like this:
    {
        'controller': {'background_task': False,
                       'killed': False,
                       'plugins': {'pocketpcr_serial': False},
                       'ready': False,
                       'run': True},
        'klipper': {'plugins': {},
                    'printer_ready': False,
                    'recovering': False},
        'sio': {'connected': True,
                'plugins': {'example': True,
                            'p2g_command': True,
                            'pcr_template': True,
                            'sio_status': True},
                'ready': True}
    }
    """
    logging.debug(f"Received status request event with data:\n{pformat(data)}")

    # Get the program-wide status information, including plugins.
    status: dict = await self.controller.status()

    logging.debug("Gathered status data:\n" + pformat(status))

    # Look for plugins that failed to load.
    failing_plugins = [
        plugin_name for comm in status.values()
        for plugin_name, plugin_loaded
        in comm.get("plugins", {}).items()
        if not plugin_loaded
    ]

    recovering_modules = [k for k, s in status.items() if s.get("recovering", "OFF") != "OFF"]
    killed_modules = [k for k, s in status.items() if s.get("killed", False)]
    undead_modules = [k for k, s in status.items() if (s.get("run", True) ^ s.get("ready", True)) and not s.get("dry", False)]  # Bit-wise operator "^" is XOR.
    all_ok = all([s.get("run", True) and s.get("ready", True) for s in status.values()])
    nok_modules = [k for k, s in status.items() if s.get("status", "OK") != "OK" and not s.get("dry", "OFF") != "OFF"]
    dry_modules = [k for k, s in status.items() if s.get("dry", "OFF") != "OFF"]
    # unstarted_modules = [k for k, s in status.items() if not s.get("run", True) and not s.get("ready", True)]

    # Default to unknown.
    status_id = self.status_unk_code

    # Compute current overall status.
    if recovering_modules:
        # 1) Reopening a connection, like the websocket connection to moonraker.
        logging.warning(f"A controller component is failing and trying to recover: {recovering_modules}")
        status_id = max(self.status_error_code, status_id)

    if killed_modules:
        # 2) Killed means horrible death.
        logging.warning(f"A controller component has died a horrible death: {killed_modules}")
        status_id = max(self.status_error_code, status_id)

    if failing_plugins:
        # 3) A false value in a plugin means it failed to load.
        logging.warning(f"Plugins reporting failures: {failing_plugins}")
        status_id = max(self.status_error_code, status_id)

    if dry_modules:
        logging.warning(f"Some modules are running in dry mode: {dry_modules}")
        status_id = max(self.status_warn_code, status_id)

    if undead_modules:
        # 4.1) If it is not killed, but also not running or not ready,
        # it might be that it hasn't completely started yet.
        logging.warning(f"All modules are running, but are not ready: {undead_modules}")
        status_id = max(self.status_warn_code, status_id)

    if nok_modules:
        # The modules report something wrong, even though we do not find it here,
        logging.warning(f"Something is wrong in the following modules: {nok_modules}")
        status_id = max(self.status_warn_code, status_id)

    if all_ok:
        # 4.2) Everything is fine (:fire:).
        logging.debug("All modules running and ready.")
        status_id = max(self.status_ok_code, status_id)

    if status_id == self.status_unk_code:
        # 6) Unknown state.
        logging.warning("Global status interpretation failed. No condition matched.")

    # Build and return status message.
    status_info = {
        "id": "status-" + str(time.time()),
        "method": "status",
        "data": {
            "controller_id": f"piper-{str(os.getpid())}-{str(socket.gethostname())}",
            # NOTE: puede ser "OK", "WARN", o "ERROR".
            "status": self.status_codes[status_id],
            "info": status
        }
    }
    logging.debug(f"Sending status with data:\n{pformat(status_info)}")
    return status_info

Process and digest status data. The controller's status data might look like this: { 'controller': {'background_task': False, 'killed': False, 'plugins': {'pocketpcr_serial': False}, 'ready': False, 'run': True}, 'klipper': {'plugins': {}, 'printer_ready': False, 'recovering': False}, 'sio': {'connected': True, 'plugins': {'example': True, 'p2g_command': True, 'pcr_template': True, 'sio_status': True}, 'ready': True} }

def register_sio_callbacks(self)
Expand source code
def register_sio_callbacks(self):
    """Function to register socketio event callbacks, typically sent by the Pipettin GUI."""
    if self.controller.comms.sio:
        @self.controller.comms.sio.on(self.status_request_event)
        async def status_update_handler(data):
            """Receives a event from the GUI/backend."""
            response_data = await self.make_status(data)
            return response_data

Function to register socketio event callbacks, typically sent by the Pipettin GUI.

async def status_message_loop(self)
Expand source code
async def status_message_loop(self):
    """Sequentially process events queued by socketio events."""
    logging.info("Coroutine started.")

    ready_before = True

    try:
        while not self.controller.run:
            asyncio.sleep(self.controller.wait_default)
        logging.info("Coroutine ready.")

        while self.controller.run:
            if self.controller.comms.sio_ready:
                if not ready_before:
                    logging.warning("Sio connection reestablished, sending status event.")
                    ready_before = True
                try:
                    # Prepare the status data.
                    status = await self.make_status()
                    # Send the data through socketio.
                    await self.controller.comms.emit(
                        self.update_status_event,
                        status,
                        callback=self.controller.comms.dummy_ack
                    )
                    logging.debug(f"Sent status event:\n{pformat(status)}")
                    # Update the plugin's status.
                    self.status = True

                except Exception as e:
                    msg = f"Caught exception with message: {e}\n" + traceback.format_exc()
                    logging.error(msg)
                    # Update the plugin's status.
                    self.status = False

            elif self.controller.comms.sio_address:
                logging.warning("Sio connection not ready, skipping status event and waiting.")
                ready_before = False
                # Update the plugin's status.
                self.status = False

            await asyncio.sleep(self.status_update_interval)

        logging.info("Coroutine loop ended: controller not running.")

    except asyncio.exceptions.CancelledError:
        logging.warning("Coroutine cancelled.")

    # Update the plugin's status.
    self.status = False
    logging.warning("Coroutine ended.")

Sequentially process events queued by socketio events.

async def tool_data_loop(self)
Expand source code
async def tool_data_loop(self):
    """Request tool data to the firmware, and relay it to the UI through socketio."""
    logging.info("Coroutine started.")

    try:
        while not self.controller.run:
            asyncio.sleep(self.controller.wait_default)
        logging.info("Coroutine ready.")

        # Wait for the websocket to be defined.
        # NOTE: If it is closed, the error will be caught further below.
        while not self.controller.machine.websocket and self.controller.run:
            logging.debug("Websocket not defined, plugin disabled. Re-checking in 2 seconds.")
            await asyncio.sleep(2)

        while self.controller.run:
            # Build a unique ID.
            command_id = "motion_report_" + self.controller.machine.hash_cmd(self.tool_data_socket_event)
            # Build RPC command.
            rpc_cmd = rpc_primitives.query_motion_report(id=command_id)

            # Track time
            initial_time = time.time()

            # Send the command
            cmd_id, (wait_pass, check_pass) = await self.controller.machine.send_cmd(
                rpc_cmd=rpc_cmd, wait=True, check=False,
                timeout=self.tool_data_wait_time)

            # Process the response.
            if not wait_pass:
                # If not found yet, it must have timed-out.
                logging.warning("Timed out waiting for response.")
            elif self.controller.machine.dry:
                logging.warning("Machine in dry mode. Sleeping for a bit and skipping tool reports.")
                await asyncio.sleep(2.0)
            else:
                try:
                    # Get the response.
                    response = self.controller.machine.tracker[command_id]["response"]

                    # Get the position information from the report.
                    # They look like this: [270.57, 64.25999999999999, 81.0, 0.0, 0.0, 0.0, [0.0]]
                    coords = response["result"]["status"]["motion_report"]["live_position"]
                    logging.debug(f"Received live-position coordinates: {coords}")

                    # Get XYZ coordinates, discarding ABC and E coordinates, and "subtracting" tool offsets.
                    tool_coords = self.controller.builder.addToolOffsets(x=-coords[0], y=-coords[1], z=-coords[2])

                    # Save the XYZ coordinates to a dict, rounding the values, and reverting the sign change.
                    position_data = {a: round(-tool_coords[i], 2) for i, a in enumerate('xyz')}
                    logging.debug(f"Relaying tool position data to socketio: {position_data}")

                    # Send the coordinates to the UI.
                    event_data = {"data": position_data}
                    await self.controller.comms.emit(
                        self.tool_data_socket_event,
                        event_data,
                        callback=self.controller.comms.dummy_ack)

                    # Update the plugin's status.
                    self.status = True

                except KeyError as e:
                    msg = f"Failed to get tool data: key {e} not found in tracker with request id '{command_id}'."
                    msg += f" Actual tracker entry: {self.controller.machine.tracker[command_id]}"
                    logging.warning(msg)
                    # Update the plugin's status.
                    self.status = False

            # Wait for the actual remaining time (after the previous await).
            final_time = time.time()
            if final_time - initial_time <= self.tool_data_wait_time:
                await asyncio.sleep(self.tool_data_wait_time - (final_time - initial_time))

        logging.info("Coroutine loop ended: controller not running.")

    except asyncio.exceptions.CancelledError:
        logging.warning("Coroutine cancelled.")
    except Exception as e:
        msg = f"Coroutine failed due to an unhandled error: {e}\n" + traceback.format_exc()
        logging.critical(msg)
        print(msg)

    # Update the plugin's status.
    self.status = False
    logging.warning("Coroutine ended.")

Request tool data to the firmware, and relay it to the UI through socketio.

Inherited members