Module pipettin-piper.piper.commanders.sio_commander

Classes

class SioCommander (config: dict, controller: Controller)
Expand source code
class SioCommander():

    sio_ready: bool
    sio_address: str
    sio: socketio.AsyncClient

    @property
    def connected(self) -> bool:
        """Is the socketio client connected?"""
        return self.sio.connected

    def __init__(self, config: dict, controller: "Controller"):
        logging.info("Initiating SioCommander class.")

        self.controller = controller
        self.config = config

        # NOTE: AsyncClient requires extra dependencies
        #       not installed (and not warned about) that do
        #       not produce visible errors.
        #self.sio = socketio.Client()  # Available as self.sio?
        self.sio = socketio.AsyncClient()  # Available as self.sio?

        # Status
        self.sio_ready = False

        # SIO server address.
        self.sio_address = config.get("sio_address", "http://localhost:3333")

        # Add socketio coroutine to the methods list (created by KlipperCommander).
        self.controller.coroutine_methods.extend([
            self.moon_sio()
        ])

        # Register default socketio event handlers.
        self.register_sio_callbacks()

    # SOCKET.IO CALLBACKS AND EVENTS SECTION ####
    def register_sio_callbacks(self):
        """Function to register socketio event callbacks, typically sent by the Pipettin GUI.

        In theory using this structure will allow callbacks to access the "self" object when they trigger.

        See:
        - https://github.com/miguelgrinberg/python-socketio/issues/390#issuecomment-787796871
        - https://clp-research.github.io/slurk/slurk_bots.html
        """

        @self.sio.event
        def connect():
            """This event runs when the controller connects to the SocketIO interface."""
            logging.info("Connected to the socket.io server.")
            try:
                settings = self.get_settings()
                # TODO: Move the "parse_settings" and "write_settings" calls 
                #       somewhere where the settings are actually available.
                # self.parse_settings(settings)
                # self.write_settings(settings)
            except Exception as e:
                logging.error(f"Failed to process settings: {e}")

        @self.sio.event
        def disconnect():
            logging.info("Disconnected from the socket.io server.")

        @self.sio.on('*')
        def catch_all(event, data=None):
            # See: https://python-socketio.readthedocs.io/en/latest/client.html#catch-all-event-handlers
            logging.warning(f"Unhandled socketio event '{event}' received with command data: " + str(data))

    @staticmethod
    def dummy_ack(*args, **kwargs):
        pass

    #### Controller settings handler methods ####
    def get_settings(self):
        """Get settings from the DB.
        Selects only the first set of settings, and cleans it up before returning.
        """
        settings = self.controller.database_tools.settings

        if not settings:
            logging.error(f"No settings retrieved in database {self.controller.database_tools.database_name}.")
            return

        if len(settings) > 1:
            logging.warning("More than one set of settings was retreived from the database. Using the first only.")
        # Use the first entry.
        settings = settings[0]

        # Select only the relevant settings.
        settings = {k: settings.get(k, {}) for k in ["workspace", "controller", "database"]}

        # Remove non-serializable keys.
        settings.pop("_id", None)
        settings.pop("updatedAt", None)
        settings.pop("createdAt", None)

        return settings

    def parse_settings(self, settings: dict):
        """Parse settings from the UI."""
        # TODO: Validate the "uri" field too.
        # TODO: Use the info in ["controller"]["configuration"] to update the controllers config.
        if settings is None:
            settings = {}
        settings_db_name = settings.get("database", {}).get("name", None)
        config_db_name = self.config.get("database", {}).get("database_name", None)
        if not settings_db_name:
            logging.error(f"The parsed settings did not have the required database information: {settings}")
        elif settings_db_name != config_db_name:
            msg = "There is a DB mismatch between the controller and the UI."
            msg += f" The controller is using '{config_db_name}' and the UI '{settings_db_name}'."
            msg += " Please update the controller config.yml file or the UI's."
            self.sio_emit_alert(text=msg)
            logging.error(msg)
        else:
            logging.info(f"The DB name in the controller's configuration '{config_db_name}' matches the UI's '{settings_db_name}'.")

    def write_settings(self, settings: dict):
        """Write settings to JSON file."""
        settings_file = self.controller.config.get("settings_file", None)
        if not settings_file:
            _, settings_file = tempfile.mkstemp(prefix="settings_file-", suffix=".json", dir=None)
            logging.info(f"Settings file undefined. Defaulting to temporary file at: {settings_file}")

        logging.info(f"Writing new settings to '{settings_file}'.")

        # Write settings to file.
        with open(settings_file, 'w', encoding='utf-8') as file:
            json.dump(settings, file, ensure_ascii=False, indent=4)

    # Alert pop-up event.
    alert_event_name = 'alert'
    alert_event_alarm = 'alarm'
    alert_event_error = 'error'
    async def sio_emit_alert(self, text="No message specified.", alert_type="alarm"):
        """Send 'alert' to the backend, the text would show up as a modal in the GUI.

        The type can be 'message', 'error', or 'alarm'.

        Example:
            data = {'text': 'text', 'type': 'error'}
        """
        data = {'text': text, 'type': alert_type}
        logging.debug(f"Sending alert to socket: {data}")
        await self.emit(self.alert_event_name, data)

    # Tool data event.
    tool_data_event_name = 'tool_data'
    async def sio_emit_position(self, position):
        """Send the robot's position as a 'tool_data' event.
        Example:
            data = {'position': {"x": 0, "y": 5, "z": 10}}
        """
        data = {'position': position}
        logging.debug(f"Sending position to socket: {pformat(data)}")
        await self.emit(self.tool_data_event_name, data)

    # Human intervention event.
    human_intervention_event_name = 'human_intervention_required'
    async def sio_emit_intervention(self, message: str):
        """Send 'message' to the 'human_intervention_required' event.
        Example output:
            data = {'text': 'message'}
        """
        data = {"data": {"text": message}}
        logging.debug(f"Sending human intervention message to socket: '{data}'")
        await self.emit(self.human_intervention_event_name, data)

    # Main emit method.
    async def emit(self, event, data=None, callback=None):
        """Emit an event through the SocketIO interface.

        Args:
            event (str): Name of the event.
            data (optional): Data attached to the event. Defaults to None.
        """
        if self.sio.connected:
            logging.debug(f"Emitting '{event}' event.")
            await self.sio.emit(event=event, data=data, callback=callback)
        else:
            logging.error(f"Failed to emit '{event}' event. The socket is disconnected.")

    # MAIN COROUTINES method section ####
    sio_ready: bool
    """Flag indicating wether the socket.io connection is online."""
    async def moon_sio(self, check_interval=2):

        logging.info("Coroutine started.")

        try:
            # Wait until the controller is ready.
            while not self.controller.run:
                await asyncio.sleep(self.controller.wait_default)
            logging.info("Coroutine ready.")

            if self.sio_address:
                # Loop while the controller is set to run.
                while self.controller.run:
                    if not self.sio.connected:
                        self.sio_ready = False
                        if self.sio_address is not None:
                            logging.warning(f"Socketio disconnected. Attempting (re)connection to address: {self.sio_address}")
                            try:
                                await self.sio.connect(self.sio_address)
                                await self.sio_emit_alert(text = "The controller has connected to the UI.", alert_type='message')
                                # await self.sio.wait()
                            except socketio.exceptions.ConnectionError as e:
                                logging.error(f"Socket.io ConnectionError: '{e}'. Retrying in {check_interval} seconds.")
                            if not self.sio.connected:
                                logging.warning(f"Could not reconnect. Retrying in {check_interval} seconds.")
                            else:
                                logging.warning("(re)connection successful!")
                    else:
                        self.sio_ready = True
                        logging.debug("Connection is live.")
                    await asyncio.sleep(check_interval)

                logging.info("Coroutine loop ended: controller not running.")

                # Disconnect after the while loop ends.
                if self.sio.connected:
                    try:
                        await self.sio.disconnect()
                    except CommanderError:
                        logging.error("Unhandled exception during disconnect.")
            else:
                logging.info("No socketio address configured. Skipping SocketIO connection setup.")

        except asyncio.exceptions.CancelledError:
            logging.warning("Coroutine task cancelled.")

        # Final state update.
        self.sio_ready = False

        logging.warning("Coroutine ended.")

    async def close_socketio(self, timeout=1.0):
        # Close socketio
        if self.sio_address:
            if self.sio.connected:
                try:
                    await asyncio.wait_for(
                        self.sio.disconnect(),
                        timeout=timeout
                    )
                except Exception:
                    msg = "close_socketio: Unhandled exception during self.sio.disconnect:\n" + traceback.format_exc()
                    logging.error(msg)
                    return False
        return True

    async def close(self, timeout=3):
        logging.info("Closing SioCommander.")
        # Close socketio
        await self.close_socketio(timeout=timeout)

    async def status(self):
        """Gather information and compute the overall status of this class."""
        status = {
            "sio": {
                # NOTE: This "OK" will be matched by make_status in sio_status.py.
                "status": "OK" if self.sio.connected and self.sio_ready else "WARN",
                "connected": "OK" if self.sio.connected else "OFF",
                "ready": self.sio_ready  # TODO: may be redundant.
            }
        }
        return status

    async def test(self):
        logging.warning("SioCommander test not implemented.")
        await asyncio.sleep(0.2)

