Module pipettin-piper.piper.datatools.datatools
Classes
class DataTools-
Expand source code
class DataTools(metaclass=ABCMeta): """A class holding and querying data about the machine, objects on it, and its configuration. Meant to be sub-classed to implement methods specific to the database backend. """ database_name: str db: dict # DATABASE SETUP ############ @abstractmethod def setup_db(self, database_url, database_name): """Populate all properties with data or connections to the DB.""" raise NotImplementedError("You must implement this method before using it.") # @abstractmethod def update_from_env_file(self, config: dict, env_file: str = None): """ Update the configuration dictionary with values from an environment file. This function loads environment variables from a specified `.env` file and uses them to override certain keys in the provided `config` dictionary. It specifically looks for `DATABASE_URI` and `DATABASE_NAME` to update the database's URL and `database_name` in the configuration. Args: config (dict): The configuration dictionary to be updated. env_file (str, optional): Path to the `.env` file containing environment variables. If None, the default `.env` file location will be used. Updates: - Sets `config["< database url key >"]` to a URL constructed from `DATABASE_URI`. - Sets `config["database_name"]` to the value of `DATABASE_NAME`. Logs: Logs the environment variables loaded from the `.env` file. Returns: tuple: A tuple containing: - The updated database URL (str). - The updated database name (str). """ # Check that the file exists. env_path_norm = os.path.normpath(os.path.expanduser(env_file)) if not os.path.isfile(env_path_norm): msg = f"Environment file not found at '{env_file}'. Current working directory is: {os.getcwd()}" logging.error(msg) raise DataError(msg) # Load values using "python-dotenv". env_vars = dotenv_values(dotenv_path=env_path_norm) # Parse values. logging.info(f"Overriding database configuration from '{env_file}' file: {str(env_vars)}") if "DATABASE_URI" in env_vars: config["database"]["database_url"] = env_vars["DATABASE_URI"] if "DATABASE_NAME" in env_vars: config["database"]["database_name"] = env_vars["DATABASE_NAME"] # The values were updated in place, but return them anyway. return config["database"]["database_url"], config["database"]["database_name"] # Main data properties #### @property def protocols(self) -> list: """protocols property""" return self.listProtocols() @property def hl_protocols(self) -> list: """High-level protocols property""" return self.listHlProtocols() @property def workspaces(self) -> list: """workspaces property""" return self.listWorkspaces() @property def platforms(self) -> list: """platforms property""" return self.listPlatforms() @property def containers(self) -> list: """containers property""" return self.listContainers() @property def tools(self) -> list: """tools property""" return self.listTools() @property def settings(self): """settings property""" return self.listSettings() #### GET METHODS #### def get_db(self) -> dict: """Dump the database into a python dictionary.""" return {self.database_name: { "protocols": self.protocols, "hLprotocols": self.hl_protocols, "workspaces": self.workspaces, "platforms": self.platforms, "containers": self.containers, "tools": self.tools, "settings": self.settings, }} @abstractmethod def listProtocols(self): """Function to get protocols from MongoDB as a list, log protocol names, and return them.""" raise NotImplementedError("You must implement this method before using it.") @abstractmethod def listHlProtocols(self): """Function to get high-level protocols from MongoDB as a list, log protocol names, and return them.""" raise NotImplementedError("You must implement this method before using it.") @abstractmethod def listWorkspaces(self): """Function to get workspaces from MongoDB as a list, log names, and return them.""" raise NotImplementedError("You must implement this method before using it.") @abstractmethod def listPlatforms(self): """Function to get platforms from MongoDB as a list, log names, and return them.""" raise NotImplementedError("You must implement this method before using it.") @abstractmethod def listContainers(self): """Function to get 'containers' from MongoDB as a list, log names, and return them.""" raise NotImplementedError("You must implement this method before using it.") @abstractmethod def listTools(self): """Function to get tools from MongoDB as a list, log names, and return them.""" raise NotImplementedError("You must implement this method before using it.") @abstractmethod def listSettings(self): """Function to get 'settings' from MongoDB as a list, log names, and return them.""" raise NotImplementedError("You must implement this method before using it.") def getPlatformsInWorkspace(self, workspace): # extract platform names in workspace platform_names = [p['platform'] for p in workspace['items']] # get platforms in workspace platforms_in_workspace = [p for p in self.platforms if p['name'] in platform_names] # done return platforms_in_workspace def getWorkspaceByName(self, workspace_name: str): """Get protocol's workspace data""" result = next(d for d in self.workspaces if d["name"] == workspace_name) if not result: raise DataError(f"Workspace not found: {workspace_name}") return result def getProtocolByName(self, protocol_name: str): """Get protocol data by name""" try: result = next(d for d in self.protocols if d["name"] == protocol_name) except StopIteration as e: raise DataError(f"Protocol not found: {protocol_name}. " + \ f"Available protocols: {[p['name'] for p in self.protocols]}") from e return result def getContainerByName(self, container_name: str): """Get protocol data by name""" result = next(d for d in self.containers if d["name"] == container_name) if not result: raise DataError(f"Container not found: {container_name}") return result def getToolByName(self, tool_name: str): """Get tool data by name""" result = next(d for d in self.tools if d["name"] == tool_name) if not result: raise DataError(f"Tool not found: {tool_name}") return result def getProtocolObjects(self, protocol_name: str): """Get all data objects for a given protocol by it's name.""" if not isinstance(protocol_name, str): raise ValueError("protocol_name must be a string.") # get protocol data by name protocol = self.getProtocolByName(protocol_name=protocol_name) # get protocol's workspace data workspace = self.getWorkspaceByName(workspace_name = protocol['workspace']) # DATABASE QUERIES ############ # get platforms in workspace platforms_in_workspace = self.getPlatformsInWorkspace(workspace) # return all return protocol, workspace, platforms_in_workspace def getWorkspaceItemByName(self, workspace: dict, item_name: str): """Iterate over items in the workspace looking for one who's name matches 'item_name'.""" one = self.find_one(workspace["items"], "name", item_name) if one is None: msg = f"No item with name '{item_name}' was found in workspace '{workspace['name']}'." msg += f" Available items are named: {[i['name'] for i in workspace['items']]}" raise DataError(msg) return one def getPlatformByName(self, platformsInWorkspace: list, platform_item: dict): """Iterate over platforms in workspace looking for one who's name matches the platform in 'platform_item'.""" one = self.find_one(platformsInWorkspace, "name", platform_item["platform"]) if one is None: msg = f"No platform with name '{platform_item['platform']}' was found in the workspace." msg += f" Available platforms are: {[p['name'] for p in platformsInWorkspace]}" raise DataError(msg) return one def getContentByName(self, content_name: str, platform_item: dict): """Iterate over contents in a platform item, looking for a content by name.""" one = self.find_one(platform_item["content"], "name", content_name) if one is None: msg = f"No content with name '{content_name}' was found in item '{platform_item['name']}'." msg += f" Available contents are named: {[c['name'] for c in platform_item['content']]}" raise DataError(msg) return one def filterContentBy(self, content: dict, selector: dict, content_type: str = None) -> bool: """Helper function to use the tube "selector" dict and, optionally, by content type. Example selector: {"by": "name", "value": "tube1"} """ result = content[selector["by"]] == selector["value"] if content_type is not None: # Handle filtering also by content type. result = result and self.filterContentByType(content, content_type) return result def filterContentByType(self, content: dict, content_type: str): """Checks if a content is of the intended type (e.g. "tip", "tube", etc.)""" if content_type is None: return True container = self.getContainerByName(content["container"]) return container["type"] == content_type def getNextContentByName(self, platform_item:dict, content_name:str=None, content_type:str=None, pop_from_item=False, pop_from_db=False, workspace_name:str = None): """Get the next content from a platform item by its name, optionally deleting it from the DB.""" # Handle unspecified content name. if content_name is None: selector = None else: selector = {"by": "name", "value": content_name} # Get the next content. return self.getNextContent( workspace_name=workspace_name, platform_item=platform_item, selector=selector, content_type=content_type, pop_from_item=pop_from_item, pop_from_db=pop_from_db ) def getNextContent(self, platform_item:dict, selector:dict=None, content_type:str=None, pop_from_item=False, pop_from_db=False, workspace_name:str = None): """ Retrieve the next content from a specified platform item within a workspace, with options to filter by criteria, delete locally, and/or delete from the database. Args: platform_item (dict): The platform item dictionary from which to retrieve content. selector (dict, optional): Criteria for filtering the content to retrieve. Defaults to None, in which case the next first content is selected. content_type (str, optional): Type of content to retrieve. If None, any content type is accepted. Defaults to None. pop_from_item (bool, optional): If True, removes the selected content from the local platform item after retrieval. Defaults to False. pop_from_db (bool, optional): If True, removes the selected content from the database after retrieval. Defaults to False. workspace_name (str, optional): The name of the workspace containing the platform item, required by `pop_from_db`. Raises: DataError: If no content matching the selector or content type is found in the platform item. Returns: dict: The content item that matches the specified criteria, if any. """ logging.debug(f"Getting next content from item '{platform_item['name']}'.") item_contents: list = platform_item["content"] if selector is not None: # Get the content by name. try: i, next_content = next( (i, content) for i, content in enumerate(item_contents) if self.filterContentBy(content, selector, content_type) ) except StopIteration as e: msg = f"No content with selector {selector} was found in item '{platform_item.get('name')}'." raise DataError(msg) from e if pop_from_item: logging.info(f"Deleting content with index {i} from item '{platform_item['name']}' locally.") item_contents.pop(i) else: # Get any content. try: i, next_content = next( (i, content) for i, content in enumerate(item_contents) if self.filterContentByType(content, content_type) ) except StopIteration as e: msg = f"No contents available in '{platform_item.get('name')}'." raise DataError(msg) from e if pop_from_item: logging.info(f"Deleting content with index {i} from item '{platform_item['name']}' locally.") item_contents.pop(i) # Delete the content from the DB. if pop_from_db: if not workspace_name: msg = "A workspace name is required to pop a content from an item in the database." logging.error(msg) raise DataError(msg) logging.info(f"Deleting content with index {i} from item '{platform_item['name']}' in the database.") self.pop_content_by_idx(workspace_name, platform_item["name"], content_idx=i) return next_content def getActionDataBy(self, action_id: str, id_field: str): """Find an action by its MongoDB ObjectID, and return its content, index, and parent protocol.""" logging.debug(f"Getting action data for action_id={action_id}") for p in self.protocols: logging.debug(f"Searching for action in protocol '{p['name']}'.") for i, a in enumerate(p["actions"]): if str(a[id_field]) == str(action_id): return p, a, i logging.warning(f"No data found for action {id_field}={action_id} returning None.") return None, None, None #### POP METHODS #### @abstractmethod def pop_content_by_idx(self, workspace_name: str, item_name: str, content_idx: int): """Remove a content from a platform item by index, deleting it form the DB.""" raise NotImplementedError("You must implement this method before using it.") # Utility methods #### @staticmethod def find(data: list, key, value) -> list: """Filter a list of dictionaries by a key's value Also works if the items in data are lists and key is a valid index. """ return [d for d in data if d[key] == value] def find_one(self, data: list, key, value): """Filter a list of dictionaries by a key's value and get one value 'None' is returned on either no matches or multiple matches. """ found = self.find(data, key, value) if len(found) == 1: return found[0] return None def is_subset(self, query, target): """ Recursively checks if the query is a subset of the target. Args: query: A dictionary or list representing the query subset. target: A dictionary or list to be checked against the query. Returns: bool: True if query is a subset of target, False otherwise. This method uses recursion to handle nested dictionaries and lists. It verifies that all elements in `query` exist in `target`: - For dictionaries, each key-value pair in `query` must have a matching pair in `target`. - For lists, each item in `query` must be found in `target` in order. - For basic data types, it directly compares for equality. """ if isinstance(query, dict) and isinstance(target, dict): # Ensure all items in the query dict are in the target dict return all(key in target and self.is_subset(query[key], target[key]) for key in query) elif isinstance(query, list) and isinstance(target, list): # Ensure all items in the query list are in the target list, in the same order it = iter(target) return all(any(self.is_subset(q_item, t_item) for t_item in it) for q_item in query) else: # Base case for non-dict, non-list types return type(query) is type(target) and query == target def find_matches(self, query_dict, dicts): """ Finds which dictionaries in a list match the query_dict structure. Args: query_dict: A dictionary used as the subset criteria. dicts: A list of dictionaries to search. Returns: list: A list containing boolean values where each value corresponds to whether the respective dictionary in `dicts` matches the query_dict. This method returns a boolean match for each dictionary in `dicts`, indicating whether `query_dict` is a subset of each respective dictionary. """ return [self.is_subset(query_dict, d) for d in dicts] def find_match(self, query_dict, dicts): """ Filters and returns dictionaries from a list that contain query_dict as a subset. Args: query_dict: A dictionary representing the query subset. dicts: A list of dictionaries to search. Returns: list: A list of dictionaries from `dicts` that contain `query_dict` as a subset. This method uses `is_subset` to check each dictionary in `dicts` and returns only those dictionaries where `query_dict` is a subset. """ return [d for d in dicts if self.is_subset(query_dict, d)] @staticmethod def update_nested(obj, selectors: list, new_value): """ Updates a nested object using an array of selectors to reach the target. Args: obj (dict or list): The original dictionary or list to update. selectors (list): A list of keys/indices to access the nested target. new_value: The new value to set at the specified location. Returns: None: The original object is modified in place. Example usage: >>> data = {"a": [5, 6, {"k": 3}]} >>> selectors = ["a", 2, "k"] >>> update_nested(data, selectors, 10) >>> print(data) # Output should be: {'a': [5, 6, {'k': 10}]} """ for key in selectors[:-1]: # Traverse dictionaries or lists obj = obj[key] # Set the value at the deepest level obj[selectors[-1]] = new_valueA class holding and querying data about the machine, objects on it, and its configuration. Meant to be sub-classed to implement methods specific to the database backend.
Subclasses
Class variables
var database_name : strvar db : dict
Static methods
def find(data: list, key, value) ‑> list-
Expand source code
@staticmethod def find(data: list, key, value) -> list: """Filter a list of dictionaries by a key's value Also works if the items in data are lists and key is a valid index. """ return [d for d in data if d[key] == value]Filter a list of dictionaries by a key's value Also works if the items in data are lists and key is a valid index.
def update_nested(obj, selectors: list, new_value)-
Expand source code
@staticmethod def update_nested(obj, selectors: list, new_value): """ Updates a nested object using an array of selectors to reach the target. Args: obj (dict or list): The original dictionary or list to update. selectors (list): A list of keys/indices to access the nested target. new_value: The new value to set at the specified location. Returns: None: The original object is modified in place. Example usage: >>> data = {"a": [5, 6, {"k": 3}]} >>> selectors = ["a", 2, "k"] >>> update_nested(data, selectors, 10) >>> print(data) # Output should be: {'a': [5, 6, {'k': 10}]} """ for key in selectors[:-1]: # Traverse dictionaries or lists obj = obj[key] # Set the value at the deepest level obj[selectors[-1]] = new_valueUpdates a nested object using an array of selectors to reach the target.
Args
obj:dictorlist- The original dictionary or list to update.
selectors:list- A list of keys/indices to access the nested target.
new_value- The new value to set at the specified location.
Returns
None- The original object is modified in place.
Example usage:
>>> data = {"a": [5, 6, {"k": 3}]} >>> selectors = ["a", 2, "k"] >>> update_nested(data, selectors, 10) >>> print(data) # Output should be: {'a': [5, 6, {'k': 10}]}
Instance variables
prop containers : list-
Expand source code
@property def containers(self) -> list: """containers property""" return self.listContainers()containers property
prop hl_protocols : list-
Expand source code
@property def hl_protocols(self) -> list: """High-level protocols property""" return self.listHlProtocols()High-level protocols property
prop platforms : list-
Expand source code
@property def platforms(self) -> list: """platforms property""" return self.listPlatforms()platforms property
prop protocols : list-
Expand source code
@property def protocols(self) -> list: """protocols property""" return self.listProtocols()protocols property
prop settings-
Expand source code
@property def settings(self): """settings property""" return self.listSettings()settings property
prop tools : list-
Expand source code
@property def tools(self) -> list: """tools property""" return self.listTools()tools property
prop workspaces : list-
Expand source code
@property def workspaces(self) -> list: """workspaces property""" return self.listWorkspaces()workspaces property
Methods
def filterContentBy(self, content: dict, selector: dict, content_type: str = None) ‑> bool-
Expand source code
def filterContentBy(self, content: dict, selector: dict, content_type: str = None) -> bool: """Helper function to use the tube "selector" dict and, optionally, by content type. Example selector: {"by": "name", "value": "tube1"} """ result = content[selector["by"]] == selector["value"] if content_type is not None: # Handle filtering also by content type. result = result and self.filterContentByType(content, content_type) return resultHelper function to use the tube "selector" dict and, optionally, by content type. Example selector: {"by": "name", "value": "tube1"}
def filterContentByType(self, content: dict, content_type: str)-
Expand source code
def filterContentByType(self, content: dict, content_type: str): """Checks if a content is of the intended type (e.g. "tip", "tube", etc.)""" if content_type is None: return True container = self.getContainerByName(content["container"]) return container["type"] == content_typeChecks if a content is of the intended type (e.g. "tip", "tube", etc.)
def find_match(self, query_dict, dicts)-
Expand source code
def find_match(self, query_dict, dicts): """ Filters and returns dictionaries from a list that contain query_dict as a subset. Args: query_dict: A dictionary representing the query subset. dicts: A list of dictionaries to search. Returns: list: A list of dictionaries from `dicts` that contain `query_dict` as a subset. This method uses `is_subset` to check each dictionary in `dicts` and returns only those dictionaries where `query_dict` is a subset. """ return [d for d in dicts if self.is_subset(query_dict, d)]Filters and returns dictionaries from a list that contain query_dict as a subset.
Args
query_dict- A dictionary representing the query subset.
dicts- A list of dictionaries to search.
Returns
list- A list of dictionaries from
dictsthat containquery_dictas a subset.
This method uses
is_subsetto check each dictionary indictsand returns only those dictionaries wherequery_dictis a subset. def find_matches(self, query_dict, dicts)-
Expand source code
def find_matches(self, query_dict, dicts): """ Finds which dictionaries in a list match the query_dict structure. Args: query_dict: A dictionary used as the subset criteria. dicts: A list of dictionaries to search. Returns: list: A list containing boolean values where each value corresponds to whether the respective dictionary in `dicts` matches the query_dict. This method returns a boolean match for each dictionary in `dicts`, indicating whether `query_dict` is a subset of each respective dictionary. """ return [self.is_subset(query_dict, d) for d in dicts]Finds which dictionaries in a list match the query_dict structure.
Args
query_dict- A dictionary used as the subset criteria.
dicts- A list of dictionaries to search.
Returns
list- A list containing boolean values where each value corresponds
to whether the respective dictionary in
dictsmatches the query_dict.
This method returns a boolean match for each dictionary in
dicts, indicating whetherquery_dictis a subset of each respective dictionary. def find_one(self, data: list, key, value)-
Expand source code
def find_one(self, data: list, key, value): """Filter a list of dictionaries by a key's value and get one value 'None' is returned on either no matches or multiple matches. """ found = self.find(data, key, value) if len(found) == 1: return found[0] return NoneFilter a list of dictionaries by a key's value and get one value 'None' is returned on either no matches or multiple matches.
def getActionDataBy(self, action_id: str, id_field: str)-
Expand source code
def getActionDataBy(self, action_id: str, id_field: str): """Find an action by its MongoDB ObjectID, and return its content, index, and parent protocol.""" logging.debug(f"Getting action data for action_id={action_id}") for p in self.protocols: logging.debug(f"Searching for action in protocol '{p['name']}'.") for i, a in enumerate(p["actions"]): if str(a[id_field]) == str(action_id): return p, a, i logging.warning(f"No data found for action {id_field}={action_id} returning None.") return None, None, NoneFind an action by its MongoDB ObjectID, and return its content, index, and parent protocol.
def getContainerByName(self, container_name: str)-
Expand source code
def getContainerByName(self, container_name: str): """Get protocol data by name""" result = next(d for d in self.containers if d["name"] == container_name) if not result: raise DataError(f"Container not found: {container_name}") return resultGet protocol data by name
def getContentByName(self, content_name: str, platform_item: dict)-
Expand source code
def getContentByName(self, content_name: str, platform_item: dict): """Iterate over contents in a platform item, looking for a content by name.""" one = self.find_one(platform_item["content"], "name", content_name) if one is None: msg = f"No content with name '{content_name}' was found in item '{platform_item['name']}'." msg += f" Available contents are named: {[c['name'] for c in platform_item['content']]}" raise DataError(msg) return oneIterate over contents in a platform item, looking for a content by name.
def getNextContent(self,
platform_item: dict,
selector: dict = None,
content_type: str = None,
pop_from_item=False,
pop_from_db=False,
workspace_name: str = None)-
Expand source code
def getNextContent(self, platform_item:dict, selector:dict=None, content_type:str=None, pop_from_item=False, pop_from_db=False, workspace_name:str = None): """ Retrieve the next content from a specified platform item within a workspace, with options to filter by criteria, delete locally, and/or delete from the database. Args: platform_item (dict): The platform item dictionary from which to retrieve content. selector (dict, optional): Criteria for filtering the content to retrieve. Defaults to None, in which case the next first content is selected. content_type (str, optional): Type of content to retrieve. If None, any content type is accepted. Defaults to None. pop_from_item (bool, optional): If True, removes the selected content from the local platform item after retrieval. Defaults to False. pop_from_db (bool, optional): If True, removes the selected content from the database after retrieval. Defaults to False. workspace_name (str, optional): The name of the workspace containing the platform item, required by `pop_from_db`. Raises: DataError: If no content matching the selector or content type is found in the platform item. Returns: dict: The content item that matches the specified criteria, if any. """ logging.debug(f"Getting next content from item '{platform_item['name']}'.") item_contents: list = platform_item["content"] if selector is not None: # Get the content by name. try: i, next_content = next( (i, content) for i, content in enumerate(item_contents) if self.filterContentBy(content, selector, content_type) ) except StopIteration as e: msg = f"No content with selector {selector} was found in item '{platform_item.get('name')}'." raise DataError(msg) from e if pop_from_item: logging.info(f"Deleting content with index {i} from item '{platform_item['name']}' locally.") item_contents.pop(i) else: # Get any content. try: i, next_content = next( (i, content) for i, content in enumerate(item_contents) if self.filterContentByType(content, content_type) ) except StopIteration as e: msg = f"No contents available in '{platform_item.get('name')}'." raise DataError(msg) from e if pop_from_item: logging.info(f"Deleting content with index {i} from item '{platform_item['name']}' locally.") item_contents.pop(i) # Delete the content from the DB. if pop_from_db: if not workspace_name: msg = "A workspace name is required to pop a content from an item in the database." logging.error(msg) raise DataError(msg) logging.info(f"Deleting content with index {i} from item '{platform_item['name']}' in the database.") self.pop_content_by_idx(workspace_name, platform_item["name"], content_idx=i) return next_contentRetrieve the next content from a specified platform item within a workspace, with options to filter by criteria, delete locally, and/or delete from the database.
Args
platform_item:dict- The platform item dictionary from which to retrieve content.
selector:dict, optional- Criteria for filtering the content to retrieve. Defaults to None, in which case the next first content is selected.
content_type:str, optional- Type of content to retrieve. If None, any content type is accepted. Defaults to None.
pop_from_item:bool, optional- If True, removes the selected content from the local platform item after retrieval. Defaults to False.
pop_from_db:bool, optional- If True, removes the selected content from the database after retrieval. Defaults to False.
workspace_name:str, optional- The name of the workspace containing the platform item, required by
pop_from_db.
Raises
DataError- If no content matching the selector or content type is found in the platform item.
Returns
dict- The content item that matches the specified criteria, if any.
def getNextContentByName(self,
platform_item: dict,
content_name: str = None,
content_type: str = None,
pop_from_item=False,
pop_from_db=False,
workspace_name: str = None)-
Expand source code
def getNextContentByName(self, platform_item:dict, content_name:str=None, content_type:str=None, pop_from_item=False, pop_from_db=False, workspace_name:str = None): """Get the next content from a platform item by its name, optionally deleting it from the DB.""" # Handle unspecified content name. if content_name is None: selector = None else: selector = {"by": "name", "value": content_name} # Get the next content. return self.getNextContent( workspace_name=workspace_name, platform_item=platform_item, selector=selector, content_type=content_type, pop_from_item=pop_from_item, pop_from_db=pop_from_db )Get the next content from a platform item by its name, optionally deleting it from the DB.
def getPlatformByName(self, platformsInWorkspace: list, platform_item: dict)-
Expand source code
def getPlatformByName(self, platformsInWorkspace: list, platform_item: dict): """Iterate over platforms in workspace looking for one who's name matches the platform in 'platform_item'.""" one = self.find_one(platformsInWorkspace, "name", platform_item["platform"]) if one is None: msg = f"No platform with name '{platform_item['platform']}' was found in the workspace." msg += f" Available platforms are: {[p['name'] for p in platformsInWorkspace]}" raise DataError(msg) return oneIterate over platforms in workspace looking for one who's name matches the platform in 'platform_item'.
def getPlatformsInWorkspace(self, workspace)-
Expand source code
def getPlatformsInWorkspace(self, workspace): # extract platform names in workspace platform_names = [p['platform'] for p in workspace['items']] # get platforms in workspace platforms_in_workspace = [p for p in self.platforms if p['name'] in platform_names] # done return platforms_in_workspace def getProtocolByName(self, protocol_name: str)-
Expand source code
def getProtocolByName(self, protocol_name: str): """Get protocol data by name""" try: result = next(d for d in self.protocols if d["name"] == protocol_name) except StopIteration as e: raise DataError(f"Protocol not found: {protocol_name}. " + \ f"Available protocols: {[p['name'] for p in self.protocols]}") from e return resultGet protocol data by name
def getProtocolObjects(self, protocol_name: str)-
Expand source code
def getProtocolObjects(self, protocol_name: str): """Get all data objects for a given protocol by it's name.""" if not isinstance(protocol_name, str): raise ValueError("protocol_name must be a string.") # get protocol data by name protocol = self.getProtocolByName(protocol_name=protocol_name) # get protocol's workspace data workspace = self.getWorkspaceByName(workspace_name = protocol['workspace']) # DATABASE QUERIES ############ # get platforms in workspace platforms_in_workspace = self.getPlatformsInWorkspace(workspace) # return all return protocol, workspace, platforms_in_workspaceGet all data objects for a given protocol by it's name.
def getToolByName(self, tool_name: str)-
Expand source code
def getToolByName(self, tool_name: str): """Get tool data by name""" result = next(d for d in self.tools if d["name"] == tool_name) if not result: raise DataError(f"Tool not found: {tool_name}") return resultGet tool data by name
def getWorkspaceByName(self, workspace_name: str)-
Expand source code
def getWorkspaceByName(self, workspace_name: str): """Get protocol's workspace data""" result = next(d for d in self.workspaces if d["name"] == workspace_name) if not result: raise DataError(f"Workspace not found: {workspace_name}") return resultGet protocol's workspace data
def getWorkspaceItemByName(self, workspace: dict, item_name: str)-
Expand source code
def getWorkspaceItemByName(self, workspace: dict, item_name: str): """Iterate over items in the workspace looking for one who's name matches 'item_name'.""" one = self.find_one(workspace["items"], "name", item_name) if one is None: msg = f"No item with name '{item_name}' was found in workspace '{workspace['name']}'." msg += f" Available items are named: {[i['name'] for i in workspace['items']]}" raise DataError(msg) return oneIterate over items in the workspace looking for one who's name matches 'item_name'.
def get_db(self) ‑> dict-
Expand source code
def get_db(self) -> dict: """Dump the database into a python dictionary.""" return {self.database_name: { "protocols": self.protocols, "hLprotocols": self.hl_protocols, "workspaces": self.workspaces, "platforms": self.platforms, "containers": self.containers, "tools": self.tools, "settings": self.settings, }}Dump the database into a python dictionary.
def is_subset(self, query, target)-
Expand source code
def is_subset(self, query, target): """ Recursively checks if the query is a subset of the target. Args: query: A dictionary or list representing the query subset. target: A dictionary or list to be checked against the query. Returns: bool: True if query is a subset of target, False otherwise. This method uses recursion to handle nested dictionaries and lists. It verifies that all elements in `query` exist in `target`: - For dictionaries, each key-value pair in `query` must have a matching pair in `target`. - For lists, each item in `query` must be found in `target` in order. - For basic data types, it directly compares for equality. """ if isinstance(query, dict) and isinstance(target, dict): # Ensure all items in the query dict are in the target dict return all(key in target and self.is_subset(query[key], target[key]) for key in query) elif isinstance(query, list) and isinstance(target, list): # Ensure all items in the query list are in the target list, in the same order it = iter(target) return all(any(self.is_subset(q_item, t_item) for t_item in it) for q_item in query) else: # Base case for non-dict, non-list types return type(query) is type(target) and query == targetRecursively checks if the query is a subset of the target.
Args
query- A dictionary or list representing the query subset.
target- A dictionary or list to be checked against the query.
Returns
bool- True if query is a subset of target, False otherwise.
This method uses recursion to handle nested dictionaries and lists. It verifies that all elements in
queryexist intarget: - For dictionaries, each key-value pair inquerymust have a matching pair intarget. - For lists, each item inquerymust be found intargetin order. - For basic data types, it directly compares for equality. def listContainers(self)-
Expand source code
@abstractmethod def listContainers(self): """Function to get 'containers' from MongoDB as a list, log names, and return them.""" raise NotImplementedError("You must implement this method before using it.")Function to get 'containers' from MongoDB as a list, log names, and return them.
def listHlProtocols(self)-
Expand source code
@abstractmethod def listHlProtocols(self): """Function to get high-level protocols from MongoDB as a list, log protocol names, and return them.""" raise NotImplementedError("You must implement this method before using it.")Function to get high-level protocols from MongoDB as a list, log protocol names, and return them.
def listPlatforms(self)-
Expand source code
@abstractmethod def listPlatforms(self): """Function to get platforms from MongoDB as a list, log names, and return them.""" raise NotImplementedError("You must implement this method before using it.")Function to get platforms from MongoDB as a list, log names, and return them.
def listProtocols(self)-
Expand source code
@abstractmethod def listProtocols(self): """Function to get protocols from MongoDB as a list, log protocol names, and return them.""" raise NotImplementedError("You must implement this method before using it.")Function to get protocols from MongoDB as a list, log protocol names, and return them.
def listSettings(self)-
Expand source code
@abstractmethod def listSettings(self): """Function to get 'settings' from MongoDB as a list, log names, and return them.""" raise NotImplementedError("You must implement this method before using it.")Function to get 'settings' from MongoDB as a list, log names, and return them.
def listTools(self)-
Expand source code
@abstractmethod def listTools(self): """Function to get tools from MongoDB as a list, log names, and return them.""" raise NotImplementedError("You must implement this method before using it.")Function to get tools from MongoDB as a list, log names, and return them.
def listWorkspaces(self)-
Expand source code
@abstractmethod def listWorkspaces(self): """Function to get workspaces from MongoDB as a list, log names, and return them.""" raise NotImplementedError("You must implement this method before using it.")Function to get workspaces from MongoDB as a list, log names, and return them.
def pop_content_by_idx(self, workspace_name: str, item_name: str, content_idx: int)-
Expand source code
@abstractmethod def pop_content_by_idx(self, workspace_name: str, item_name: str, content_idx: int): """Remove a content from a platform item by index, deleting it form the DB.""" raise NotImplementedError("You must implement this method before using it.")Remove a content from a platform item by index, deleting it form the DB.
def setup_db(self, database_url, database_name)-
Expand source code
@abstractmethod def setup_db(self, database_url, database_name): """Populate all properties with data or connections to the DB.""" raise NotImplementedError("You must implement this method before using it.")Populate all properties with data or connections to the DB.
def update_from_env_file(self, config: dict, env_file: str = None)-
Expand source code
def update_from_env_file(self, config: dict, env_file: str = None): """ Update the configuration dictionary with values from an environment file. This function loads environment variables from a specified `.env` file and uses them to override certain keys in the provided `config` dictionary. It specifically looks for `DATABASE_URI` and `DATABASE_NAME` to update the database's URL and `database_name` in the configuration. Args: config (dict): The configuration dictionary to be updated. env_file (str, optional): Path to the `.env` file containing environment variables. If None, the default `.env` file location will be used. Updates: - Sets `config["< database url key >"]` to a URL constructed from `DATABASE_URI`. - Sets `config["database_name"]` to the value of `DATABASE_NAME`. Logs: Logs the environment variables loaded from the `.env` file. Returns: tuple: A tuple containing: - The updated database URL (str). - The updated database name (str). """ # Check that the file exists. env_path_norm = os.path.normpath(os.path.expanduser(env_file)) if not os.path.isfile(env_path_norm): msg = f"Environment file not found at '{env_file}'. Current working directory is: {os.getcwd()}" logging.error(msg) raise DataError(msg) # Load values using "python-dotenv". env_vars = dotenv_values(dotenv_path=env_path_norm) # Parse values. logging.info(f"Overriding database configuration from '{env_file}' file: {str(env_vars)}") if "DATABASE_URI" in env_vars: config["database"]["database_url"] = env_vars["DATABASE_URI"] if "DATABASE_NAME" in env_vars: config["database"]["database_name"] = env_vars["DATABASE_NAME"] # The values were updated in place, but return them anyway. return config["database"]["database_url"], config["database"]["database_name"]Update the configuration dictionary with values from an environment file.
This function loads environment variables from a specified
.envfile and uses them to override certain keys in the providedconfigdictionary. It specifically looks forDATABASE_URIandDATABASE_NAMEto update the database's URL anddatabase_namein the configuration.Args
config:dict- The configuration dictionary to be updated.
env_file:str, optional- Path to the
.envfile containing environment variables. If None, the default.envfile location will be used.
Updates
- Sets
config["< database url key >"]to a URL constructed fromDATABASE_URI. - Sets
config["database_name"]to the value ofDATABASE_NAME.
Logs
Logs the environment variables loaded from the
.envfile.Returns
tuple- A tuple containing: - The updated database URL (str). - The updated database name (str).