Module pipettin-piper.piper.datatools.datatools

Classes

class DataTools
Expand source code
class DataTools(metaclass=ABCMeta):
    """A class holding and querying data about the machine, objects on it, and its configuration.
    Meant to be sub-classed to implement methods specific to the database backend.
    """

    database_name: str
    db: dict

    # DATABASE SETUP ############

    @abstractmethod
    def setup_db(self, database_url, database_name):
        """Populate all properties with data or connections to the DB."""
        raise NotImplementedError("You must implement this method before using it.")

    # @abstractmethod
    def update_from_env_file(self, config: dict, env_file: str = None):
        """
        Update the configuration dictionary with values from an environment file.

        This function loads environment variables from a specified `.env` file and uses them to override 
        certain keys in the provided `config` dictionary. It specifically looks for `DATABASE_URI` and 
        `DATABASE_NAME` to update the database's URL and `database_name` in the configuration.

        Args:
            config (dict): The configuration dictionary to be updated.
            env_file (str, optional): Path to the `.env` file containing environment variables.
                                      If None, the default `.env` file location will be used.

        Updates:
            - Sets `config["< database url key >"]` to a URL constructed from `DATABASE_URI`.
            - Sets `config["database_name"]` to the value of `DATABASE_NAME`.

        Logs:
            Logs the environment variables loaded from the `.env` file.

        Returns:
            tuple: A tuple containing:
                - The updated database URL (str).
                - The updated database name (str).
        """
        # Check that the file exists.
        env_path_norm = os.path.normpath(os.path.expanduser(env_file))
        if not os.path.isfile(env_path_norm):
            msg = f"Environment file not found at '{env_file}'. Current working directory is: {os.getcwd()}"
            logging.error(msg)
            raise DataError(msg)
        # Load values using "python-dotenv".
        env_vars = dotenv_values(dotenv_path=env_path_norm)
        # Parse values.
        logging.info(f"Overriding database configuration from '{env_file}' file: {str(env_vars)}")
        if "DATABASE_URI" in env_vars:
            config["database"]["database_url"] = env_vars["DATABASE_URI"]
        if "DATABASE_NAME" in env_vars:
            config["database"]["database_name"] = env_vars["DATABASE_NAME"]

        # The values were updated in place, but return them anyway.
        return config["database"]["database_url"], config["database"]["database_name"]

    # Main data properties ####

    @property
    def protocols(self) -> list:
        """protocols property"""
        return self.listProtocols()

    @property
    def hl_protocols(self) -> list:
        """High-level protocols property"""
        return self.listHlProtocols()

    @property
    def workspaces(self) -> list:
        """workspaces property"""
        return self.listWorkspaces()

    @property
    def platforms(self) -> list:
        """platforms property"""
        return self.listPlatforms()

    @property
    def containers(self) -> list:
        """containers property"""
        return self.listContainers()

    @property
    def tools(self) -> list:
        """tools property"""
        return self.listTools()

    @property
    def settings(self):
        """settings property"""
        return self.listSettings()

    #### GET METHODS ####

    def get_db(self) -> dict:
        """Dump the database into a python dictionary."""
        return {self.database_name: {
            "protocols": self.protocols,
            "hLprotocols": self.hl_protocols,
            "workspaces": self.workspaces,
            "platforms": self.platforms,
            "containers": self.containers,
            "tools": self.tools,
            "settings": self.settings,
        }}

    @abstractmethod
    def listProtocols(self):
        """Function to get protocols from MongoDB as a list, log protocol names, and return them."""
        raise NotImplementedError("You must implement this method before using it.")

    @abstractmethod
    def listHlProtocols(self):
        """Function to get high-level protocols from MongoDB as a list, log protocol names, and return them."""
        raise NotImplementedError("You must implement this method before using it.")

    @abstractmethod
    def listWorkspaces(self):
        """Function to get workspaces from MongoDB as a list, log names, and return them."""
        raise NotImplementedError("You must implement this method before using it.")

    @abstractmethod
    def listPlatforms(self):
        """Function to get platforms from MongoDB as a list, log names, and return them."""
        raise NotImplementedError("You must implement this method before using it.")

    @abstractmethod
    def listContainers(self):
        """Function to get 'containers' from MongoDB as a list, log names, and return them."""
        raise NotImplementedError("You must implement this method before using it.")

    @abstractmethod
    def listTools(self):
        """Function to get tools from MongoDB as a list, log names, and return them."""
        raise NotImplementedError("You must implement this method before using it.")

    @abstractmethod
    def listSettings(self):
        """Function to get 'settings' from MongoDB as a list, log names, and return them."""
        raise NotImplementedError("You must implement this method before using it.")

    def getPlatformsInWorkspace(self, workspace):
        # extract platform names in workspace
        platform_names = [p['platform'] for p in workspace['items']]
        # get platforms in workspace
        platforms_in_workspace = [p for p in self.platforms if p['name'] in platform_names]
        # done
        return platforms_in_workspace

    def getWorkspaceByName(self, workspace_name: str):
        """Get protocol's workspace data"""
        result = next(d for d in self.workspaces if d["name"] == workspace_name)
        if not result:
            raise DataError(f"Workspace not found: {workspace_name}")
        return result

    def getProtocolByName(self, protocol_name: str):
        """Get protocol data by name"""
        try:
            result = next(d for d in self.protocols if d["name"] == protocol_name)
        except StopIteration as e:
            raise DataError(f"Protocol not found: {protocol_name}. " + \
                            f"Available protocols: {[p['name'] for p in self.protocols]}") from e
        return result

    def getContainerByName(self, container_name: str):
        """Get protocol data by name"""
        result = next(d for d in self.containers if d["name"] == container_name)
        if not result:
            raise DataError(f"Container not found: {container_name}")
        return result

    def getToolByName(self, tool_name: str):
        """Get tool data by name"""
        result = next(d for d in self.tools if d["name"] == tool_name)
        if not result:
            raise DataError(f"Tool not found: {tool_name}")
        return result

    def getProtocolObjects(self, protocol_name: str):
        """Get all data objects for a given protocol by it's name."""

        if not isinstance(protocol_name, str):
            raise ValueError("protocol_name must be a string.")

        # get protocol data by name
        protocol = self.getProtocolByName(protocol_name=protocol_name)

        # get protocol's workspace data
        workspace = self.getWorkspaceByName(workspace_name = protocol['workspace'])

        # DATABASE QUERIES ############

        # get platforms in workspace
        platforms_in_workspace = self.getPlatformsInWorkspace(workspace)

        # return all
        return protocol, workspace, platforms_in_workspace

    def getWorkspaceItemByName(self, workspace: dict, item_name: str):
        """Iterate over items in the workspace looking for one who's name matches 'item_name'."""
        one = self.find_one(workspace["items"], "name", item_name)
        if one is None:
            msg = f"No item with name '{item_name}' was found in workspace '{workspace['name']}'."
            msg += f" Available items are named: {[i['name'] for i in workspace['items']]}"
            raise DataError(msg)
        return one

    def getPlatformByName(self, platformsInWorkspace: list, platform_item: dict):
        """Iterate over platforms in workspace looking for one who's name matches the platform in 'platform_item'."""
        one = self.find_one(platformsInWorkspace, "name", platform_item["platform"])
        if one is None:
            msg = f"No platform with name '{platform_item['platform']}' was found in the workspace."
            msg += f" Available platforms are: {[p['name'] for p in platformsInWorkspace]}"
            raise DataError(msg)
        return one

    def getContentByName(self, content_name: str, platform_item: dict):
        """Iterate over contents in a platform item, looking for a content by name."""
        one = self.find_one(platform_item["content"], "name", content_name)
        if one is None:
            msg = f"No content with name '{content_name}' was found in item '{platform_item['name']}'."
            msg += f" Available contents are named: {[c['name'] for c in platform_item['content']]}"
            raise DataError(msg)
        return one

    def filterContentBy(self, content: dict, selector: dict, content_type: str = None) -> bool:
        """Helper function to use the tube "selector" dict and, optionally, by content type.
        Example selector: {"by": "name", "value": "tube1"}
        """
        result = content[selector["by"]] == selector["value"]
        if content_type is not None:
            # Handle filtering also by content type.
            result = result and self.filterContentByType(content, content_type)
        return result

    def filterContentByType(self, content: dict, content_type: str):
        """Checks if a content is of the intended type (e.g. "tip", "tube", etc.)"""
        if content_type is None:
            return True
        container = self.getContainerByName(content["container"])
        return container["type"] == content_type

    def getNextContentByName(self, platform_item:dict,
                             content_name:str=None, content_type:str=None,
                             pop_from_item=False, pop_from_db=False,
                             workspace_name:str = None):
        """Get the next content from a platform item by its name, optionally deleting it from the DB."""

        # Handle unspecified content name.
        if content_name is None:
            selector = None
        else:
            selector = {"by": "name", "value": content_name}

        # Get the next content.
        return self.getNextContent(
            workspace_name=workspace_name, platform_item=platform_item,
            selector=selector, content_type=content_type,
            pop_from_item=pop_from_item, pop_from_db=pop_from_db
        )

    def getNextContent(self, platform_item:dict,
                       selector:dict=None, content_type:str=None,
                       pop_from_item=False, pop_from_db=False,
                       workspace_name:str = None):
        """
        Retrieve the next content from a specified platform item within a workspace, 
        with options to filter by criteria, delete locally, and/or delete from the database.

        Args:
            platform_item (dict): The platform item dictionary from which to retrieve content.
            selector (dict, optional): Criteria for filtering the content to retrieve. Defaults to None, in which case the next first content is selected.
            content_type (str, optional): Type of content to retrieve. If None, any content type is accepted. Defaults to None.
            pop_from_item (bool, optional): If True, removes the selected content from the local platform item after retrieval. Defaults to False.
            pop_from_db (bool, optional): If True, removes the selected content from the database after retrieval. Defaults to False.
            workspace_name (str, optional): The name of the workspace containing the platform item, required by `pop_from_db`.

        Raises:
            DataError: If no content matching the selector or content type is found in the platform item.

        Returns:
            dict: The content item that matches the specified criteria, if any.

        """
        logging.debug(f"Getting next content from item '{platform_item['name']}'.")

        item_contents: list = platform_item["content"]

        if selector is not None:
            # Get the content by name.
            try:
                i, next_content = next(
                    (i, content)
                    for i, content in enumerate(item_contents)
                    if self.filterContentBy(content, selector, content_type)
                )
            except StopIteration as e:
                msg = f"No content with selector {selector} was found in item '{platform_item.get('name')}'."
                raise DataError(msg) from e
            if pop_from_item:
                logging.info(f"Deleting content with index {i} from item '{platform_item['name']}' locally.")
                item_contents.pop(i)
        else:
            # Get any content.
            try:
                i, next_content = next(
                    (i, content)
                    for i, content in enumerate(item_contents)
                    if self.filterContentByType(content, content_type)
                )
            except StopIteration as e:
                msg = f"No contents available in '{platform_item.get('name')}'."
                raise DataError(msg) from e
            if pop_from_item:
                logging.info(f"Deleting content with index {i} from item '{platform_item['name']}' locally.")
                item_contents.pop(i)

        # Delete the content from the DB.
        if pop_from_db:
            if not workspace_name:
                msg = "A workspace name is required to pop a content from an item in the database."
                logging.error(msg)
                raise DataError(msg)
            logging.info(f"Deleting content with index {i} from item '{platform_item['name']}' in the database.")
            self.pop_content_by_idx(workspace_name, platform_item["name"], content_idx=i)

        return next_content

    def getActionDataBy(self, action_id: str, id_field: str):
        """Find an action by its MongoDB ObjectID, and return its content, index, and parent protocol."""
        logging.debug(f"Getting action data for action_id={action_id}")
        for p in self.protocols:
            logging.debug(f"Searching for action in protocol '{p['name']}'.")
            for i, a in enumerate(p["actions"]):
                if str(a[id_field]) == str(action_id):
                    return p, a, i
        logging.warning(f"No data found for action {id_field}={action_id} returning None.")
        return None, None, None

    #### POP METHODS ####
    @abstractmethod
    def pop_content_by_idx(self, workspace_name: str, item_name: str, content_idx: int):
        """Remove a content from a platform item by index, deleting it form the DB."""
        raise NotImplementedError("You must implement this method before using it.")

    # Utility methods ####

    @staticmethod
    def find(data: list, key, value) -> list:
        """Filter a list of dictionaries by a key's value
        Also works if the items in data are lists and key is a valid index.
        """
        return [d for d in data if d[key] == value]

    def find_one(self, data: list, key, value):
        """Filter a list of dictionaries by a key's value and get one value
        'None' is returned on either no matches or multiple matches.
        """
        found = self.find(data, key, value)
        if len(found) == 1:
            return found[0]
        return None

    def is_subset(self, query, target):
        """
        Recursively checks if the query is a subset of the target.

        Args:
            query: A dictionary or list representing the query subset.
            target: A dictionary or list to be checked against the query.

        Returns:
            bool: True if query is a subset of target, False otherwise.

        This method uses recursion to handle nested dictionaries and lists.
        It verifies that all elements in `query` exist in `target`:
        - For dictionaries, each key-value pair in `query` must have a matching pair in `target`.
        - For lists, each item in `query` must be found in `target` in order.
        - For basic data types, it directly compares for equality.
        """
        if isinstance(query, dict) and isinstance(target, dict):
            # Ensure all items in the query dict are in the target dict
            return all(key in target and self.is_subset(query[key], target[key]) for key in query)
        elif isinstance(query, list) and isinstance(target, list):
            # Ensure all items in the query list are in the target list, in the same order
            it = iter(target)
            return all(any(self.is_subset(q_item, t_item) for t_item in it) for q_item in query)
        else:
            # Base case for non-dict, non-list types
            return type(query) is type(target) and query == target

    def find_matches(self, query_dict, dicts):
        """
        Finds which dictionaries in a list match the query_dict structure.

        Args:
            query_dict: A dictionary used as the subset criteria.
            dicts: A list of dictionaries to search.

        Returns:
            list: A list containing boolean values where each value corresponds
                  to whether the respective dictionary in `dicts` matches the query_dict.

        This method returns a boolean match for each dictionary in `dicts`, indicating
        whether `query_dict` is a subset of each respective dictionary.
        """
        return [self.is_subset(query_dict, d) for d in dicts]

    def find_match(self, query_dict, dicts):
        """
        Filters and returns dictionaries from a list that contain query_dict as a subset.

        Args:
            query_dict: A dictionary representing the query subset.
            dicts: A list of dictionaries to search.

        Returns:
            list: A list of dictionaries from `dicts` that contain `query_dict` as a subset.

        This method uses `is_subset` to check each dictionary in `dicts` and returns only
        those dictionaries where `query_dict` is a subset.
        """
        return [d for d in dicts if self.is_subset(query_dict, d)]

    @staticmethod
    def update_nested(obj, selectors: list, new_value):
        """
        Updates a nested object using an array of selectors to reach the target.

        Args:
            obj (dict or list): The original dictionary or list to update.
            selectors (list): A list of keys/indices to access the nested target.
            new_value: The new value to set at the specified location.

        Returns:
            None: The original object is modified in place.

        Example usage:

        >>> data = {"a": [5, 6, {"k": 3}]}
        >>> selectors = ["a", 2, "k"]
        >>> update_nested(data, selectors, 10)
        >>> print(data)  # Output should be: {'a': [5, 6, {'k': 10}]}
        """
        for key in selectors[:-1]:
            # Traverse dictionaries or lists
            obj = obj[key]

        # Set the value at the deepest level
        obj[selectors[-1]] = new_value

