Module pipettin-piper.piper.plugins.base_actions
Functions
def load_plugin(controller: "'Controller'", **kwargs)-
Expand source code
def load_plugin(controller: "Controller", **kwargs): """ Plugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception. """ logging.debug(f"load_plugin: loading {plugin_name} plugin.") try: class_instance = BaseActions(controller=controller, **kwargs) except Exception as e: msg = f"Failed to load with error: {e}" logging.error(msg) raise PluginError(msg) from e return class_instancePlugins are expected to have a function named 'load_plugin' which will instantiate the plugin's class and returning it to the main Commander class. If they fail to load, they must raise a PluginError exception.
Classes
class BaseActions (controller: Controller)-
Expand source code
class BaseActions(Plugin): """Handler methods for basic actions. This plugin handles 'HOME' and 'WAIT' actions. """ def __init__(self, controller: Controller): # Log messages. logging.info("Loading plugin %s", plugin_name) # Save the Controller and GCODE builder objects. self.controller = controller self.gcode_builder: GcodeBuilder = controller.builder # Get the gcode generator. self.gcode: GcodePrimitives = controller.builder.gcode # Add your handlers for new protocol actions. # The builder handler is used by 'builder.parseAction' to generate GCODE. self.controller.builder.add_action_handler(name=self.home_action_name, function=self.home_action_handler) self.controller.builder.add_action_handler(name=self.wait_action_name, function=self.wait_action_parser) self.controller.builder.add_action_handler(name=self.gcode_action_name, function=self.gcode_action_parser) # The controller handler is used by 'controller.run_actions_protocol', # to execute the action. self.controller.add_action_runner(name=self.home_action_name, function=self.home_action_runner) # Add the WAIT action as a skippable one. self.controller.builder.tc_check_skip_actions.append(self.wait_action_name) # Set status. self._status = True gcode_action_name = 'GCODE' def gcode_action_parser(self, action, i): """Run raw gcode.""" # Get the gcode. raw_gcode = action["args"]["GCODE"] # Add commands to the action. self.controller.builder.extend_commands(raw_gcode) logging.debug(f"Processed {self.gcode_action_name} action with index {i} and commands: '{raw_gcode}'") return raw_gcode home_action_name = "HOME" def home_action_handler(self, action, i): """Home the machine and its tool. GCODE generation is deferred until execution by 'home_action_runner'. This allows skipping an axis if already homed. """ # Make GCODE. msg = f"Parsed HOME action with index {i}. Homing GCODE generation deffered." logging.debug(msg) commands = [self.gcode.comment(msg)] # Save current tool in the action. action["args"]["loaded_tool"] = self.controller.builder.current_tool # Extend protocol. self.controller.builder.extend_commands(commands, action) return commands async def get_homed_axes(self, timeout=10.0) -> str: """Get the currently homed axes as a string.""" # Get the currently homed axes. homed_axes = await self.controller.machine.wait_for_homing_update(timeout=timeout) if homed_axes is None: raise ProtocolError("Could not get the currently homed axes due to a communication time-out.") return homed_axes async def home_action_runner(self, action: dict, i: int, wait:bool, check:bool, timeout:float): """This method must execute provided action.""" logging.info(f"Executing homing action with index {i}.") # Commands list. commands = [self.gcode.comment("Home action runner commands START")] # Get axes if any, defaulting to all. axes = action.get("args", {}).get("axes", "XYZE") # Get tool names, defaulting to all. tools = action.get("args", {}).get("tools", list(self.controller.builder.tools)) # Get the name of the loaded tool at the time of parsing. # This indicates wether a tool is loaded in the previous action, which can # only be checked at the time of parsing the action (i.e. "current_tool" is not helpful). loaded_tool = action.get("args", {}).get("loaded_tool", None) # Activate the tool-changer axis. if "E" in axes.upper(): logging.debug("Activating tool-changer axis for homing.") _, (wait_pass, check_pass) = await self.controller.machine.activate_toolchanger_axis() if not wait_pass or not check_pass: raise ProtocolError("Failed to select the tool-changer axis. Homing failed.") # Get the currently homed axes. homed_axes = await self.get_homed_axes(timeout=10.0) # Filter out the currently homed axes. axes = "".join([a for a in axes if a.upper() not in homed_axes.upper()]) logging.debug(f"Will now home axes '{axes}' and tools: {tools}") # Home XYZ axes. xyz_axes = "".join([a for a in axes if a.upper() in "XYZ"]) if xyz_axes: logging.debug(f"Adding homing commands for motion system axes: '{xyz_axes}'") commands += self.controller.builder.gcode.gcodeHomeXYZ(axes=axes) else: logging.debug("Motion system axes already homed.") # Home E axis. if axes.upper().find("E") >= 0: # Check for tool present. if loaded_tool: msg = f"Can't home the tool-changer axis because tool '{loaded_tool}' seems to be mounted." logging.error(msg) raise ProtocolError(msg) # Add the homing command for the tool-changer. logging.debug("Adding homing command for the tool-changer axis.") commands += self.gcode.gcodeHomeP(cmd_prefix="HOME_", which_axis="E0") # TODO: "gcodeHomeP" is largely deprecated. Replace it. # Home tools. for tool in self.controller.builder.tools.values(): # Skip unselected tools. if tools and tool.name not in tools: logging.debug(f"Tool '{tool.name}' not selected for homing. Skipped.") continue # Activate the tool. logging.debug(f"Activating tool '{tool.name}' for homing check.") _, checks = await self.controller.machine.send_gcode_script( gcode_cmds=tool.gcode_activate(), wait=True, check=True, timeout=1.1) # Previously skipped checks. if not all(checks): raise ProtocolError(f"Failed to select the tool axis for '{tool.name}'. Homing action failed.") # Get the currently homed axes. logging.debug(f"Checking if '{tool.name}' is already homed.") homed_axes = await self.get_homed_axes(timeout=10.0) # Perform homing only if needed. if "E" not in homed_axes.upper(): # Home the tool. logging.debug(f"Adding homing commands for tool '{tool.name}'.") commands += tool.home() else: logging.debug(f"Skipping '{tool.name}'. It is already homed.") commands += [self.gcode.comment("Home action runner commands END")] logging.debug(f"Extending parsed homing commands by:\n{pformat(commands)}") # Add commands to the action. action.setdefault("GCODE", []) action["GCODE"].extend(commands) # Add gcode parsing options: increase the timeout for the homing move. action = self.controller.machine.update_exec_opts( action=action, wait=True, check=True, timeout=60.0, add_timeout=True) # Execute the GCODE in the action. logging.info(f"Executing homing moves for action {i}.") logging.debug(f"Final homing action commands:\n{pformat(action)}") await self.controller.machine.run_actions_protocol(actions=[action], i=i) # Done! logging.debug(f"Homing moves for action {i} executed successfuly.") wait_action_name = "WAIT" def wait_action_parser(self, action, i): """Parse 'dwell' action, pausing GCODE commands for a certain time.""" gcode_commands = self.gcode.gcodeDwell(seconds=action["args"]["seconds"]) self.controller.builder.extend_commands(gcode_commands) logging.debug(f'Parsed {self.wait_action_name} action {i} with time {action["args"]["seconds"]}') return gcode_commandsHandler methods for basic actions. This plugin handles 'HOME' and 'WAIT' actions.
Ancestors
Class variables
var gcode_action_namevar home_action_namevar wait_action_name
Methods
def gcode_action_parser(self, action, i)-
Expand source code
def gcode_action_parser(self, action, i): """Run raw gcode.""" # Get the gcode. raw_gcode = action["args"]["GCODE"] # Add commands to the action. self.controller.builder.extend_commands(raw_gcode) logging.debug(f"Processed {self.gcode_action_name} action with index {i} and commands: '{raw_gcode}'") return raw_gcodeRun raw gcode.
async def get_homed_axes(self, timeout=10.0) ‑> str-
Expand source code
async def get_homed_axes(self, timeout=10.0) -> str: """Get the currently homed axes as a string.""" # Get the currently homed axes. homed_axes = await self.controller.machine.wait_for_homing_update(timeout=timeout) if homed_axes is None: raise ProtocolError("Could not get the currently homed axes due to a communication time-out.") return homed_axesGet the currently homed axes as a string.
def home_action_handler(self, action, i)-
Expand source code
def home_action_handler(self, action, i): """Home the machine and its tool. GCODE generation is deferred until execution by 'home_action_runner'. This allows skipping an axis if already homed. """ # Make GCODE. msg = f"Parsed HOME action with index {i}. Homing GCODE generation deffered." logging.debug(msg) commands = [self.gcode.comment(msg)] # Save current tool in the action. action["args"]["loaded_tool"] = self.controller.builder.current_tool # Extend protocol. self.controller.builder.extend_commands(commands, action) return commandsHome the machine and its tool. GCODE generation is deferred until execution by 'home_action_runner'. This allows skipping an axis if already homed.
async def home_action_runner(self, action: dict, i: int, wait: bool, check: bool, timeout: float)-
Expand source code
async def home_action_runner(self, action: dict, i: int, wait:bool, check:bool, timeout:float): """This method must execute provided action.""" logging.info(f"Executing homing action with index {i}.") # Commands list. commands = [self.gcode.comment("Home action runner commands START")] # Get axes if any, defaulting to all. axes = action.get("args", {}).get("axes", "XYZE") # Get tool names, defaulting to all. tools = action.get("args", {}).get("tools", list(self.controller.builder.tools)) # Get the name of the loaded tool at the time of parsing. # This indicates wether a tool is loaded in the previous action, which can # only be checked at the time of parsing the action (i.e. "current_tool" is not helpful). loaded_tool = action.get("args", {}).get("loaded_tool", None) # Activate the tool-changer axis. if "E" in axes.upper(): logging.debug("Activating tool-changer axis for homing.") _, (wait_pass, check_pass) = await self.controller.machine.activate_toolchanger_axis() if not wait_pass or not check_pass: raise ProtocolError("Failed to select the tool-changer axis. Homing failed.") # Get the currently homed axes. homed_axes = await self.get_homed_axes(timeout=10.0) # Filter out the currently homed axes. axes = "".join([a for a in axes if a.upper() not in homed_axes.upper()]) logging.debug(f"Will now home axes '{axes}' and tools: {tools}") # Home XYZ axes. xyz_axes = "".join([a for a in axes if a.upper() in "XYZ"]) if xyz_axes: logging.debug(f"Adding homing commands for motion system axes: '{xyz_axes}'") commands += self.controller.builder.gcode.gcodeHomeXYZ(axes=axes) else: logging.debug("Motion system axes already homed.") # Home E axis. if axes.upper().find("E") >= 0: # Check for tool present. if loaded_tool: msg = f"Can't home the tool-changer axis because tool '{loaded_tool}' seems to be mounted." logging.error(msg) raise ProtocolError(msg) # Add the homing command for the tool-changer. logging.debug("Adding homing command for the tool-changer axis.") commands += self.gcode.gcodeHomeP(cmd_prefix="HOME_", which_axis="E0") # TODO: "gcodeHomeP" is largely deprecated. Replace it. # Home tools. for tool in self.controller.builder.tools.values(): # Skip unselected tools. if tools and tool.name not in tools: logging.debug(f"Tool '{tool.name}' not selected for homing. Skipped.") continue # Activate the tool. logging.debug(f"Activating tool '{tool.name}' for homing check.") _, checks = await self.controller.machine.send_gcode_script( gcode_cmds=tool.gcode_activate(), wait=True, check=True, timeout=1.1) # Previously skipped checks. if not all(checks): raise ProtocolError(f"Failed to select the tool axis for '{tool.name}'. Homing action failed.") # Get the currently homed axes. logging.debug(f"Checking if '{tool.name}' is already homed.") homed_axes = await self.get_homed_axes(timeout=10.0) # Perform homing only if needed. if "E" not in homed_axes.upper(): # Home the tool. logging.debug(f"Adding homing commands for tool '{tool.name}'.") commands += tool.home() else: logging.debug(f"Skipping '{tool.name}'. It is already homed.") commands += [self.gcode.comment("Home action runner commands END")] logging.debug(f"Extending parsed homing commands by:\n{pformat(commands)}") # Add commands to the action. action.setdefault("GCODE", []) action["GCODE"].extend(commands) # Add gcode parsing options: increase the timeout for the homing move. action = self.controller.machine.update_exec_opts( action=action, wait=True, check=True, timeout=60.0, add_timeout=True) # Execute the GCODE in the action. logging.info(f"Executing homing moves for action {i}.") logging.debug(f"Final homing action commands:\n{pformat(action)}") await self.controller.machine.run_actions_protocol(actions=[action], i=i) # Done! logging.debug(f"Homing moves for action {i} executed successfuly.")This method must execute provided action.
def wait_action_parser(self, action, i)-
Expand source code
def wait_action_parser(self, action, i): """Parse 'dwell' action, pausing GCODE commands for a certain time.""" gcode_commands = self.gcode.gcodeDwell(seconds=action["args"]["seconds"]) self.controller.builder.extend_commands(gcode_commands) logging.debug(f'Parsed {self.wait_action_name} action {i} with time {action["args"]["seconds"]}') return gcode_commandsParse 'dwell' action, pausing GCODE commands for a certain time.
Inherited members