Module pipettin-piper.piper.plugins.p2g_command

Functions

def load_plugin(controller: "'Controller'", **kwargs)
Expand source code
def load_plugin(controller: "Controller", **kwargs):
    """Main router for commands emitted by the Pipettin Writer web UI
    Plugins are expected to have a function named 'load_plugin' which will instantiate
    the plugin's class and returning it to the main Commander class.
    """
    logging.debug(f"load_plugin: loading {plugin_name} plugin.")
    try:
        class_instance = CommandRouter(
            controller,
            builder=controller.builder,
            config=controller.config)
    except Exception as e:
        msg = f"Failed to load with error: {e}"
        logging.error(msg)
        raise PluginError(msg) from e

    return class_instance

Main router for commands emitted by the Pipettin Writer web UI Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class.

Classes

class CommandRouter (controller: Controller, builder: GcodeBuilder = None, config=None, verbose=True)
Expand source code
class CommandRouter(Plugin):
    """
    Receives a command from the GUI/backend, generates actions, their GCODE, and executes it.

    This class is made to parse commands from the GUI (i.e. from the socket.io connection) into executable GCODE.
    The GCODE will likely not be standard. This depends on the particular machine and the implementation of "gcode primitives".

    Examples of event data from the GUI:
        - Run protocol.
        - Go-to content, using a tool.
        - Park tool.
        - Kill the controller.
    """
    controller: Controller
    def __init__(self,
                 controller: Controller,
                 builder: GcodeBuilder = None,
                 config = None,
                 verbose = True):

        # Save configuration.
        if config is None:
            config = {}
        self.config: dict = config
        # Save controller.
        self.controller: Controller = controller

        # Names/IDs of the available methods that the
        # incoming commands from the socket can use.
        self.commands = {
            # NOTE: New commands!
            self.run_protocol_event: self.make_protocol_commands,
            self.go_to_event: self.make_goto_command,
            self.park_tool_event: self.make_park_command
        }
        self.verbose = verbose

        # Save the GcodeBuilder object.
        if builder is not None:
            self.builder = builder
        else:
            self.builder = controller.builder

        # Instantiate gcode generator.
        self.gcode: GcodePrimitives = self.builder.gcode

        # Register additional controller coroutines.
        self.controller.coroutine_methods.extend([self.command_message_loop()])

        # Default value for the "waiting" flag.
        self.waiting_for_human = False

        # Register default socketio event handlers.
        self.register_sio_callbacks()

        # Register action handlers.
        self.register_action_handlers()

        # Register event handler for action status updates.
        self.controller.register_event_callback(
            event_name=self.controller.machine.event_protocol_updates,
            callback_name="CommandRouter.send_execution_updates",
            callback_function=self.send_execution_updates
        )

        # Set status.
        self._status = True

    def register_action_handlers(self):
        # TODO: Improve this!!
        # This is meant to allow using the "parseAction" method in "builder",
        # to properly handle go-to commands, in the context of past history (i.e.
        # current tool, obstacle avoidance, clearance, etc) and replace the MongoTools
        # method (which is duplicating code in builder).
        # The handler is needed because it is used by parseAction.
        self.builder.add_action_handler(name=self.goto_action_id,
                                        function=self.goto_action_handler)
        self.builder.add_action_handler(name=self.human_action_id,
                                        function=self.human_action_handler)

        # Action runners.
        self.controller.add_action_runner(name=self.goto_action_id,
                                          function=self.goto_action_runner)
        self.controller.add_action_runner(name=self.park_action_id,
                                          function=self.run_park_command)
        # Human intervention runner for the controller.
        # TODO: move this to a dedicated plugin.
        self.controller.add_action_runner(name=self.human_action_id,
                                          function=self.run_human_action)

    # A queue object for commands from the GUI/socket.
    p2g_command_queue = Queue()
    current_cmd = None

    # SOCKET.IO CALLBACKS AND EVENTS SECTION ####
    # Event names.
    run_protocol_event = 'run_protocol' # 'run_protocol' (new) or 'p2g_command' (old)
    go_to_event = 'go_to'
    kill_event='kill_commander'
    park_tool_event = 'park_tool'
    # Event handlers.
    def register_sio_callbacks(self):
        """Function to register socketio event callbacks, typically sent by the Pipettin GUI."""
        if not self.controller.comms.sio:
            logging.warning("SIO object unavailable. Skipping callback registration.")
        else:
            # Register websocket event listener function for "stop protocol".
            self.controller.comms.sio.on(self.stop_event, self.stop_protocol_handler)
            # Register websocket event listener function for "pause protocol".
            self.controller.comms.sio.on(self.pause_event, self.pause_protocol_handler)
            # Register websocket event listener function for "pause protocol".
            self.controller.comms.sio.on(self.continue_event, self.continue_protocol_handler)

            # Register websocket event listener function for "start human intervention".
            self.controller.comms.sio.on('human_intervention_continue', self.continue_message_handler)

            # Register websocket event listener function for "cancelled human intervention".
            self.controller.comms.sio.on('human_intervention_cancelled', self.abort_message_handler)

            @self.controller.comms.sio.on(self.run_protocol_event)
            async def command_message_handler(data):
                """
                Receives a command from the GUI/backend and sends them to the GCODE generator.

                There are several different commands which arrive through this event (see the class' docstring).
                """

                logging.info(f"Incoming '{self.run_protocol_event}' event.")
                logging.debug(f"'{self.run_protocol_event}''s data:\n{pformat(data)}")

                if self.p2g_command_queue.empty() and not self.current_cmd:
                    # Add the command to the queue.
                    rpc_data = self.make_rpc(self.run_protocol_event, data)
                    self.p2g_command_queue.put(rpc_data, block=False)
                else:
                    # TODO: rewrite this part, letting it wait until the older items are processed.
                    #       One option is to use a queue with length 1, and let the code be blocked here.
                    #       I however don't know if that would block the whole program, Should I use await instead?
                    msg = f"command {self.run_protocol_event}: the queue is not empty (n={self.p2g_command_queue.qsize()}), 'p2g_command' socketio command rejected: {data}"
                    self.controller.send_alert(msg, alert_type="error")
                    logging.warning(msg)

            @self.controller.comms.sio.on(self.go_to_event)
            def command_go_to(data):
                """Move the robot to a platform item or content."""
                logging.info(f"Go-to command received '{self.go_to_event}' through socketio with data:\n{pformat(data)}")
                # Queue the command.
                self.queue_command(self.go_to_event, data)

            @self.controller.comms.sio.on(self.park_tool_event)
            def command_park_tool(data):
                """Park the requested tool."""
                logging.info(f"Park-tool command received '{self.park_tool_event}' through socketio with data:\n{pformat(data)}")
                # Queue the command.
                self.queue_command(self.park_tool_event, data)

            @self.controller.comms.sio.on(self.kill_event)
            async def command_kill_handler(data):
                """
                Send an emergency stop command to Klipper/Moonraker immediately, and stop the commander's coroutines.
                """
                logging.info(f"Received '{self.kill_event}' command: " + pformat(data))
                result = True

                # Send "emergency_stop" message to Klipper/Moonraker.
                try:
                    if not self.controller.machine.dry:
                        await self.controller.machine.emergency_stop()
                    else:
                        logging.error("Skipping emergency_stop, commander in dry mode.")
                except Exception:
                    logging.error("Error sending emergency stop to the firmware.")
                    result = False

                # Stop/kill coroutines.
                try:
                    await self.controller.stop()
                except Exception:
                    logging.error("Error stopping the controller.")
                    result = False

                return result

    def queue_command(self, event: str, data: dict):
        """Adds a command to the queue if it is not empty.
        The queue is a thread-safe object.
        """
        if self.p2g_command_queue.empty() and not self.current_cmd:
            logging.debug(f"Queuing '{event}' command.")
            # Add the command to the queue.
            rpc_data = self.make_rpc(event, data)
            self.p2g_command_queue.put(rpc_data, block=False)
        else:
            # TODO: rewrite this part, letting it wait until the older items are processed.
            #       One option is to use a queue with length 1, and let the code be blocked here.
            #       I however don't know if that would block the whole program, Should I use await instead?
            msg = f"Command {self.go_to_event}: the queue is not empty (n={self.p2g_command_queue.qsize()}), "
            msg += f"'{event}' socketio command rejected: {data}"
            self.controller.send_alert(msg, alert_type="error")
            logging.warning(msg)

    human_action_id = "HUMAN"
    def human_action_handler(self, action, i):
        """GCODE generator for the HUMAN action."""
        logging.debug(f"Handling '{self.human_action_id}' action with ID {i}.")
        # Move over the target content and downward into the tube.
        commands = self.gcode.gcodeDwell(seconds=1.0)
        self.builder.extend_commands(commands=commands, action=action)
        return None, None

    waiting_for_human = False
    """This attribute is set to 'True' when a response is pending, and to other values when one arrives."""
    async def run_human_action(self, action: dict, i: int, *args, **kwargs):
        """Execute a HUMAN action from a mid-level protocol.

        Args:
            action (dict): The action definition.
            i (int): Action index.

        Example action:
            {"cmd":"HUMAN",
             "args":{"text":"Click me!"},
             "stepID":"669d9e9a2bae3945e9aca164"}
        """

        message = action["args"]["text"]
        timeout = action["args"].get("timeout", 600)

        logging.info(f"Executing human action {i} with message: '{message}'. Timeout set to '{timeout}' seconds.")

        if self.waiting_for_human is True:
            raise ProtocolError("Another human action is already awaitng a response.")

        # Set waiting flag.
        self.waiting_for_human = True
        # Send waiting event to the UI.
        await self.controller.comms.sio_emit_intervention(message)

        # Wait for a response.
        wait_time = min(1, timeout)
        while self.waiting_for_human is True and timeout >= 0:
            logging.debug("Waiting for response from websocket...")
            await asyncio.sleep(wait_time)
            timeout -= wait_time

        # Process state.
        if self.waiting_for_human is True:
            # Reset the flag to a false-like value.
            self.waiting_for_human = None
            # Handle Timed out.
            msg = "Timed-out while waiting for the human intervention (no response from the socket)."
            await self.controller.comms.sio_emit_alert(msg)
            logging.error(msg)
            raise ProtocolError(message=msg, action=action)

        elif self.waiting_for_human == self.continue_code:
            # Handle Continue.
            logging.info(f"Received {self.continue_code} response from the controller.")
            return True

        elif self.waiting_for_human == self.abort_code:
            # Handle Abort.
            msg = f"Received {self.abort_code} response from the controller."
            logging.warning(msg)
            raise ProtocolError(message=msg, action=action)

        else:
            # Reset the flag to a false-like value.
            self.waiting_for_human = None
            # Handle Invalid status code.
            msg = f"Received an invalid response from the socket: {self.waiting_for_human}"
            self.controller.comms.sio_emit_alert(msg)
            logging.error(msg)
            raise ProtocolError(message=msg, action=action)

    stop_event = "stop_protocol"
    def stop_protocol_handler(self, *args, **kwargs):
        """Handle press of stop button in the protocol panel."""
        # TODO: Use incoming data. It has protocol _id and name.
        if self.protocol_is_running(self.stop_event):
            logging.info("Setting the stop event.")
            self.controller.stop_protocol.set()
            if self.controller.pause_protocol.is_set():
                logging.debug("Also clearing the pause event.")
                self.controller.pause_protocol.clear()

    pause_event = "pause_protocol"
    async def pause_protocol_handler(self, *args, **kwargs):
        """Handle press of pause button in the protocol panel."""
        # TODO: Use incoming data. It has protocol _id and name.
        if self.protocol_is_running(self.pause_event):
            if self.controller.pause_protocol.is_set():
                logging.warning("The protocol is already paused.")
                await self.controller.comms.sio_emit_alert("Warning: the protocol is already paused.")
            else:
                logging.info("Setting the pause event.")
                self.controller.pause_protocol.set()
                await self.controller.comms.sio_emit_alert("Protocol paused.")

    continue_event = "continue_protocol"
    async def continue_protocol_handler(self, *args, **kwargs):
        """Handle press of continue button in the protocol panel."""
        # TODO: Use incoming data. It has protocol _id and name.
        if self.protocol_is_running(self.continue_event):
            if not self.controller.pause_protocol.is_set():
                logging.warning("The protocol is not paused.")
                await self.controller.comms.sio_emit_alert("The protocol is not paused.")
            else:
                logging.info("Clearing the pause event to continue.")
                self.controller.pause_protocol.clear()

    def protocol_is_running(self, event):
        """Check if a command is already being processed by 'command_message_loop'."""
        if not self.current_cmd:
            msg = f"There is no protocol running ('{event}' skipeed)."
            logging.error(msg)
            self.controller.comms.sio_emit_alert(msg)
            return False
        return True

    continue_code = "continue"
    def continue_message_handler(self, message):
        """This method is a callback to a incoming 'continue' event."""
        logging.info("Received continue event with message '" + str(message) + "'.")
        self.waiting_for_human = self.continue_code

    abort_code = "abort"
    def abort_message_handler(self, message):
        """This method is a callback to an incoming 'abort' event."""
        logging.info("Received cancel event with message '" + str(message) + "'.")
        self.waiting_for_human = self.abort_code

    @staticmethod
    def make_rpc(method_name: str, method_data: dict, cmd_id=None):
        """Convenience function to prepare a dictionary with the method+data structure."""
        return {"method": method_name, "data": method_data, "id": cmd_id}

    async def command_message_loop(self):
        """Sequentially process events queued by socketio events.
        NOTE: replaced old "CommandRouter" with this "plugin".
        """
        logging.info("Coroutine started.")

        # Wait here until ready. TODO: refactor to avoid depending on klipper being ready.
        # while not self.controller.ws_ready:
        #     await asyncio.sleep(self.controller.wait_default)

        while not self.controller.run:
            asyncio.sleep(self.controller.wait_default)
        logging.info("Coroutine ready.")

        try:
            while self.controller.run:
                # Wait for a bit.
                await asyncio.sleep(0.2)

                # Get an item without blocking (raises Empty, handled below).
                try:
                    self.current_cmd = self.p2g_command_queue.get_nowait()
                except Empty:
                    continue

                try:
                    # Get the current command, which has "rpc data" (actually the socketio event payload).
                    event_data = self.current_cmd

                    logging.info("Received new p2g command.")
                    logging.debug(event_data)

                    # Parse the GUI command to Actions and/or GCODE.
                    logging.debug("Parsing protocol actions.")
                    actions = self.parse_command(event_data)

                    # Convert the actions or gcode to RPC form and send them to Moonraker.
                    # NOTE: Actions take priority over GCODE.

                    # Send Actions.
                    logging.info(f"Executing actions protocol with {len(actions)} actions.")
                    logging.debug(f"Executing the following final actions protocol:\n{pformat(actions)}")
                    await self.controller.run_actions_protocol(actions)

                except ProtocolError as e:
                    # Send an alert throught the SIO connection to the GUI.
                    msg = f"ProtocolError with message '{e.message}'.\n" + traceback.format_exc()
                    self.controller.send_alert(msg, alert_type="error")
                    logging.error(msg)

                except Exception as e:
                    msg = f"Command of method '{self.current_cmd.get('method')}'"
                    msg += f" failed with uncaught exception and message: {e}\n"
                    msg += traceback.format_exc()
                    print(msg)
                    logging.error(msg)
                    self.controller.send_alert(msg, alert_type="error")
                    # TODO: dsiabled the "break" because the controller cannot be restarted easily ATM.
                    # break
                else:
                    # Send our congrats!
                    logging.debug(f"Command executed successfully with data: {event_data}")
                    protocol_name = event_data['data'].get("name", None)
                    if protocol_name:
                        ui_msg = f"Actions protocol '{protocol_name}' was executed successfully."
                        self.controller.send_alert(ui_msg, alert_type="message")

                # Clear the current command property.
                self.current_cmd = None

            logging.debug("Controller not running.")

        except asyncio.exceptions.CancelledError:
            logging.warning("Coroutine cancelled.")

        logging.warning("Coroutine ended.")

    def apply_settings(self, data):
        """Apply incoming settings from the UI to reconfigure the controller.

        For now this only applies the incoming database settings.
        """
        logging.info("Applying new settings.")
        try:
            # Get DB settings.
            db_settings = data["settings"]["database"]
            # Make a new URL for MongoDB.
            database_url = db_settings["uri"]
            # Get the new DB name.
            database_name = db_settings["name"]
            logging.debug(f"Got database_name '{database_name}' and database_url '{database_url}' from the settings.")
        except KeyError:
            # If any keys are missing in the settings, skip the update.
            logging.warning("Database details not found in the incoming settings. Update skipped.")
        else:
            # Apply the update if all keys were found.
            try:
                if self.controller.database_tools.database_name != database_name:
                    # Create a connection to the new DB, overiding the previous one.
                    logging.info(f"Overriding database_tools in the controller. Previous DB name: {self.controller.database_tools.database_name}")
                    self.controller.database_tools.setup_db(database_url, database_name)

                    msg = "The controller's database_tools object has been updated,"
                    msg += f" using database_url='{database_url}' and database_name='{database_name}'."
                    # TODO: Re-initialize tools in GcodeBuilder somehow...
                    msg += " Any tools that have changed will not be re-initialized."
                    logging.warning(msg)
                else:
                    logging.debug("The old and new database names match. Database tools were left as they were.")
            except Exception as e:
                msg = f"Failed to setup database_tools from data='{data}'. Error message: {e}"
                logging.error(msg)
                raise PluginError(msg) from e

    def parse_command(self, rpc_data: dict):
        """Send the data to the appropriate method, looking for the method's ID in the data's keys.
        The methods called by this method (i.e. those in self.commands) should always return gcode.
        """

        # Get the method associated to the command name and generate the GCODE or Actions.
        method_name = rpc_data["method"]
        method = self.commands.get(method_name, None)

        if method is None:
            msg = f"Failed to find a method to parse command with data:\n{pformat(rpc_data)}"
            logging.error(msg)
            raise PluginError(msg)

        # Run the command.
        data = rpc_data["data"]
        actions = method(data)

        # Return gcode and actions.
        return actions

    # PARK TOOL HANDLER METHODS ####
    park_action_id = "PARK"
    async def run_park_command(self, action, i, *args, **kwargs):
        """Action handler for parking a tool."""
        # Get the next and current tools.
        tool_to_park = action["args"].get("tool", None)
        current_tool = self.controller.builder.current_tool
        logging.info(f"Executing parking action with current_tool='{current_tool}' and tool_to_park='{tool_to_park}'.")

        # Check that the tool is in the list.
        if tool_to_park and (tool_to_park not in self.builder.tools):
            msg = f"Parking error: '{tool_to_park}' is not in the tool list. Available tools: {list(self.builder.tools)}"
            self.controller.send_alert(msg, alert_type="error")
            raise ProtocolError(message=msg)

        # If no tool is mounted according to the controller, or there is a mismatch, require a confirmation.
        if tool_to_park != current_tool:
            msg = "Confirmation required to park the tool: there is a mismatch between the controllers account and your request."
            msg += f"\n\nPlease check that the tool '{tool_to_park}' is currently mounted, and that it can be parked."
            if current_tool is not None:
                msg += f" For redundancy, check that the tool '{current_tool}' is not mounted."

            adhoc_human_action = {"cmd": self.human_action_id, "args": {"text": msg}}
            try:
                await self.run_human_action(action=adhoc_human_action, i=i)
            except ProtocolError as e:
                logging.error(f"Parking request aborted with message: {e.message}")
            # Override the active tool attribute.
            self.builder.current_tool = tool_to_park
        else:
            logging.debug("Parking the current tool.")

        # Skip all of this if the tool to be set is "None" (i.e. the empty tool-head).
        forced_t0_homing = False
        if tool_to_park is not None:
            # Get the name of the workspace.
            workspace_name = action["workspace"]
            logging.info(f"Overriding workspace in preparation for parking. Using workspace '{workspace_name}'.")
            # Update by workspace name.
            self.builder.update_objects_by_workspace(workspace_name=workspace_name)

            # Activate the tool-changer axis.
            cmd_id, (wait_pass, check_pass) = await self.controller.machine.activate_toolchanger_axis()
            if not wait_pass or not check_pass:
                raise ProtocolError("Failed to select the tool-changer axis. Use the manual control to park the tool.")

            # Check if the machine needs homing.
            # TODO: Generalize this, and move it to controller.machine.
            logging.debug("Requesting update on the homed axes.")
            homing_state = await self.controller.machine.wait_for_homing_update()
            homed_axes = str(homing_state).lower()
            if not homing_state or "x" not in homed_axes or "y" not in homed_axes or "z" not in homed_axes:
                raise ProtocolError("Homing required before parking the tool.")
            logging.debug(f"Current homed axes: {homed_axes}")

            # Force origin command.
            if 'e' not in homed_axes:
                forced_t0_homing = True
                msg = f"The tool-changer axis is not homed. It will be 'zeroed' to allow parking the '{tool_to_park}' tool, and then homed properly."
                self.controller.send_alert(msg, alert_type="alarm")
                logging.warning(msg)
                # TODO: De-hardcode this command. Use the "GcodePrimitives" instead.
                cmd, _ = self.controller.machine.make_gcode_cmd("SET_KINEMATIC_POSITION E=0")
                _, (wait_pass, check_pass) = await self.controller.machine.send_cmd(
                    rpc_cmd=cmd, wait=True, check=True, timeout=1.1)
                if not wait_pass or not check_pass:
                    raise ProtocolError("Failed to zero the tool-changer axis. Use the manual control to park the tool.")
        else:
            logging.debug("The tool requested to park is 'None'. The current tool will be set empty without moves.")

        # Add the commands and the timeout (by reference).
        logging.debug(f"Building parking moves for tool '{tool_to_park}'.")
        # NOTE: The new tool is set to "None" because this indicates a parking move.
        self.builder.toolChangeMacro(new_tool=None, extend=True, action=action)

        # Home the tool changer.
        if tool_to_park is not None and forced_t0_homing:
            # Only if a tool was parked and the tool-changer was not homed initially.
            t0_home = self.gcode.gcodeHomeP(cmd_prefix="HOME_", which_axis="E0")
            # TODO: "gcodeHomeP" is largely deprecated. Replace it.
            self.controller.builder.extend_commands(commands=t0_home, action=action)

        # Execute the GCODE in the action.
        logging.info("Executing parking moves.")
        logging.debug(f"Final parking action:\n{pformat(action)}")
        await self.controller.machine.run_actions_protocol(actions=[action], i=42, wait=True, check=True)

    # PARK EVENT HANDLER METHODS ####
    def make_park_command(self, data: dict) -> list:
        """Event handler to park the requested tool."""
        logging.info("Building tool-parking action.")

        # Build the park action.
        action = {
            "cmd": self.park_action_id,
            "args": {"tool": data["tool"]},
            # Get the name of the workspace.
            "workspace": data["workspace"]
        }

        # Done!
        return [action]

    # GO-TO ACTION HANDLER METHODS ####
    goto_action_id = "GOTO"
    def goto_action_handler(self, action, i, clear_z_first=True):
        """Uses the builder's methods to generate GCODE for a "GOTO" action.
        This method is meant to be called from 'builder.parseAction' when it
        encounters an action with cmd="GOTO".

        The GCODE is stored in the builder class, and then
        returned to the "make_goto_command" method below.
        """

        logging.info(f"Handling {self.goto_action_id} action.")
        logging.debug(f"Action data:\n{pformat(action)}")

        # Move to clearance level on Z.
        if clear_z_first:
            # NOTE: this requires platform items in a workspace.
            self.builder.macroMoveSafeHeight()

        # Move over the target content and downward into the tube.
        self.builder.macroGoToTubeXY(action, i)

        # Get the Z-coordinate of the tube.
        _, _, _, z = self.builder.getTubeAndCoords(action)
        # Apply tool offsets.
        _, _, final_z = self.builder.addToolOffsets(z=z)
        # Save the coordinate for the action runner.
        logging.debug(f"Final Z coordinate for {self.goto_action_id} action: {final_z}")
        action["args"]["final_z"] = final_z

        return None, None

    async def goto_action_runner(self, action: dict, i: int, *args, **kwargs):
        """Action runner for GO-TO commands.
        Passes the action to the default handler in the controller,
        and sends an alert to the UI informing the final Z-coordinate of the
        would-be tip placement move, for calibration purposes.
        """
        # Run the action normally.
        await self.controller.machine.run_actions_protocol(actions=[action], i=33, wait=True, check=True)

        # Send info event to the UI.
        final_z = action["args"].get("final_z", None)
        if final_z is not None:
            text = f"The final height of a full move would be: {final_z} mm"
            await self.controller.comms.sio_emit_alert(text=text)

    # GO-TO EVENT HANDLER METHODS ####
    def make_goto_command(self, data) -> list:
        """Generate a protocol action and GCODE for the incoming "go to" socket event.
        The GCODE is generated "goto_action_handler" defined above.
        """
        # NOTE: Since the action has cmd="GOTO", the "goto_action_handler" method
        #       defined in this class will be called to generate the GCODE for this
        #       action. Only the action is generated here.

        logging.info(f"Processing '{self.go_to_event}' command.")
        logging.debug(f"Using command data:\n{pformat(data)}")

        # Process values for tool_name, affecting toolchanges.
        try:
            # Get the value if the key is present.
            tool_name = data["tool"]
            # If the value is "None" the current tool should be parked if any.
            if tool_name is None:
                tool_name = False
        except KeyError:
            # If no key was provided, do noting.
            logging.warning(f"Tool name not found in {self.goto_action_id} command data. Setting it to 'None'.")
            tool_name = None

        # Get the name of the workspace.
        ws_name = data.get("workspace", None)

        # Apply the DB settings, which might update the DB and other stuff.
        self.apply_settings(data)

        # Update workspace.
        current_ws_name = self.builder.workspace.get("name", None)
        if ws_name:
            # Send messages.
            msg = f"Updating builder objects to match the new workspace '{ws_name}'."
            logging.debug(msg)
            if current_ws_name and ws_name != current_ws_name:
                # Alert the users.
                self.controller.send_alert(msg + f" The previous workspace was '{current_ws_name}'.")
            # Update builder objects and clearance (not clearing the protocol).
            self.builder.update_objects_by_workspace(workspace_name=ws_name)

        # NOTE: This will use macroGoToTubeXY to calculate the goto coordinate
        # in a new action handler (use goto_action_handler above).
        goto_action = {
            # NOTE: Using "goto_action_id" means that "goto_action_handler" above will receive this action.
            "cmd": self.goto_action_id,
            "args": {
                # Tool name, may be None.
                "tool": tool_name,
                # Content-based move.
                "item": data.get("item", None),  # getContentCoords
                "content": data.get("content", None),
                "selector": data.get("selector", None),  # Used by getContentCoords.
                # Coordinate-based move.
                # NOTE: An alternative to content-based position.
                "position": data.get("position", None),
                # Final Z-coordinate, if relevant to the move.
                "final_z": None
            },
            "workspace": ws_name
        }

        # Actions list.
        actions = []

        # Generate GCODE for the action.
        # NOTE: It is generated "goto_action_handler" defined above.
        action_parsed = self.builder.parseAction(goto_action)
        # TODO: Prevent actions such as "GOTO" from being added as
        #       steps to a protocol, and increasing the index.

        # Add the action to the actions list.
        actions.append(action_parsed)

        logging.debug(f"Actions generated for 'goto' command:\n{pformat(actions)}")

        # Return commands
        return actions

    # PROTOCOL EVENT HANDLER METHODS ####
    sio_event_execution_update = 'execution'
    """Event that the UI uses to display updates in the protocol panel."""
    async def send_execution_updates(self, action: dict, status: str):
        """Callback method that sends progress updates from a running actions protocol.

        Args:
            action (dict): Data for an action.
            status (str): Status code from the controller for the current action.

        Status codes for action can be: "error", "paused", "done", and "running".
        """
        # # Let the UI know this action is about to be executed.
        # await self.controller.comms.sio_emit_action_update(action, status)

        # TODO: figure out how to tolerate all future actions, such as PCR procols, etc.

        # Make a copy.
        action = deepcopy(action)

        # Clean up.
        action.pop("GCODE", None)
        action.pop("rpc_commands", None)
        action.pop("tracker", None)

        # Fix non-serializable ID objects.
        if action.get("_id", None):
            logging.debug(f"Stringifying ID object from action: {action['_id']}")
            action["_id"] = str(action["_id"])

        # The "index" data is only available in actions produced by "make_protocol_commands".
        index = action.get("index", None)

        # Report progress to the UI.
        protocol_name = action.get("protocol", None)
        if protocol_name is not None:
            # Build basic data.
            update_data = {
                'action': action,
                'status': status,
                'action_index': action["index"],
                'protocol': protocol_name,
                'stepID': action.get('stepID', None),
                'workspace': action.get('workspace', None),
                'hLprotocol': action.get('hLprotocol', None)
                # 'hLprotocolID': action.get('hLprotocolID', None)
            }

            # Add progress information.
            fraction = (action["index"] + 1) / (action["action_count"])
            update_data.update({
                'length': action["action_count"],
                'message': f"Action {index + 1} of {action['action_count']}",
                'fraction': fraction
            })

            # Send progress update.
            logging.info(f"Sending execution update for protocol '{update_data['protocol']}'.")
            logging.debug(f"Sending update with contents:\n{pformat(update_data)}")
            await self.controller.comms.emit(self.sio_event_execution_update, update_data)

            try:
                # Try to update the action in MongoDB.
                self.controller.database_tools.updateActionBy(
                    action=action, protocol_name=protocol_name,
                    new_data=status, field="status", find_by="index")
            except Exception as e:
                msg = f"update_action_status: failed to update action in the DB with error message: {e}\n"
                msg += traceback.format_exc()
                logging.error(msg)
                print(msg)

    def make_protocol_commands(self, data):
        """Event handler to generate GCODE for each action in a mid-level actions protocol.

        The GCODE is also saved to a file, at either the OS's temporary directory, or
        to the path specified in the controller's 'config' dictionary ("gcode_path" key).
        """
        logging.info(f"Incoming protocol: '{data['name']}'")

        actions = []

        # Apply the settings, which might update the DB and other stuff.
        self.apply_settings(data)

        # Load protocol objects from mongodb:
        protocol_name = data['name']
        protocol, workspace, platforms_in_workspace = self.controller.database_tools.getProtocolObjects(protocol_name=protocol_name)

        # logging.info(json.dumps(protocol, indent=2, default=str))  # https://stackoverflow.com/a/56061155/11524079

        # Initiate the task class, which will hold all relevant information for the current task.
        # builder = self.builder(protocol, workspace, platforms_in_workspace)  # NOTE: now the object is inherited.

        # Update objects in the task class, which will hold all relevant information for the current task.
        logging.debug(f"Initializing builder objects for protocol '{protocol_name}'.")
        self.builder.initialize_objects(protocol, workspace, platforms_in_workspace)

        # TODO: Pipette configs should be copied.
        # builder.pipettes = commander.pipette.pipettes

        # Generate GCODE for the task, it is saved in the task class object.
        logging.debug(f"Parsing actions protocol '{protocol_name}'.")
        logging.debug("Protocol definition:\n" + pformat(protocol))
        protocol_with_actions = self.builder.parseProtocol()

        # Get the resulting actions and gcode.
        actions = protocol_with_actions['actions']

        # Add metadata to actions.
        for i, action in enumerate(actions):
            # New contextual properties, useful for tracking.
            action["action_count"] = len(actions)
            action["protocol"] = protocol_name
            action["workspace"] = workspace.get("name", None)
            # Mongo object IDs (alphanumeric IDs, not names).
            # action["hLprotocolID"] = protocol.get("hLprotocolID", None)
            # action["workspaceID"] = protocol.get("workspaceID", None)
            action["hLprotocol"] = protocol.get("hLprotocol", None)
            # NOTE: 'stepID' is already a property in actions loaded from the DB.
            # NOTE: 'index' is now a property in actions loaded from the DB.

        # Log it
        # TODO: Permanently delete this. Raw GCODE no longer used by parse_command.
        #logging.debug("CommandRouter: GCODE commands generated: " + pformat(gcode))

        # Get the command list, and pass it on to the commander.
        # if not args.dry:
        #     await commander.run_gcode_protocol(commands=builder.commands)

        # Replace any spaces with underscores.
        protocol_name.replace(" ", "_")

        # Save GCODE to a file in Klipper's printer_data
        # TODO: migrate this functionality back to the Controller.
        logging.debug("Saving protocol to the file system.")
        protocol_file_path = self.controller.machine.save_gcode_protocol(
            gcode_commands=self.controller.builder.commands,
            prefix=protocol_name,
            directory=self.controller.config.get("gcode_path", None))
        logging.info(f"Saved protocol to file: '{protocol_file_path}'")

        # Since controller will execute any returned GCODE commands,
        # if we want to run the GCODE manually we must return an empty list instead.
        # return []
        # Return commands
        logging.debug(f"Done parsing {len(actions)} from protocol '{data['name']}'.")

        return actions