A class holding and querying data about the machine, objects on it, and its configuration. Meant to be sub-classed to implement methods specific to the database backend.

Subclasses

Class variables

var database_name : str
var db : dict

Static methods

def find(data: list, key, value) ‑> list
Expand source code
@staticmethod
def find(data: list, key, value) -> list:
    """Filter a list of dictionaries by a key's value
    Also works if the items in data are lists and key is a valid index.
    """
    return [d for d in data if d[key] == value]

Filter a list of dictionaries by a key's value Also works if the items in data are lists and key is a valid index.

def update_nested(obj, selectors: list, new_value)
Expand source code
@staticmethod
def update_nested(obj, selectors: list, new_value):
    """
    Updates a nested object using an array of selectors to reach the target.

    Args:
        obj (dict or list): The original dictionary or list to update.
        selectors (list): A list of keys/indices to access the nested target.
        new_value: The new value to set at the specified location.

    Returns:
        None: The original object is modified in place.

    Example usage:

    >>> data = {"a": [5, 6, {"k": 3}]}
    >>> selectors = ["a", 2, "k"]
    >>> update_nested(data, selectors, 10)
    >>> print(data)  # Output should be: {'a': [5, 6, {'k': 10}]}
    """
    for key in selectors[:-1]:
        # Traverse dictionaries or lists
        obj = obj[key]

    # Set the value at the deepest level
    obj[selectors[-1]] = new_value