Class variables

var alert_event_alarm
var alert_event_error
var alert_event_name
var human_intervention_event_name
var sio : socketio.async_client.AsyncClient
var sio_address : str
var sio_ready : bool

Flag indicating wether the socket.io connection is online.

var tool_data_event_name

Static methods

def dummy_ack(*args, **kwargs)
Expand source code
@staticmethod
def dummy_ack(*args, **kwargs):
    pass

Instance variables

prop connected : bool
Expand source code
@property
def connected(self) -> bool:
    """Is the socketio client connected?"""
    return self.sio.connected

Is the socketio client connected?

Methods

async def close(self, timeout=3)
Expand source code
async def close(self, timeout=3):
    logging.info("Closing SioCommander.")
    # Close socketio
    await self.close_socketio(timeout=timeout)
async def close_socketio(self, timeout=1.0)
Expand source code
async def close_socketio(self, timeout=1.0):
    # Close socketio
    if self.sio_address:
        if self.sio.connected:
            try:
                await asyncio.wait_for(
                    self.sio.disconnect(),
                    timeout=timeout
                )
            except Exception:
                msg = "close_socketio: Unhandled exception during self.sio.disconnect:\n" + traceback.format_exc()
                logging.error(msg)
                return False
    return True
async def emit(self, event, data=None, callback=None)
Expand source code
async def emit(self, event, data=None, callback=None):
    """Emit an event through the SocketIO interface.

    Args:
        event (str): Name of the event.
        data (optional): Data attached to the event. Defaults to None.
    """
    if self.sio.connected:
        logging.debug(f"Emitting '{event}' event.")
        await self.sio.emit(event=event, data=data, callback=callback)
    else:
        logging.error(f"Failed to emit '{event}' event. The socket is disconnected.")