Receives a command from the GUI/backend, generates actions, their GCODE, and executes it.

This class is made to parse commands from the GUI (i.e. from the socket.io connection) into executable GCODE. The GCODE will likely not be standard. This depends on the particular machine and the implementation of "gcode primitives".

Examples of event data from the GUI: - Run protocol. - Go-to content, using a tool. - Park tool. - Kill the controller.

Ancestors

Class variables

var abort_code
var continue_code
var continue_event
var controller : Controller
var current_cmd
var go_to_event
var goto_action_id
var human_action_id
var kill_event
var p2g_command_queue
var park_action_id
var park_tool_event
var pause_event
var run_protocol_event
var sio_event_execution_update

Event that the UI uses to display updates in the protocol panel.

var stop_event
var waiting_for_human

This attribute is set to 'True' when a response is pending, and to other values when one arrives.

Static methods

def make_rpc(method_name: str, method_data: dict, cmd_id=None)
Expand source code
@staticmethod
def make_rpc(method_name: str, method_data: dict, cmd_id=None):
    """Convenience function to prepare a dictionary with the method+data structure."""
    return {"method": method_name, "data": method_data, "id": cmd_id}

Convenience function to prepare a dictionary with the method+data structure.

Methods

def abort_message_handler(self, message)
Expand source code
def abort_message_handler(self, message):
    """This method is a callback to an incoming 'abort' event."""
    logging.info("Received cancel event with message '" + str(message) + "'.")
    self.waiting_for_human = self.abort_code