Updates a nested object using an array of selectors to reach the target.

Args

obj : dict or list
The original dictionary or list to update.
selectors : list
A list of keys/indices to access the nested target.
new_value
The new value to set at the specified location.

Returns

None
The original object is modified in place.

Example usage:

>>> data = {"a": [5, 6, {"k": 3}]}
>>> selectors = ["a", 2, "k"]
>>> update_nested(data, selectors, 10)
>>> print(data)  # Output should be: {'a': [5, 6, {'k': 10}]}

Instance variables

prop containers : list
Expand source code
@property
def containers(self) -> list:
    """containers property"""
    return self.listContainers()

containers property

prop hl_protocols : list
Expand source code
@property
def hl_protocols(self) -> list:
    """High-level protocols property"""
    return self.listHlProtocols()

High-level protocols property

prop platforms : list
Expand source code
@property
def platforms(self) -> list:
    """platforms property"""
    return self.listPlatforms()

platforms property

prop protocols : list
Expand source code
@property
def protocols(self) -> list:
    """protocols property"""
    return self.listProtocols()

protocols property

prop settings
Expand source code
@property
def settings(self):
    """settings property"""
    return self.listSettings()

settings property

prop tools : list
Expand source code
@property
def tools(self) -> list:
    """tools property"""
    return self.listTools()

