Module pipettin-piper.piper.commanders.sio_commander
Classes
class SioCommander (config: dict, controller: Controller)-
Expand source code
class SioCommander(): sio_ready: bool sio_address: str sio: socketio.AsyncClient @property def connected(self) -> bool: """Is the socketio client connected?""" return self.sio.connected def __init__(self, config: dict, controller: "Controller"): logging.info("Initiating SioCommander class.") self.controller = controller self.config = config # NOTE: AsyncClient requires extra dependencies # not installed (and not warned about) that do # not produce visible errors. #self.sio = socketio.Client() # Available as self.sio? self.sio = socketio.AsyncClient() # Available as self.sio? # Status self.sio_ready = False # SIO server address. self.sio_address = config.get("sio_address", "http://localhost:3333") # Add socketio coroutine to the methods list (created by KlipperCommander). self.controller.coroutine_methods.extend([ self.moon_sio() ]) # Register default socketio event handlers. self.register_sio_callbacks() # SOCKET.IO CALLBACKS AND EVENTS SECTION #### def register_sio_callbacks(self): """Function to register socketio event callbacks, typically sent by the Pipettin GUI. In theory using this structure will allow callbacks to access the "self" object when they trigger. See: - https://github.com/miguelgrinberg/python-socketio/issues/390#issuecomment-787796871 - https://clp-research.github.io/slurk/slurk_bots.html """ @self.sio.event def connect(): """This event runs when the controller connects to the SocketIO interface.""" logging.info("Connected to the socket.io server.") try: settings = self.get_settings() # TODO: Move the "parse_settings" and "write_settings" calls # somewhere where the settings are actually available. # self.parse_settings(settings) # self.write_settings(settings) except Exception as e: logging.error(f"Failed to process settings: {e}") @self.sio.event def disconnect(): logging.info("Disconnected from the socket.io server.") @self.sio.on('*') def catch_all(event, data=None): # See: https://python-socketio.readthedocs.io/en/latest/client.html#catch-all-event-handlers logging.warning(f"Unhandled socketio event '{event}' received with command data: " + str(data)) @staticmethod def dummy_ack(*args, **kwargs): pass #### Controller settings handler methods #### def get_settings(self): """Get settings from the DB. Selects only the first set of settings, and cleans it up before returning. """ settings = self.controller.database_tools.settings if not settings: logging.error(f"No settings retrieved in database {self.controller.database_tools.database_name}.") return if len(settings) > 1: logging.warning("More than one set of settings was retreived from the database. Using the first only.") # Use the first entry. settings = settings[0] # Select only the relevant settings. settings = {k: settings.get(k, {}) for k in ["workspace", "controller", "database"]} # Remove non-serializable keys. settings.pop("_id", None) settings.pop("updatedAt", None) settings.pop("createdAt", None) return settings def parse_settings(self, settings: dict): """Parse settings from the UI.""" # TODO: Validate the "uri" field too. # TODO: Use the info in ["controller"]["configuration"] to update the controllers config. if settings is None: settings = {} settings_db_name = settings.get("database", {}).get("name", None) config_db_name = self.config.get("database", {}).get("database_name", None) if not settings_db_name: logging.error(f"The parsed settings did not have the required database information: {settings}") elif settings_db_name != config_db_name: msg = "There is a DB mismatch between the controller and the UI." msg += f" The controller is using '{config_db_name}' and the UI '{settings_db_name}'." msg += " Please update the controller config.yml file or the UI's." self.sio_emit_alert(text=msg) logging.error(msg) else: logging.info(f"The DB name in the controller's configuration '{config_db_name}' matches the UI's '{settings_db_name}'.") def write_settings(self, settings: dict): """Write settings to JSON file.""" settings_file = self.controller.config.get("settings_file", None) if not settings_file: _, settings_file = tempfile.mkstemp(prefix="settings_file-", suffix=".json", dir=None) logging.info(f"Settings file undefined. Defaulting to temporary file at: {settings_file}") logging.info(f"Writing new settings to '{settings_file}'.") # Write settings to file. with open(settings_file, 'w', encoding='utf-8') as file: json.dump(settings, file, ensure_ascii=False, indent=4) # Alert pop-up event. alert_event_name = 'alert' alert_event_alarm = 'alarm' alert_event_error = 'error' async def sio_emit_alert(self, text="No message specified.", alert_type="alarm"): """Send 'alert' to the backend, the text would show up as a modal in the GUI. The type can be 'message', 'error', or 'alarm'. Example: data = {'text': 'text', 'type': 'error'} """ data = {'text': text, 'type': alert_type} logging.debug(f"Sending alert to socket: {data}") await self.emit(self.alert_event_name, data) # Tool data event. tool_data_event_name = 'tool_data' async def sio_emit_position(self, position): """Send the robot's position as a 'tool_data' event. Example: data = {'position': {"x": 0, "y": 5, "z": 10}} """ data = {'position': position} logging.debug(f"Sending position to socket: {pformat(data)}") await self.emit(self.tool_data_event_name, data) # Human intervention event. human_intervention_event_name = 'human_intervention_required' async def sio_emit_intervention(self, message: str): """Send 'message' to the 'human_intervention_required' event. Example output: data = {'text': 'message'} """ data = {"data": {"text": message}} logging.debug(f"Sending human intervention message to socket: '{data}'") await self.emit(self.human_intervention_event_name, data) # Main emit method. async def emit(self, event, data=None, callback=None): """Emit an event through the SocketIO interface. Args: event (str): Name of the event. data (optional): Data attached to the event. Defaults to None. """ if self.sio.connected: logging.debug(f"Emitting '{event}' event.") await self.sio.emit(event=event, data=data, callback=callback) else: logging.error(f"Failed to emit '{event}' event. The socket is disconnected.") # MAIN COROUTINES method section #### sio_ready: bool """Flag indicating wether the socket.io connection is online.""" async def moon_sio(self, check_interval=2): logging.info("Coroutine started.") try: # Wait until the controller is ready. while not self.controller.run: await asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") if self.sio_address: # Loop while the controller is set to run. while self.controller.run: if not self.sio.connected: self.sio_ready = False if self.sio_address is not None: logging.warning(f"Socketio disconnected. Attempting (re)connection to address: {self.sio_address}") try: await self.sio.connect(self.sio_address) await self.sio_emit_alert(text = "The controller has connected to the UI.", alert_type='message') # await self.sio.wait() except socketio.exceptions.ConnectionError as e: logging.error(f"Socket.io ConnectionError: '{e}'. Retrying in {check_interval} seconds.") if not self.sio.connected: logging.warning(f"Could not reconnect. Retrying in {check_interval} seconds.") else: logging.warning("(re)connection successful!") else: self.sio_ready = True logging.debug("Connection is live.") await asyncio.sleep(check_interval) logging.info("Coroutine loop ended: controller not running.") # Disconnect after the while loop ends. if self.sio.connected: try: await self.sio.disconnect() except CommanderError: logging.error("Unhandled exception during disconnect.") else: logging.info("No socketio address configured. Skipping SocketIO connection setup.") except asyncio.exceptions.CancelledError: logging.warning("Coroutine task cancelled.") # Final state update. self.sio_ready = False logging.warning("Coroutine ended.") async def close_socketio(self, timeout=1.0): # Close socketio if self.sio_address: if self.sio.connected: try: await asyncio.wait_for( self.sio.disconnect(), timeout=timeout ) except Exception: msg = "close_socketio: Unhandled exception during self.sio.disconnect:\n" + traceback.format_exc() logging.error(msg) return False return True async def close(self, timeout=3): logging.info("Closing SioCommander.") # Close socketio await self.close_socketio(timeout=timeout) async def status(self): """Gather information and compute the overall status of this class.""" status = { "sio": { # NOTE: This "OK" will be matched by make_status in sio_status.py. "status": "OK" if self.sio.connected and self.sio_ready else "WARN", "connected": "OK" if self.sio.connected else "OFF", "ready": self.sio_ready # TODO: may be redundant. } } return status async def test(self): logging.warning("SioCommander test not implemented.") await asyncio.sleep(0.2)Class variables
var alert_event_alarmvar alert_event_errorvar alert_event_namevar human_intervention_event_namevar sio : socketio.async_client.AsyncClientvar sio_address : strvar sio_ready : bool-
Flag indicating wether the socket.io connection is online.
var tool_data_event_name
Static methods
def dummy_ack(*args, **kwargs)-
Expand source code
@staticmethod def dummy_ack(*args, **kwargs): pass
Instance variables
prop connected : bool-
Expand source code
@property def connected(self) -> bool: """Is the socketio client connected?""" return self.sio.connectedIs the socketio client connected?
Methods
async def close(self, timeout=3)-
Expand source code
async def close(self, timeout=3): logging.info("Closing SioCommander.") # Close socketio await self.close_socketio(timeout=timeout) async def close_socketio(self, timeout=1.0)-
Expand source code
async def close_socketio(self, timeout=1.0): # Close socketio if self.sio_address: if self.sio.connected: try: await asyncio.wait_for( self.sio.disconnect(), timeout=timeout ) except Exception: msg = "close_socketio: Unhandled exception during self.sio.disconnect:\n" + traceback.format_exc() logging.error(msg) return False return True async def emit(self, event, data=None, callback=None)-
Expand source code
async def emit(self, event, data=None, callback=None): """Emit an event through the SocketIO interface. Args: event (str): Name of the event. data (optional): Data attached to the event. Defaults to None. """ if self.sio.connected: logging.debug(f"Emitting '{event}' event.") await self.sio.emit(event=event, data=data, callback=callback) else: logging.error(f"Failed to emit '{event}' event. The socket is disconnected.")Emit an event through the SocketIO interface.
Args
event:str- Name of the event.
data:optional- Data attached to the event. Defaults to None.
def get_settings(self)-
Expand source code
def get_settings(self): """Get settings from the DB. Selects only the first set of settings, and cleans it up before returning. """ settings = self.controller.database_tools.settings if not settings: logging.error(f"No settings retrieved in database {self.controller.database_tools.database_name}.") return if len(settings) > 1: logging.warning("More than one set of settings was retreived from the database. Using the first only.") # Use the first entry. settings = settings[0] # Select only the relevant settings. settings = {k: settings.get(k, {}) for k in ["workspace", "controller", "database"]} # Remove non-serializable keys. settings.pop("_id", None) settings.pop("updatedAt", None) settings.pop("createdAt", None) return settingsGet settings from the DB. Selects only the first set of settings, and cleans it up before returning.
async def moon_sio(self, check_interval=2)-
Expand source code
async def moon_sio(self, check_interval=2): logging.info("Coroutine started.") try: # Wait until the controller is ready. while not self.controller.run: await asyncio.sleep(self.controller.wait_default) logging.info("Coroutine ready.") if self.sio_address: # Loop while the controller is set to run. while self.controller.run: if not self.sio.connected: self.sio_ready = False if self.sio_address is not None: logging.warning(f"Socketio disconnected. Attempting (re)connection to address: {self.sio_address}") try: await self.sio.connect(self.sio_address) await self.sio_emit_alert(text = "The controller has connected to the UI.", alert_type='message') # await self.sio.wait() except socketio.exceptions.ConnectionError as e: logging.error(f"Socket.io ConnectionError: '{e}'. Retrying in {check_interval} seconds.") if not self.sio.connected: logging.warning(f"Could not reconnect. Retrying in {check_interval} seconds.") else: logging.warning("(re)connection successful!") else: self.sio_ready = True logging.debug("Connection is live.") await asyncio.sleep(check_interval) logging.info("Coroutine loop ended: controller not running.") # Disconnect after the while loop ends. if self.sio.connected: try: await self.sio.disconnect() except CommanderError: logging.error("Unhandled exception during disconnect.") else: logging.info("No socketio address configured. Skipping SocketIO connection setup.") except asyncio.exceptions.CancelledError: logging.warning("Coroutine task cancelled.") # Final state update. self.sio_ready = False logging.warning("Coroutine ended.") def parse_settings(self, settings: dict)-
Expand source code
def parse_settings(self, settings: dict): """Parse settings from the UI.""" # TODO: Validate the "uri" field too. # TODO: Use the info in ["controller"]["configuration"] to update the controllers config. if settings is None: settings = {} settings_db_name = settings.get("database", {}).get("name", None) config_db_name = self.config.get("database", {}).get("database_name", None) if not settings_db_name: logging.error(f"The parsed settings did not have the required database information: {settings}") elif settings_db_name != config_db_name: msg = "There is a DB mismatch between the controller and the UI." msg += f" The controller is using '{config_db_name}' and the UI '{settings_db_name}'." msg += " Please update the controller config.yml file or the UI's." self.sio_emit_alert(text=msg) logging.error(msg) else: logging.info(f"The DB name in the controller's configuration '{config_db_name}' matches the UI's '{settings_db_name}'.")Parse settings from the UI.
def register_sio_callbacks(self)-
Expand source code
def register_sio_callbacks(self): """Function to register socketio event callbacks, typically sent by the Pipettin GUI. In theory using this structure will allow callbacks to access the "self" object when they trigger. See: - https://github.com/miguelgrinberg/python-socketio/issues/390#issuecomment-787796871 - https://clp-research.github.io/slurk/slurk_bots.html """ @self.sio.event def connect(): """This event runs when the controller connects to the SocketIO interface.""" logging.info("Connected to the socket.io server.") try: settings = self.get_settings() # TODO: Move the "parse_settings" and "write_settings" calls # somewhere where the settings are actually available. # self.parse_settings(settings) # self.write_settings(settings) except Exception as e: logging.error(f"Failed to process settings: {e}") @self.sio.event def disconnect(): logging.info("Disconnected from the socket.io server.") @self.sio.on('*') def catch_all(event, data=None): # See: https://python-socketio.readthedocs.io/en/latest/client.html#catch-all-event-handlers logging.warning(f"Unhandled socketio event '{event}' received with command data: " + str(data))Function to register socketio event callbacks, typically sent by the Pipettin GUI.
In theory using this structure will allow callbacks to access the "self" object when they trigger.
See: - https://github.com/miguelgrinberg/python-socketio/issues/390#issuecomment-787796871 - https://clp-research.github.io/slurk/slurk_bots.html
async def sio_emit_alert(self, text='No message specified.', alert_type='alarm')-
Expand source code
async def sio_emit_alert(self, text="No message specified.", alert_type="alarm"): """Send 'alert' to the backend, the text would show up as a modal in the GUI. The type can be 'message', 'error', or 'alarm'. Example: data = {'text': 'text', 'type': 'error'} """ data = {'text': text, 'type': alert_type} logging.debug(f"Sending alert to socket: {data}") await self.emit(self.alert_event_name, data)Send 'alert' to the backend, the text would show up as a modal in the GUI.
The type can be 'message', 'error', or 'alarm'.
Example
data = {'text': 'text', 'type': 'error'}
async def sio_emit_intervention(self, message: str)-
Expand source code
async def sio_emit_intervention(self, message: str): """Send 'message' to the 'human_intervention_required' event. Example output: data = {'text': 'message'} """ data = {"data": {"text": message}} logging.debug(f"Sending human intervention message to socket: '{data}'") await self.emit(self.human_intervention_event_name, data)Send 'message' to the 'human_intervention_required' event. Example output: data = {'text': 'message'}
async def sio_emit_position(self, position)-
Expand source code
async def sio_emit_position(self, position): """Send the robot's position as a 'tool_data' event. Example: data = {'position': {"x": 0, "y": 5, "z": 10}} """ data = {'position': position} logging.debug(f"Sending position to socket: {pformat(data)}") await self.emit(self.tool_data_event_name, data)Send the robot's position as a 'tool_data' event.
Example
data = {'position': {"x": 0, "y": 5, "z": 10}}
async def status(self)-
Expand source code
async def status(self): """Gather information and compute the overall status of this class.""" status = { "sio": { # NOTE: This "OK" will be matched by make_status in sio_status.py. "status": "OK" if self.sio.connected and self.sio_ready else "WARN", "connected": "OK" if self.sio.connected else "OFF", "ready": self.sio_ready # TODO: may be redundant. } } return statusGather information and compute the overall status of this class.
async def test(self)-
Expand source code
async def test(self): logging.warning("SioCommander test not implemented.") await asyncio.sleep(0.2) def write_settings(self, settings: dict)-
Expand source code
def write_settings(self, settings: dict): """Write settings to JSON file.""" settings_file = self.controller.config.get("settings_file", None) if not settings_file: _, settings_file = tempfile.mkstemp(prefix="settings_file-", suffix=".json", dir=None) logging.info(f"Settings file undefined. Defaulting to temporary file at: {settings_file}") logging.info(f"Writing new settings to '{settings_file}'.") # Write settings to file. with open(settings_file, 'w', encoding='utf-8') as file: json.dump(settings, file, ensure_ascii=False, indent=4)Write settings to JSON file.