This method is a callback to an incoming 'abort' event.

def apply_settings(self, data)
Expand source code
def apply_settings(self, data):
    """Apply incoming settings from the UI to reconfigure the controller.

    For now this only applies the incoming database settings.
    """
    logging.info("Applying new settings.")
    try:
        # Get DB settings.
        db_settings = data["settings"]["database"]
        # Make a new URL for MongoDB.
        database_url = db_settings["uri"]
        # Get the new DB name.
        database_name = db_settings["name"]
        logging.debug(f"Got database_name '{database_name}' and database_url '{database_url}' from the settings.")
    except KeyError:
        # If any keys are missing in the settings, skip the update.
        logging.warning("Database details not found in the incoming settings. Update skipped.")
    else:
        # Apply the update if all keys were found.
        try:
            if self.controller.database_tools.database_name != database_name:
                # Create a connection to the new DB, overiding the previous one.
                logging.info(f"Overriding database_tools in the controller. Previous DB name: {self.controller.database_tools.database_name}")
                self.controller.database_tools.setup_db(database_url, database_name)

                msg = "The controller's database_tools object has been updated,"
                msg += f" using database_url='{database_url}' and database_name='{database_name}'."
                # TODO: Re-initialize tools in GcodeBuilder somehow...
                msg += " Any tools that have changed will not be re-initialized."
                logging.warning(msg)
            else:
                logging.debug("The old and new database names match. Database tools were left as they were.")
        except Exception as e:
            msg = f"Failed to setup database_tools from data='{data}'. Error message: {e}"
            logging.error(msg)
            raise PluginError(msg) from e