tools property

prop workspaces : list
Expand source code
@property
def workspaces(self) -> list:
    """workspaces property"""
    return self.listWorkspaces()

workspaces property

Methods

def filterContentBy(self, content: dict, selector: dict, content_type: str = None) ‑> bool
Expand source code
def filterContentBy(self, content: dict, selector: dict, content_type: str = None) -> bool:
    """Helper function to use the tube "selector" dict and, optionally, by content type.
    Example selector: {"by": "name", "value": "tube1"}
    """
    result = content[selector["by"]] == selector["value"]
    if content_type is not None:
        # Handle filtering also by content type.
        result = result and self.filterContentByType(content, content_type)
    return result

Helper function to use the tube "selector" dict and, optionally, by content type. Example selector: {"by": "name", "value": "tube1"}

def filterContentByType(self, content: dict, content_type: str)
Expand source code
def filterContentByType(self, content: dict, content_type: str):
    """Checks if a content is of the intended type (e.g. "tip", "tube", etc.)"""
    if content_type is None:
        return True
    container = self.getContainerByName(content["container"])
    return container["type"] == content_type

Checks if a content is of the intended type (e.g. "tip", "tube", etc.)

def find_match(self, query_dict, dicts)
Expand source code
def find_match(self, query_dict, dicts):
    """
    Filters and returns dictionaries from a list that contain query_dict as a subset.

    Args:
        query_dict: A dictionary representing the query subset.
        dicts: A list of dictionaries to search.

    Returns:
        list: A list of dictionaries from `dicts` that contain `query_dict` as a subset.

    This method uses `is_subset` to check each dictionary in `dicts` and returns only
    those dictionaries where `query_dict` is a subset.
    """
    return [d for d in dicts if self.is_subset(query_dict, d)]

