Module pipettin-piper.piper.plugins.base_actions

Functions

def load_plugin(controller: "'Controller'", **kwargs)
Expand source code
def load_plugin(controller: "Controller", **kwargs):
    """
    Plugins are expected to have a function named 'load_plugin' which will instantiate
    the plugin's class and returning it to the main Commander class.
    If they fail to load, they must raise a PluginError exception.
    """
    logging.debug(f"load_plugin: loading {plugin_name} plugin.")
    try:
        class_instance = BaseActions(controller=controller, **kwargs)
    except Exception as e:
        msg = f"Failed to load with error: {e}"
        logging.error(msg)
        raise PluginError(msg) from e

    return class_instance

Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.

Classes

class BaseActions (controller: Controller)
Expand source code
class BaseActions(Plugin):
    """Handler methods for basic actions.
    This plugin handles 'HOME' and 'WAIT' actions.
    """
    def __init__(self, controller: Controller):

        # Log messages.
        logging.info("Loading plugin %s", plugin_name)

        # Save the Controller and GCODE builder objects.
        self.controller = controller
        self.gcode_builder: GcodeBuilder = controller.builder

        # Get the gcode generator.
        self.gcode: GcodePrimitives = controller.builder.gcode

        # Add your handlers for new protocol actions.
        # The builder handler is used by 'builder.parseAction' to generate GCODE.
        self.controller.builder.add_action_handler(name=self.home_action_name,
                                                   function=self.home_action_handler)
        self.controller.builder.add_action_handler(name=self.wait_action_name,
                                                   function=self.wait_action_parser)
        self.controller.builder.add_action_handler(name=self.gcode_action_name,
                                                   function=self.gcode_action_parser)
        # The controller handler is used by 'controller.run_actions_protocol',
        # to execute the action.
        self.controller.add_action_runner(name=self.home_action_name,
                                          function=self.home_action_runner)

        # Add the WAIT action as a skippable one.
        self.controller.builder.tc_check_skip_actions.append(self.wait_action_name)

        # Set status.
        self._status = True

    gcode_action_name = 'GCODE'
    def gcode_action_parser(self, action, i):
        """Run raw gcode."""
        # Get the gcode.
        raw_gcode = action["args"]["GCODE"]
        # Add commands to the action.
        self.controller.builder.extend_commands(raw_gcode)
        logging.debug(f"Processed {self.gcode_action_name} action with index {i} and commands: '{raw_gcode}'")
        return raw_gcode

    home_action_name = "HOME"
    def home_action_handler(self, action, i):
        """Home the machine and its tool.
        GCODE generation is deferred until execution by 'home_action_runner'.
        This allows skipping an axis if already homed.
        """
        # Make GCODE.
        msg = f"Parsed HOME action with index {i}. Homing GCODE generation deffered."
        logging.debug(msg)
        commands = [self.gcode.comment(msg)]
        # Save current tool in the action.
        action["args"]["loaded_tool"] = self.controller.builder.current_tool
        # Extend protocol.
        self.controller.builder.extend_commands(commands, action)
        return commands

    async def get_homed_axes(self, timeout=10.0) -> str:
        """Get the currently homed axes as a string."""
        # Get the currently homed axes.
        homed_axes = await self.controller.machine.wait_for_homing_update(timeout=timeout)
        if homed_axes is None:
            raise ProtocolError("Could not get the currently homed axes due to a communication time-out.")
        return homed_axes

    async def home_action_runner(self, action: dict, i: int, wait:bool, check:bool, timeout:float):
        """This method must execute provided action."""
        logging.info(f"Executing homing action with index {i}.")

        # Commands list.
        commands = [self.gcode.comment("Home action runner commands START")]

        # Get axes if any, defaulting to all.
        axes = action.get("args", {}).get("axes", "XYZE")
        # Get tool names, defaulting to all.
        tools = action.get("args", {}).get("tools", list(self.controller.builder.tools))

        # Get the name of the loaded tool at the time of parsing.
        # This indicates wether a tool is loaded in the previous action, which can
        # only be checked at the time of parsing the action (i.e. "current_tool" is not helpful).
        loaded_tool = action.get("args", {}).get("loaded_tool", None)

        # Activate the tool-changer axis.
        if "E" in axes.upper():
            logging.debug("Activating tool-changer axis for homing.")
            _, (wait_pass, check_pass) = await self.controller.machine.activate_toolchanger_axis()
            if not wait_pass or not check_pass:
                raise ProtocolError("Failed to select the tool-changer axis. Homing failed.")

        # Get the currently homed axes.
        homed_axes = await self.get_homed_axes(timeout=10.0)

        # Filter out the currently homed axes.
        axes = "".join([a for a in axes if a.upper() not in homed_axes.upper()])
        logging.debug(f"Will now home axes '{axes}' and tools: {tools}")

        # Home XYZ axes.
        xyz_axes = "".join([a for a in axes if a.upper() in "XYZ"])
        if xyz_axes:
            logging.debug(f"Adding homing commands for motion system axes: '{xyz_axes}'")
            commands += self.controller.builder.gcode.gcodeHomeXYZ(axes=axes)
        else:
            logging.debug("Motion system axes already homed.")

        # Home E axis.
        if axes.upper().find("E") >= 0:
            # Check for tool present.
            if loaded_tool:
                msg = f"Can't home the tool-changer axis because tool '{loaded_tool}' seems to be mounted."
                logging.error(msg)
                raise ProtocolError(msg)
            # Add the homing command for the tool-changer.
            logging.debug("Adding homing command for the tool-changer axis.")
            commands += self.gcode.gcodeHomeP(cmd_prefix="HOME_", which_axis="E0")
            # TODO: "gcodeHomeP" is largely deprecated. Replace it.

        # Home tools.
        for tool in self.controller.builder.tools.values():
            # Skip unselected tools.
            if tools and tool.name not in tools:
                logging.debug(f"Tool '{tool.name}' not selected for homing. Skipped.")
                continue
            # Activate the tool.
            logging.debug(f"Activating tool '{tool.name}' for homing check.")
            _, checks = await self.controller.machine.send_gcode_script(
                gcode_cmds=tool.gcode_activate(),
                wait=True, check=True, timeout=1.1) # Previously skipped checks.
            if not all(checks):
                raise ProtocolError(f"Failed to select the tool axis for '{tool.name}'. Homing action failed.")
            # Get the currently homed axes.
            logging.debug(f"Checking if '{tool.name}' is already homed.")
            homed_axes = await self.get_homed_axes(timeout=10.0)
            # Perform homing only if needed.
            if "E" not in homed_axes.upper():
                # Home the tool.
                logging.debug(f"Adding homing commands for tool '{tool.name}'.")
                commands += tool.home()
            else:
                logging.debug(f"Skipping '{tool.name}'. It is already homed.")

        commands += [self.gcode.comment("Home action runner commands END")]
        logging.debug(f"Extending parsed homing commands by:\n{pformat(commands)}")

        # Add commands to the action.
        action.setdefault("GCODE", [])
        action["GCODE"].extend(commands)
        # Add gcode parsing options: increase the timeout for the homing move.
        action = self.controller.machine.update_exec_opts(
            action=action, wait=True, check=True, timeout=60.0, add_timeout=True)

        # Execute the GCODE in the action.
        logging.info(f"Executing homing moves for action {i}.")
        logging.debug(f"Final homing action commands:\n{pformat(action)}")
        await self.controller.machine.run_actions_protocol(actions=[action], i=i)

        # Done!
        logging.debug(f"Homing moves for action {i} executed successfuly.")

    wait_action_name = "WAIT"
    def wait_action_parser(self, action, i):
        """Parse 'dwell' action, pausing GCODE commands for a certain time."""
        gcode_commands = self.gcode.gcodeDwell(seconds=action["args"]["seconds"])

        self.controller.builder.extend_commands(gcode_commands)

        logging.debug(f'Parsed {self.wait_action_name} action {i} with time {action["args"]["seconds"]}')

        return gcode_commands