Apply incoming settings from the UI to reconfigure the controller.

For now this only applies the incoming database settings.

async def command_message_loop(self)
Expand source code
async def command_message_loop(self):
    """Sequentially process events queued by socketio events.
    NOTE: replaced old "CommandRouter" with this "plugin".
    """
    logging.info("Coroutine started.")

    # Wait here until ready. TODO: refactor to avoid depending on klipper being ready.
    # while not self.controller.ws_ready:
    #     await asyncio.sleep(self.controller.wait_default)

    while not self.controller.run:
        asyncio.sleep(self.controller.wait_default)
    logging.info("Coroutine ready.")

    try:
        while self.controller.run:
            # Wait for a bit.
            await asyncio.sleep(0.2)

            # Get an item without blocking (raises Empty, handled below).
            try:
                self.current_cmd = self.p2g_command_queue.get_nowait()
            except Empty:
                continue

            try:
                # Get the current command, which has "rpc data" (actually the socketio event payload).
                event_data = self.current_cmd

                logging.info("Received new p2g command.")
                logging.debug(event_data)

                # Parse the GUI command to Actions and/or GCODE.
                logging.debug("Parsing protocol actions.")
                actions = self.parse_command(event_data)

                # Convert the actions or gcode to RPC form and send them to Moonraker.
                # NOTE: Actions take priority over GCODE.

                # Send Actions.
                logging.info(f"Executing actions protocol with {len(actions)} actions.")
                logging.debug(f"Executing the following final actions protocol:\n{pformat(actions)}")
                await self.controller.run_actions_protocol(actions)

            except ProtocolError as e:
                # Send an alert throught the SIO connection to the GUI.
                msg = f"ProtocolError with message '{e.message}'.\n" + traceback.format_exc()
                self.controller.send_alert(msg, alert_type="error")
                logging.error(msg)

            except Exception as e:
                msg = f"Command of method '{self.current_cmd.get('method')}'"
                msg += f" failed with uncaught exception and message: {e}\n"
                msg += traceback.format_exc()
                print(msg)
                logging.error(msg)
                self.controller.send_alert(msg, alert_type="error")
                # TODO: dsiabled the "break" because the controller cannot be restarted easily ATM.
                # break
            else:
                # Send our congrats!
                logging.debug(f"Command executed successfully with data: {event_data}")
                protocol_name = event_data['data'].get("name", None)
                if protocol_name:
                    ui_msg = f"Actions protocol '{protocol_name}' was executed successfully."
                    self.controller.send_alert(ui_msg, alert_type="message")

            # Clear the current command property.
            self.current_cmd = None

        logging.debug("Controller not running.")

    except asyncio.exceptions.CancelledError:
        logging.warning("Coroutine cancelled.")

    logging.warning("Coroutine ended.")