Filters and returns dictionaries from a list that contain query_dict as a subset.

Args

query_dict
A dictionary representing the query subset.
dicts
A list of dictionaries to search.

Returns

list
A list of dictionaries from dicts that contain query_dict as a subset.

This method uses is_subset to check each dictionary in dicts and returns only those dictionaries where query_dict is a subset.

def find_matches(self, query_dict, dicts)
Expand source code
def find_matches(self, query_dict, dicts):
    """
    Finds which dictionaries in a list match the query_dict structure.

    Args:
        query_dict: A dictionary used as the subset criteria.
        dicts: A list of dictionaries to search.

    Returns:
        list: A list containing boolean values where each value corresponds
              to whether the respective dictionary in `dicts` matches the query_dict.

    This method returns a boolean match for each dictionary in `dicts`, indicating
    whether `query_dict` is a subset of each respective dictionary.
    """
    return [self.is_subset(query_dict, d) for d in dicts]

Finds which dictionaries in a list match the query_dict structure.

Args

query_dict
A dictionary used as the subset criteria.
dicts
A list of dictionaries to search.

Returns

list
A list containing boolean values where each value corresponds to whether the respective dictionary in dicts matches the query_dict.

This method returns a boolean match for each dictionary in dicts, indicating whether query_dict is a subset of each respective dictionary.

def find_one(self, data: list, key, value)
Expand source code
def find_one(self, data: list, key, value):
    """Filter a list of dictionaries by a key's value and get one value
    'None' is returned on either no matches or multiple matches.
    """
    found = self.find(data, key, value)
    if len(found) == 1:
        return found[0]
    return None

Filter a list of dictionaries by a key's value and get one value 'None' is returned on either no matches or multiple matches.

def getActionDataBy(self, action_id: str, id_field: str)
Expand source code
def getActionDataBy(self, action_id: str, id_field: str):
    """Find an action by its MongoDB ObjectID, and return its content, index, and parent protocol."""
    logging.debug(f"Getting action data for action_id={action_id}")
    for p in self.protocols:
        logging.debug(f"Searching for action in protocol '{p['name']}'.")
        for i, a in enumerate(p["actions"]):
            if str(a[id_field]) == str(action_id):
                return p, a, i
    logging.warning(f"No data found for action {id_field}={action_id} returning None.")
    return None, None, None

Find an action by its MongoDB ObjectID, and return its content, index, and parent protocol.

def getContainerByName(self, container_name: str)
Expand source code
def getContainerByName(self, container_name: str):
    """Get protocol data by name"""
    result = next(d for d in self.containers if d["name"] == container_name)
    if not result:
        raise DataError(f"Container not found: {container_name}")
    return result

Get protocol data by name

def getContentByName(self, content_name: str, platform_item: dict)
Expand source code
def getContentByName(self, content_name: str, platform_item: dict):
    """Iterate over contents in a platform item, looking for a content by name."""
    one = self.find_one(platform_item["content"], "name", content_name)
    if one is None:
        msg = f"No content with name '{content_name}' was found in item '{platform_item['name']}'."
        msg += f" Available contents are named: {[c['name'] for c in platform_item['content']]}"
        raise DataError(msg)
    return one

Iterate over contents in a platform item, looking for a content by name.