Handler methods for basic actions. This plugin handles 'HOME' and 'WAIT' actions.

Ancestors

Class variables

var gcode_action_name
var home_action_name
var wait_action_name

Methods

def gcode_action_parser(self, action, i)
Expand source code
def gcode_action_parser(self, action, i):
    """Run raw gcode."""
    # Get the gcode.
    raw_gcode = action["args"]["GCODE"]
    # Add commands to the action.
    self.controller.builder.extend_commands(raw_gcode)
    logging.debug(f"Processed {self.gcode_action_name} action with index {i} and commands: '{raw_gcode}'")
    return raw_gcode

Run raw gcode.

async def get_homed_axes(self, timeout=10.0) ‑> str
Expand source code
async def get_homed_axes(self, timeout=10.0) -> str:
    """Get the currently homed axes as a string."""
    # Get the currently homed axes.
    homed_axes = await self.controller.machine.wait_for_homing_update(timeout=timeout)
    if homed_axes is None:
        raise ProtocolError("Could not get the currently homed axes due to a communication time-out.")
    return homed_axes

Get the currently homed axes as a string.

def home_action_handler(self, action, i)
Expand source code
def home_action_handler(self, action, i):
    """Home the machine and its tool.
    GCODE generation is deferred until execution by 'home_action_runner'.
    This allows skipping an axis if already homed.
    """
    # Make GCODE.
    msg = f"Parsed HOME action with index {i}. Homing GCODE generation deffered."
    logging.debug(msg)
    commands = [self.gcode.comment(msg)]
    # Save current tool in the action.
    action["args"]["loaded_tool"] = self.controller.builder.current_tool
    # Extend protocol.
    self.controller.builder.extend_commands(commands, action)
    return commands

Home the machine and its tool. GCODE generation is deferred until execution by 'home_action_runner'. This allows skipping an axis if already homed.