Sequentially process events queued by socketio events. NOTE: replaced old "CommandRouter" with this "plugin".

def continue_message_handler(self, message)
Expand source code
def continue_message_handler(self, message):
    """This method is a callback to a incoming 'continue' event."""
    logging.info("Received continue event with message '" + str(message) + "'.")
    self.waiting_for_human = self.continue_code

This method is a callback to a incoming 'continue' event.

async def continue_protocol_handler(self, *args, **kwargs)
Expand source code
async def continue_protocol_handler(self, *args, **kwargs):
    """Handle press of continue button in the protocol panel."""
    # TODO: Use incoming data. It has protocol _id and name.
    if self.protocol_is_running(self.continue_event):
        if not self.controller.pause_protocol.is_set():
            logging.warning("The protocol is not paused.")
            await self.controller.comms.sio_emit_alert("The protocol is not paused.")
        else:
            logging.info("Clearing the pause event to continue.")
            self.controller.pause_protocol.clear()

Handle press of continue button in the protocol panel.

def goto_action_handler(self, action, i, clear_z_first=True)
Expand source code
def goto_action_handler(self, action, i, clear_z_first=True):
    """Uses the builder's methods to generate GCODE for a "GOTO" action.
    This method is meant to be called from 'builder.parseAction' when it
    encounters an action with cmd="GOTO".

    The GCODE is stored in the builder class, and then
    returned to the "make_goto_command" method below.
    """

    logging.info(f"Handling {self.goto_action_id} action.")
    logging.debug(f"Action data:\n{pformat(action)}")

    # Move to clearance level on Z.
    if clear_z_first:
        # NOTE: this requires platform items in a workspace.
        self.builder.macroMoveSafeHeight()

    # Move over the target content and downward into the tube.
    self.builder.macroGoToTubeXY(action, i)

    # Get the Z-coordinate of the tube.
    _, _, _, z = self.builder.getTubeAndCoords(action)
    # Apply tool offsets.
    _, _, final_z = self.builder.addToolOffsets(z=z)
    # Save the coordinate for the action runner.
    logging.debug(f"Final Z coordinate for {self.goto_action_id} action: {final_z}")
    action["args"]["final_z"] = final_z

    return None, None

Uses the builder's methods to generate GCODE for a "GOTO" action. This method is meant to be called from 'builder.parseAction' when it encounters an action with cmd="GOTO".

The GCODE is stored in the builder class, and then returned to the "make_goto_command" method below.