def getNextContent(self,
platform_item: dict,
selector: dict = None,
content_type: str = None,
pop_from_item=False,
pop_from_db=False,
workspace_name: str = None)
Expand source code
def getNextContent(self, platform_item:dict,
                   selector:dict=None, content_type:str=None,
                   pop_from_item=False, pop_from_db=False,
                   workspace_name:str = None):
    """
    Retrieve the next content from a specified platform item within a workspace, 
    with options to filter by criteria, delete locally, and/or delete from the database.

    Args:
        platform_item (dict): The platform item dictionary from which to retrieve content.
        selector (dict, optional): Criteria for filtering the content to retrieve. Defaults to None, in which case the next first content is selected.
        content_type (str, optional): Type of content to retrieve. If None, any content type is accepted. Defaults to None.
        pop_from_item (bool, optional): If True, removes the selected content from the local platform item after retrieval. Defaults to False.
        pop_from_db (bool, optional): If True, removes the selected content from the database after retrieval. Defaults to False.
        workspace_name (str, optional): The name of the workspace containing the platform item, required by `pop_from_db`.

    Raises:
        DataError: If no content matching the selector or content type is found in the platform item.

    Returns:
        dict: The content item that matches the specified criteria, if any.

    """
    logging.debug(f"Getting next content from item '{platform_item['name']}'.")

    item_contents: list = platform_item["content"]

    if selector is not None:
        # Get the content by name.
        try:
            i, next_content = next(
                (i, content)
                for i, content in enumerate(item_contents)
                if self.filterContentBy(content, selector, content_type)
            )
        except StopIteration as e:
            msg = f"No content with selector {selector} was found in item '{platform_item.get('name')}'."
            raise DataError(msg) from e
        if pop_from_item:
            logging.info(f"Deleting content with index {i} from item '{platform_item['name']}' locally.")
            item_contents.pop(i)
    else:
        # Get any content.
        try:
            i, next_content = next(
                (i, content)
                for i, content in enumerate(item_contents)
                if self.filterContentByType(content, content_type)
            )
        except StopIteration as e:
            msg = f"No contents available in '{platform_item.get('name')}'."
            raise DataError(msg) from e
        if pop_from_item:
            logging.info(f"Deleting content with index {i} from item '{platform_item['name']}' locally.")
            item_contents.pop(i)

    # Delete the content from the DB.
    if pop_from_db:
        if not workspace_name:
            msg = "A workspace name is required to pop a content from an item in the database."
            logging.error(msg)
            raise DataError(msg)
        logging.info(f"Deleting content with index {i} from item '{platform_item['name']}' in the database.")
        self.pop_content_by_idx(workspace_name, platform_item["name"], content_idx=i)

    return next_content

Retrieve the next content from a specified platform item within a workspace, with options to filter by criteria, delete locally, and/or delete from the database.

Args

platform_item : dict
The platform item dictionary from which to retrieve content.
selector : dict, optional
Criteria for filtering the content to retrieve. Defaults to None, in which case the next first content is selected.
content_type : str, optional
Type of content to retrieve. If None, any content type is accepted. Defaults to None.
pop_from_item : bool, optional
If True, removes the selected content from the local platform item after retrieval. Defaults to False.
pop_from_db : bool, optional
If True, removes the selected content from the database after retrieval. Defaults to False.
workspace_name : str, optional
The name of the workspace containing the platform item, required by pop_from_db.

Raises

DataError
If no content matching the selector or content type is found in the platform item.

Returns

dict
The content item that matches the specified criteria, if any.
def getNextContentByName(self,
platform_item: dict,
content_name: str = None,
content_type: str = None,
pop_from_item=False,
pop_from_db=False,
workspace_name: str = None)
Expand source code
def getNextContentByName(self, platform_item:dict,
                         content_name:str=None, content_type:str=None,
                         pop_from_item=False, pop_from_db=False,
                         workspace_name:str = None):
    """Get the next content from a platform item by its name, optionally deleting it from the DB."""

    # Handle unspecified content name.
    if content_name is None:
        selector = None
    else:
        selector = {"by": "name", "value": content_name}

    # Get the next content.
    return self.getNextContent(
        workspace_name=workspace_name, platform_item=platform_item,
        selector=selector, content_type=content_type,
        pop_from_item=pop_from_item, pop_from_db=pop_from_db
    )

Get the next content from a platform item by its name, optionally deleting it from the DB.

def getPlatformByName(self, platformsInWorkspace: list, platform_item: dict)
Expand source code
def getPlatformByName(self, platformsInWorkspace: list, platform_item: dict):
    """Iterate over platforms in workspace looking for one who's name matches the platform in 'platform_item'."""
    one = self.find_one(platformsInWorkspace, "name", platform_item["platform"])
    if one is None:
        msg = f"No platform with name '{platform_item['platform']}' was found in the workspace."
        msg += f" Available platforms are: {[p['name'] for p in platformsInWorkspace]}"
        raise DataError(msg)
    return one

Iterate over platforms in workspace looking for one who's name matches the platform in 'platform_item'.

def getPlatformsInWorkspace(self, workspace)
Expand source code
def getPlatformsInWorkspace(self, workspace):
    # extract platform names in workspace
    platform_names = [p['platform'] for p in workspace['items']]
    # get platforms in workspace
    platforms_in_workspace = [p for p in self.platforms if p['name'] in platform_names]
    # done
    return platforms_in_workspace