async def home_action_runner(self, action: dict, i: int, wait: bool, check: bool, timeout: float)
Expand source code
async def home_action_runner(self, action: dict, i: int, wait:bool, check:bool, timeout:float):
    """This method must execute provided action."""
    logging.info(f"Executing homing action with index {i}.")

    # Commands list.
    commands = [self.gcode.comment("Home action runner commands START")]

    # Get axes if any, defaulting to all.
    axes = action.get("args", {}).get("axes", "XYZE")
    # Get tool names, defaulting to all.
    tools = action.get("args", {}).get("tools", list(self.controller.builder.tools))

    # Get the name of the loaded tool at the time of parsing.
    # This indicates wether a tool is loaded in the previous action, which can
    # only be checked at the time of parsing the action (i.e. "current_tool" is not helpful).
    loaded_tool = action.get("args", {}).get("loaded_tool", None)

    # Activate the tool-changer axis.
    if "E" in axes.upper():
        logging.debug("Activating tool-changer axis for homing.")
        _, (wait_pass, check_pass) = await self.controller.machine.activate_toolchanger_axis()
        if not wait_pass or not check_pass:
            raise ProtocolError("Failed to select the tool-changer axis. Homing failed.")

    # Get the currently homed axes.
    homed_axes = await self.get_homed_axes(timeout=10.0)

    # Filter out the currently homed axes.
    axes = "".join([a for a in axes if a.upper() not in homed_axes.upper()])
    logging.debug(f"Will now home axes '{axes}' and tools: {tools}")

    # Home XYZ axes.
    xyz_axes = "".join([a for a in axes if a.upper() in "XYZ"])
    if xyz_axes:
        logging.debug(f"Adding homing commands for motion system axes: '{xyz_axes}'")
        commands += self.controller.builder.gcode.gcodeHomeXYZ(axes=axes)
    else:
        logging.debug("Motion system axes already homed.")

    # Home E axis.
    if axes.upper().find("E") >= 0:
        # Check for tool present.
        if loaded_tool:
            msg = f"Can't home the tool-changer axis because tool '{loaded_tool}' seems to be mounted."
            logging.error(msg)
            raise ProtocolError(msg)
        # Add the homing command for the tool-changer.
        logging.debug("Adding homing command for the tool-changer axis.")
        commands += self.gcode.gcodeHomeP(cmd_prefix="HOME_", which_axis="E0")
        # TODO: "gcodeHomeP" is largely deprecated. Replace it.

    # Home tools.
    for tool in self.controller.builder.tools.values():
        # Skip unselected tools.
        if tools and tool.name not in tools:
            logging.debug(f"Tool '{tool.name}' not selected for homing. Skipped.")
            continue
        # Activate the tool.
        logging.debug(f"Activating tool '{tool.name}' for homing check.")
        _, checks = await self.controller.machine.send_gcode_script(
            gcode_cmds=tool.gcode_activate(),
            wait=True, check=True, timeout=1.1) # Previously skipped checks.
        if not all(checks):
            raise ProtocolError(f"Failed to select the tool axis for '{tool.name}'. Homing action failed.")
        # Get the currently homed axes.
        logging.debug(f"Checking if '{tool.name}' is already homed.")
        homed_axes = await self.get_homed_axes(timeout=10.0)
        # Perform homing only if needed.
        if "E" not in homed_axes.upper():
            # Home the tool.
            logging.debug(f"Adding homing commands for tool '{tool.name}'.")
            commands += tool.home()
        else:
            logging.debug(f"Skipping '{tool.name}'. It is already homed.")

    commands += [self.gcode.comment("Home action runner commands END")]
    logging.debug(f"Extending parsed homing commands by:\n{pformat(commands)}")

    # Add commands to the action.
    action.setdefault("GCODE", [])
    action["GCODE"].extend(commands)
    # Add gcode parsing options: increase the timeout for the homing move.
    action = self.controller.machine.update_exec_opts(
        action=action, wait=True, check=True, timeout=60.0, add_timeout=True)

    # Execute the GCODE in the action.
    logging.info(f"Executing homing moves for action {i}.")
    logging.debug(f"Final homing action commands:\n{pformat(action)}")
    await self.controller.machine.run_actions_protocol(actions=[action], i=i)

    # Done!
    logging.debug(f"Homing moves for action {i} executed successfuly.")

This method must execute provided action.

def wait_action_parser(self, action, i)
Expand source code
def wait_action_parser(self, action, i):
    """Parse 'dwell' action, pausing GCODE commands for a certain time."""
    gcode_commands = self.gcode.gcodeDwell(seconds=action["args"]["seconds"])

    self.controller.builder.extend_commands(gcode_commands)

    logging.debug(f'Parsed {self.wait_action_name} action {i} with time {action["args"]["seconds"]}')

    return gcode_commands

Parse 'dwell' action, pausing GCODE commands for a certain time.

Inherited members