async def goto_action_runner(self, action: dict, i: int, *args, **kwargs)
Expand source code
async def goto_action_runner(self, action: dict, i: int, *args, **kwargs):
    """Action runner for GO-TO commands.
    Passes the action to the default handler in the controller,
    and sends an alert to the UI informing the final Z-coordinate of the
    would-be tip placement move, for calibration purposes.
    """
    # Run the action normally.
    await self.controller.machine.run_actions_protocol(actions=[action], i=33, wait=True, check=True)

    # Send info event to the UI.
    final_z = action["args"].get("final_z", None)
    if final_z is not None:
        text = f"The final height of a full move would be: {final_z} mm"
        await self.controller.comms.sio_emit_alert(text=text)

Action runner for GO-TO commands. Passes the action to the default handler in the controller, and sends an alert to the UI informing the final Z-coordinate of the would-be tip placement move, for calibration purposes.

def human_action_handler(self, action, i)
Expand source code
def human_action_handler(self, action, i):
    """GCODE generator for the HUMAN action."""
    logging.debug(f"Handling '{self.human_action_id}' action with ID {i}.")
    # Move over the target content and downward into the tube.
    commands = self.gcode.gcodeDwell(seconds=1.0)
    self.builder.extend_commands(commands=commands, action=action)
    return None, None

GCODE generator for the HUMAN action.

def make_goto_command(self, data) ‑> list
Expand source code
def make_goto_command(self, data) -> list:
    """Generate a protocol action and GCODE for the incoming "go to" socket event.
    The GCODE is generated "goto_action_handler" defined above.
    """
    # NOTE: Since the action has cmd="GOTO", the "goto_action_handler" method
    #       defined in this class will be called to generate the GCODE for this
    #       action. Only the action is generated here.

    logging.info(f"Processing '{self.go_to_event}' command.")
    logging.debug(f"Using command data:\n{pformat(data)}")

    # Process values for tool_name, affecting toolchanges.
    try:
        # Get the value if the key is present.
        tool_name = data["tool"]
        # If the value is "None" the current tool should be parked if any.
        if tool_name is None:
            tool_name = False
    except KeyError:
        # If no key was provided, do noting.
        logging.warning(f"Tool name not found in {self.goto_action_id} command data. Setting it to 'None'.")
        tool_name = None

    # Get the name of the workspace.
    ws_name = data.get("workspace", None)

    # Apply the DB settings, which might update the DB and other stuff.
    self.apply_settings(data)

    # Update workspace.
    current_ws_name = self.builder.workspace.get("name", None)
    if ws_name:
        # Send messages.
        msg = f"Updating builder objects to match the new workspace '{ws_name}'."
        logging.debug(msg)
        if current_ws_name and ws_name != current_ws_name:
            # Alert the users.
            self.controller.send_alert(msg + f" The previous workspace was '{current_ws_name}'.")
        # Update builder objects and clearance (not clearing the protocol).
        self.builder.update_objects_by_workspace(workspace_name=ws_name)

    # NOTE: This will use macroGoToTubeXY to calculate the goto coordinate
    # in a new action handler (use goto_action_handler above).
    goto_action = {
        # NOTE: Using "goto_action_id" means that "goto_action_handler" above will receive this action.
        "cmd": self.goto_action_id,
        "args": {
            # Tool name, may be None.
            "tool": tool_name,
            # Content-based move.
            "item": data.get("item", None),  # getContentCoords
            "content": data.get("content", None),
            "selector": data.get("selector", None),  # Used by getContentCoords.
            # Coordinate-based move.
            # NOTE: An alternative to content-based position.
            "position": data.get("position", None),
            # Final Z-coordinate, if relevant to the move.
            "final_z": None
        },
        "workspace": ws_name
    }

    # Actions list.
    actions = []

    # Generate GCODE for the action.
    # NOTE: It is generated "goto_action_handler" defined above.
    action_parsed = self.builder.parseAction(goto_action)
    # TODO: Prevent actions such as "GOTO" from being added as
    #       steps to a protocol, and increasing the index.

    # Add the action to the actions list.
    actions.append(action_parsed)

    logging.debug(f"Actions generated for 'goto' command:\n{pformat(actions)}")

    # Return commands
    return actions

Generate a protocol action and GCODE for the incoming "go to" socket event. The GCODE is generated "goto_action_handler" defined above.

def make_park_command(self, data: dict) ‑> list
Expand source code
def make_park_command(self, data: dict) -> list:
    """Event handler to park the requested tool."""
    logging.info("Building tool-parking action.")

    # Build the park action.
    action = {
        "cmd": self.park_action_id,
        "args": {"tool": data["tool"]},
        # Get the name of the workspace.
        "workspace": data["workspace"]
    }

    # Done!
    return [action]

Event handler to park the requested tool.

def make_protocol_commands(self, data)
Expand source code
def make_protocol_commands(self, data):
    """Event handler to generate GCODE for each action in a mid-level actions protocol.

    The GCODE is also saved to a file, at either the OS's temporary directory, or
    to the path specified in the controller's 'config' dictionary ("gcode_path" key).
    """
    logging.info(f"Incoming protocol: '{data['name']}'")

    actions = []

    # Apply the settings, which might update the DB and other stuff.
    self.apply_settings(data)

    # Load protocol objects from mongodb:
    protocol_name = data['name']
    protocol, workspace, platforms_in_workspace = self.controller.database_tools.getProtocolObjects(protocol_name=protocol_name)

    # logging.info(json.dumps(protocol, indent=2, default=str))  # https://stackoverflow.com/a/56061155/11524079

    # Initiate the task class, which will hold all relevant information for the current task.
    # builder = self.builder(protocol, workspace, platforms_in_workspace)  # NOTE: now the object is inherited.

    # Update objects in the task class, which will hold all relevant information for the current task.
    logging.debug(f"Initializing builder objects for protocol '{protocol_name}'.")
    self.builder.initialize_objects(protocol, workspace, platforms_in_workspace)

    # TODO: Pipette configs should be copied.
    # builder.pipettes = commander.pipette.pipettes

    # Generate GCODE for the task, it is saved in the task class object.
    logging.debug(f"Parsing actions protocol '{protocol_name}'.")
    logging.debug("Protocol definition:\n" + pformat(protocol))
    protocol_with_actions = self.builder.parseProtocol()

    # Get the resulting actions and gcode.
    actions = protocol_with_actions['actions']

    # Add metadata to actions.
    for i, action in enumerate(actions):
        # New contextual properties, useful for tracking.
        action["action_count"] = len(actions)
        action["protocol"] = protocol_name
        action["workspace"] = workspace.get("name", None)
        # Mongo object IDs (alphanumeric IDs, not names).
        # action["hLprotocolID"] = protocol.get("hLprotocolID", None)
        # action["workspaceID"] = protocol.get("workspaceID", None)
        action["hLprotocol"] = protocol.get("hLprotocol", None)
        # NOTE: 'stepID' is already a property in actions loaded from the DB.
        # NOTE: 'index' is now a property in actions loaded from the DB.

    # Log it
    # TODO: Permanently delete this. Raw GCODE no longer used by parse_command.
    #logging.debug("CommandRouter: GCODE commands generated: " + pformat(gcode))

    # Get the command list, and pass it on to the commander.
    # if not args.dry:
    #     await commander.run_gcode_protocol(commands=builder.commands)

    # Replace any spaces with underscores.
    protocol_name.replace(" ", "_")

    # Save GCODE to a file in Klipper's printer_data
    # TODO: migrate this functionality back to the Controller.
    logging.debug("Saving protocol to the file system.")
    protocol_file_path = self.controller.machine.save_gcode_protocol(
        gcode_commands=self.controller.builder.commands,
        prefix=protocol_name,
        directory=self.controller.config.get("gcode_path", None))
    logging.info(f"Saved protocol to file: '{protocol_file_path}'")

    # Since controller will execute any returned GCODE commands,
    # if we want to run the GCODE manually we must return an empty list instead.
    # return []
    # Return commands
    logging.debug(f"Done parsing {len(actions)} from protocol '{data['name']}'.")

    return actions

Event handler to generate GCODE for each action in a mid-level actions protocol.

The GCODE is also saved to a file, at either the OS's temporary directory, or to the path specified in the controller's 'config' dictionary ("gcode_path" key).

def parse_command(self, rpc_data: dict)
Expand source code
def parse_command(self, rpc_data: dict):
    """Send the data to the appropriate method, looking for the method's ID in the data's keys.
    The methods called by this method (i.e. those in self.commands) should always return gcode.
    """

    # Get the method associated to the command name and generate the GCODE or Actions.
    method_name = rpc_data["method"]
    method = self.commands.get(method_name, None)

    if method is None:
        msg = f"Failed to find a method to parse command with data:\n{pformat(rpc_data)}"
        logging.error(msg)
        raise PluginError(msg)

    # Run the command.
    data = rpc_data["data"]
    actions = method(data)

    # Return gcode and actions.
    return actions