Emit an event through the SocketIO interface.

Args

event : str
Name of the event.
data : optional
Data attached to the event. Defaults to None.
def get_settings(self)
Expand source code
def get_settings(self):
    """Get settings from the DB.
    Selects only the first set of settings, and cleans it up before returning.
    """
    settings = self.controller.database_tools.settings

    if not settings:
        logging.error(f"No settings retrieved in database {self.controller.database_tools.database_name}.")
        return

    if len(settings) > 1:
        logging.warning("More than one set of settings was retreived from the database. Using the first only.")
    # Use the first entry.
    settings = settings[0]

    # Select only the relevant settings.
    settings = {k: settings.get(k, {}) for k in ["workspace", "controller", "database"]}

    # Remove non-serializable keys.
    settings.pop("_id", None)
    settings.pop("updatedAt", None)
    settings.pop("createdAt", None)

    return settings

Get settings from the DB. Selects only the first set of settings, and cleans it up before returning.

async def moon_sio(self, check_interval=2)
Expand source code
async def moon_sio(self, check_interval=2):

    logging.info("Coroutine started.")

    try:
        # Wait until the controller is ready.
        while not self.controller.run:
            await asyncio.sleep(self.controller.wait_default)
        logging.info("Coroutine ready.")

        if self.sio_address:
            # Loop while the controller is set to run.
            while self.controller.run:
                if not self.sio.connected:
                    self.sio_ready = False
                    if self.sio_address is not None:
                        logging.warning(f"Socketio disconnected. Attempting (re)connection to address: {self.sio_address}")
                        try:
                            await self.sio.connect(self.sio_address)
                            await self.sio_emit_alert(text = "The controller has connected to the UI.", alert_type='message')
                            # await self.sio.wait()
                        except socketio.exceptions.ConnectionError as e:
                            logging.error(f"Socket.io ConnectionError: '{e}'. Retrying in {check_interval} seconds.")
                        if not self.sio.connected:
                            logging.warning(f"Could not reconnect. Retrying in {check_interval} seconds.")
                        else:
                            logging.warning("(re)connection successful!")
                else:
                    self.sio_ready = True
                    logging.debug("Connection is live.")
                await asyncio.sleep(check_interval)

            logging.info("Coroutine loop ended: controller not running.")

            # Disconnect after the while loop ends.
            if self.sio.connected:
                try:
                    await self.sio.disconnect()
                except CommanderError:
                    logging.error("Unhandled exception during disconnect.")
        else:
            logging.info("No socketio address configured. Skipping SocketIO connection setup.")

    except asyncio.exceptions.CancelledError:
        logging.warning("Coroutine task cancelled.")

    # Final state update.
    self.sio_ready = False

    logging.warning("Coroutine ended.")