def getProtocolByName(self, protocol_name: str)
Expand source code
def getProtocolByName(self, protocol_name: str):
    """Get protocol data by name"""
    try:
        result = next(d for d in self.protocols if d["name"] == protocol_name)
    except StopIteration as e:
        raise DataError(f"Protocol not found: {protocol_name}. " + \
                        f"Available protocols: {[p['name'] for p in self.protocols]}") from e
    return result

Get protocol data by name

def getProtocolObjects(self, protocol_name: str)
Expand source code
def getProtocolObjects(self, protocol_name: str):
    """Get all data objects for a given protocol by it's name."""

    if not isinstance(protocol_name, str):
        raise ValueError("protocol_name must be a string.")

    # get protocol data by name
    protocol = self.getProtocolByName(protocol_name=protocol_name)

    # get protocol's workspace data
    workspace = self.getWorkspaceByName(workspace_name = protocol['workspace'])

    # DATABASE QUERIES ############

    # get platforms in workspace
    platforms_in_workspace = self.getPlatformsInWorkspace(workspace)

    # return all
    return protocol, workspace, platforms_in_workspace

Get all data objects for a given protocol by it's name.

def getToolByName(self, tool_name: str)
Expand source code
def getToolByName(self, tool_name: str):
    """Get tool data by name"""
    result = next(d for d in self.tools if d["name"] == tool_name)
    if not result:
        raise DataError(f"Tool not found: {tool_name}")
    return result

Get tool data by name

def getWorkspaceByName(self, workspace_name: str)
Expand source code
def getWorkspaceByName(self, workspace_name: str):
    """Get protocol's workspace data"""
    result = next(d for d in self.workspaces if d["name"] == workspace_name)
    if not result:
        raise DataError(f"Workspace not found: {workspace_name}")
    return result

Get protocol's workspace data

def getWorkspaceItemByName(self, workspace: dict, item_name: str)
Expand source code
def getWorkspaceItemByName(self, workspace: dict, item_name: str):
    """Iterate over items in the workspace looking for one who's name matches 'item_name'."""
    one = self.find_one(workspace["items"], "name", item_name)
    if one is None:
        msg = f"No item with name '{item_name}' was found in workspace '{workspace['name']}'."
        msg += f" Available items are named: {[i['name'] for i in workspace['items']]}"
        raise DataError(msg)
    return one

Iterate over items in the workspace looking for one who's name matches 'item_name'.

def get_db(self) ‑> dict
Expand source code
def get_db(self) -> dict:
    """Dump the database into a python dictionary."""
    return {self.database_name: {
        "protocols": self.protocols,
        "hLprotocols": self.hl_protocols,
        "workspaces": self.workspaces,
        "platforms": self.platforms,
        "containers": self.containers,
        "tools": self.tools,
        "settings": self.settings,
    }}

Dump the database into a python dictionary.

def is_subset(self, query, target)
Expand source code
def is_subset(self, query, target):
    """
    Recursively checks if the query is a subset of the target.

    Args:
        query: A dictionary or list representing the query subset.
        target: A dictionary or list to be checked against the query.

    Returns:
        bool: True if query is a subset of target, False otherwise.

    This method uses recursion to handle nested dictionaries and lists.
    It verifies that all elements in `query` exist in `target`:
    - For dictionaries, each key-value pair in `query` must have a matching pair in `target`.
    - For lists, each item in `query` must be found in `target` in order.
    - For basic data types, it directly compares for equality.
    """
    if isinstance(query, dict) and isinstance(target, dict):
        # Ensure all items in the query dict are in the target dict
        return all(key in target and self.is_subset(query[key], target[key]) for key in query)
    elif isinstance(query, list) and isinstance(target, list):
        # Ensure all items in the query list are in the target list, in the same order
        it = iter(target)
        return all(any(self.is_subset(q_item, t_item) for t_item in it) for q_item in query)
    else:
        # Base case for non-dict, non-list types
        return type(query) is type(target) and query == target

Recursively checks if the query is a subset of the target.

Args

query
A dictionary or list representing the query subset.
target
A dictionary or list to be checked against the query.

Returns

bool
True if query is a subset of target, False otherwise.

This method uses recursion to handle nested dictionaries and lists. It verifies that all elements in query exist in target: - For dictionaries, each key-value pair in query must have a matching pair in target. - For lists, each item in query must be found in target in order. - For basic data types, it directly compares for equality.

def listContainers(self)
Expand source code
@abstractmethod
def listContainers(self):
    """Function to get 'containers' from MongoDB as a list, log names, and return them."""
    raise NotImplementedError("You must implement this method before using it.")

Function to get 'containers' from MongoDB as a list, log names, and return them.