Send the data to the appropriate method, looking for the method's ID in the data's keys. The methods called by this method (i.e. those in self.commands) should always return gcode.

async def pause_protocol_handler(self, *args, **kwargs)
Expand source code
async def pause_protocol_handler(self, *args, **kwargs):
    """Handle press of pause button in the protocol panel."""
    # TODO: Use incoming data. It has protocol _id and name.
    if self.protocol_is_running(self.pause_event):
        if self.controller.pause_protocol.is_set():
            logging.warning("The protocol is already paused.")
            await self.controller.comms.sio_emit_alert("Warning: the protocol is already paused.")
        else:
            logging.info("Setting the pause event.")
            self.controller.pause_protocol.set()
            await self.controller.comms.sio_emit_alert("Protocol paused.")

Handle press of pause button in the protocol panel.

def protocol_is_running(self, event)
Expand source code
def protocol_is_running(self, event):
    """Check if a command is already being processed by 'command_message_loop'."""
    if not self.current_cmd:
        msg = f"There is no protocol running ('{event}' skipeed)."
        logging.error(msg)
        self.controller.comms.sio_emit_alert(msg)
        return False
    return True

Check if a command is already being processed by 'command_message_loop'.

def queue_command(self, event: str, data: dict)
Expand source code
def queue_command(self, event: str, data: dict):
    """Adds a command to the queue if it is not empty.
    The queue is a thread-safe object.
    """
    if self.p2g_command_queue.empty() and not self.current_cmd:
        logging.debug(f"Queuing '{event}' command.")
        # Add the command to the queue.
        rpc_data = self.make_rpc(event, data)
        self.p2g_command_queue.put(rpc_data, block=False)
    else:
        # TODO: rewrite this part, letting it wait until the older items are processed.
        #       One option is to use a queue with length 1, and let the code be blocked here.
        #       I however don't know if that would block the whole program, Should I use await instead?
        msg = f"Command {self.go_to_event}: the queue is not empty (n={self.p2g_command_queue.qsize()}), "
        msg += f"'{event}' socketio command rejected: {data}"
        self.controller.send_alert(msg, alert_type="error")
        logging.warning(msg)

Adds a command to the queue if it is not empty. The queue is a thread-safe object.

def register_action_handlers(self)
Expand source code
def register_action_handlers(self):
    # TODO: Improve this!!
    # This is meant to allow using the "parseAction" method in "builder",
    # to properly handle go-to commands, in the context of past history (i.e.
    # current tool, obstacle avoidance, clearance, etc) and replace the MongoTools
    # method (which is duplicating code in builder).
    # The handler is needed because it is used by parseAction.
    self.builder.add_action_handler(name=self.goto_action_id,
                                    function=self.goto_action_handler)
    self.builder.add_action_handler(name=self.human_action_id,
                                    function=self.human_action_handler)

    # Action runners.
    self.controller.add_action_runner(name=self.goto_action_id,
                                      function=self.goto_action_runner)
    self.controller.add_action_runner(name=self.park_action_id,
                                      function=self.run_park_command)
    # Human intervention runner for the controller.
    # TODO: move this to a dedicated plugin.
    self.controller.add_action_runner(name=self.human_action_id,
                                      function=self.run_human_action)
def register_sio_callbacks(self)
Expand source code
def register_sio_callbacks(self):
    """Function to register socketio event callbacks, typically sent by the Pipettin GUI."""
    if not self.controller.comms.sio:
        logging.warning("SIO object unavailable. Skipping callback registration.")
    else:
        # Register websocket event listener function for "stop protocol".
        self.controller.comms.sio.on(self.stop_event, self.stop_protocol_handler)
        # Register websocket event listener function for "pause protocol".
        self.controller.comms.sio.on(self.pause_event, self.pause_protocol_handler)
        # Register websocket event listener function for "pause protocol".
        self.controller.comms.sio.on(self.continue_event, self.continue_protocol_handler)

        # Register websocket event listener function for "start human intervention".
        self.controller.comms.sio.on('human_intervention_continue', self.continue_message_handler)

        # Register websocket event listener function for "cancelled human intervention".
        self.controller.comms.sio.on('human_intervention_cancelled', self.abort_message_handler)

        @self.controller.comms.sio.on(self.run_protocol_event)
        async def command_message_handler(data):
            """
            Receives a command from the GUI/backend and sends them to the GCODE generator.

            There are several different commands which arrive through this event (see the class' docstring).
            """

            logging.info(f"Incoming '{self.run_protocol_event}' event.")
            logging.debug(f"'{self.run_protocol_event}''s data:\n{pformat(data)}")

            if self.p2g_command_queue.empty() and not self.current_cmd:
                # Add the command to the queue.
                rpc_data = self.make_rpc(self.run_protocol_event, data)
                self.p2g_command_queue.put(rpc_data, block=False)
            else:
                # TODO: rewrite this part, letting it wait until the older items are processed.
                #       One option is to use a queue with length 1, and let the code be blocked here.
                #       I however don't know if that would block the whole program, Should I use await instead?
                msg = f"command {self.run_protocol_event}: the queue is not empty (n={self.p2g_command_queue.qsize()}), 'p2g_command' socketio command rejected: {data}"
                self.controller.send_alert(msg, alert_type="error")
                logging.warning(msg)

        @self.controller.comms.sio.on(self.go_to_event)
        def command_go_to(data):
            """Move the robot to a platform item or content."""
            logging.info(f"Go-to command received '{self.go_to_event}' through socketio with data:\n{pformat(data)}")
            # Queue the command.
            self.queue_command(self.go_to_event, data)

        @self.controller.comms.sio.on(self.park_tool_event)
        def command_park_tool(data):
            """Park the requested tool."""
            logging.info(f"Park-tool command received '{self.park_tool_event}' through socketio with data:\n{pformat(data)}")
            # Queue the command.
            self.queue_command(self.park_tool_event, data)

        @self.controller.comms.sio.on(self.kill_event)
        async def command_kill_handler(data):
            """
            Send an emergency stop command to Klipper/Moonraker immediately, and stop the commander's coroutines.
            """
            logging.info(f"Received '{self.kill_event}' command: " + pformat(data))
            result = True

            # Send "emergency_stop" message to Klipper/Moonraker.
            try:
                if not self.controller.machine.dry:
                    await self.controller.machine.emergency_stop()
                else:
                    logging.error("Skipping emergency_stop, commander in dry mode.")
            except Exception:
                logging.error("Error sending emergency stop to the firmware.")
                result = False

            # Stop/kill coroutines.
            try:
                await self.controller.stop()
            except Exception:
                logging.error("Error stopping the controller.")
                result = False

            return result

Function to register socketio event callbacks, typically sent by the Pipettin GUI.

async def run_human_action(self, action: dict, i: int, *args, **kwargs)
Expand source code
async def run_human_action(self, action: dict, i: int, *args, **kwargs):
    """Execute a HUMAN action from a mid-level protocol.

    Args:
        action (dict): The action definition.
        i (int): Action index.

    Example action:
        {"cmd":"HUMAN",
         "args":{"text":"Click me!"},
         "stepID":"669d9e9a2bae3945e9aca164"}
    """

    message = action["args"]["text"]
    timeout = action["args"].get("timeout", 600)

    logging.info(f"Executing human action {i} with message: '{message}'. Timeout set to '{timeout}' seconds.")

    if self.waiting_for_human is True:
        raise ProtocolError("Another human action is already awaitng a response.")

    # Set waiting flag.
    self.waiting_for_human = True
    # Send waiting event to the UI.
    await self.controller.comms.sio_emit_intervention(message)

    # Wait for a response.
    wait_time = min(1, timeout)
    while self.waiting_for_human is True and timeout >= 0:
        logging.debug("Waiting for response from websocket...")
        await asyncio.sleep(wait_time)
        timeout -= wait_time

    # Process state.
    if self.waiting_for_human is True:
        # Reset the flag to a false-like value.
        self.waiting_for_human = None
        # Handle Timed out.
        msg = "Timed-out while waiting for the human intervention (no response from the socket)."
        await self.controller.comms.sio_emit_alert(msg)
        logging.error(msg)
        raise ProtocolError(message=msg, action=action)

    elif self.waiting_for_human == self.continue_code:
        # Handle Continue.
        logging.info(f"Received {self.continue_code} response from the controller.")
        return True

    elif self.waiting_for_human == self.abort_code:
        # Handle Abort.
        msg = f"Received {self.abort_code} response from the controller."
        logging.warning(msg)
        raise ProtocolError(message=msg, action=action)

    else:
        # Reset the flag to a false-like value.
        self.waiting_for_human = None
        # Handle Invalid status code.
        msg = f"Received an invalid response from the socket: {self.waiting_for_human}"
        self.controller.comms.sio_emit_alert(msg)
        logging.error(msg)
        raise ProtocolError(message=msg, action=action)

