Module pipettin-piper.piper.datatools.mongo
Functions
def load_datatools(controller: Controller)-
Expand source code
def load_datatools(controller: Controller): if controller.database_tools: raise DataError("Database tools already set in controller to: " + str(controller.database_tools)) # Set the databse_tools property to an instance of MongoObjects. controller.database_tools = MongoObjects(controller=controller) return controller.database_tools def load_from_config(**db_config: dict)-
Expand source code
def load_from_config(**db_config : dict): return MongoObjects(**db_config)
Classes
class MongoObjects (database_url=None, database_name=None, env_file=None, controller=None, verbose=True)-
Expand source code
class MongoObjects(DataTools): """A class holding and querying data about the machine, objects on it, and its configuration. Requires MongoDB as a data backend. """ verbose: bool = False controller: Controller database_url: str = 'localhost:27017' database_name: str = 'pipettin' env_file: str """Path to the UI's '.env' file, containing DATABASE_NAME and DATABASE_URI variables.""" def __init__(self, database_url=None, database_name=None, env_file=None, controller=None, verbose=True): self.verbose = verbose self.config = {"database": { "database_url": self.database_url, "database_name": self.database_name } } # Get the config from the controller, if any. if controller: self.controller = controller self.verbose = controller.verbose # Update connection details. self.config["database"].update(controller.config["database"]) # Use input argument if set, else use the config. if env_file: self.env_file = env_file else: self.env_file = self.config["database"].get("env_file", None) # Load variables from ".env" file. if self.env_file is not None: self.update_from_env_file(self.config, self.env_file) # Set "database_url" from config, defaulting to input arguments. self.database_url = self.config["database"]["database_url"] self.database_name = self.config["database"]["database_name"] # Override connection details from init arguments. if database_url: self.database_url = database_url if database_name: self.database_name = database_name self.setup_db(self.database_url, self.database_name) def make_mongo_url(self, database_url): """Compose a MongoDB connection URL from the database URL""" return f'mongodb://{database_url}/' @property def mongo_url(self): """Current MongoDB connection URL""" return self.make_mongo_url(self.database_url) # DATABASE SETUP ############ def setup_db(self, database_url, database_name): """Establish a connection to the MongoDB database and configure collections. This method sets up the connection to a MongoDB instance using the provided or default `database_url` and `database_name`. It initializes various collections, ensuring that the appropriate indexes are created. If the `database_url` or `database_name` is not provided, the method uses the default values stored in the instance. Args: database_url (str): The MongoDB connection address. database_name (str): The name of the database to connect to. Raises: pymongo.errors.InvalidName: Raised if the provided `database_name` is invalid. pymongo.errors.DuplicateKeyError: Raised when duplicate keys are found while creating indexes. DataError: Raised if there are errors during the connection or index creation process. """ try: # Connect and select database name. mongo_url = self.make_mongo_url(database_url) logging.info(f"Connecting to mongodb at: '{mongo_url}'") self.client = pymongo.MongoClient(mongo_url) logging.info(f"Connected to database: '{database_name}'") # The databases in the MongoClient are accessed with a dict-style syntax. self.db = self.client[database_name] # Collection names. self.collectionProtocols: Collection = self.db['protocols'] self.collectionHlProtocols: Collection = self.db['hLprotocols'] self.collectionWorkspaces: Collection = self.db['workspaces'] self.collectionPlatforms: Collection = self.db['platforms'] self.collectionContainers: Collection = self.db['containers'] self.collectionTools: Collection = self.db['tools'] self.collectionSettings: Collection = self.db['settings'] try: # Create a unique index by name and workspace on "hLprotocols". # See: https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.create_index self.collectionHlProtocols.create_index( keys=[('name', pymongo.ASCENDING), ('workspace', pymongo.ASCENDING)], unique=True) # Ensure unique indexes in the rest of the collections. self.collectionProtocols.create_index([('name', pymongo.ASCENDING)], unique=True) self.collectionWorkspaces.create_index([('name', pymongo.ASCENDING)], unique=True) self.collectionPlatforms.create_index([('name', pymongo.ASCENDING)], unique=True) self.collectionContainers.create_index([('name', pymongo.ASCENDING)], unique=True) self.collectionTools.create_index([('name', pymongo.ASCENDING)], unique=True) self.collectionSettings.create_index([('name', pymongo.ASCENDING)], unique=True) except pymongo.errors.DuplicateKeyError as e: logging.error(f"Failed to ensure indexes on database {database_name}. You have duplicate object IDs: {e}") except Exception as e: msg = f"Failed to ensure index with error: {e}" logging.error(msg) raise DataError(msg) from e except pymongo.errors.InvalidName as e: msg = f"Invalid database name '{database_name}' ({e})." logging.error(msg) except DataError as e: logging.error(f"Failed connect to the database or to load objects from it: {e}\n" + traceback.format_exc()) raise e # Main class methods #### #### POP METHODS #### def pop_content_by_idx(self, workspace_name: str, item_name: str, content_idx: int): """Remove a content from a platform item by index, deleting it form the DB.""" try: workspace = self.getWorkspaceByName(workspace_name) item = self.getWorkspaceItemByName(workspace=workspace, item_name=item_name) content = item["content"][content_idx] # Delete. # mongo_id = content["_id"] # result = self.db.workspaces.find_one_and_delete( # # {"_id": ObjectId(str(mongo_id))} # # { "items.content._id": ObjectId(str(mongo_id)) } # {"name": workspace_name, "items.name": item_name, "items.content.name": "tip1"} # ) # Delete by update. result = self.db.workspaces.update_one( { "name": workspace_name, "items.name": item_name }, { "$pull": { "items.$.content": content } } ) result = result.modified_count # Find one and delete by update. # result = self.db.workspaces.find_one_and_update( # { "name": workspace_name, "items.name": item_name }, # { "$pull": { "items.$.content": content } }, # return_document=True # This returns the document after the update # ) if result: logging.debug(f"Deleted content from the DB: {content}") else: logging.warning(f"Failed to delete content: {content}") except Exception as e: msg = f"Failed to pull content {content_idx}" msg += f" from item '{item_name}' in workspace '{workspace_name}'. Error: {e}" logging.error(msg) raise DataError(msg) from e return result #### GET METHODS #### def listProtocols(self): """Method to get protocols from MongoDB as a list, log protocol names, and return them.""" protocols = list(self.collectionProtocols.find()) logging.debug(f"Found the following protocols: {[p['name'] for p in protocols]}") return protocols def listHlProtocols(self): """Method to get high-level protocols from MongoDB as a list, log protocol names, and return them.""" hl_protocols = list(self.collectionHlProtocols.find()) logging.debug(f"Found the following high-level protocols: {[p['name'] for p in hl_protocols]}") return hl_protocols def listWorkspaces(self): """Method to get workspaces from MongoDB as a list, log names, and return them.""" workspaces = list(self.collectionWorkspaces.find()) logging.debug(f"Found the following workspaces: {[w['name'] for w in workspaces]}") return workspaces def listPlatforms(self): """Method to get platforms from MongoDB as a list, log names, and return them.""" platforms = list(self.collectionPlatforms.find()) logging.debug(f"Found the following platforms: {[pl['name'] for pl in platforms]}") return platforms def listContainers(self): """Method to get 'containers' from MongoDB as a list, log names, and return them.""" containers = list(self.collectionContainers.find()) logging.debug(f"Found the following containers: {[t['name'] for t in containers]}") return containers def listTools(self): """Method to get tools from MongoDB as a list, log names, and return them.""" tools = list(self.collectionTools.find()) logging.debug(f"Found the following tools: {[t['name'] for t in tools]}") return tools def listSettings(self): """Method to get 'settings' from MongoDB as a list, log names, and return them.""" settings = list(self.collectionSettings.find()) logging.debug(f"Found {len(settings)} set(s) of settings.") if settings is None: pass elif len(settings) > 1: logging.debug("More than one set of settings was retreived from the database.") return settings #### CUSTOM UPDATE METHODS #### def updateObject(self, collection_name: str, document_id: str, field: str, data): """Find and update a field in an object from a MongoDB collection, by the objects MongoDB ID Test: m.updateObject( collection="protocols", oid=action["_id"], field="actions.0.sarasa", data={"hola": 3} ) References: - https://www.w3schools.com/python/python_mongodb_update.asp - https://stackoverflow.com/a/4374288/11524079 - https://www.mongodb.com/docs/manual/reference/operator/update/set/ - https://www.mongodb.com/docs/manual/core/document/#std-label-document-dot-notation - https://stackoverflow.com/a/46608956/11524079 """ # Get the collection. collection = self.db[collection_name] # Alt2: collection.find_one_and_update({"_id": ObjectId(str(document_id))}, {"$set": {field: data}}) def updateActionBy(self, action: dict, protocol_name: str, new_data: dict, field: str = None, find_by: str = "index"): """Update an action's data""" logging.info(f"Updating action with {find_by}={action[find_by]} from protocol '{protocol_name}' with new_data={new_data} and field={field}") # Get the collection. protocols = self.db["protocols"] # Target field (mongo syntax). target = "actions.$" if field is not None: target += f".{field}" # Update. result = protocols.update_one( { "name": protocol_name, f"actions.{find_by}": action[find_by] }, {"$set": {target: new_data}} ) # Handle failure to update. if not result.modified_count: msg = f"Failed to update action by {find_by}={action[find_by]}." logging.error(msg) raise DataError(msg) def updateObjectBy(self, query: dict, collection_name: str, data: dict): """Find one object in a collection by the 'query' selector and update its data""" # Get the collection. collection: Collection = self.db[collection_name] # Find one object and update it. collection.find_one_and_update(query, {'$set': data}) def updateObjectByName(self, collection_name: str, document_name: str, data: dict): """Find one object in a collection by the 'name' key and update its data""" # Select an object by its "name" key and update it. self.updateObjectBy(query={"name": document_name}, collection_name=collection_name, data=data) def updateToolByName(self, tool_name: str, tool_data: dict): """Find one tool by the 'name' key and update its data""" # Select a tool by its "name" key and update it. self.updateObjectByName(collection_name="tools", document_name=tool_name, data=tool_data)A class holding and querying data about the machine, objects on it, and its configuration. Requires MongoDB as a data backend.
Ancestors
Class variables
var controller : Controllervar database_name : strvar database_url : strvar env_file : str-
Path to the UI's '.env' file, containing DATABASE_NAME and DATABASE_URI variables.
var verbose : bool
Instance variables
prop mongo_url-
Expand source code
@property def mongo_url(self): """Current MongoDB connection URL""" return self.make_mongo_url(self.database_url)Current MongoDB connection URL
Methods
def listContainers(self)-
Expand source code
def listContainers(self): """Method to get 'containers' from MongoDB as a list, log names, and return them.""" containers = list(self.collectionContainers.find()) logging.debug(f"Found the following containers: {[t['name'] for t in containers]}") return containersMethod to get 'containers' from MongoDB as a list, log names, and return them.
def listHlProtocols(self)-
Expand source code
def listHlProtocols(self): """Method to get high-level protocols from MongoDB as a list, log protocol names, and return them.""" hl_protocols = list(self.collectionHlProtocols.find()) logging.debug(f"Found the following high-level protocols: {[p['name'] for p in hl_protocols]}") return hl_protocolsMethod to get high-level protocols from MongoDB as a list, log protocol names, and return them.
def listPlatforms(self)-
Expand source code
def listPlatforms(self): """Method to get platforms from MongoDB as a list, log names, and return them.""" platforms = list(self.collectionPlatforms.find()) logging.debug(f"Found the following platforms: {[pl['name'] for pl in platforms]}") return platformsMethod to get platforms from MongoDB as a list, log names, and return them.
def listProtocols(self)-
Expand source code
def listProtocols(self): """Method to get protocols from MongoDB as a list, log protocol names, and return them.""" protocols = list(self.collectionProtocols.find()) logging.debug(f"Found the following protocols: {[p['name'] for p in protocols]}") return protocolsMethod to get protocols from MongoDB as a list, log protocol names, and return them.
def listSettings(self)-
Expand source code
def listSettings(self): """Method to get 'settings' from MongoDB as a list, log names, and return them.""" settings = list(self.collectionSettings.find()) logging.debug(f"Found {len(settings)} set(s) of settings.") if settings is None: pass elif len(settings) > 1: logging.debug("More than one set of settings was retreived from the database.") return settingsMethod to get 'settings' from MongoDB as a list, log names, and return them.
def listTools(self)-
Expand source code
def listTools(self): """Method to get tools from MongoDB as a list, log names, and return them.""" tools = list(self.collectionTools.find()) logging.debug(f"Found the following tools: {[t['name'] for t in tools]}") return toolsMethod to get tools from MongoDB as a list, log names, and return them.
def listWorkspaces(self)-
Expand source code
def listWorkspaces(self): """Method to get workspaces from MongoDB as a list, log names, and return them.""" workspaces = list(self.collectionWorkspaces.find()) logging.debug(f"Found the following workspaces: {[w['name'] for w in workspaces]}") return workspacesMethod to get workspaces from MongoDB as a list, log names, and return them.
def make_mongo_url(self, database_url)-
Expand source code
def make_mongo_url(self, database_url): """Compose a MongoDB connection URL from the database URL""" return f'mongodb://{database_url}/'Compose a MongoDB connection URL from the database URL
def setup_db(self, database_url, database_name)-
Expand source code
def setup_db(self, database_url, database_name): """Establish a connection to the MongoDB database and configure collections. This method sets up the connection to a MongoDB instance using the provided or default `database_url` and `database_name`. It initializes various collections, ensuring that the appropriate indexes are created. If the `database_url` or `database_name` is not provided, the method uses the default values stored in the instance. Args: database_url (str): The MongoDB connection address. database_name (str): The name of the database to connect to. Raises: pymongo.errors.InvalidName: Raised if the provided `database_name` is invalid. pymongo.errors.DuplicateKeyError: Raised when duplicate keys are found while creating indexes. DataError: Raised if there are errors during the connection or index creation process. """ try: # Connect and select database name. mongo_url = self.make_mongo_url(database_url) logging.info(f"Connecting to mongodb at: '{mongo_url}'") self.client = pymongo.MongoClient(mongo_url) logging.info(f"Connected to database: '{database_name}'") # The databases in the MongoClient are accessed with a dict-style syntax. self.db = self.client[database_name] # Collection names. self.collectionProtocols: Collection = self.db['protocols'] self.collectionHlProtocols: Collection = self.db['hLprotocols'] self.collectionWorkspaces: Collection = self.db['workspaces'] self.collectionPlatforms: Collection = self.db['platforms'] self.collectionContainers: Collection = self.db['containers'] self.collectionTools: Collection = self.db['tools'] self.collectionSettings: Collection = self.db['settings'] try: # Create a unique index by name and workspace on "hLprotocols". # See: https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.create_index self.collectionHlProtocols.create_index( keys=[('name', pymongo.ASCENDING), ('workspace', pymongo.ASCENDING)], unique=True) # Ensure unique indexes in the rest of the collections. self.collectionProtocols.create_index([('name', pymongo.ASCENDING)], unique=True) self.collectionWorkspaces.create_index([('name', pymongo.ASCENDING)], unique=True) self.collectionPlatforms.create_index([('name', pymongo.ASCENDING)], unique=True) self.collectionContainers.create_index([('name', pymongo.ASCENDING)], unique=True) self.collectionTools.create_index([('name', pymongo.ASCENDING)], unique=True) self.collectionSettings.create_index([('name', pymongo.ASCENDING)], unique=True) except pymongo.errors.DuplicateKeyError as e: logging.error(f"Failed to ensure indexes on database {database_name}. You have duplicate object IDs: {e}") except Exception as e: msg = f"Failed to ensure index with error: {e}" logging.error(msg) raise DataError(msg) from e except pymongo.errors.InvalidName as e: msg = f"Invalid database name '{database_name}' ({e})." logging.error(msg) except DataError as e: logging.error(f"Failed connect to the database or to load objects from it: {e}\n" + traceback.format_exc()) raise eEstablish a connection to the MongoDB database and configure collections.
This method sets up the connection to a MongoDB instance using the provided or default
database_urlanddatabase_name. It initializes various collections, ensuring that the appropriate indexes are created. If thedatabase_urlordatabase_nameis not provided, the method uses the default values stored in the instance.Args
database_url:str- The MongoDB connection address.
database_name:str- The name of the database to connect to.
Raises
pymongo.errors.InvalidName- Raised if the provided
database_nameis invalid. pymongo.errors.DuplicateKeyError- Raised when duplicate keys are found while creating indexes.
DataError- Raised if there are errors during the connection or index creation process.
def updateActionBy(self,
action: dict,
protocol_name: str,
new_data: dict,
field: str = None,
find_by: str = 'index')-
Expand source code
def updateActionBy(self, action: dict, protocol_name: str, new_data: dict, field: str = None, find_by: str = "index"): """Update an action's data""" logging.info(f"Updating action with {find_by}={action[find_by]} from protocol '{protocol_name}' with new_data={new_data} and field={field}") # Get the collection. protocols = self.db["protocols"] # Target field (mongo syntax). target = "actions.$" if field is not None: target += f".{field}" # Update. result = protocols.update_one( { "name": protocol_name, f"actions.{find_by}": action[find_by] }, {"$set": {target: new_data}} ) # Handle failure to update. if not result.modified_count: msg = f"Failed to update action by {find_by}={action[find_by]}." logging.error(msg) raise DataError(msg)Update an action's data
def updateObject(self, collection_name: str, document_id: str, field: str, data)-
Expand source code
def updateObject(self, collection_name: str, document_id: str, field: str, data): """Find and update a field in an object from a MongoDB collection, by the objects MongoDB ID Test: m.updateObject( collection="protocols", oid=action["_id"], field="actions.0.sarasa", data={"hola": 3} ) References: - https://www.w3schools.com/python/python_mongodb_update.asp - https://stackoverflow.com/a/4374288/11524079 - https://www.mongodb.com/docs/manual/reference/operator/update/set/ - https://www.mongodb.com/docs/manual/core/document/#std-label-document-dot-notation - https://stackoverflow.com/a/46608956/11524079 """ # Get the collection. collection = self.db[collection_name] # Alt2: collection.find_one_and_update({"_id": ObjectId(str(document_id))}, {"$set": {field: data}})Find and update a field in an object from a MongoDB collection, by the objects MongoDB ID
Test
m.updateObject( collection="protocols", oid=action["_id"], field="actions.0.sarasa", data={"hola": 3} )
References: - https://www.w3schools.com/python/python_mongodb_update.asp - https://stackoverflow.com/a/4374288/11524079 - https://www.mongodb.com/docs/manual/reference/operator/update/set/ - https://www.mongodb.com/docs/manual/core/document/#std-label-document-dot-notation - https://stackoverflow.com/a/46608956/11524079
def updateObjectBy(self, query: dict, collection_name: str, data: dict)-
Expand source code
def updateObjectBy(self, query: dict, collection_name: str, data: dict): """Find one object in a collection by the 'query' selector and update its data""" # Get the collection. collection: Collection = self.db[collection_name] # Find one object and update it. collection.find_one_and_update(query, {'$set': data})Find one object in a collection by the 'query' selector and update its data
def updateObjectByName(self, collection_name: str, document_name: str, data: dict)-
Expand source code
def updateObjectByName(self, collection_name: str, document_name: str, data: dict): """Find one object in a collection by the 'name' key and update its data""" # Select an object by its "name" key and update it. self.updateObjectBy(query={"name": document_name}, collection_name=collection_name, data=data)Find one object in a collection by the 'name' key and update its data
def updateToolByName(self, tool_name: str, tool_data: dict)-
Expand source code
def updateToolByName(self, tool_name: str, tool_data: dict): """Find one tool by the 'name' key and update its data""" # Select a tool by its "name" key and update it. self.updateObjectByName(collection_name="tools", document_name=tool_name, data=tool_data)Find one tool by the 'name' key and update its data
Inherited members
DataTools:containersfilterContentByfilterContentByTypefindfind_matchfind_matchesfind_onegetActionDataBygetContainerByNamegetContentByNamegetNextContentgetNextContentByNamegetPlatformByNamegetProtocolByNamegetProtocolObjectsgetToolByNamegetWorkspaceByNamegetWorkspaceItemByNameget_dbhl_protocolsis_subsetplatformspop_content_by_idxprotocolssettingstoolsupdate_from_env_fileupdate_nestedworkspaces