def parse_settings(self, settings: dict)
Expand source code
def parse_settings(self, settings: dict):
    """Parse settings from the UI."""
    # TODO: Validate the "uri" field too.
    # TODO: Use the info in ["controller"]["configuration"] to update the controllers config.
    if settings is None:
        settings = {}
    settings_db_name = settings.get("database", {}).get("name", None)
    config_db_name = self.config.get("database", {}).get("database_name", None)
    if not settings_db_name:
        logging.error(f"The parsed settings did not have the required database information: {settings}")
    elif settings_db_name != config_db_name:
        msg = "There is a DB mismatch between the controller and the UI."
        msg += f" The controller is using '{config_db_name}' and the UI '{settings_db_name}'."
        msg += " Please update the controller config.yml file or the UI's."
        self.sio_emit_alert(text=msg)
        logging.error(msg)
    else:
        logging.info(f"The DB name in the controller's configuration '{config_db_name}' matches the UI's '{settings_db_name}'.")

Parse settings from the UI.

def register_sio_callbacks(self)
Expand source code
def register_sio_callbacks(self):
    """Function to register socketio event callbacks, typically sent by the Pipettin GUI.

    In theory using this structure will allow callbacks to access the "self" object when they trigger.

    See:
    - https://github.com/miguelgrinberg/python-socketio/issues/390#issuecomment-787796871
    - https://clp-research.github.io/slurk/slurk_bots.html
    """

    @self.sio.event
    def connect():
        """This event runs when the controller connects to the SocketIO interface."""
        logging.info("Connected to the socket.io server.")
        try:
            settings = self.get_settings()
            # TODO: Move the "parse_settings" and "write_settings" calls 
            #       somewhere where the settings are actually available.
            # self.parse_settings(settings)
            # self.write_settings(settings)
        except Exception as e:
            logging.error(f"Failed to process settings: {e}")

    @self.sio.event
    def disconnect():
        logging.info("Disconnected from the socket.io server.")

    @self.sio.on('*')
    def catch_all(event, data=None):
        # See: https://python-socketio.readthedocs.io/en/latest/client.html#catch-all-event-handlers
        logging.warning(f"Unhandled socketio event '{event}' received with command data: " + str(data))