Execute a HUMAN action from a mid-level protocol.

Args

action : dict
The action definition.
i : int
Action index.

Example action: {"cmd":"HUMAN", "args":{"text":"Click me!"}, "stepID":"669d9e9a2bae3945e9aca164"}

async def run_park_command(self, action, i, *args, **kwargs)
Expand source code
async def run_park_command(self, action, i, *args, **kwargs):
    """Action handler for parking a tool."""
    # Get the next and current tools.
    tool_to_park = action["args"].get("tool", None)
    current_tool = self.controller.builder.current_tool
    logging.info(f"Executing parking action with current_tool='{current_tool}' and tool_to_park='{tool_to_park}'.")

    # Check that the tool is in the list.
    if tool_to_park and (tool_to_park not in self.builder.tools):
        msg = f"Parking error: '{tool_to_park}' is not in the tool list. Available tools: {list(self.builder.tools)}"
        self.controller.send_alert(msg, alert_type="error")
        raise ProtocolError(message=msg)

    # If no tool is mounted according to the controller, or there is a mismatch, require a confirmation.
    if tool_to_park != current_tool:
        msg = "Confirmation required to park the tool: there is a mismatch between the controllers account and your request."
        msg += f"\n\nPlease check that the tool '{tool_to_park}' is currently mounted, and that it can be parked."
        if current_tool is not None:
            msg += f" For redundancy, check that the tool '{current_tool}' is not mounted."

        adhoc_human_action = {"cmd": self.human_action_id, "args": {"text": msg}}
        try:
            await self.run_human_action(action=adhoc_human_action, i=i)
        except ProtocolError as e:
            logging.error(f"Parking request aborted with message: {e.message}")
        # Override the active tool attribute.
        self.builder.current_tool = tool_to_park
    else:
        logging.debug("Parking the current tool.")

    # Skip all of this if the tool to be set is "None" (i.e. the empty tool-head).
    forced_t0_homing = False
    if tool_to_park is not None:
        # Get the name of the workspace.
        workspace_name = action["workspace"]
        logging.info(f"Overriding workspace in preparation for parking. Using workspace '{workspace_name}'.")
        # Update by workspace name.
        self.builder.update_objects_by_workspace(workspace_name=workspace_name)

        # Activate the tool-changer axis.
        cmd_id, (wait_pass, check_pass) = await self.controller.machine.activate_toolchanger_axis()
        if not wait_pass or not check_pass:
            raise ProtocolError("Failed to select the tool-changer axis. Use the manual control to park the tool.")

        # Check if the machine needs homing.
        # TODO: Generalize this, and move it to controller.machine.
        logging.debug("Requesting update on the homed axes.")
        homing_state = await self.controller.machine.wait_for_homing_update()
        homed_axes = str(homing_state).lower()
        if not homing_state or "x" not in homed_axes or "y" not in homed_axes or "z" not in homed_axes:
            raise ProtocolError("Homing required before parking the tool.")
        logging.debug(f"Current homed axes: {homed_axes}")

        # Force origin command.
        if 'e' not in homed_axes:
            forced_t0_homing = True
            msg = f"The tool-changer axis is not homed. It will be 'zeroed' to allow parking the '{tool_to_park}' tool, and then homed properly."
            self.controller.send_alert(msg, alert_type="alarm")
            logging.warning(msg)
            # TODO: De-hardcode this command. Use the "GcodePrimitives" instead.
            cmd, _ = self.controller.machine.make_gcode_cmd("SET_KINEMATIC_POSITION E=0")
            _, (wait_pass, check_pass) = await self.controller.machine.send_cmd(
                rpc_cmd=cmd, wait=True, check=True, timeout=1.1)
            if not wait_pass or not check_pass:
                raise ProtocolError("Failed to zero the tool-changer axis. Use the manual control to park the tool.")
    else:
        logging.debug("The tool requested to park is 'None'. The current tool will be set empty without moves.")

    # Add the commands and the timeout (by reference).
    logging.debug(f"Building parking moves for tool '{tool_to_park}'.")
    # NOTE: The new tool is set to "None" because this indicates a parking move.
    self.builder.toolChangeMacro(new_tool=None, extend=True, action=action)

    # Home the tool changer.
    if tool_to_park is not None and forced_t0_homing:
        # Only if a tool was parked and the tool-changer was not homed initially.
        t0_home = self.gcode.gcodeHomeP(cmd_prefix="HOME_", which_axis="E0")
        # TODO: "gcodeHomeP" is largely deprecated. Replace it.
        self.controller.builder.extend_commands(commands=t0_home, action=action)

    # Execute the GCODE in the action.
    logging.info("Executing parking moves.")
    logging.debug(f"Final parking action:\n{pformat(action)}")
    await self.controller.machine.run_actions_protocol(actions=[action], i=42, wait=True, check=True)

Action handler for parking a tool.

async def send_execution_updates(self, action: dict, status: str)
Expand source code
async def send_execution_updates(self, action: dict, status: str):
    """Callback method that sends progress updates from a running actions protocol.

    Args:
        action (dict): Data for an action.
        status (str): Status code from the controller for the current action.

    Status codes for action can be: "error", "paused", "done", and "running".
    """
    # # Let the UI know this action is about to be executed.
    # await self.controller.comms.sio_emit_action_update(action, status)

    # TODO: figure out how to tolerate all future actions, such as PCR procols, etc.

    # Make a copy.
    action = deepcopy(action)

    # Clean up.
    action.pop("GCODE", None)
    action.pop("rpc_commands", None)
    action.pop("tracker", None)

    # Fix non-serializable ID objects.
    if action.get("_id", None):
        logging.debug(f"Stringifying ID object from action: {action['_id']}")
        action["_id"] = str(action["_id"])

    # The "index" data is only available in actions produced by "make_protocol_commands".
    index = action.get("index", None)

    # Report progress to the UI.
    protocol_name = action.get("protocol", None)
    if protocol_name is not None:
        # Build basic data.
        update_data = {
            'action': action,
            'status': status,
            'action_index': action["index"],
            'protocol': protocol_name,
            'stepID': action.get('stepID', None),
            'workspace': action.get('workspace', None),
            'hLprotocol': action.get('hLprotocol', None)
            # 'hLprotocolID': action.get('hLprotocolID', None)
        }

        # Add progress information.
        fraction = (action["index"] + 1) / (action["action_count"])
        update_data.update({
            'length': action["action_count"],
            'message': f"Action {index + 1} of {action['action_count']}",
            'fraction': fraction
        })

        # Send progress update.
        logging.info(f"Sending execution update for protocol '{update_data['protocol']}'.")
        logging.debug(f"Sending update with contents:\n{pformat(update_data)}")
        await self.controller.comms.emit(self.sio_event_execution_update, update_data)

        try:
            # Try to update the action in MongoDB.
            self.controller.database_tools.updateActionBy(
                action=action, protocol_name=protocol_name,
                new_data=status, field="status", find_by="index")
        except Exception as e:
            msg = f"update_action_status: failed to update action in the DB with error message: {e}\n"
            msg += traceback.format_exc()
            logging.error(msg)
            print(msg)

Callback method that sends progress updates from a running actions protocol.

Args

action : dict
Data for an action.
status : str
Status code from the controller for the current action.

Status codes for action can be: "error", "paused", "done", and "running".

def stop_protocol_handler(self, *args, **kwargs)
Expand source code
def stop_protocol_handler(self, *args, **kwargs):
    """Handle press of stop button in the protocol panel."""
    # TODO: Use incoming data. It has protocol _id and name.
    if self.protocol_is_running(self.stop_event):
        logging.info("Setting the stop event.")
        self.controller.stop_protocol.set()
        if self.controller.pause_protocol.is_set():
            logging.debug("Also clearing the pause event.")
            self.controller.pause_protocol.clear()

Handle press of stop button in the protocol panel.

Inherited members