def listHlProtocols(self)
Expand source code
@abstractmethod
def listHlProtocols(self):
    """Function to get high-level protocols from MongoDB as a list, log protocol names, and return them."""
    raise NotImplementedError("You must implement this method before using it.")

Function to get high-level protocols from MongoDB as a list, log protocol names, and return them.

def listPlatforms(self)
Expand source code
@abstractmethod
def listPlatforms(self):
    """Function to get platforms from MongoDB as a list, log names, and return them."""
    raise NotImplementedError("You must implement this method before using it.")

Function to get platforms from MongoDB as a list, log names, and return them.

def listProtocols(self)
Expand source code
@abstractmethod
def listProtocols(self):
    """Function to get protocols from MongoDB as a list, log protocol names, and return them."""
    raise NotImplementedError("You must implement this method before using it.")

Function to get protocols from MongoDB as a list, log protocol names, and return them.

def listSettings(self)
Expand source code
@abstractmethod
def listSettings(self):
    """Function to get 'settings' from MongoDB as a list, log names, and return them."""
    raise NotImplementedError("You must implement this method before using it.")

Function to get 'settings' from MongoDB as a list, log names, and return them.

def listTools(self)
Expand source code
@abstractmethod
def listTools(self):
    """Function to get tools from MongoDB as a list, log names, and return them."""
    raise NotImplementedError("You must implement this method before using it.")

Function to get tools from MongoDB as a list, log names, and return them.

def listWorkspaces(self)
Expand source code
@abstractmethod
def listWorkspaces(self):
    """Function to get workspaces from MongoDB as a list, log names, and return them."""
    raise NotImplementedError("You must implement this method before using it.")

Function to get workspaces from MongoDB as a list, log names, and return them.

def pop_content_by_idx(self, workspace_name: str, item_name: str, content_idx: int)
Expand source code
@abstractmethod
def pop_content_by_idx(self, workspace_name: str, item_name: str, content_idx: int):
    """Remove a content from a platform item by index, deleting it form the DB."""
    raise NotImplementedError("You must implement this method before using it.")

Remove a content from a platform item by index, deleting it form the DB.

def setup_db(self, database_url, database_name)
Expand source code
@abstractmethod
def setup_db(self, database_url, database_name):
    """Populate all properties with data or connections to the DB."""
    raise NotImplementedError("You must implement this method before using it.")

Populate all properties with data or connections to the DB.

def update_from_env_file(self, config: dict, env_file: str = None)
Expand source code
def update_from_env_file(self, config: dict, env_file: str = None):
    """
    Update the configuration dictionary with values from an environment file.

    This function loads environment variables from a specified `.env` file and uses them to override 
    certain keys in the provided `config` dictionary. It specifically looks for `DATABASE_URI` and 
    `DATABASE_NAME` to update the database's URL and `database_name` in the configuration.

    Args:
        config (dict): The configuration dictionary to be updated.
        env_file (str, optional): Path to the `.env` file containing environment variables.
                                  If None, the default `.env` file location will be used.

    Updates:
        - Sets `config["< database url key >"]` to a URL constructed from `DATABASE_URI`.
        - Sets `config["database_name"]` to the value of `DATABASE_NAME`.

    Logs:
        Logs the environment variables loaded from the `.env` file.

    Returns:
        tuple: A tuple containing:
            - The updated database URL (str).
            - The updated database name (str).
    """
    # Check that the file exists.
    env_path_norm = os.path.normpath(os.path.expanduser(env_file))
    if not os.path.isfile(env_path_norm):
        msg = f"Environment file not found at '{env_file}'. Current working directory is: {os.getcwd()}"
        logging.error(msg)
        raise DataError(msg)
    # Load values using "python-dotenv".
    env_vars = dotenv_values(dotenv_path=env_path_norm)
    # Parse values.
    logging.info(f"Overriding database configuration from '{env_file}' file: {str(env_vars)}")
    if "DATABASE_URI" in env_vars:
        config["database"]["database_url"] = env_vars["DATABASE_URI"]
    if "DATABASE_NAME" in env_vars:
        config["database"]["database_name"] = env_vars["DATABASE_NAME"]

    # The values were updated in place, but return them anyway.
    return config["database"]["database_url"], config["database"]["database_name"]

Update the configuration dictionary with values from an environment file.

This function loads environment variables from a specified .env file and uses them to override certain keys in the provided config dictionary. It specifically looks for DATABASE_URI and DATABASE_NAME to update the database's URL and database_name in the configuration.

Args

config : dict
The configuration dictionary to be updated.
env_file : str, optional
Path to the .env file containing environment variables. If None, the default .env file location will be used.

Updates

  • Sets config["< database url key >"] to a URL constructed from DATABASE_URI.
  • Sets config["database_name"] to the value of DATABASE_NAME.

Logs

Logs the environment variables loaded from the .env file.

Returns

tuple
A tuple containing: - The updated database URL (str). - The updated database name (str).