Module pipettin-piper.piper.commanders.klipper_commander
Classes
class KlipperCommander (controller: Controller,
commands: list = None,
tracker: dict = None,
config: dict = None,
dry=False)-
Expand source code
class KlipperCommander(MessageTracker): """A class to drive the Pipettin robot. It's main function is to run in the background, maintain a socket.io connection with the GUI, and a connection to Moonraker (which in turn drives the CNC machine). It also has several convenience methods to send actions from "pipetting protocols". The SocketIO client communicates with the Pipetting GUI; a Node JS web-application for designing workspaces and protocols. This class can (re)connect to Moonraker, send it commands, check responses, and other stuff. The "python-websockets" library is used for this purpose. This class is started by the "start" method, wich is blocking. The method is meant to be used by a background process. Alternatively, try using "await Controller.launch()" in an asyncio context. Returns: Controller: Instance of the class. """ def __init__(self, controller:"Controller", commands:list=None, tracker:dict=None, config:dict=None, dry=False): logging.info("Initiating Machine class.") # Save the controller class. self.controller = controller # Save the config options. self.config = {} if config is None else config # Set paranoid mode. self.paranoid = self.config.get("paranoid", True) # Save the commands list. self.commands = [] if commands is None else commands # Start a commands list for a "session". self.session_commands = [] # Init current action dict. self.current_action = {} # Mark RPC protocol stream as available. self.free = True # Set default values. self.initial_cid = self.cid = self.config.get("initial_cid", 666) self.cid_overwrite = self.config.get("cid_overwrite", False) self.spam_info = self.config.get("spam_info", True) # Setting this to false by default prevents the coroutines from # stopping on a shutdown when no protocol is runnning. self.ensure_printer_ready = self.config.get("ensure_printer_ready", False) # NOTE: Must use "0.0.0.0" IP to avoid OSEerror with multiple exceptions being raised. # https://github.com/python-websockets/websockets/issues/593 # https://bugs.python.org/issue29980 self.ws_address = self.config.get("ws_address", "ws://0.0.0.0:7125/websocket") self.verbose = self.config.get("verbose", True) self.dry = dry or self.config.get("dry", False) self.auto_restart = self.config.get("auto_restart", True) # Machine homed axes. self.homed_axes: str = None # Machine XYZ limits. # TODO: Unify with the ones in GcodePrimitives. self.machine_limits: dict = {} # Machine XYZ position (last commanded position). self.gcode_position: list = [] # Dry mode for testing. if self.dry: logging.warning("KlipperCommander: dry mode enabled.") # Initialize the tracker handler class. super().__init__(self.config, self.controller) # List of methods that should be started as coroutines/tasks. self.controller.coroutine_methods.extend([ self.tracker_sync(), self.websocket_reconnect(), self.websocket_reader(), self.firmware_watchdog(), self.session_watchdog(), self.websocket_info(), self.query_toolhead_status() ]) # Set default values. new_info_response = False printer_state = "" printer_ready = False recovering = False # read_interval: float = 0.005 # TODO: Remove attribute, it is deprecated. free = True """Used to flag if a protocol is being sent through RPC, and help prevent attempts to send another one simultaneously.""" # CHECKS method section #### def check_greenlight(self, action=None): """Check if the commander is free. Raises an error if a current action is already being processed. """ if self.current_action: raise ProtocolError( "Failed to run action; another one is being processed by the controller.", action=action ) def check_controller(self, idx=None, action=None): """Check if the controller is running, or raise an error.""" if not self.controller.run: msg = "Error, the commander is not running." msg += f" Aborting actions protocol at index {idx}." if idx else "" logging.error(msg) raise CommanderError(msg, action=action) def check_cmd_session(self): """Use check_cmd_errors on the current session commands A truthy return value contains a list of command IDs that have errors in their responses. A falsy value means no errors were found. """ if self.session_commands is not None: return self.check_cmd_errors(self.session_commands) def check_cmd_errors(self, query_cmd_ids: list) -> list: """Look for errors in the tracker for the provided cmd ids. The returned list will be empty/falsy if none of the queries commands had errors (yet), or contain the command ids that had errors. """ # First get the relevant tracker entries. entries = [(cmd_id, self.tracker.get(cmd_id, {})) for cmd_id in query_cmd_ids] # Look for errors. error_cmd_ids = [cmd_id for cmd_id, cmd_data in entries if "error" in cmd_data] return error_cmd_ids def errors_in_tracker(self): """Check for an error in the all responses of the tracker dict. TODO: Remove this method, it is unused. """ logging.debug("Parsing all tracker entries for errors in responses.") for item in self.tracker: if "response" in item: if "error" in item["response"]: # An error message was found in the response. logging.error(f"Error found in response to tracker item:\n{item}") return item logging.debug("No errors found in tracker responses.") def check_tracker_scripts(self, tracker=None): """ Check for "not ok" responses in "script" (GCODE) entries of the tracker dict. Entries with no response (yet) are ignored. TODO: Remove this method, it is unused. """ if not tracker: tracker = self.tracker logging.debug("Parsing all tracker entries of GCODE scripts for errors.") for item in tracker: # Skip the item if it does not match a GCODE command try: if item["command"]["method"] != 'printer.gcode.script': continue except KeyError: continue # Look for a response and a result try: # Positive results can be "ok" or "ok withsomethingelsehere..." if not str(item["response"]["result"]).startswith("ok"): # If the above key si found and is not "ok", # then something went wrong. return False except KeyError: # If the keys were not found, # The response or the result are missing, # and they should be waited for. return False # The function gets here if everything went well. return True async def check_command_result_ok(self, cmd_id, timeout=0.0, loop_delay=0.2): """ Wait for an "ok" in the response to a command by its ID. Also returns early if an error was found in the response.""" logging.debug(f"Checking 'ok' response for cmd_id={cmd_id}") elapsed = 0.0 if timeout is None: logging.warning("Infinite time-out for checking on cmd_id=" + str(cmd_id)) while self.controller.run: # Check for an "ok" response try: response = self.tracker.get(str(cmd_id), {}).get("response", {}) result = response.get("result", None) if str(result).startswith("ok"): logging.debug(f"Got 'ok' response for cmd_id={cmd_id}") return True if response.get("error", None): logging.warning(f"Got 'error' response for cmd_id={cmd_id}") return False except (KeyError, AttributeError): pass # TODO: Consider enabling a check on "ws_ready". Otherwise this might get stuck. # if not self.ws_ready and timeout is None: # logging.error("Aborting check on command without timeout. The websocket is not ready.") # return False # Check for a timeout if timeout is not None: if elapsed >= timeout: logging.warning("Timed-out checking for response for cmd_id=" + str(cmd_id)) return None # Wait for a bit await asyncio.sleep(loop_delay) elapsed += loop_delay async def wait_for_response(self, cmd_id, timeout=0.0, loop_delay=0.2): """Wait for a 'response' to be added to a tracker entry.""" return await self.wait_for(cmd_id=cmd_id, what="response", timeout=timeout, loop_delay=loop_delay) #async def wait_for_result(self, cmd_id, timeout=0.0, loop_delay=0.2): # await self.wait_for(cmd_id=cmd_id, what="result", timeout=timeout, loop_delay=loop_delay) async def wait_for(self, cmd_id:str, what:str="response", timeout=0.0, loop_delay=0.2): """Wait for a key to be added to a tracker entry.""" logging.debug(f"Checking for '{what}' in cmd_id={cmd_id} with timeout={timeout}") elapsed = 0.0 while self.controller.run: # Check for a "response" response = self.get_response_by_id(cmd_id, what=what) # Return True if a response was found. if response: logging.debug(f"Response found for cmd_id='{cmd_id}'.") return True # This code executes if no response was found. elif timeout is not None: # Check for a timeout and return if it has elapsed. if elapsed >= timeout: logging.warning(f"Timed-out checking for response for cmd_id={cmd_id} and contents: {response}") return False else: # If no timeout was specified, then return immediately. logging.debug("No response yet for cmd_id='" + str(cmd_id)+ "'. Waiting infinitely.") # Wait for a bit before checking again. await asyncio.sleep(loop_delay) elapsed += loop_delay new_info_response: bool """Flag indicating if a new status report has arrived (managed by check_printer_ready and wait_for_info_update).""" printer_state: str """String describing the state of the printer object (updated by check_printer_ready, consumed by firmware_watchdog).""" printer_ready: bool """String indicating a 'ready' state of the printer object (see also: printer_state).""" def check_printer_ready(self, response, info_id_prefix="websocket_info"): """ Checks if a response contains a result->state equal to "ready". Raises an exception if self.ensure_printer_ready is True. Context: called by websocket_reader when a response is received. """ logging.debug("Checking for " + info_id_prefix + " response.") try: if str(response["id"]).startswith(info_id_prefix): # Update state. self.printer_state = response["result"]["state"] # Check if state is "ready". self.printer_ready = self.printer_state == 'ready' # Release "wait_for_info_update". self.new_info_response = True # Handle "not ready" state. if not self.printer_ready: # Stop the coroutines if ready must be ensured. msg = f"Printer in state='{self.printer_state}' is not ready." if self.ensure_printer_ready: msg += " Setting run=False (due to ensure_printer_ready=True)." logging.error(msg) self.controller.run = False raise NotReadyError(msg) else: logging.debug("Printer ready!") else: logging.debug(f"Command with id={str(response['id'])} does not look like a status report with info_id_prefix={info_id_prefix}") except KeyError as e: logging.debug(f"Could not check message due to KeyError: {e}") except NotReadyError as e: logging.debug(f"NotReadyError with message: {e}") return self.printer_state # WAIT method section #### async def wait_for_info_update(self, wait_time=None, timeout=4.0): """Wait for a new update from "check_printer_ready". Updates are requested automatically. This function is useful to ensure that the information is current. """ # This will be reset to "True" by "check_printer_ready" when a new status update arrives. self.new_info_response = False if wait_time is None: wait_time = self.controller.wait_default if not self.dry: logging.debug(f"Waiting with timeout={timeout} and interval={wait_time}") # Wait elapsed = 0.0 while not self.new_info_response: await asyncio.sleep(wait_time) elapsed += wait_time if elapsed >= timeout: logging.debug("Timed out.") return False else: # TODO: Reconsider how dry mode works, which skips all commands, preventing # info updates too. logging.debug(f"Dry mode enabled, waiting for {wait_time} before returning.") await asyncio.sleep(wait_time) logging.debug("Success!") return True async def wait_for_ready(self, reset=False, wait_time=1.1, timeout=8.0): """ Wait until the printer becomes ready, optionally sending a "FIRMWARE_RESTART" command. This method is similar to "wait_for_idle_printer", but this one relies on regular status updates (not on object queries), and can restart klipper. """ logging.debug("Waiting...") if self.dry: logging.debug(f"Dry mode enabled, sleeping for {wait_time} seconds and returning.") await asyncio.sleep(0.2) logging.debug("Dry mode enabled, returning success.") return True # Track elapsed time initial_time = time.time() # Force wait for a new update. if not await self.wait_for_info_update(timeout=timeout): logging.debug("No new info received 1/2.") else: logging.debug("New info received 1/2.") # Else, new status info was received. # Reset printer if requested and not ready. if not self.printer_ready and reset: logging.debug("Sending firmware_restart.") await self.firmware_restart() # Subtract the elapsed time from the timeout before running the next command. elapsed_time = time.time() - initial_time if elapsed_time >= timeout: second_timeout = 0.0 else: second_timeout = timeout - elapsed_time # Wait for an update again. if not await self.wait_for_info_update(timeout=second_timeout): # Break here if no new info was received. logging.debug("No new info received 2/2.") return False else: logging.debug("New info received 2/2.") # Wait here for printer ready. while not self.printer_ready: await asyncio.sleep(wait_time) second_timeout -= wait_time if self.printer_ready: # Break if the printer became ready break elif second_timeout <= 0: # If not ready, and the timeout is over, return False. logging.debug("Timed out.") return False # Ready! this part of the code is only reached then the printer is ready. assert self.printer_ready # Check again just in case. logging.debug("Success!") return True async def wait_for_setup(self, timeout=5.0, wait_time=None, raise_error=False): """Wait for the Moonraker connection to be established.""" # TODO: Reconsider un-commenting this. It depends on the meaning of "dry". # Does "dry" mean not sending but connecting? or neither? # if self.dry: # logging.warning("wait_for_setup: dry mode enabled, returning True.") # return True if not self.ws_address: logging.warning("wait_for_setup: WS address is empty, returning True after 500 ms.") await asyncio.sleep(0.5) return True # Track elapsed time remaining = timeout if not wait_time: wait_time = self.controller.wait_default # Wait here for websocket ready. while not self.ws_ready: if remaining <= 0: # If not ready, and the timeout is over, return False. logging.debug("wait_for_setup: Timed out.") return False await asyncio.sleep(wait_time) remaining -= wait_time # Raise an error on timeout if requested. if raise_error and not self.ws_ready: raise CommanderError("Timed-out waiting for the connection to moonraker to be established.") return self.ws_ready # MAIN COROUTINES method section #### async def session_watchdog(self, check_interval=1.0): """Command session check coroutine. Will send a "FIRMWARE_RESTART" if a command in a session gets an "error" response. """ logging.info("Coroutine started.") try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while not self.ws_ready and self.controller.run: # Wait here until the websocket is ready. await asyncio.sleep(self.controller.wait_default) while self.controller.run: # Raise errors if any previous command in the current session has failed. if self.check_cmd_session(): logging.error("Errors found in RPC command session. Starting emergency stop to prevent damage.") await self.emergency_stop() logging.warning("Clearing the command session now to avoid multiple restarts.") self.end_cmd_session() # Wait for a bit. await asyncio.sleep(check_interval) except asyncio.exceptions.CancelledError: logging.warning("Coroutine cancelled.") except Exception as e: msg = f"Unhandled exception: {e}" logging.error(msg) print(msg) raise CommanderError(msg) from e logging.warning("Coroutine ended.") # Printer "watchdog" coroutine #### recovering: bool """Flag indicating if the printer was found in an error state and is being restarted.""" async def firmware_watchdog(self, min_wait_for_restart=3.0): """Firmware restart coroutine. Will send a "FIRMWARE_RESTART" command if the printer is found in a "shutdown" state. """ logging.info("Coroutine started.") try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while not self.ws_ready and self.controller.run: # Wait here until the websocket is ready. await asyncio.sleep(self.controller.wait_default) self.recovering = False while self.controller.run: # Wait for a new info update. if await self.wait_for_info_update(timeout=4.0): # If shutdown, send a firmware restart. if self.printer_state in ['shutdown', 'error']: if self.auto_restart: logging.warning(f"{self.printer_state} state found, restarting firmware and waiting for {min_wait_for_restart} seconds.") await self.firmware_restart() else: logging.critical(f"{self.printer_state} state found, you need to manually restart the firmware.") # Set the recovering flag in either case. self.recovering = True elif self.recovering and self.printer_state != 'startup': logging.info(f"Recovery successful, printer state: {self.printer_state}") self.recovering = False else: logging.warning(f"Info update timeout, retrying in {min_wait_for_restart} seconds.") # Wait for a bit await asyncio.sleep(min_wait_for_restart) logging.debug("Controller not running.") except asyncio.exceptions.CancelledError: logging.warning("Coroutine cancelled.") except Exception as e: msg = f"Unhandled exception: {e}" logging.error(msg) print(msg) raise CommanderError(msg) from e logging.warning("Coroutine ended.") # Websocket reader coroutine #### new_info_flag: bool = False """Used by websocket_reader to sleep between reads, and should be quite faster than 'wait_default' (set by the main controller class).""" async def websocket_reader(self): """ The "reader" co-routine. This asyncio co-routine reads from the websocket continuously. The "read" is awaited, which means that other co-routines can still run while the reader is checking for messages. Note: a "co-routine" is similar to the old "serial_reader" thread, made with in Threading. """ logging.info("Coroutine started.") try: while not self.controller.run: # Wait here until the controller is ready. asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") # Wait here until the websocket is ready. while not self.ws_ready and self.controller.run: await asyncio.sleep(self.controller.wait_default) # Loop while running. while self.controller.run: if not self.ws_ready: # Wait until the websocket is ready. await asyncio.sleep(self.controller.wait_default) else: # Receive data from the websocket. try: logging.debug("Waiting for a websocket message.") data = await self.websocket.recv() self.new_info_flag = True # To anyone listening. except ConnectionClosed: logging.warning("Failed to get data, the websocket is closed.") await asyncio.sleep(self.controller.wait_default) continue # Convert the response to a dictionary. response = json.loads(data) # Skip "notify_proc_stat_update" messages. if response.get('method', None) == 'notify_proc_stat_update': logging.debug("Received notify_proc_stat_update, continuing.") continue logging.debug(f"Received response:\n{pformat(response)}") # Add key to tracker dict try: # Track the response. self.track_response(response) except KeyError: logging.debug(f"Incoming message does not contain an 'id' field: {response}") continue # Parse the response for important messages here. # The handlers may raise an exception, or stop the coroutines, etc. self.check_printer_ready(response) # Sleep for a bit before looping. # TODO: This sleep was the cause of incoming commands accumulating, # presumably the new "sqlite code" used by the tracker is blocking, # and slow enough to cause messages to be processed too slowly here. # TODO: This was unnecessary as the recv command already "waits". # await asyncio.sleep(self.read_interval) logging.debug("Controller not running.") except asyncio.exceptions.CancelledError: logging.warning("Task cancelled.") except Exception as e: msg = f"Unhandled exception: {e}" logging.error(msg) print(msg) raise CommanderError(msg) from e # Close the "shelve" tracker object. # TODO: Think of a better place to put this. self.tracker_close() logging.warning("Coroutine ended.") def track_response(self, response: dict): """Register an incoming websocket message in the tracker.""" # Get the response ID response_id = str(response["id"]) # Check if the tracker has this ID already or not. if response_id not in self.tracker: logging.debug("Saving a response with id 'response_id', which did not match an existing entry in the tracker.") self.tracker[response_id] = {} # Look for an existing response. if "response" in self.tracker[response_id]: logging.debug(f"Response found in tracker item with id={response_id} (it will be updated/overwritten).") # Backup the previous response before overwriting it. response_prev = self.tracker[response_id]["response"] self.tracker.update_entry(response_id, {"response_prev": response_prev}) # Update tracker dict, saving the response in a "response" key. self.tracker.update_entry(response_id, {"status": "responded", "response": response}) logging.debug(f"Updated response in tracker item with ID {response_id}") # Websocket status request coroutine #### async def websocket_info(self, cmd_id_prefix="websocket_info", loop_time=None): """ Info request coroutine. Periodically sends "printer.info" requests to Moonraker. """ logging.info("Coroutine started.") if loop_time is None: # NOTE: 0.05 was too fast... setting it to x 3 the default. loop_time = self.controller.wait_default * 3 try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while not self.ws_ready and self.controller.run: # Wait here until the websocket is ready. await asyncio.sleep(self.controller.wait_default) # Loop while running. while self.controller.run: # Check spam flag or wait and skip iteration if not self.spam_info: logging.warning("Waiting until spam_info is set.") await asyncio.sleep(2.0) continue # Wait here until the websocket is ready. while not self.ws_ready and self.controller.run: logging.warning("Waiting until ws_ready and controller.run are set.") await asyncio.sleep(2.0) # Increment ID counter cid = int(self.cid) self.cid = self.cid + 1 # Build command: cmd_id = f"{cmd_id_prefix}_{cid}_{time.time()}" # Generate "{"jsonrpc": "2.0", "method": "printer.info", "id": cmd_id}" # info_command = {"jsonrpc": "2.0", "method": "printer.info", "id": 7} info_command = rpc_primitives.printer_info(cmd_id) # Track time initial_time = time.time() # Send command. await self.send_cmd( rpc_cmd=info_command, cmd_id=cmd_id, wait=False, check=False) # Wait for the actual remaining time (after the previous await). final_time = time.time() if final_time - initial_time <= loop_time: await asyncio.sleep(loop_time - (final_time - initial_time)) logging.info("Coroutine loop ended: controller not running.") except asyncio.exceptions.CancelledError: logging.warning("Task cancelled.") except Exception as e: msg = f"Unhandled exception: {e}" logging.error(msg) print(msg) raise CommanderError(msg) from e logging.warning("Coroutine ended.") # Websocket status request coroutine #### async def query_toolhead_status(self, cmd_id_prefix="homed_axes", loop_time=0.5): """ Tooolhead info request coroutine, including homed axes. Periodically sends "printer.objects.query" requests to Moonraker. NOTE: 0.2 seconds is too fast. """ logging.info("Coroutine started.") try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while not self.ws_ready and self.controller.run: # Wait here until the websocket is ready. await asyncio.sleep(self.controller.wait_default) # Loop while running. while self.controller.run: # Check spam flag or wait and skip iteration if not self.spam_info: logging.warning("Waiting until spam_info is set.") await asyncio.sleep(2.0) continue # Wait here until the websocket is ready. while not self.ws_ready and self.controller.run: logging.warning("Waiting until ws_ready and controller.run are set.") await asyncio.sleep(2.0) # Increment ID counter cid = int(self.cid) self.cid = self.cid + 1 # Build command: cmd_id = f"{cmd_id_prefix}_{cid}_{time.time()}" # Generate "{"jsonrpc": "2.0", "method": "printer.info", "id": cmd_id}" # info_command = {"jsonrpc": "2.0", "method": "printer.info", "id": 7} info_command = rpc_primitives.query_toolhead_status(cmd_id) # Track time initial_time = time.time() # Send command. _, (wait_check, _) = await self.send_cmd( rpc_cmd=info_command, cmd_id=cmd_id, wait=True, check=False, timeout=loop_time) # Look for a response. response = self.get_response_by_id(cmd_id) if wait_check and response: try: toolhead_status = response["result"]["status"]["toolhead"] self.parse_toolhead_status(toolhead_status=toolhead_status) except KeyError: logging.warning(f"Missing tool-head status information in response: {response}") else: if not self.dry: logging.warning(f"Failed to obtain the toolhead's status (wait_check={wait_check}, response={response})") # Wait for the actual remaining time (after the previous await). elapsed_time = time.time() - initial_time if elapsed_time <= loop_time: await asyncio.sleep(loop_time - elapsed_time) logging.debug("Controller not running.") except asyncio.exceptions.CancelledError: logging.error("task cancelled.") except Exception as e: msg = f"Unhandled exception: {e}" logging.error(msg) print(msg) raise CommanderError(msg) from e logging.warning("Coroutine ended.") def parse_toolhead_status(self, toolhead_status: dict): """Parse a status update from the toolhead and save the results. See 'query_toolhead_status' in gcode_primitives for details on its content. """ # Parse homing status. try: homed_axes = toolhead_status["homed_axes"] logging.debug(f"Received axis homing status from firmware: {homed_axes}") self.homed_axes = homed_axes except KeyError: logging.error(f"Failed to extract homing status from toolhead status: {toolhead_status}") # Parse XYZ machine limits. try: axis_minimum = toolhead_status["axis_minimum"] axis_maximum = toolhead_status["axis_maximum"] logging.debug(f"Received axis limits from firmware (max: {axis_minimum}) (min: {axis_maximum}).") # Pair up the limits, only the XYZ limits are important for now. limits = [] for i in range(3): # Extract the first 3 (only XYZ components). limits.extend([axis_minimum[i], axis_maximum[i]]) # Make the "machine_limits" dictionary. self.machine_limits = self.controller.builder.gcode.make_limits(*limits) # TODO: Validate and update machine limits in the GcodePrimitives class. except Exception as e: logging.error(f"Failed to extract axis limits ({e}) from toolhead status: {toolhead_status}") # Parse "GCODE position" try: gcode_position = toolhead_status["position"] logging.debug(f"Received GCODE position coordinates from firmware: {gcode_position}") self.gcode_position = gcode_position[:3] # Extract the first 3 (only XYZ components). except Exception as e: logging.error(f"Failed to extract GCODE position ({e}) from toolhead status: {toolhead_status}") async def wait_for_homing_update(self, timeout=2.0): """Wait until the homed axes are updated""" # Early return in dry mode. if self.dry: logging.debug("Dry mode on. Returning all axes.") return "xyzabce" # Reset the homed axes. self.homed_axes = None # Temporary wait function for wait_for below. async def _wait(wait_time): while self.homed_axes is None: await asyncio.sleep(wait_time) # Wait here with a timeout. try: await asyncio.wait_for( _wait(self.controller.wait_default), timeout=timeout ) except (asyncio.exceptions.TimeoutError, asyncio.CancelledError): logging.warning("Timed-out while waiting for a homed-axes query.") return None return self.homed_axes async def activate_toolchanger_axis(self, timeout=1.1): """Activate the tool-changer axis.""" tc_select_gcode = self.controller.builder.gcode.gcodeAtivateToolChanger()[0] logging.debug(f"Selecting the tool-changer axis using: {tc_select_gcode}") cmd, _ = self.controller.machine.make_gcode_cmd(tc_select_gcode) cmd_id, (wait_pass, check_pass) = await self.send_cmd( rpc_cmd=cmd, wait=True, check=True, timeout=timeout) return cmd_id, tuple([wait_pass, check_pass]) # Websocket reconnection coroutine #### ws_ready: bool = False """Flag indicating if the websocket connection to Moonraker is established.""" websocket: ClientConnection = None """Property holding the websocket connection object (or Nonr)""" async def websocket_reconnect(self, ws=None, wait_time=None, ws_open_timeout=3.0): """ This method handles reconnection on disconnect. TODO: try to catch the error when no websocket is available: OSError: Multiple exceptions: [Errno 111] Connect call failed ('::1', 7125, 0, 0), [Errno 111] Connect call failed ('127.0.0.1', 7125) """ logging.info("Coroutine started.") # Set a default wait time for reconnections. if not wait_time: wait_time = self.controller.wait_default*5 # Get the class's websocket if it was not specified. if ws is None: ws = self.websocket try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while self.controller.run and self.ws_address: # New asyncio API in websockets. # https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#websockets.asyncio.client.connect try: async with connect(uri=self.ws_address, open_timeout=ws_open_timeout) as ws: while self.controller.run and self.ws_address: # Attempt to send a message. logging.debug("Websocket defined, awaiting pong.") pong = await ws.ping() latency = await pong logging.debug(f"Connection latency is {latency} seconds") # Save the working websocket object. if not self.websocket: self.websocket = ws # Signal that the websocket is ready. self.ws_ready = True # Wait for a bit before the next ping. await asyncio.sleep(self.controller.wait_default) except ConnectionClosed: # Signal that the websocket is failing. self.ws_ready = False self.websocket = None logging.error("The websocket connection is closed.") continue except ConnectionRefusedError as e: # Signal that the websocket is failing. self.ws_ready = False self.websocket = None msg = f"Connection refused: '{e}'\n" + traceback.format_exc() logging.error(msg) print(msg) # Wait before the next ping or reconnection. logging.warning(f"Waiting for {ws_open_timeout} seconds before reconnecting") await asyncio.sleep(ws_open_timeout) # Loop end. logging.debug("Controller not running, or websocket address unset.") except asyncio.CancelledError: # Update status. self.ws_ready = False # Try clean exit. logging.error("Coroutine cancelled. Attempting clean exit.") await self.close() except Exception as e: # Update status. self.ws_ready = False msg = f"Unhandled exception: {e}" logging.error(msg) print(msg) raise CommanderError(msg) from e logging.warning("Coroutine ended.") # NOTE: moved moon_sio to Controller. # STOP/RESTART methods section #### async def emergency_stop(self): """Send an emergency stop signal to the firmware. See: https://moonraker.readthedocs.io/en/latest/web_api/#emergency-stop """ e_stop = rpc_primitives.emergency_stop(id=42) data = json.dumps(e_stop) if self.websocket is None: logging.error("Failed to send signal, the websocket is not setup.") else: sent = False while not sent: try: await self.websocket.send(data) sent = True except ConnectionClosed: logging.critical("Failed to send signal, the websocket is closed. Retrying.") await asyncio.sleep(self.controller.wait_default) # TODO: consider removing the wait. async def firmware_restart(self): """ See: https://moonraker.readthedocs.io/en/latest/web_api/#firmware-restart """ fw_restart = rpc_primitives.firmware_restart(id=8463) data = json.dumps(fw_restart) if self.websocket is None: logging.error("firmware_restart: failed to send signal, the websocket is not setup.") else: try: await self.websocket.send(data) except ConnectionClosed: logging.critical("firmware_restart: failed to send signal, the websocket is closed.") async def close_websocket(self, wait=True): """Utility function to close the class' websocket connection, and wait it to close.""" if self.ws_address and self.websocket: logging.info(f"Closing websocket connection with wait={wait}") try: logging.debug("Sending close signal to websocket.") # Close the websocket (interrupts recv). await self.websocket.close() if wait: logging.debug("Waiting for websocket close.") # And wait for it. await self.websocket.wait_closed() except Exception as e: msg = f"Exception while closing the websocket: {e}\n" + traceback.format_exc() logging.error(msg) print(msg) return False else: logging.warning("Skipped (neither 'ws_address' or 'websocket' are defined).") return True async def close(self, timeout=3.0): """Close the connection and wait for it to close gracefully.""" logging.info("Closing Machine connection.") try: # Close websocket. await asyncio.wait_for(self.close_websocket(), timeout=timeout) except asyncio.exceptions.TimeoutError: logging.info("KlipperCommander: failed to close in time.") logging.debug("KlipperCommander: websocket connection closed.") async def status(self): """Gather information and compute the overall status of this class. This is consumed, for example, by the sio_status plugin (see make_status). """ # Evaluate the homed status. # TODO: I've changed this to "xyz" only, excluding the extruder. Reconsider. # It seems more relevant to the UI that those are homed, # while the "active extruder" may be a pipette, TC, or norhing relevant. if self.homed_axes is not None: homed = "OK" if all(x in self.homed_axes.lower() for x in "xyz") else "WARN" else: homed = "UNK" # Evaluate the overall status. if self.ws_ready and self.printer_ready and not self.recovering and homed == "OK": # NOTE: This "OK" will be matched by make_status in sio_status.py. status_code = "OK" elif not self.ws_address: status_code = "OFF" else: status_code = "WARN" logging.warning(f"Machine not ready: ws_ready={self.ws_ready} printer_ready={self.printer_ready} recovering={self.recovering} homed={homed} (homed_axes={self.homed_axes})") # Build status data. status = { "klipper": { "status": status_code, # TODO: add printer status. "ready": self.ws_ready, "printer_ready": "OK" if self.printer_ready else "WARN", "homed": homed, "recovering": "WARN" if self.recovering else "OFF", "dry": "WARN" if self.dry else "OFF" } } return status # GCODE methods section #### def make_gcode_cmd(self, gcode_cmd: str, cmd_id: str=None): """Create RPC compatible command from GCODE. Returns the RPC command (a dictionary) and the command ID.""" # Create a cmd_id from md5sum (requires hashlib) if cmd_id is None: cmd_id = self.hash_cmd(gcode_cmd) # Build and send command command = rpc_primitives.gcode_script(gcode_cmd, cmd_id) return command, cmd_id @staticmethod def hash_cmd(gcode_cmd: str): """Generate an MD5 hash from the provided GCODE command. The current time is appended to the command, to make their hashes unique. This function is meant to be used as a generator of unique IDs for RPC commands sent to Klipper. """ # Should be unique by using the current time uid = gcode_cmd + str(time.time()) uid_md5 = md5(uid.encode()) cmd_id = uid_md5.hexdigest() return cmd_id async def send_gcode_cmd(self, gcode_cmd, cmd_id=None, wait=False, check=False, timeout=0.0): """ Make RPC command from the GCODE, send it, and optionally check/wait for it. It wraps "send_cmd" for GCODE. This mechanism is independent of the 'writer' coroutine, and will not wait for idle. """ logging.debug(f"Sending gcode_cmd={gcode_cmd}") # Build RPC command rpc_command, cmd_id = self.make_gcode_cmd(gcode_cmd=gcode_cmd, cmd_id=cmd_id) # Send the command cmd_id, checks = await self.send_cmd(rpc_cmd=rpc_command, cmd_id=cmd_id, wait=wait, check=check, timeout=timeout) return cmd_id, checks async def send_gcode_script(self, gcode_cmds: list, cmd_id=None, wait=False, check=False, timeout=0.0): """Run a list of GCODES as a single script in Klipper. Hopefully will prevent some stuttering. Args: gcode_cmds (list): A list of strings, each a valid GCODE command for Klipper. Returns: str: tracking ID for the script command. """ logging.debug(f"Sending gcode_cmds={gcode_cmds}") # Join multiple GCODE commands with a newline character. Klipper will eventually parse # this with run_script (at "gcode.py") and split it back into the individual commands. gcode_script = "\n".join(gcode_cmds) # Reuse "send_gcode_cmd" cmd_id, checks = await self.send_gcode_cmd(gcode_cmd=gcode_script, cmd_id=cmd_id, wait=wait, check=check, timeout=timeout) return cmd_id, checks def new_cmd_session(self): """Clear all commands in the session list""" logging.debug("Starting new commands session.") if self.session_commands: logging.debug(f"Discarded {len(self.session_commands)} entries.") # Save old commands. old_cmds = self.session_commands # Reset the list. self.session_commands = [] # Return old commands. return old_cmds def cmd_to_session(self, cmd_id): """Append a command ID to the session list""" if self.session_commands is not None: logging.debug(f"Appending {cmd_id} command to session with {len(self.session_commands)} entries.") self.session_commands.append(cmd_id) def end_cmd_session(self): """Delete the list, and disable cmd_to_session.""" self.session_commands = None async def send_cmd(self, rpc_cmd, cmd_id=None, wait=False, check=False, timeout=0.0): """Send RPC command, and optionally check/wait for it. This mechanism is independent of the 'writer' coroutine, and will not wait for idle. """ # Check if the commander is dead before trying to send the command. if self.dry: logging.debug(f"Dry mode enabled, the command with id {cmd_id} will have no effect.") elif not (self.controller.run and self.ws_ready): msg = f"Incoming command can't be sent: commander running = {self.controller.run}; websocket ready = {self.ws_ready}." logging.error(msg) raise CommanderError(msg) # Generate unique command ID if one was not provided. if cmd_id is None: # Get the existing ID in the command or hash it. logging.debug("Replacing None cmd_id with hash of contents.") cmd_id = rpc_cmd.get("id", self.hash_cmd(str(rpc_cmd))) logging.debug(f"Processing command with ID: {cmd_id}") # Add the command to the tracker. if cmd_id in self.tracker: logging.warning(f"The info command ID {cmd_id} is already present in the tracker. It will be replaced.") self.tracker[cmd_id] = {} # Set the command, potentially overwriting the previous # one (if cmd_id was specified and not unique).) self.tracker.update_entry(cmd_id, {"status": "sending", "command": rpc_cmd}) # Uncomment to write all commands to a temporary file. # with open("/tmp/rpc_cmd.log", 'a', encoding='utf-8') as file: # json.dump(rpc_cmd, file) # file.write('\n') # Send the command. if self.dry: # Handle dry mode. logging.debug(f"Dry mode enabled, delaying a bit, and skipping command and its checks: {rpc_cmd}") await asyncio.sleep(self.controller.wait_default) elif not self.websocket: # Handle unconfigured websocket. logging.error(f"The websocket is not setup, failed to send: {rpc_cmd}") else: # Send the command. logging.debug(f"Sending RPC command using wait={wait}, check={check}, timeout={timeout}, and command: {rpc_cmd}") # Append the command to the session. self.cmd_to_session(cmd_id) try: data = json.dumps(rpc_cmd) await self.websocket.send(data) except ConnectionClosed: # Handle connection closed. logging.error("Failed to send command, the websocket is closed.") self.tracker.update_entry(cmd_id, {"status": "not_sent"}) # Early return on connection closed error. return cmd_id, (False, False) else: # Handle success. logging.debug("RPC command sent.") self.tracker.update_entry(cmd_id, {"status": "sent"}) # TODO: Consider raising exceptions here. # Check results are "True" by default when not requested. wait_pass, check_pass = not wait, not check # New status message. new_status = None # Wait for a response from the RPC. if wait and not self.dry: wait_check = await self.wait_for_response(cmd_id=cmd_id, timeout=timeout, loop_delay=0.05) if wait_check: new_status = "responded" wait_pass = True else: new_status = "timeout" wait_pass = False logging.error(f"Failed to wait for a response for cmd_id={cmd_id} with timeout={timeout}") # Run an "ok" check on the response, if received in time. if wait_pass and check and not self.dry: ok_check = await self.check_command_result_ok(cmd_id=cmd_id, timeout=timeout, loop_delay=0.05) if not ok_check: logging.error(f"Failed to check response for cmd_id={cmd_id} with timeout={timeout}") check_pass = False new_status = "error" else: new_status = "ok" check_pass = True if new_status is not None: self.tracker.update_entry(cmd_id, {"status": new_status}) # Dry mode override. if self.dry: logging.debug("Dry mode enabled. Overwritting wait and check flags with 'True' values.") wait_pass, check_pass = True, True # Checks vector checks = tuple([wait_pass, check_pass]) return cmd_id, checks async def gcode_send_set_origin(self): """ Set origin and unlock the machine. Relies on SET_KINEMATIC_POSITION. This mechanism is independent of the 'writer' coroutine, and will not wait for idle. """ set_origin = self.controller.builder.gcode.gcode_set_origin() await self.send_gcode_cmd(gcode_cmd=set_origin[0]) async def gcode_send_dwell(self, delay_milliseconds=0.1): """ Dwell command. See: https://www.klipper3d.org/G-Codes.html#g-code-commands This mechanism is independent of the 'writer' coroutine, and will not wait for idle. """ dwell_cmd = self.controller.builder.gcode.gcodeDwell(seconds=delay_milliseconds/1000) await self.send_gcode_cmd(gcode_cmd=dwell_cmd[0]) async def gcode_send_wait(self): """ Wait for current moves to finish: Add ability to fully stall the input until all moves are complete. See: https://github.com/Klipper3d/klipper/commit/2e03d84755f466adaad64ae0054eb461869d0529 This mechanism is independent of the 'writer' coroutine, and will not wait for idle. """ wait_cmd = self.controller.builder.gcode.gcodeWait() await self.send_gcode_cmd(gcode_cmd=wait_cmd[0]) async def wait_for_free(self, timeout=10.0, loop_delay=0.2): """Wait until the run_rpc_protocol loop idles (i.e. no more commands due).""" if self.dry: logging.debug("Dry mode enabled, waiting briefly and returning.") await asyncio.sleep(self.controller.wait_default) logging.debug("Dry mode enabled, returning success.") return True # Track start time start_time = time.time() elapsed = 0.0 logging.debug(f"Waiting for idle class status: start time {start_time} and timeout {timeout}") while True: # Check if there is no RPC protocol being streamed. if self.free: logging.debug(f"Success! Free to run RPC protocols (start time={start_time} timeout={timeout})") return True # Check for a timeout. if timeout: if elapsed >= timeout: logging.debug(f"Timed-out waiting for idle class status: start time={start_time} timeout={timeout}") return False # Wait for a bit before checking again. await asyncio.sleep(loop_delay) elapsed += loop_delay printer_idle_states = ["Ready", "Idle"] async def wait_for_idle_printer(self, timeout=10.0, loop_delay=0.5): """The 'idle_timeout' object reports the idle state of the printer. AFAIK, "Idle" is set by klipper (klippy/extras/idle_timeout.py) when the "idle timeout" is triggered successfully. Else, the printer may be idling and report a "Ready" state, or in a busy state, and report a "Printing" state. This method is similar to "wait_for_ready", except it will not restart Klipper (it only does queries to "idle"). It is only meant to wait for a "still" printer which is running normally. The "response" to the query can look like this: {'id': 'aa3ba74fd3e3061ca58075924edec8f1', 'jsonrpc': '2.0', 'result': {'eventtime': 62094.128874252, 'status': {'idle_timeout': {'printing_time': 5.842246050997346, 'state': 'Printing'}}}} """ # Early return for dry mode. if self.dry: logging.debug(f"Simulating waiting for idle printer (dry mode on). Returning true in {loop_delay} seconds.") await asyncio.sleep(loop_delay) return True # Track start time start_time = time.time() # Make query command idle_query_cmd = rpc_primitives.query_idle_timeout(id=None) elapsed = 0.0 logging.debug(f"Waiting for idle printer with start_time={start_time}") while True: # Reset the commands unique ID. idle_query_cmd["id"] = self.hash_cmd(str(idle_query_cmd)) # Track iter time last_time = time.time() try: # Send the command andwait for a response. logging.debug(f"Sending query command idle_query_cmd={idle_query_cmd}") cmd_id, _ = await self.send_cmd(rpc_cmd=idle_query_cmd, wait=True, check=False, timeout=timeout) # Get the printer's status from the response. cmd_result = self.get_result_by_id(cmd_id) if cmd_result: status = cmd_result['status']['idle_timeout']["state"] logging.debug(f"Received response with status: {status}") # Check the response's result for printer idle. if status in self.printer_idle_states: logging.debug("Printer now idle.") return True else: logging.debug(f"No response yet for cmd_id {cmd_id}") except KeyError as e: logging.debug(f"Key error when looking for idle state in response: {e}") # Check for a timeout (total elapsed time, all loops). if timeout: elapsed = time.time() - start_time if elapsed >= timeout: logging.debug("Wait for idle printer timed out.") return False # Wait for the actual remaining time (after the previous loop) elapsed_time = time.time() - last_time logging.debug(f"Elapsed time: {elapsed_time}") if elapsed_time < loop_delay: logging.debug(f"Waiting for {loop_delay - elapsed_time} seconds.") await asyncio.sleep(loop_delay - elapsed_time) event_protocol_updates:str = "action_updates" async def update_action_status(self, action: dict, status: str): """ This function triggers an event that notifies other parts of the program about an action whose status has changed. For example, "send_execution_updates" in the "p2g_command" plugin. """ results = await self.controller.trigger_event_callback( event_name=self.event_protocol_updates, action=action, status=status) return results action_execute_paused:str = "paused" action_execute_run:str = "running" action_execute_ok:str = "done" action_execute_error:str = "error" async def run_actions_protocol(self, actions: list, i: int=0, wait:bool=None, check:bool=None, default_action_timeout:float=10.0): """Parses actions from a pipetting protocol, which already contain the corresponding GCODE, and executes them. Args: actions (List): A list of dictionaries, each one an action from a pipetting protocol. i (int): Actual starting index for the actions in the received list. Useful to reference the upstream protocol. wait (bool, optional): Wait for a response to all commands, aborting if it times out. Defaults to None. check (bool, optional): Check each response for an "ok", aborting if it is not found. Defaults to None. default_action_timeout (float, optional): Default timeout for an action to complete its execution. Defaults to 10.0. Returns: List: The same actions from the input, with extra information from runtime. """ logging.debug(f"Received actions protocol with {len(actions)} actions.") # Default to the machine's paranoia status. if wait is None: wait = self.controller.paranoid if check is None: check = self.controller.paranoid # Raise errors if the controller is not ready. self.check_controller() # Check if the machine is free to run new actions. self.check_greenlight() # With the green light, start a new command session. self.new_cmd_session() # Stream the commands one action at a time. try: # Track errors. error = None # Track expected execution time. exec_time = default_action_timeout # Run the actions. for idx, action in enumerate(actions): # TODO: This is a good place to add the "pausing" and "stopping" behaviour. logging.debug(f"Running action {idx} with command {action.get('cmd', None)}.") parsed_action = await self.run_action(action, idx+i, wait, check, default_action_timeout) # Increment time. exec_time += parsed_action.get("exec_opts", {}).get("timeout", default_action_timeout) logging.debug(f"Done with action {idx} with command {action.get('cmd', None)}.") except Exception as e: logging.error(f"Errors while running action {idx} with command {action.get('cmd', None)}" + str(e)) error = e # Ensure idle printer before clearing the command session, unless there are errors already. if error is None: # TODO: The following loop may get stuck if the controller is not running or something else fails. if not await self.wait_for_idle_printer(timeout=exec_time): msg = f"Emergency stop! Printer not idle after total execution time elapsed: {exec_time}" logging.critical(msg) await self.emergency_stop() error = ProtocolError(msg) else: logging.debug("Printer idle, clearing cmd session.") # End the command session. self.end_cmd_session() # Raise any error produced during the session. if error is not None: raise error logging.debug("Actions protocol executed successfully.") return deepcopy(actions) async def run_action(self, action, idx, wait:bool=None, check:bool=None, default_action_timeout:float=10.0): """Run an action's GCODE Converts GCODE from an action into RPC commands, and sends them as an RPC protocol. """ logging.debug(f"Incoming {action['cmd']} action with index {idx}.") # Raise errors if the controller is not ready. self.check_controller(idx, action) # Get GCODE commands. gcode_commands = action["GCODE"] # Remove all comment-only commands. gcode_commands_clean = [cmd for cmd in gcode_commands if not cmd.strip().startswith(";")] # Get execution options. exec_opts = action.get("exec_opts", {}) # Set default execution options. exec_opts.setdefault("wait", wait) exec_opts.setdefault("check", check) exec_opts.setdefault("timeout", default_action_timeout) # Generate RPC commands from GCODE commands. # NOTE: Only the dictionary is kept, and it's ID is unused (hence the "[0]" below). rpc_commands = [self.make_gcode_cmd(cmd)[0] for cmd in gcode_commands_clean] # Save RPC commands to the action dict. action["rpc_commands"] = deepcopy(rpc_commands) # Check if the machine is free to run new actions. self.check_greenlight(action=action) # Set the current action to the controller's attribute. self.current_action = action # Run the commands with a timeout. exception = None try: # Let everyone know whats up. await self.update_action_status(action, self.action_execute_run) logging.debug(f"Executing RPC protocol for {action['cmd']} action with index {idx}.") # Send RPC commands for the current action. # Cancel the coroutine task if the timeout is reached. # See: https://docs.python.org/3/library/asyncio-task.html#asyncio.wait_for await asyncio.wait_for( # NOTE: The action generators and runners must have incremented the default # timeout in "exec_opts" to avoid being interrupted by errors here. self.run_rpc_protocol(rpc_commands=rpc_commands, **exec_opts), timeout=exec_opts["timeout"]) # Handle exceptions. except asyncio.TimeoutError as e: # Send an alert throught the SIO connection to the GUI. msg = f"Time-out error at action with index='{idx}' command='{action['cmd']}', args='{action['args']}'," msg += f" after {exec_opts['timeout']} seconds. Error message: {e}" logging.error(msg) exception = ProtocolError(action=action, cause=e, message=msg) except ProtocolError as e: # Send an alert throught the SIO connection to the GUI. msg = f"Protocol error at action with index='{idx}' command='{action['cmd']}', args='{action['args']}'," msg += f" was raised by rpc_command with id={e.cmd_id} and tracker entry: {self.tracker.get(e.cmd_id)}." msg += f" Error message: {e.message}" logging.error(msg) exception = e # ProtocolError except Exception as e: # Send an alert throught the SIO connection to the GUI. msg = f"Unhandled error at action with index={idx} command={action['cmd']} and args={action['args']}." msg += f" Error message: {str(e)}" logging.error(msg) exception = ProtocolError(action=action, cause=e, message=msg) # Release the current action self.current_action = {} # Save tracker entries (which holds responses and results) before raising errors. action.setdefault("tracker", {}) for cmd_id in [c["id"] for c in rpc_commands]: action["tracker"][cmd_id] = self.get_command_by_id(cmd_id=cmd_id) # TODO: Consider adding a way to alert without aborting, # allowing the user to "fix" the problem without # having to start ALL OVER AGAIN. # Save the return status. if exception is None: action["result"] = "ok" # Let everyone know whats up. await self.update_action_status(action, self.action_execute_ok) logging.debug(f"Action {idx} with {action['cmd']} command executed successfully.") elif isinstance(exception, ProtocolError): # Break the loop if an error occured. action["result"] = "error" # Send an an alert to the controller/socket. self.send_alert(exception.message, alert_type="error") # Let everyone know whats up. await self.update_action_status(action, self.action_execute_error) logging.error(f"Protocol error during {action['cmd']} action execution.") # Raise the protocol error. raise exception else: # Break the loop if an error occured. action["result"] = "error" # Let everyone know whats up. await self.update_action_status(action, self.action_execute_error) # Raise a protocol error from the exception above. msg = f"Error during {action['cmd']} action execution (unhandled exception)." logging.error(msg) raise ProtocolError(msg, action=action) from exception # Return a copy the action for reference. return deepcopy(action) async def run_gcode_protocol(self, gcode_commands: list, wait=True, check=True, timeout=30.0): """Make RPC commands from a list of GCODE commands, and send it through the background writer loop. TODO: Delete this method. It is no longer used, and does not handle a command session. Optionally wait for (and/or check) the return status of each command. True by default, as a protocol should stop on failures. Args: gcode_commands (list): A list of GCODE commands (as strings). wait (bool, optional): Optionally wait for a response, send e-stop signal if it times out. Defaults to False. check (bool, optional): Optionally check each response for an "ok", send e-stop signal if it times out. Defaults to False. timeout (float, optional): Timeout for 'wait' and 'check'. Defaults to 10.0. Returns: list: The list of rpc_commands. """ logging.info(f"Running GCODE protocol using wait={wait}, check={check}, timeout={timeout}") logging.debug("Protocol commands: " + pformat(gcode_commands)) if not self.controller.run: logging.error("Error, the commander is not running. Skipping actions protocol.") return # Generate RPC commands from GCODE commands # NOTE: Only the dictionary is kept, and it's ID is discarded (hence the "[0]"). rpc_commands = [self.make_gcode_cmd(cmd)[0] for cmd in gcode_commands] # Send protocol try: await self.run_rpc_protocol(rpc_commands=rpc_commands, wait=wait, check=check, timeout=timeout) except ProtocolError as e: # Send an alert throught the SIO connection to the GUI. msg = f"Protocol error in gcode protocol with message '{e.message}'.\n" + traceback.format_exc() self.send_alert(msg, alert_type="error") logging.error(msg) return rpc_commands def update_exec_opts(self, action, wait=None, check=None, timeout=None, add_timeout=False): """Override execution options for 'run_rpc_protocol' These values will override the wait, check, and timeout parameters to 'run_rpc_protocol'. """ action.setdefault("exec_opts", {}) if wait is not None: action["exec_opts"]["wait"] = wait if check is not None: action["exec_opts"]["check"] = check if timeout is not None: if add_timeout: action["exec_opts"].setdefault("timeout", 0.0) action["exec_opts"]["timeout"] += timeout else: action["exec_opts"]["timeout"] = timeout return action async def run_rpc_protocol(self, rpc_commands: list, wait=True, check=True, timeout=30.0): """Send a list of RPC commands to Moonraker, corresponding to a particular mid-level action. Optionally wait for (and/or check) the return status of each command. True by default, as a protocol should stop on failures. Args: rpc_commands (list): List of RPC/JSON commands for Moonraker. wait (bool, optional): Optionally wait for a response, send e-stop signal if it times out. Defaults to True. check (bool, optional): Optionally check each response for an "ok", send e-stop signal if it times out. Defaults to True. timeout (float, optional): Timeout for 'wait' and 'check'. Defaults to 30.0. Returns: list: The list of rpc_commands. """ msg = f"Running RPC protocol using wait={wait}, check={check}" msg += f", timeout={timeout} and {len(rpc_commands)} commands." logging.info(msg) logging.debug("Protocol commands:\n" + pformat(rpc_commands)) # Check if the commander is dead before trying. if not self.controller.run or (not self.ws_ready and not self.dry): msg = f"Incoming protocol can't be sent: commander running = {self.controller.run}, websocket ready = {self.ws_ready}." msg += " Commands not sent:\n" + pformat(rpc_commands) logging.error(msg) raise CommanderError(msg) # Wait for machine (printer) ready. logging.debug("Waiting for machine ready...") if not await self.wait_for_ready(wait_time=2.0): raise CommanderError(f"Machine not ready after {2.0}s timeout. RPC protocol run aborted.") # Wait for writer coroutine idle. logging.debug("Waiting for idle writer coroutine...") if not await self.wait_for_free(): # Abort the protcol here if the free flag is not set. raise ProtocolError("Machine not free after timeout. RPC protocol run aborted.") # Flag busy. self.free = False # Stream commands. logging.debug(f"Streaming protocol with {len(rpc_commands)} commands.") try: error = None for command in rpc_commands: # Check for readyness. if not self.printer_ready and not self.dry: raise CommanderError("Machine not ready. RPC protocol run aborted.") # Get the existing ID in the command or hash it now. command.setdefault("id", self.hash_cmd(str(command))) cmd_id = command["id"] # Send RPC commands (for the current action) with a timeout. logging.debug(f"Sending RPC command with ID: {command['id']}") logging.debug(f"Sending RPC command with data: {command}") cmd_id, (wait_pass, check_pass) = await self.send_cmd(rpc_cmd=command, wait=wait, check=check, timeout=timeout) # Check the result. if self.dry: # Skip checks in dry mode. logging.debug(f"Dry mode enabled, will not check output for command: {command}") else: if wait and not wait_pass: # Raise an exception if the command timed out and we wanted to wait for it. msg = f"Timed-out waiting for command: {command}" logging.error(msg) raise ProtocolError(message=msg, cmd_id=cmd_id, wait=wait_pass, check=check_pass) if check and not check_pass: # Raise an exception if the command did not return an 'ok' response and we asked for it. msg = f"Error checking response to command: {command}." # Look for information in the response. response = self.get_response_by_id(cmd_id) if response: msg += " Error message: " + response.get("error", {}).get("message", "< no message found in response >") else: msg += " No response to command." logging.error(msg) raise ProtocolError(message=msg, cmd_id=cmd_id, wait=wait_pass, check=check_pass) # Report final status. logging.debug(f"Done with RPC command {cmd_id}. Checks: 'wait_pass={wait_pass}', 'check_pass={check_pass}' Content: {command}") logging.debug(f"Done with this RPC protocol, {len(rpc_commands)} commands were sent.") except Exception as e: # Raise the error later. error = e # Set idle, allowing others to run. self.free = True # Raise any errors now. if error is not None: raise error # Return the RPC commands. return rpc_commands def save_gcode_protocol(self, gcode_commands, file=None, directory=None, prefix="protocol"): """Save a list of commands to a text (.gcode) file. By default the gcode will be saved to the system's temporary directory, with a unique file name, and '.gcode' file extension. If file is specified, it will be used, and the other arguments are ignored. The 'directory' and 'prefix' are used when 'file' is not set (i.e. 'None'). In this state, the GCODE can be saved to a unique file in a custom directory (e.g. to Klipper's virtual SD card). Args: gcode_commands (List): A list of strings. file (str, optional): Full path to the target text file. Will default to "protocol.gcode" in the OS's temporary directory. directory (str, optional): Path to the target directory for the gcode file. Will default to the OS's temporary directory. prefix (str, optional): Prefix for the gcode file. Returns: str: Full path to the gcode file. """ try: if file is None: _, gcode_path = tempfile.mkstemp(prefix=prefix + "-", suffix=".gcode", dir=directory) file = os.path.join(tempfile.gettempdir(), gcode_path) logging.info(f"Unspecified 'file'. Defaulting to: '{file}'.") with open(file, "w", encoding="utf-8") as f: for gcode_command in gcode_commands: f.write(gcode_command) f.write('\n') logging.info(f"GCODE saved to file: '{file}'.") # if self.verbose: # self.send_alert(text=f"GCODE protocol saved to file: '{file}'.", alert_type="message") except Exception as e: msg = f"An exception occured when saving to file={file} and directory={directory}: '{e}'." logging.error(msg) print(msg) traceback.print_exc() return file def send_alert(self, text: str, alert_type: str="message"): """Send an alert to the main controller. The contents and type of the alert will be handled or relayed by the main controller, and possibly passed to the comms (SIO) module.""" self.controller.send_alert(text=text, alert_type=alert_type) async def get_supported_cmds(self): """GCODE help for Klipper. Some of the Klipper commands are GCODE, others are not. Hence "ungcodes". See: https://moonraker.readthedocs.io/en/latest/web_api/#get-gcode-help """ help_cmd = rpc_primitives.rpc_help(id=4645) cmd_id, _ = await self.send_cmd(rpc_cmd=help_cmd, wait=True, check=True, timeout=3.0) help_result = self.get_result_by_id(cmd_id=cmd_id) return help_result async def test(self): logging.debug("KlipperCommander test") await asyncio.sleep(0.2)A class to drive the Pipettin robot.
It's main function is to run in the background, maintain a socket.io connection with the GUI, and a connection to Moonraker (which in turn drives the CNC machine). It also has several convenience methods to send actions from "pipetting protocols".
The SocketIO client communicates with the Pipetting GUI; a Node JS web-application for designing workspaces and protocols.
This class can (re)connect to Moonraker, send it commands, check responses, and other stuff. The "python-websockets" library is used for this purpose.
This class is started by the "start" method, wich is blocking. The method is meant to be used by a background process. Alternatively, try using "await Controller.launch()" in an asyncio context.
Returns
Controller- Instance of the class.
Ancestors
Class variables
var action_execute_error : strvar action_execute_ok : strvar action_execute_paused : strvar action_execute_run : strvar event_protocol_updates : strvar free-
Used to flag if a protocol is being sent through RPC, and help prevent attempts to send another one simultaneously.
var new_info_flag : bool-
Used by websocket_reader to sleep between reads, and should be quite faster than 'wait_default' (set by the main controller class).
var new_info_response : bool-
Flag indicating if a new status report has arrived (managed by check_printer_ready and wait_for_info_update).
var printer_idle_statesvar printer_ready : bool-
String indicating a 'ready' state of the printer object (see also: printer_state).
var printer_state : str-
String describing the state of the printer object (updated by check_printer_ready, consumed by firmware_watchdog).
var recovering : bool-
Flag indicating if the printer was found in an error state and is being restarted.
var websocket : websockets.asyncio.client.ClientConnection-
Property holding the websocket connection object (or Nonr)
var ws_ready : bool-
Flag indicating if the websocket connection to Moonraker is established.
Static methods
def hash_cmd(gcode_cmd: str)-
Expand source code
@staticmethod def hash_cmd(gcode_cmd: str): """Generate an MD5 hash from the provided GCODE command. The current time is appended to the command, to make their hashes unique. This function is meant to be used as a generator of unique IDs for RPC commands sent to Klipper. """ # Should be unique by using the current time uid = gcode_cmd + str(time.time()) uid_md5 = md5(uid.encode()) cmd_id = uid_md5.hexdigest() return cmd_idGenerate an MD5 hash from the provided GCODE command. The current time is appended to the command, to make their hashes unique. This function is meant to be used as a generator of unique IDs for RPC commands sent to Klipper.
Methods
async def activate_toolchanger_axis(self, timeout=1.1)-
Expand source code
async def activate_toolchanger_axis(self, timeout=1.1): """Activate the tool-changer axis.""" tc_select_gcode = self.controller.builder.gcode.gcodeAtivateToolChanger()[0] logging.debug(f"Selecting the tool-changer axis using: {tc_select_gcode}") cmd, _ = self.controller.machine.make_gcode_cmd(tc_select_gcode) cmd_id, (wait_pass, check_pass) = await self.send_cmd( rpc_cmd=cmd, wait=True, check=True, timeout=timeout) return cmd_id, tuple([wait_pass, check_pass])Activate the tool-changer axis.
def check_cmd_errors(self, query_cmd_ids: list) ‑> list-
Expand source code
def check_cmd_errors(self, query_cmd_ids: list) -> list: """Look for errors in the tracker for the provided cmd ids. The returned list will be empty/falsy if none of the queries commands had errors (yet), or contain the command ids that had errors. """ # First get the relevant tracker entries. entries = [(cmd_id, self.tracker.get(cmd_id, {})) for cmd_id in query_cmd_ids] # Look for errors. error_cmd_ids = [cmd_id for cmd_id, cmd_data in entries if "error" in cmd_data] return error_cmd_idsLook for errors in the tracker for the provided cmd ids. The returned list will be empty/falsy if none of the queries commands had errors (yet), or contain the command ids that had errors.
def check_cmd_session(self)-
Expand source code
def check_cmd_session(self): """Use check_cmd_errors on the current session commands A truthy return value contains a list of command IDs that have errors in their responses. A falsy value means no errors were found. """ if self.session_commands is not None: return self.check_cmd_errors(self.session_commands)Use check_cmd_errors on the current session commands A truthy return value contains a list of command IDs that have errors in their responses. A falsy value means no errors were found.
async def check_command_result_ok(self, cmd_id, timeout=0.0, loop_delay=0.2)-
Expand source code
async def check_command_result_ok(self, cmd_id, timeout=0.0, loop_delay=0.2): """ Wait for an "ok" in the response to a command by its ID. Also returns early if an error was found in the response.""" logging.debug(f"Checking 'ok' response for cmd_id={cmd_id}") elapsed = 0.0 if timeout is None: logging.warning("Infinite time-out for checking on cmd_id=" + str(cmd_id)) while self.controller.run: # Check for an "ok" response try: response = self.tracker.get(str(cmd_id), {}).get("response", {}) result = response.get("result", None) if str(result).startswith("ok"): logging.debug(f"Got 'ok' response for cmd_id={cmd_id}") return True if response.get("error", None): logging.warning(f"Got 'error' response for cmd_id={cmd_id}") return False except (KeyError, AttributeError): pass # TODO: Consider enabling a check on "ws_ready". Otherwise this might get stuck. # if not self.ws_ready and timeout is None: # logging.error("Aborting check on command without timeout. The websocket is not ready.") # return False # Check for a timeout if timeout is not None: if elapsed >= timeout: logging.warning("Timed-out checking for response for cmd_id=" + str(cmd_id)) return None # Wait for a bit await asyncio.sleep(loop_delay) elapsed += loop_delayWait for an "ok" in the response to a command by its ID. Also returns early if an error was found in the response.
def check_controller(self, idx=None, action=None)-
Expand source code
def check_controller(self, idx=None, action=None): """Check if the controller is running, or raise an error.""" if not self.controller.run: msg = "Error, the commander is not running." msg += f" Aborting actions protocol at index {idx}." if idx else "" logging.error(msg) raise CommanderError(msg, action=action)Check if the controller is running, or raise an error.
def check_greenlight(self, action=None)-
Expand source code
def check_greenlight(self, action=None): """Check if the commander is free. Raises an error if a current action is already being processed. """ if self.current_action: raise ProtocolError( "Failed to run action; another one is being processed by the controller.", action=action )Check if the commander is free. Raises an error if a current action is already being processed.
def check_printer_ready(self, response, info_id_prefix='websocket_info')-
Expand source code
def check_printer_ready(self, response, info_id_prefix="websocket_info"): """ Checks if a response contains a result->state equal to "ready". Raises an exception if self.ensure_printer_ready is True. Context: called by websocket_reader when a response is received. """ logging.debug("Checking for " + info_id_prefix + " response.") try: if str(response["id"]).startswith(info_id_prefix): # Update state. self.printer_state = response["result"]["state"] # Check if state is "ready". self.printer_ready = self.printer_state == 'ready' # Release "wait_for_info_update". self.new_info_response = True # Handle "not ready" state. if not self.printer_ready: # Stop the coroutines if ready must be ensured. msg = f"Printer in state='{self.printer_state}' is not ready." if self.ensure_printer_ready: msg += " Setting run=False (due to ensure_printer_ready=True)." logging.error(msg) self.controller.run = False raise NotReadyError(msg) else: logging.debug("Printer ready!") else: logging.debug(f"Command with id={str(response['id'])} does not look like a status report with info_id_prefix={info_id_prefix}") except KeyError as e: logging.debug(f"Could not check message due to KeyError: {e}") except NotReadyError as e: logging.debug(f"NotReadyError with message: {e}") return self.printer_stateChecks if a response contains a result->state equal to "ready". Raises an exception if self.ensure_printer_ready is True. Context: called by websocket_reader when a response is received.
def check_tracker_scripts(self, tracker=None)-
Expand source code
def check_tracker_scripts(self, tracker=None): """ Check for "not ok" responses in "script" (GCODE) entries of the tracker dict. Entries with no response (yet) are ignored. TODO: Remove this method, it is unused. """ if not tracker: tracker = self.tracker logging.debug("Parsing all tracker entries of GCODE scripts for errors.") for item in tracker: # Skip the item if it does not match a GCODE command try: if item["command"]["method"] != 'printer.gcode.script': continue except KeyError: continue # Look for a response and a result try: # Positive results can be "ok" or "ok withsomethingelsehere..." if not str(item["response"]["result"]).startswith("ok"): # If the above key si found and is not "ok", # then something went wrong. return False except KeyError: # If the keys were not found, # The response or the result are missing, # and they should be waited for. return False # The function gets here if everything went well. return TrueCheck for "not ok" responses in "script" (GCODE) entries of the tracker dict. Entries with no response (yet) are ignored. TODO: Remove this method, it is unused.
async def close(self, timeout=3.0)-
Expand source code
async def close(self, timeout=3.0): """Close the connection and wait for it to close gracefully.""" logging.info("Closing Machine connection.") try: # Close websocket. await asyncio.wait_for(self.close_websocket(), timeout=timeout) except asyncio.exceptions.TimeoutError: logging.info("KlipperCommander: failed to close in time.") logging.debug("KlipperCommander: websocket connection closed.")Close the connection and wait for it to close gracefully.
async def close_websocket(self, wait=True)-
Expand source code
async def close_websocket(self, wait=True): """Utility function to close the class' websocket connection, and wait it to close.""" if self.ws_address and self.websocket: logging.info(f"Closing websocket connection with wait={wait}") try: logging.debug("Sending close signal to websocket.") # Close the websocket (interrupts recv). await self.websocket.close() if wait: logging.debug("Waiting for websocket close.") # And wait for it. await self.websocket.wait_closed() except Exception as e: msg = f"Exception while closing the websocket: {e}\n" + traceback.format_exc() logging.error(msg) print(msg) return False else: logging.warning("Skipped (neither 'ws_address' or 'websocket' are defined).") return TrueUtility function to close the class' websocket connection, and wait it to close.
def cmd_to_session(self, cmd_id)-
Expand source code
def cmd_to_session(self, cmd_id): """Append a command ID to the session list""" if self.session_commands is not None: logging.debug(f"Appending {cmd_id} command to session with {len(self.session_commands)} entries.") self.session_commands.append(cmd_id)Append a command ID to the session list
async def emergency_stop(self)-
Expand source code
async def emergency_stop(self): """Send an emergency stop signal to the firmware. See: https://moonraker.readthedocs.io/en/latest/web_api/#emergency-stop """ e_stop = rpc_primitives.emergency_stop(id=42) data = json.dumps(e_stop) if self.websocket is None: logging.error("Failed to send signal, the websocket is not setup.") else: sent = False while not sent: try: await self.websocket.send(data) sent = True except ConnectionClosed: logging.critical("Failed to send signal, the websocket is closed. Retrying.") await asyncio.sleep(self.controller.wait_default) # TODO: consider removing the wait.Send an emergency stop signal to the firmware. See: https://moonraker.readthedocs.io/en/latest/web_api/#emergency-stop
def end_cmd_session(self)-
Expand source code
def end_cmd_session(self): """Delete the list, and disable cmd_to_session.""" self.session_commands = NoneDelete the list, and disable cmd_to_session.
def errors_in_tracker(self)-
Expand source code
def errors_in_tracker(self): """Check for an error in the all responses of the tracker dict. TODO: Remove this method, it is unused. """ logging.debug("Parsing all tracker entries for errors in responses.") for item in self.tracker: if "response" in item: if "error" in item["response"]: # An error message was found in the response. logging.error(f"Error found in response to tracker item:\n{item}") return item logging.debug("No errors found in tracker responses.")Check for an error in the all responses of the tracker dict. TODO: Remove this method, it is unused.
async def firmware_restart(self)-
Expand source code
async def firmware_restart(self): """ See: https://moonraker.readthedocs.io/en/latest/web_api/#firmware-restart """ fw_restart = rpc_primitives.firmware_restart(id=8463) data = json.dumps(fw_restart) if self.websocket is None: logging.error("firmware_restart: failed to send signal, the websocket is not setup.") else: try: await self.websocket.send(data) except ConnectionClosed: logging.critical("firmware_restart: failed to send signal, the websocket is closed.") async def firmware_watchdog(self, min_wait_for_restart=3.0)-
Expand source code
async def firmware_watchdog(self, min_wait_for_restart=3.0): """Firmware restart coroutine. Will send a "FIRMWARE_RESTART" command if the printer is found in a "shutdown" state. """ logging.info("Coroutine started.") try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while not self.ws_ready and self.controller.run: # Wait here until the websocket is ready. await asyncio.sleep(self.controller.wait_default) self.recovering = False while self.controller.run: # Wait for a new info update. if await self.wait_for_info_update(timeout=4.0): # If shutdown, send a firmware restart. if self.printer_state in ['shutdown', 'error']: if self.auto_restart: logging.warning(f"{self.printer_state} state found, restarting firmware and waiting for {min_wait_for_restart} seconds.") await self.firmware_restart() else: logging.critical(f"{self.printer_state} state found, you need to manually restart the firmware.") # Set the recovering flag in either case. self.recovering = True elif self.recovering and self.printer_state != 'startup': logging.info(f"Recovery successful, printer state: {self.printer_state}") self.recovering = False else: logging.warning(f"Info update timeout, retrying in {min_wait_for_restart} seconds.") # Wait for a bit await asyncio.sleep(min_wait_for_restart) logging.debug("Controller not running.") except asyncio.exceptions.CancelledError: logging.warning("Coroutine cancelled.") except Exception as e: msg = f"Unhandled exception: {e}" logging.error(msg) print(msg) raise CommanderError(msg) from e logging.warning("Coroutine ended.")Firmware restart coroutine.
Will send a "FIRMWARE_RESTART" command if the printer is found in a "shutdown" state.
async def gcode_send_dwell(self, delay_milliseconds=0.1)-
Expand source code
async def gcode_send_dwell(self, delay_milliseconds=0.1): """ Dwell command. See: https://www.klipper3d.org/G-Codes.html#g-code-commands This mechanism is independent of the 'writer' coroutine, and will not wait for idle. """ dwell_cmd = self.controller.builder.gcode.gcodeDwell(seconds=delay_milliseconds/1000) await self.send_gcode_cmd(gcode_cmd=dwell_cmd[0])Dwell command. See: https://www.klipper3d.org/G-Codes.html#g-code-commands This mechanism is independent of the 'writer' coroutine, and will not wait for idle.
async def gcode_send_set_origin(self)-
Expand source code
async def gcode_send_set_origin(self): """ Set origin and unlock the machine. Relies on SET_KINEMATIC_POSITION. This mechanism is independent of the 'writer' coroutine, and will not wait for idle. """ set_origin = self.controller.builder.gcode.gcode_set_origin() await self.send_gcode_cmd(gcode_cmd=set_origin[0])Set origin and unlock the machine. Relies on SET_KINEMATIC_POSITION. This mechanism is independent of the 'writer' coroutine, and will not wait for idle.
async def gcode_send_wait(self)-
Expand source code
async def gcode_send_wait(self): """ Wait for current moves to finish: Add ability to fully stall the input until all moves are complete. See: https://github.com/Klipper3d/klipper/commit/2e03d84755f466adaad64ae0054eb461869d0529 This mechanism is independent of the 'writer' coroutine, and will not wait for idle. """ wait_cmd = self.controller.builder.gcode.gcodeWait() await self.send_gcode_cmd(gcode_cmd=wait_cmd[0])Wait for current moves to finish: Add ability to fully stall the input until all moves are complete. See: https://github.com/Klipper3d/klipper/commit/2e03d84755f466adaad64ae0054eb461869d0529 This mechanism is independent of the 'writer' coroutine, and will not wait for idle.
async def get_supported_cmds(self)-
Expand source code
async def get_supported_cmds(self): """GCODE help for Klipper. Some of the Klipper commands are GCODE, others are not. Hence "ungcodes". See: https://moonraker.readthedocs.io/en/latest/web_api/#get-gcode-help """ help_cmd = rpc_primitives.rpc_help(id=4645) cmd_id, _ = await self.send_cmd(rpc_cmd=help_cmd, wait=True, check=True, timeout=3.0) help_result = self.get_result_by_id(cmd_id=cmd_id) return help_resultGCODE help for Klipper.
Some of the Klipper commands are GCODE, others are not. Hence "ungcodes".
See: https://moonraker.readthedocs.io/en/latest/web_api/#get-gcode-help
def make_gcode_cmd(self, gcode_cmd: str, cmd_id: str = None)-
Expand source code
def make_gcode_cmd(self, gcode_cmd: str, cmd_id: str=None): """Create RPC compatible command from GCODE. Returns the RPC command (a dictionary) and the command ID.""" # Create a cmd_id from md5sum (requires hashlib) if cmd_id is None: cmd_id = self.hash_cmd(gcode_cmd) # Build and send command command = rpc_primitives.gcode_script(gcode_cmd, cmd_id) return command, cmd_idCreate RPC compatible command from GCODE.
Returns the RPC command (a dictionary) and the command ID.
def new_cmd_session(self)-
Expand source code
def new_cmd_session(self): """Clear all commands in the session list""" logging.debug("Starting new commands session.") if self.session_commands: logging.debug(f"Discarded {len(self.session_commands)} entries.") # Save old commands. old_cmds = self.session_commands # Reset the list. self.session_commands = [] # Return old commands. return old_cmdsClear all commands in the session list
def parse_toolhead_status(self, toolhead_status: dict)-
Expand source code
def parse_toolhead_status(self, toolhead_status: dict): """Parse a status update from the toolhead and save the results. See 'query_toolhead_status' in gcode_primitives for details on its content. """ # Parse homing status. try: homed_axes = toolhead_status["homed_axes"] logging.debug(f"Received axis homing status from firmware: {homed_axes}") self.homed_axes = homed_axes except KeyError: logging.error(f"Failed to extract homing status from toolhead status: {toolhead_status}") # Parse XYZ machine limits. try: axis_minimum = toolhead_status["axis_minimum"] axis_maximum = toolhead_status["axis_maximum"] logging.debug(f"Received axis limits from firmware (max: {axis_minimum}) (min: {axis_maximum}).") # Pair up the limits, only the XYZ limits are important for now. limits = [] for i in range(3): # Extract the first 3 (only XYZ components). limits.extend([axis_minimum[i], axis_maximum[i]]) # Make the "machine_limits" dictionary. self.machine_limits = self.controller.builder.gcode.make_limits(*limits) # TODO: Validate and update machine limits in the GcodePrimitives class. except Exception as e: logging.error(f"Failed to extract axis limits ({e}) from toolhead status: {toolhead_status}") # Parse "GCODE position" try: gcode_position = toolhead_status["position"] logging.debug(f"Received GCODE position coordinates from firmware: {gcode_position}") self.gcode_position = gcode_position[:3] # Extract the first 3 (only XYZ components). except Exception as e: logging.error(f"Failed to extract GCODE position ({e}) from toolhead status: {toolhead_status}")Parse a status update from the toolhead and save the results. See 'query_toolhead_status' in gcode_primitives for details on its content.
async def query_toolhead_status(self, cmd_id_prefix='homed_axes', loop_time=0.5)-
Expand source code
async def query_toolhead_status(self, cmd_id_prefix="homed_axes", loop_time=0.5): """ Tooolhead info request coroutine, including homed axes. Periodically sends "printer.objects.query" requests to Moonraker. NOTE: 0.2 seconds is too fast. """ logging.info("Coroutine started.") try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while not self.ws_ready and self.controller.run: # Wait here until the websocket is ready. await asyncio.sleep(self.controller.wait_default) # Loop while running. while self.controller.run: # Check spam flag or wait and skip iteration if not self.spam_info: logging.warning("Waiting until spam_info is set.") await asyncio.sleep(2.0) continue # Wait here until the websocket is ready. while not self.ws_ready and self.controller.run: logging.warning("Waiting until ws_ready and controller.run are set.") await asyncio.sleep(2.0) # Increment ID counter cid = int(self.cid) self.cid = self.cid + 1 # Build command: cmd_id = f"{cmd_id_prefix}_{cid}_{time.time()}" # Generate "{"jsonrpc": "2.0", "method": "printer.info", "id": cmd_id}" # info_command = {"jsonrpc": "2.0", "method": "printer.info", "id": 7} info_command = rpc_primitives.query_toolhead_status(cmd_id) # Track time initial_time = time.time() # Send command. _, (wait_check, _) = await self.send_cmd( rpc_cmd=info_command, cmd_id=cmd_id, wait=True, check=False, timeout=loop_time) # Look for a response. response = self.get_response_by_id(cmd_id) if wait_check and response: try: toolhead_status = response["result"]["status"]["toolhead"] self.parse_toolhead_status(toolhead_status=toolhead_status) except KeyError: logging.warning(f"Missing tool-head status information in response: {response}") else: if not self.dry: logging.warning(f"Failed to obtain the toolhead's status (wait_check={wait_check}, response={response})") # Wait for the actual remaining time (after the previous await). elapsed_time = time.time() - initial_time if elapsed_time <= loop_time: await asyncio.sleep(loop_time - elapsed_time) logging.debug("Controller not running.") except asyncio.exceptions.CancelledError: logging.error("task cancelled.") except Exception as e: msg = f"Unhandled exception: {e}" logging.error(msg) print(msg) raise CommanderError(msg) from e logging.warning("Coroutine ended.")Tooolhead info request coroutine, including homed axes. Periodically sends "printer.objects.query" requests to Moonraker. NOTE: 0.2 seconds is too fast.
async def run_action(self,
action,
idx,
wait: bool = None,
check: bool = None,
default_action_timeout: float = 10.0)-
Expand source code
async def run_action(self, action, idx, wait:bool=None, check:bool=None, default_action_timeout:float=10.0): """Run an action's GCODE Converts GCODE from an action into RPC commands, and sends them as an RPC protocol. """ logging.debug(f"Incoming {action['cmd']} action with index {idx}.") # Raise errors if the controller is not ready. self.check_controller(idx, action) # Get GCODE commands. gcode_commands = action["GCODE"] # Remove all comment-only commands. gcode_commands_clean = [cmd for cmd in gcode_commands if not cmd.strip().startswith(";")] # Get execution options. exec_opts = action.get("exec_opts", {}) # Set default execution options. exec_opts.setdefault("wait", wait) exec_opts.setdefault("check", check) exec_opts.setdefault("timeout", default_action_timeout) # Generate RPC commands from GCODE commands. # NOTE: Only the dictionary is kept, and it's ID is unused (hence the "[0]" below). rpc_commands = [self.make_gcode_cmd(cmd)[0] for cmd in gcode_commands_clean] # Save RPC commands to the action dict. action["rpc_commands"] = deepcopy(rpc_commands) # Check if the machine is free to run new actions. self.check_greenlight(action=action) # Set the current action to the controller's attribute. self.current_action = action # Run the commands with a timeout. exception = None try: # Let everyone know whats up. await self.update_action_status(action, self.action_execute_run) logging.debug(f"Executing RPC protocol for {action['cmd']} action with index {idx}.") # Send RPC commands for the current action. # Cancel the coroutine task if the timeout is reached. # See: https://docs.python.org/3/library/asyncio-task.html#asyncio.wait_for await asyncio.wait_for( # NOTE: The action generators and runners must have incremented the default # timeout in "exec_opts" to avoid being interrupted by errors here. self.run_rpc_protocol(rpc_commands=rpc_commands, **exec_opts), timeout=exec_opts["timeout"]) # Handle exceptions. except asyncio.TimeoutError as e: # Send an alert throught the SIO connection to the GUI. msg = f"Time-out error at action with index='{idx}' command='{action['cmd']}', args='{action['args']}'," msg += f" after {exec_opts['timeout']} seconds. Error message: {e}" logging.error(msg) exception = ProtocolError(action=action, cause=e, message=msg) except ProtocolError as e: # Send an alert throught the SIO connection to the GUI. msg = f"Protocol error at action with index='{idx}' command='{action['cmd']}', args='{action['args']}'," msg += f" was raised by rpc_command with id={e.cmd_id} and tracker entry: {self.tracker.get(e.cmd_id)}." msg += f" Error message: {e.message}" logging.error(msg) exception = e # ProtocolError except Exception as e: # Send an alert throught the SIO connection to the GUI. msg = f"Unhandled error at action with index={idx} command={action['cmd']} and args={action['args']}." msg += f" Error message: {str(e)}" logging.error(msg) exception = ProtocolError(action=action, cause=e, message=msg) # Release the current action self.current_action = {} # Save tracker entries (which holds responses and results) before raising errors. action.setdefault("tracker", {}) for cmd_id in [c["id"] for c in rpc_commands]: action["tracker"][cmd_id] = self.get_command_by_id(cmd_id=cmd_id) # TODO: Consider adding a way to alert without aborting, # allowing the user to "fix" the problem without # having to start ALL OVER AGAIN. # Save the return status. if exception is None: action["result"] = "ok" # Let everyone know whats up. await self.update_action_status(action, self.action_execute_ok) logging.debug(f"Action {idx} with {action['cmd']} command executed successfully.") elif isinstance(exception, ProtocolError): # Break the loop if an error occured. action["result"] = "error" # Send an an alert to the controller/socket. self.send_alert(exception.message, alert_type="error") # Let everyone know whats up. await self.update_action_status(action, self.action_execute_error) logging.error(f"Protocol error during {action['cmd']} action execution.") # Raise the protocol error. raise exception else: # Break the loop if an error occured. action["result"] = "error" # Let everyone know whats up. await self.update_action_status(action, self.action_execute_error) # Raise a protocol error from the exception above. msg = f"Error during {action['cmd']} action execution (unhandled exception)." logging.error(msg) raise ProtocolError(msg, action=action) from exception # Return a copy the action for reference. return deepcopy(action)Run an action's GCODE Converts GCODE from an action into RPC commands, and sends them as an RPC protocol.
async def run_actions_protocol(self,
actions: list,
i: int = 0,
wait: bool = None,
check: bool = None,
default_action_timeout: float = 10.0)-
Expand source code
async def run_actions_protocol(self, actions: list, i: int=0, wait:bool=None, check:bool=None, default_action_timeout:float=10.0): """Parses actions from a pipetting protocol, which already contain the corresponding GCODE, and executes them. Args: actions (List): A list of dictionaries, each one an action from a pipetting protocol. i (int): Actual starting index for the actions in the received list. Useful to reference the upstream protocol. wait (bool, optional): Wait for a response to all commands, aborting if it times out. Defaults to None. check (bool, optional): Check each response for an "ok", aborting if it is not found. Defaults to None. default_action_timeout (float, optional): Default timeout for an action to complete its execution. Defaults to 10.0. Returns: List: The same actions from the input, with extra information from runtime. """ logging.debug(f"Received actions protocol with {len(actions)} actions.") # Default to the machine's paranoia status. if wait is None: wait = self.controller.paranoid if check is None: check = self.controller.paranoid # Raise errors if the controller is not ready. self.check_controller() # Check if the machine is free to run new actions. self.check_greenlight() # With the green light, start a new command session. self.new_cmd_session() # Stream the commands one action at a time. try: # Track errors. error = None # Track expected execution time. exec_time = default_action_timeout # Run the actions. for idx, action in enumerate(actions): # TODO: This is a good place to add the "pausing" and "stopping" behaviour. logging.debug(f"Running action {idx} with command {action.get('cmd', None)}.") parsed_action = await self.run_action(action, idx+i, wait, check, default_action_timeout) # Increment time. exec_time += parsed_action.get("exec_opts", {}).get("timeout", default_action_timeout) logging.debug(f"Done with action {idx} with command {action.get('cmd', None)}.") except Exception as e: logging.error(f"Errors while running action {idx} with command {action.get('cmd', None)}" + str(e)) error = e # Ensure idle printer before clearing the command session, unless there are errors already. if error is None: # TODO: The following loop may get stuck if the controller is not running or something else fails. if not await self.wait_for_idle_printer(timeout=exec_time): msg = f"Emergency stop! Printer not idle after total execution time elapsed: {exec_time}" logging.critical(msg) await self.emergency_stop() error = ProtocolError(msg) else: logging.debug("Printer idle, clearing cmd session.") # End the command session. self.end_cmd_session() # Raise any error produced during the session. if error is not None: raise error logging.debug("Actions protocol executed successfully.") return deepcopy(actions)Parses actions from a pipetting protocol, which already contain the corresponding GCODE, and executes them.
Args
actions:List- A list of dictionaries, each one an action from a pipetting protocol.
i:int- Actual starting index for the actions in the received list. Useful to reference the upstream protocol.
wait:bool, optional- Wait for a response to all commands, aborting if it times out. Defaults to None.
check:bool, optional- Check each response for an "ok", aborting if it is not found. Defaults to None.
default_action_timeout:float, optional- Default timeout for an action to complete its execution. Defaults to 10.0.
Returns
List- The same actions from the input, with extra information from runtime.
async def run_gcode_protocol(self, gcode_commands: list, wait=True, check=True, timeout=30.0)-
Expand source code
async def run_gcode_protocol(self, gcode_commands: list, wait=True, check=True, timeout=30.0): """Make RPC commands from a list of GCODE commands, and send it through the background writer loop. TODO: Delete this method. It is no longer used, and does not handle a command session. Optionally wait for (and/or check) the return status of each command. True by default, as a protocol should stop on failures. Args: gcode_commands (list): A list of GCODE commands (as strings). wait (bool, optional): Optionally wait for a response, send e-stop signal if it times out. Defaults to False. check (bool, optional): Optionally check each response for an "ok", send e-stop signal if it times out. Defaults to False. timeout (float, optional): Timeout for 'wait' and 'check'. Defaults to 10.0. Returns: list: The list of rpc_commands. """ logging.info(f"Running GCODE protocol using wait={wait}, check={check}, timeout={timeout}") logging.debug("Protocol commands: " + pformat(gcode_commands)) if not self.controller.run: logging.error("Error, the commander is not running. Skipping actions protocol.") return # Generate RPC commands from GCODE commands # NOTE: Only the dictionary is kept, and it's ID is discarded (hence the "[0]"). rpc_commands = [self.make_gcode_cmd(cmd)[0] for cmd in gcode_commands] # Send protocol try: await self.run_rpc_protocol(rpc_commands=rpc_commands, wait=wait, check=check, timeout=timeout) except ProtocolError as e: # Send an alert throught the SIO connection to the GUI. msg = f"Protocol error in gcode protocol with message '{e.message}'.\n" + traceback.format_exc() self.send_alert(msg, alert_type="error") logging.error(msg) return rpc_commandsMake RPC commands from a list of GCODE commands, and send it through the background writer loop.
TODO: Delete this method. It is no longer used, and does not handle a command session.
Optionally wait for (and/or check) the return status of each command. True by default, as a protocol should stop on failures.
Args
gcode_commands:list- A list of GCODE commands (as strings).
wait:bool, optional- Optionally wait for a response, send e-stop signal if it times out. Defaults to False.
check:bool, optional- Optionally check each response for an "ok", send e-stop signal if it times out. Defaults to False.
timeout:float, optional- Timeout for 'wait' and 'check'. Defaults to 10.0.
Returns
list- The list of rpc_commands.
async def run_rpc_protocol(self, rpc_commands: list, wait=True, check=True, timeout=30.0)-
Expand source code
async def run_rpc_protocol(self, rpc_commands: list, wait=True, check=True, timeout=30.0): """Send a list of RPC commands to Moonraker, corresponding to a particular mid-level action. Optionally wait for (and/or check) the return status of each command. True by default, as a protocol should stop on failures. Args: rpc_commands (list): List of RPC/JSON commands for Moonraker. wait (bool, optional): Optionally wait for a response, send e-stop signal if it times out. Defaults to True. check (bool, optional): Optionally check each response for an "ok", send e-stop signal if it times out. Defaults to True. timeout (float, optional): Timeout for 'wait' and 'check'. Defaults to 30.0. Returns: list: The list of rpc_commands. """ msg = f"Running RPC protocol using wait={wait}, check={check}" msg += f", timeout={timeout} and {len(rpc_commands)} commands." logging.info(msg) logging.debug("Protocol commands:\n" + pformat(rpc_commands)) # Check if the commander is dead before trying. if not self.controller.run or (not self.ws_ready and not self.dry): msg = f"Incoming protocol can't be sent: commander running = {self.controller.run}, websocket ready = {self.ws_ready}." msg += " Commands not sent:\n" + pformat(rpc_commands) logging.error(msg) raise CommanderError(msg) # Wait for machine (printer) ready. logging.debug("Waiting for machine ready...") if not await self.wait_for_ready(wait_time=2.0): raise CommanderError(f"Machine not ready after {2.0}s timeout. RPC protocol run aborted.") # Wait for writer coroutine idle. logging.debug("Waiting for idle writer coroutine...") if not await self.wait_for_free(): # Abort the protcol here if the free flag is not set. raise ProtocolError("Machine not free after timeout. RPC protocol run aborted.") # Flag busy. self.free = False # Stream commands. logging.debug(f"Streaming protocol with {len(rpc_commands)} commands.") try: error = None for command in rpc_commands: # Check for readyness. if not self.printer_ready and not self.dry: raise CommanderError("Machine not ready. RPC protocol run aborted.") # Get the existing ID in the command or hash it now. command.setdefault("id", self.hash_cmd(str(command))) cmd_id = command["id"] # Send RPC commands (for the current action) with a timeout. logging.debug(f"Sending RPC command with ID: {command['id']}") logging.debug(f"Sending RPC command with data: {command}") cmd_id, (wait_pass, check_pass) = await self.send_cmd(rpc_cmd=command, wait=wait, check=check, timeout=timeout) # Check the result. if self.dry: # Skip checks in dry mode. logging.debug(f"Dry mode enabled, will not check output for command: {command}") else: if wait and not wait_pass: # Raise an exception if the command timed out and we wanted to wait for it. msg = f"Timed-out waiting for command: {command}" logging.error(msg) raise ProtocolError(message=msg, cmd_id=cmd_id, wait=wait_pass, check=check_pass) if check and not check_pass: # Raise an exception if the command did not return an 'ok' response and we asked for it. msg = f"Error checking response to command: {command}." # Look for information in the response. response = self.get_response_by_id(cmd_id) if response: msg += " Error message: " + response.get("error", {}).get("message", "< no message found in response >") else: msg += " No response to command." logging.error(msg) raise ProtocolError(message=msg, cmd_id=cmd_id, wait=wait_pass, check=check_pass) # Report final status. logging.debug(f"Done with RPC command {cmd_id}. Checks: 'wait_pass={wait_pass}', 'check_pass={check_pass}' Content: {command}") logging.debug(f"Done with this RPC protocol, {len(rpc_commands)} commands were sent.") except Exception as e: # Raise the error later. error = e # Set idle, allowing others to run. self.free = True # Raise any errors now. if error is not None: raise error # Return the RPC commands. return rpc_commandsSend a list of RPC commands to Moonraker, corresponding to a particular mid-level action.
Optionally wait for (and/or check) the return status of each command. True by default, as a protocol should stop on failures.
Args
rpc_commands:list- List of RPC/JSON commands for Moonraker.
wait:bool, optional- Optionally wait for a response, send e-stop signal if it times out. Defaults to True.
check:bool, optional- Optionally check each response for an "ok", send e-stop signal if it times out. Defaults to True.
timeout:float, optional- Timeout for 'wait' and 'check'. Defaults to 30.0.
Returns
list- The list of rpc_commands.
def save_gcode_protocol(self, gcode_commands, file=None, directory=None, prefix='protocol')-
Expand source code
def save_gcode_protocol(self, gcode_commands, file=None, directory=None, prefix="protocol"): """Save a list of commands to a text (.gcode) file. By default the gcode will be saved to the system's temporary directory, with a unique file name, and '.gcode' file extension. If file is specified, it will be used, and the other arguments are ignored. The 'directory' and 'prefix' are used when 'file' is not set (i.e. 'None'). In this state, the GCODE can be saved to a unique file in a custom directory (e.g. to Klipper's virtual SD card). Args: gcode_commands (List): A list of strings. file (str, optional): Full path to the target text file. Will default to "protocol.gcode" in the OS's temporary directory. directory (str, optional): Path to the target directory for the gcode file. Will default to the OS's temporary directory. prefix (str, optional): Prefix for the gcode file. Returns: str: Full path to the gcode file. """ try: if file is None: _, gcode_path = tempfile.mkstemp(prefix=prefix + "-", suffix=".gcode", dir=directory) file = os.path.join(tempfile.gettempdir(), gcode_path) logging.info(f"Unspecified 'file'. Defaulting to: '{file}'.") with open(file, "w", encoding="utf-8") as f: for gcode_command in gcode_commands: f.write(gcode_command) f.write('\n') logging.info(f"GCODE saved to file: '{file}'.") # if self.verbose: # self.send_alert(text=f"GCODE protocol saved to file: '{file}'.", alert_type="message") except Exception as e: msg = f"An exception occured when saving to file={file} and directory={directory}: '{e}'." logging.error(msg) print(msg) traceback.print_exc() return fileSave a list of commands to a text (.gcode) file.
By default the gcode will be saved to the system's temporary directory, with a unique file name, and '.gcode' file extension.
If file is specified, it will be used, and the other arguments are ignored.
The 'directory' and 'prefix' are used when 'file' is not set (i.e. 'None'). In this state, the GCODE can be saved to a unique file in a custom directory (e.g. to Klipper's virtual SD card).
Args
gcode_commands:List- A list of strings.
file:str, optional- Full path to the target text file. Will default to "protocol.gcode" in the OS's temporary directory.
directory:str, optional- Path to the target directory for the gcode file. Will default to the OS's temporary directory.
prefix:str, optional- Prefix for the gcode file.
Returns
str- Full path to the gcode file.
def send_alert(self, text: str, alert_type: str = 'message')-
Expand source code
def send_alert(self, text: str, alert_type: str="message"): """Send an alert to the main controller. The contents and type of the alert will be handled or relayed by the main controller, and possibly passed to the comms (SIO) module.""" self.controller.send_alert(text=text, alert_type=alert_type)Send an alert to the main controller. The contents and type of the alert will be handled or relayed by the main controller, and possibly passed to the comms (SIO) module.
async def send_cmd(self, rpc_cmd, cmd_id=None, wait=False, check=False, timeout=0.0)-
Expand source code
async def send_cmd(self, rpc_cmd, cmd_id=None, wait=False, check=False, timeout=0.0): """Send RPC command, and optionally check/wait for it. This mechanism is independent of the 'writer' coroutine, and will not wait for idle. """ # Check if the commander is dead before trying to send the command. if self.dry: logging.debug(f"Dry mode enabled, the command with id {cmd_id} will have no effect.") elif not (self.controller.run and self.ws_ready): msg = f"Incoming command can't be sent: commander running = {self.controller.run}; websocket ready = {self.ws_ready}." logging.error(msg) raise CommanderError(msg) # Generate unique command ID if one was not provided. if cmd_id is None: # Get the existing ID in the command or hash it. logging.debug("Replacing None cmd_id with hash of contents.") cmd_id = rpc_cmd.get("id", self.hash_cmd(str(rpc_cmd))) logging.debug(f"Processing command with ID: {cmd_id}") # Add the command to the tracker. if cmd_id in self.tracker: logging.warning(f"The info command ID {cmd_id} is already present in the tracker. It will be replaced.") self.tracker[cmd_id] = {} # Set the command, potentially overwriting the previous # one (if cmd_id was specified and not unique).) self.tracker.update_entry(cmd_id, {"status": "sending", "command": rpc_cmd}) # Uncomment to write all commands to a temporary file. # with open("/tmp/rpc_cmd.log", 'a', encoding='utf-8') as file: # json.dump(rpc_cmd, file) # file.write('\n') # Send the command. if self.dry: # Handle dry mode. logging.debug(f"Dry mode enabled, delaying a bit, and skipping command and its checks: {rpc_cmd}") await asyncio.sleep(self.controller.wait_default) elif not self.websocket: # Handle unconfigured websocket. logging.error(f"The websocket is not setup, failed to send: {rpc_cmd}") else: # Send the command. logging.debug(f"Sending RPC command using wait={wait}, check={check}, timeout={timeout}, and command: {rpc_cmd}") # Append the command to the session. self.cmd_to_session(cmd_id) try: data = json.dumps(rpc_cmd) await self.websocket.send(data) except ConnectionClosed: # Handle connection closed. logging.error("Failed to send command, the websocket is closed.") self.tracker.update_entry(cmd_id, {"status": "not_sent"}) # Early return on connection closed error. return cmd_id, (False, False) else: # Handle success. logging.debug("RPC command sent.") self.tracker.update_entry(cmd_id, {"status": "sent"}) # TODO: Consider raising exceptions here. # Check results are "True" by default when not requested. wait_pass, check_pass = not wait, not check # New status message. new_status = None # Wait for a response from the RPC. if wait and not self.dry: wait_check = await self.wait_for_response(cmd_id=cmd_id, timeout=timeout, loop_delay=0.05) if wait_check: new_status = "responded" wait_pass = True else: new_status = "timeout" wait_pass = False logging.error(f"Failed to wait for a response for cmd_id={cmd_id} with timeout={timeout}") # Run an "ok" check on the response, if received in time. if wait_pass and check and not self.dry: ok_check = await self.check_command_result_ok(cmd_id=cmd_id, timeout=timeout, loop_delay=0.05) if not ok_check: logging.error(f"Failed to check response for cmd_id={cmd_id} with timeout={timeout}") check_pass = False new_status = "error" else: new_status = "ok" check_pass = True if new_status is not None: self.tracker.update_entry(cmd_id, {"status": new_status}) # Dry mode override. if self.dry: logging.debug("Dry mode enabled. Overwritting wait and check flags with 'True' values.") wait_pass, check_pass = True, True # Checks vector checks = tuple([wait_pass, check_pass]) return cmd_id, checksSend RPC command, and optionally check/wait for it. This mechanism is independent of the 'writer' coroutine, and will not wait for idle.
async def send_gcode_cmd(self, gcode_cmd, cmd_id=None, wait=False, check=False, timeout=0.0)-
Expand source code
async def send_gcode_cmd(self, gcode_cmd, cmd_id=None, wait=False, check=False, timeout=0.0): """ Make RPC command from the GCODE, send it, and optionally check/wait for it. It wraps "send_cmd" for GCODE. This mechanism is independent of the 'writer' coroutine, and will not wait for idle. """ logging.debug(f"Sending gcode_cmd={gcode_cmd}") # Build RPC command rpc_command, cmd_id = self.make_gcode_cmd(gcode_cmd=gcode_cmd, cmd_id=cmd_id) # Send the command cmd_id, checks = await self.send_cmd(rpc_cmd=rpc_command, cmd_id=cmd_id, wait=wait, check=check, timeout=timeout) return cmd_id, checksMake RPC command from the GCODE, send it, and optionally check/wait for it. It wraps "send_cmd" for GCODE. This mechanism is independent of the 'writer' coroutine, and will not wait for idle.
async def send_gcode_script(self, gcode_cmds: list, cmd_id=None, wait=False, check=False, timeout=0.0)-
Expand source code
async def send_gcode_script(self, gcode_cmds: list, cmd_id=None, wait=False, check=False, timeout=0.0): """Run a list of GCODES as a single script in Klipper. Hopefully will prevent some stuttering. Args: gcode_cmds (list): A list of strings, each a valid GCODE command for Klipper. Returns: str: tracking ID for the script command. """ logging.debug(f"Sending gcode_cmds={gcode_cmds}") # Join multiple GCODE commands with a newline character. Klipper will eventually parse # this with run_script (at "gcode.py") and split it back into the individual commands. gcode_script = "\n".join(gcode_cmds) # Reuse "send_gcode_cmd" cmd_id, checks = await self.send_gcode_cmd(gcode_cmd=gcode_script, cmd_id=cmd_id, wait=wait, check=check, timeout=timeout) return cmd_id, checksRun a list of GCODES as a single script in Klipper. Hopefully will prevent some stuttering.
Args
gcode_cmds:list- A list of strings, each a valid GCODE command for Klipper.
Returns
str- tracking ID for the script command.
async def session_watchdog(self, check_interval=1.0)-
Expand source code
async def session_watchdog(self, check_interval=1.0): """Command session check coroutine. Will send a "FIRMWARE_RESTART" if a command in a session gets an "error" response. """ logging.info("Coroutine started.") try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while not self.ws_ready and self.controller.run: # Wait here until the websocket is ready. await asyncio.sleep(self.controller.wait_default) while self.controller.run: # Raise errors if any previous command in the current session has failed. if self.check_cmd_session(): logging.error("Errors found in RPC command session. Starting emergency stop to prevent damage.") await self.emergency_stop() logging.warning("Clearing the command session now to avoid multiple restarts.") self.end_cmd_session() # Wait for a bit. await asyncio.sleep(check_interval) except asyncio.exceptions.CancelledError: logging.warning("Coroutine cancelled.") except Exception as e: msg = f"Unhandled exception: {e}" logging.error(msg) print(msg) raise CommanderError(msg) from e logging.warning("Coroutine ended.")Command session check coroutine.
Will send a "FIRMWARE_RESTART" if a command in a session gets an "error" response.
async def status(self)-
Expand source code
async def status(self): """Gather information and compute the overall status of this class. This is consumed, for example, by the sio_status plugin (see make_status). """ # Evaluate the homed status. # TODO: I've changed this to "xyz" only, excluding the extruder. Reconsider. # It seems more relevant to the UI that those are homed, # while the "active extruder" may be a pipette, TC, or norhing relevant. if self.homed_axes is not None: homed = "OK" if all(x in self.homed_axes.lower() for x in "xyz") else "WARN" else: homed = "UNK" # Evaluate the overall status. if self.ws_ready and self.printer_ready and not self.recovering and homed == "OK": # NOTE: This "OK" will be matched by make_status in sio_status.py. status_code = "OK" elif not self.ws_address: status_code = "OFF" else: status_code = "WARN" logging.warning(f"Machine not ready: ws_ready={self.ws_ready} printer_ready={self.printer_ready} recovering={self.recovering} homed={homed} (homed_axes={self.homed_axes})") # Build status data. status = { "klipper": { "status": status_code, # TODO: add printer status. "ready": self.ws_ready, "printer_ready": "OK" if self.printer_ready else "WARN", "homed": homed, "recovering": "WARN" if self.recovering else "OFF", "dry": "WARN" if self.dry else "OFF" } } return statusGather information and compute the overall status of this class. This is consumed, for example, by the sio_status plugin (see make_status).
async def test(self)-
Expand source code
async def test(self): logging.debug("KlipperCommander test") await asyncio.sleep(0.2) def track_response(self, response: dict)-
Expand source code
def track_response(self, response: dict): """Register an incoming websocket message in the tracker.""" # Get the response ID response_id = str(response["id"]) # Check if the tracker has this ID already or not. if response_id not in self.tracker: logging.debug("Saving a response with id 'response_id', which did not match an existing entry in the tracker.") self.tracker[response_id] = {} # Look for an existing response. if "response" in self.tracker[response_id]: logging.debug(f"Response found in tracker item with id={response_id} (it will be updated/overwritten).") # Backup the previous response before overwriting it. response_prev = self.tracker[response_id]["response"] self.tracker.update_entry(response_id, {"response_prev": response_prev}) # Update tracker dict, saving the response in a "response" key. self.tracker.update_entry(response_id, {"status": "responded", "response": response}) logging.debug(f"Updated response in tracker item with ID {response_id}")Register an incoming websocket message in the tracker.
async def update_action_status(self, action: dict, status: str)-
Expand source code
async def update_action_status(self, action: dict, status: str): """ This function triggers an event that notifies other parts of the program about an action whose status has changed. For example, "send_execution_updates" in the "p2g_command" plugin. """ results = await self.controller.trigger_event_callback( event_name=self.event_protocol_updates, action=action, status=status) return resultsThis function triggers an event that notifies other parts of the program about an action whose status has changed.
For example, "send_execution_updates" in the "p2g_command" plugin.
def update_exec_opts(self, action, wait=None, check=None, timeout=None, add_timeout=False)-
Expand source code
def update_exec_opts(self, action, wait=None, check=None, timeout=None, add_timeout=False): """Override execution options for 'run_rpc_protocol' These values will override the wait, check, and timeout parameters to 'run_rpc_protocol'. """ action.setdefault("exec_opts", {}) if wait is not None: action["exec_opts"]["wait"] = wait if check is not None: action["exec_opts"]["check"] = check if timeout is not None: if add_timeout: action["exec_opts"].setdefault("timeout", 0.0) action["exec_opts"]["timeout"] += timeout else: action["exec_opts"]["timeout"] = timeout return actionOverride execution options for 'run_rpc_protocol' These values will override the wait, check, and timeout parameters to 'run_rpc_protocol'.
async def wait_for(self, cmd_id: str, what: str = 'response', timeout=0.0, loop_delay=0.2)-
Expand source code
async def wait_for(self, cmd_id:str, what:str="response", timeout=0.0, loop_delay=0.2): """Wait for a key to be added to a tracker entry.""" logging.debug(f"Checking for '{what}' in cmd_id={cmd_id} with timeout={timeout}") elapsed = 0.0 while self.controller.run: # Check for a "response" response = self.get_response_by_id(cmd_id, what=what) # Return True if a response was found. if response: logging.debug(f"Response found for cmd_id='{cmd_id}'.") return True # This code executes if no response was found. elif timeout is not None: # Check for a timeout and return if it has elapsed. if elapsed >= timeout: logging.warning(f"Timed-out checking for response for cmd_id={cmd_id} and contents: {response}") return False else: # If no timeout was specified, then return immediately. logging.debug("No response yet for cmd_id='" + str(cmd_id)+ "'. Waiting infinitely.") # Wait for a bit before checking again. await asyncio.sleep(loop_delay) elapsed += loop_delayWait for a key to be added to a tracker entry.
async def wait_for_free(self, timeout=10.0, loop_delay=0.2)-
Expand source code
async def wait_for_free(self, timeout=10.0, loop_delay=0.2): """Wait until the run_rpc_protocol loop idles (i.e. no more commands due).""" if self.dry: logging.debug("Dry mode enabled, waiting briefly and returning.") await asyncio.sleep(self.controller.wait_default) logging.debug("Dry mode enabled, returning success.") return True # Track start time start_time = time.time() elapsed = 0.0 logging.debug(f"Waiting for idle class status: start time {start_time} and timeout {timeout}") while True: # Check if there is no RPC protocol being streamed. if self.free: logging.debug(f"Success! Free to run RPC protocols (start time={start_time} timeout={timeout})") return True # Check for a timeout. if timeout: if elapsed >= timeout: logging.debug(f"Timed-out waiting for idle class status: start time={start_time} timeout={timeout}") return False # Wait for a bit before checking again. await asyncio.sleep(loop_delay) elapsed += loop_delayWait until the run_rpc_protocol loop idles (i.e. no more commands due).
async def wait_for_homing_update(self, timeout=2.0)-
Expand source code
async def wait_for_homing_update(self, timeout=2.0): """Wait until the homed axes are updated""" # Early return in dry mode. if self.dry: logging.debug("Dry mode on. Returning all axes.") return "xyzabce" # Reset the homed axes. self.homed_axes = None # Temporary wait function for wait_for below. async def _wait(wait_time): while self.homed_axes is None: await asyncio.sleep(wait_time) # Wait here with a timeout. try: await asyncio.wait_for( _wait(self.controller.wait_default), timeout=timeout ) except (asyncio.exceptions.TimeoutError, asyncio.CancelledError): logging.warning("Timed-out while waiting for a homed-axes query.") return None return self.homed_axesWait until the homed axes are updated
async def wait_for_idle_printer(self, timeout=10.0, loop_delay=0.5)-
Expand source code
async def wait_for_idle_printer(self, timeout=10.0, loop_delay=0.5): """The 'idle_timeout' object reports the idle state of the printer. AFAIK, "Idle" is set by klipper (klippy/extras/idle_timeout.py) when the "idle timeout" is triggered successfully. Else, the printer may be idling and report a "Ready" state, or in a busy state, and report a "Printing" state. This method is similar to "wait_for_ready", except it will not restart Klipper (it only does queries to "idle"). It is only meant to wait for a "still" printer which is running normally. The "response" to the query can look like this: {'id': 'aa3ba74fd3e3061ca58075924edec8f1', 'jsonrpc': '2.0', 'result': {'eventtime': 62094.128874252, 'status': {'idle_timeout': {'printing_time': 5.842246050997346, 'state': 'Printing'}}}} """ # Early return for dry mode. if self.dry: logging.debug(f"Simulating waiting for idle printer (dry mode on). Returning true in {loop_delay} seconds.") await asyncio.sleep(loop_delay) return True # Track start time start_time = time.time() # Make query command idle_query_cmd = rpc_primitives.query_idle_timeout(id=None) elapsed = 0.0 logging.debug(f"Waiting for idle printer with start_time={start_time}") while True: # Reset the commands unique ID. idle_query_cmd["id"] = self.hash_cmd(str(idle_query_cmd)) # Track iter time last_time = time.time() try: # Send the command andwait for a response. logging.debug(f"Sending query command idle_query_cmd={idle_query_cmd}") cmd_id, _ = await self.send_cmd(rpc_cmd=idle_query_cmd, wait=True, check=False, timeout=timeout) # Get the printer's status from the response. cmd_result = self.get_result_by_id(cmd_id) if cmd_result: status = cmd_result['status']['idle_timeout']["state"] logging.debug(f"Received response with status: {status}") # Check the response's result for printer idle. if status in self.printer_idle_states: logging.debug("Printer now idle.") return True else: logging.debug(f"No response yet for cmd_id {cmd_id}") except KeyError as e: logging.debug(f"Key error when looking for idle state in response: {e}") # Check for a timeout (total elapsed time, all loops). if timeout: elapsed = time.time() - start_time if elapsed >= timeout: logging.debug("Wait for idle printer timed out.") return False # Wait for the actual remaining time (after the previous loop) elapsed_time = time.time() - last_time logging.debug(f"Elapsed time: {elapsed_time}") if elapsed_time < loop_delay: logging.debug(f"Waiting for {loop_delay - elapsed_time} seconds.") await asyncio.sleep(loop_delay - elapsed_time)The 'idle_timeout' object reports the idle state of the printer. AFAIK, "Idle" is set by klipper (klippy/extras/idle_timeout.py) when the "idle timeout" is triggered successfully.
Else, the printer may be idling and report a "Ready" state, or in a busy state, and report a "Printing" state.
This method is similar to "wait_for_ready", except it will not restart Klipper (it only does queries to "idle"). It is only meant to wait for a "still" printer which is running normally.
The "response" to the query can look like this:
{'id': 'aa3ba74fd3e3061ca58075924edec8f1', 'jsonrpc': '2.0', 'result': {'eventtime': 62094.128874252, 'status': {'idle_timeout': {'printing_time': 5.842246050997346, 'state': 'Printing'}}}} async def wait_for_info_update(self, wait_time=None, timeout=4.0)-
Expand source code
async def wait_for_info_update(self, wait_time=None, timeout=4.0): """Wait for a new update from "check_printer_ready". Updates are requested automatically. This function is useful to ensure that the information is current. """ # This will be reset to "True" by "check_printer_ready" when a new status update arrives. self.new_info_response = False if wait_time is None: wait_time = self.controller.wait_default if not self.dry: logging.debug(f"Waiting with timeout={timeout} and interval={wait_time}") # Wait elapsed = 0.0 while not self.new_info_response: await asyncio.sleep(wait_time) elapsed += wait_time if elapsed >= timeout: logging.debug("Timed out.") return False else: # TODO: Reconsider how dry mode works, which skips all commands, preventing # info updates too. logging.debug(f"Dry mode enabled, waiting for {wait_time} before returning.") await asyncio.sleep(wait_time) logging.debug("Success!") return TrueWait for a new update from "check_printer_ready". Updates are requested automatically. This function is useful to ensure that the information is current.
async def wait_for_ready(self, reset=False, wait_time=1.1, timeout=8.0)-
Expand source code
async def wait_for_ready(self, reset=False, wait_time=1.1, timeout=8.0): """ Wait until the printer becomes ready, optionally sending a "FIRMWARE_RESTART" command. This method is similar to "wait_for_idle_printer", but this one relies on regular status updates (not on object queries), and can restart klipper. """ logging.debug("Waiting...") if self.dry: logging.debug(f"Dry mode enabled, sleeping for {wait_time} seconds and returning.") await asyncio.sleep(0.2) logging.debug("Dry mode enabled, returning success.") return True # Track elapsed time initial_time = time.time() # Force wait for a new update. if not await self.wait_for_info_update(timeout=timeout): logging.debug("No new info received 1/2.") else: logging.debug("New info received 1/2.") # Else, new status info was received. # Reset printer if requested and not ready. if not self.printer_ready and reset: logging.debug("Sending firmware_restart.") await self.firmware_restart() # Subtract the elapsed time from the timeout before running the next command. elapsed_time = time.time() - initial_time if elapsed_time >= timeout: second_timeout = 0.0 else: second_timeout = timeout - elapsed_time # Wait for an update again. if not await self.wait_for_info_update(timeout=second_timeout): # Break here if no new info was received. logging.debug("No new info received 2/2.") return False else: logging.debug("New info received 2/2.") # Wait here for printer ready. while not self.printer_ready: await asyncio.sleep(wait_time) second_timeout -= wait_time if self.printer_ready: # Break if the printer became ready break elif second_timeout <= 0: # If not ready, and the timeout is over, return False. logging.debug("Timed out.") return False # Ready! this part of the code is only reached then the printer is ready. assert self.printer_ready # Check again just in case. logging.debug("Success!") return TrueWait until the printer becomes ready, optionally sending a "FIRMWARE_RESTART" command.
This method is similar to "wait_for_idle_printer", but this one relies on regular status updates (not on object queries), and can restart klipper.
async def wait_for_response(self, cmd_id, timeout=0.0, loop_delay=0.2)-
Expand source code
async def wait_for_response(self, cmd_id, timeout=0.0, loop_delay=0.2): """Wait for a 'response' to be added to a tracker entry.""" return await self.wait_for(cmd_id=cmd_id, what="response", timeout=timeout, loop_delay=loop_delay)Wait for a 'response' to be added to a tracker entry.
async def wait_for_setup(self, timeout=5.0, wait_time=None, raise_error=False)-
Expand source code
async def wait_for_setup(self, timeout=5.0, wait_time=None, raise_error=False): """Wait for the Moonraker connection to be established.""" # TODO: Reconsider un-commenting this. It depends on the meaning of "dry". # Does "dry" mean not sending but connecting? or neither? # if self.dry: # logging.warning("wait_for_setup: dry mode enabled, returning True.") # return True if not self.ws_address: logging.warning("wait_for_setup: WS address is empty, returning True after 500 ms.") await asyncio.sleep(0.5) return True # Track elapsed time remaining = timeout if not wait_time: wait_time = self.controller.wait_default # Wait here for websocket ready. while not self.ws_ready: if remaining <= 0: # If not ready, and the timeout is over, return False. logging.debug("wait_for_setup: Timed out.") return False await asyncio.sleep(wait_time) remaining -= wait_time # Raise an error on timeout if requested. if raise_error and not self.ws_ready: raise CommanderError("Timed-out waiting for the connection to moonraker to be established.") return self.ws_readyWait for the Moonraker connection to be established.
async def websocket_info(self, cmd_id_prefix='websocket_info', loop_time=None)-
Expand source code
async def websocket_info(self, cmd_id_prefix="websocket_info", loop_time=None): """ Info request coroutine. Periodically sends "printer.info" requests to Moonraker. """ logging.info("Coroutine started.") if loop_time is None: # NOTE: 0.05 was too fast... setting it to x 3 the default. loop_time = self.controller.wait_default * 3 try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while not self.ws_ready and self.controller.run: # Wait here until the websocket is ready. await asyncio.sleep(self.controller.wait_default) # Loop while running. while self.controller.run: # Check spam flag or wait and skip iteration if not self.spam_info: logging.warning("Waiting until spam_info is set.") await asyncio.sleep(2.0) continue # Wait here until the websocket is ready. while not self.ws_ready and self.controller.run: logging.warning("Waiting until ws_ready and controller.run are set.") await asyncio.sleep(2.0) # Increment ID counter cid = int(self.cid) self.cid = self.cid + 1 # Build command: cmd_id = f"{cmd_id_prefix}_{cid}_{time.time()}" # Generate "{"jsonrpc": "2.0", "method": "printer.info", "id": cmd_id}" # info_command = {"jsonrpc": "2.0", "method": "printer.info", "id": 7} info_command = rpc_primitives.printer_info(cmd_id) # Track time initial_time = time.time() # Send command. await self.send_cmd( rpc_cmd=info_command, cmd_id=cmd_id, wait=False, check=False) # Wait for the actual remaining time (after the previous await). final_time = time.time() if final_time - initial_time <= loop_time: await asyncio.sleep(loop_time - (final_time - initial_time)) logging.info("Coroutine loop ended: controller not running.") except asyncio.exceptions.CancelledError: logging.warning("Task cancelled.") except Exception as e: msg = f"Unhandled exception: {e}" logging.error(msg) print(msg) raise CommanderError(msg) from e logging.warning("Coroutine ended.")Info request coroutine.
Periodically sends "printer.info" requests to Moonraker.
async def websocket_reader(self)-
Expand source code
async def websocket_reader(self): """ The "reader" co-routine. This asyncio co-routine reads from the websocket continuously. The "read" is awaited, which means that other co-routines can still run while the reader is checking for messages. Note: a "co-routine" is similar to the old "serial_reader" thread, made with in Threading. """ logging.info("Coroutine started.") try: while not self.controller.run: # Wait here until the controller is ready. asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") # Wait here until the websocket is ready. while not self.ws_ready and self.controller.run: await asyncio.sleep(self.controller.wait_default) # Loop while running. while self.controller.run: if not self.ws_ready: # Wait until the websocket is ready. await asyncio.sleep(self.controller.wait_default) else: # Receive data from the websocket. try: logging.debug("Waiting for a websocket message.") data = await self.websocket.recv() self.new_info_flag = True # To anyone listening. except ConnectionClosed: logging.warning("Failed to get data, the websocket is closed.") await asyncio.sleep(self.controller.wait_default) continue # Convert the response to a dictionary. response = json.loads(data) # Skip "notify_proc_stat_update" messages. if response.get('method', None) == 'notify_proc_stat_update': logging.debug("Received notify_proc_stat_update, continuing.") continue logging.debug(f"Received response:\n{pformat(response)}") # Add key to tracker dict try: # Track the response. self.track_response(response) except KeyError: logging.debug(f"Incoming message does not contain an 'id' field: {response}") continue # Parse the response for important messages here. # The handlers may raise an exception, or stop the coroutines, etc. self.check_printer_ready(response) # Sleep for a bit before looping. # TODO: This sleep was the cause of incoming commands accumulating, # presumably the new "sqlite code" used by the tracker is blocking, # and slow enough to cause messages to be processed too slowly here. # TODO: This was unnecessary as the recv command already "waits". # await asyncio.sleep(self.read_interval) logging.debug("Controller not running.") except asyncio.exceptions.CancelledError: logging.warning("Task cancelled.") except Exception as e: msg = f"Unhandled exception: {e}" logging.error(msg) print(msg) raise CommanderError(msg) from e # Close the "shelve" tracker object. # TODO: Think of a better place to put this. self.tracker_close() logging.warning("Coroutine ended.")The "reader" co-routine. This asyncio co-routine reads from the websocket continuously. The "read" is awaited, which means that other co-routines can still run while the reader is checking for messages. Note: a "co-routine" is similar to the old "serial_reader" thread, made with in Threading.
async def websocket_reconnect(self, ws=None, wait_time=None, ws_open_timeout=3.0)-
Expand source code
async def websocket_reconnect(self, ws=None, wait_time=None, ws_open_timeout=3.0): """ This method handles reconnection on disconnect. TODO: try to catch the error when no websocket is available: OSError: Multiple exceptions: [Errno 111] Connect call failed ('::1', 7125, 0, 0), [Errno 111] Connect call failed ('127.0.0.1', 7125) """ logging.info("Coroutine started.") # Set a default wait time for reconnections. if not wait_time: wait_time = self.controller.wait_default*5 # Get the class's websocket if it was not specified. if ws is None: ws = self.websocket try: while not self.controller.run: asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") while self.controller.run and self.ws_address: # New asyncio API in websockets. # https://websockets.readthedocs.io/en/stable/reference/asyncio/client.html#websockets.asyncio.client.connect try: async with connect(uri=self.ws_address, open_timeout=ws_open_timeout) as ws: while self.controller.run and self.ws_address: # Attempt to send a message. logging.debug("Websocket defined, awaiting pong.") pong = await ws.ping() latency = await pong logging.debug(f"Connection latency is {latency} seconds") # Save the working websocket object. if not self.websocket: self.websocket = ws # Signal that the websocket is ready. self.ws_ready = True # Wait for a bit before the next ping. await asyncio.sleep(self.controller.wait_default) except ConnectionClosed: # Signal that the websocket is failing. self.ws_ready = False self.websocket = None logging.error("The websocket connection is closed.") continue except ConnectionRefusedError as e: # Signal that the websocket is failing. self.ws_ready = False self.websocket = None msg = f"Connection refused: '{e}'\n" + traceback.format_exc() logging.error(msg) print(msg) # Wait before the next ping or reconnection. logging.warning(f"Waiting for {ws_open_timeout} seconds before reconnecting") await asyncio.sleep(ws_open_timeout) # Loop end. logging.debug("Controller not running, or websocket address unset.") except asyncio.CancelledError: # Update status. self.ws_ready = False # Try clean exit. logging.error("Coroutine cancelled. Attempting clean exit.") await self.close() except Exception as e: # Update status. self.ws_ready = False msg = f"Unhandled exception: {e}" logging.error(msg) print(msg) raise CommanderError(msg) from e logging.warning("Coroutine ended.")This method handles reconnection on disconnect.
TODO: try to catch the error when no websocket is available: OSError: Multiple exceptions: [Errno 111] Connect call failed ('::1', 7125, 0, 0), [Errno 111] Connect call failed ('127.0.0.1', 7125)
Inherited members