Function to register socketio event callbacks, typically sent by the Pipettin GUI.

In theory using this structure will allow callbacks to access the "self" object when they trigger.

See: - https://github.com/miguelgrinberg/python-socketio/issues/390#issuecomment-787796871 - https://clp-research.github.io/slurk/slurk_bots.html

async def sio_emit_alert(self, text='No message specified.', alert_type='alarm')
Expand source code
async def sio_emit_alert(self, text="No message specified.", alert_type="alarm"):
    """Send 'alert' to the backend, the text would show up as a modal in the GUI.

    The type can be 'message', 'error', or 'alarm'.

    Example:
        data = {'text': 'text', 'type': 'error'}
    """
    data = {'text': text, 'type': alert_type}
    logging.debug(f"Sending alert to socket: {data}")
    await self.emit(self.alert_event_name, data)

Send 'alert' to the backend, the text would show up as a modal in the GUI.

The type can be 'message', 'error', or 'alarm'.

Example

data = {'text': 'text', 'type': 'error'}

async def sio_emit_intervention(self, message: str)
Expand source code
async def sio_emit_intervention(self, message: str):
    """Send 'message' to the 'human_intervention_required' event.
    Example output:
        data = {'text': 'message'}
    """
    data = {"data": {"text": message}}
    logging.debug(f"Sending human intervention message to socket: '{data}'")
    await self.emit(self.human_intervention_event_name, data)

Send 'message' to the 'human_intervention_required' event. Example output: data = {'text': 'message'}

async def sio_emit_position(self, position)
Expand source code
async def sio_emit_position(self, position):
    """Send the robot's position as a 'tool_data' event.
    Example:
        data = {'position': {"x": 0, "y": 5, "z": 10}}
    """
    data = {'position': position}
    logging.debug(f"Sending position to socket: {pformat(data)}")
    await self.emit(self.tool_data_event_name, data)

Send the robot's position as a 'tool_data' event.

Example

data = {'position': {"x": 0, "y": 5, "z": 10}}

async def status(self)
Expand source code
async def status(self):
    """Gather information and compute the overall status of this class."""
    status = {
        "sio": {
            # NOTE: This "OK" will be matched by make_status in sio_status.py.
            "status": "OK" if self.sio.connected and self.sio_ready else "WARN",
            "connected": "OK" if self.sio.connected else "OFF",
            "ready": self.sio_ready  # TODO: may be redundant.
        }
    }
    return status

Gather information and compute the overall status of this class.

async def test(self)
Expand source code
async def test(self):
    logging.warning("SioCommander test not implemented.")
    await asyncio.sleep(0.2)
def write_settings(self, settings: dict)
Expand source code
def write_settings(self, settings: dict):
    """Write settings to JSON file."""
    settings_file = self.controller.config.get("settings_file", None)
    if not settings_file:
        _, settings_file = tempfile.mkstemp(prefix="settings_file-", suffix=".json", dir=None)
        logging.info(f"Settings file undefined. Defaulting to temporary file at: {settings_file}")

    logging.info(f"Writing new settings to '{settings_file}'.")

    # Write settings to file.
    with open(settings_file, 'w', encoding='utf-8') as file:
        json.dump(settings, file, ensure_ascii=False, indent=4)

Write settings to JSON file.