Module pipettin-piper.piper.commanders.tracker
Functions
def open_shelve_tracker(config)-
Expand source code
def open_shelve_tracker(config): """Setup the command tracker as a disk-based dictionary. There are several implementations of this idea. I'm using "shelve" because it is a python built-in. """ tracker_directory = config.get("tracker_directory", None) tracker_file = config.get("tracker_file", None) # NOTE: Because 'tempfile.mkstemp' creates an empty file without format, the DB must be recreated. # Meaning of flag types: https://docs.python.org/3/library/dbm.html#dbm.open if tracker_file is None: # Always create a new, empty database, open for reading and writing. open_flag = 'n' logging.debug("Using new shelf database.") else: # Open existing database for reading and writing, creating it if it doesn’t exist. open_flag = 'c' logging.debug("Using existing shelf database.") if tracker_file is None: # Create a new file with unique name. current_time = datetime.now().strftime('%y-%m-%d-%H-%M-%S') _, shelve_path = tempfile.mkstemp(prefix=f"tracker-{current_time}-", dir=tracker_directory, suffix=".shelf") logging.debug(f"Created new shelf file: {shelve_path}") else: # Use the system's temporary directory if none was provided. if tracker_directory is None: tracker_directory = tempfile.gettempdir() # Make the full path to the file. shelve_path = Path(tracker_directory) / tracker_file # Save the path to the class attribute. logging.info(f"Opening shelve database with path '{shelve_path}' and flag '{open_flag}'.") # Open the shelve. tracker_shelve = shelve.open(filename=shelve_path, writeback=True, flag=open_flag) logging.info("Shelf setup complete.") return tracker_shelve, shelve_pathSetup the command tracker as a disk-based dictionary. There are several implementations of this idea. I'm using "shelve" because it is a python built-in.
def open_sqlite_tracker(config)-
Expand source code
def open_sqlite_tracker(config): """ Setup the command tracker as a disk-based dictionary using SQLite. """ tracker_directory = config.get("tracker_directory", None) tracker_file = config.get("tracker_file", None) if tracker_file is None: # Create a new file with a unique name. current_time = datetime.now().strftime('%y-%m-%d-%H-%M-%S') _, sqlite_path = tempfile.mkstemp( prefix=f"tracker-{current_time}-", dir=tracker_directory, suffix=".sqlite" ) logging.info(f"Creating new SQLite database file: {sqlite_path}") else: # Use the system's temporary directory if none was provided. if tracker_directory is None: tracker_directory = tempfile.gettempdir() # Make the full path to the file. sqlite_path = Path(tracker_directory) / tracker_file # Log the SQLite database path. logging.info(f"Opening SQLite database at path: '{sqlite_path}'") # Initialize the SQLiteDict. tracker_db = SQLiteDict(db_path=sqlite_path) logging.info("SQLiteDict setup complete.") return tracker_db, sqlite_pathSetup the command tracker as a disk-based dictionary using SQLite.
Classes
class ExpiringDict (data=None, time_threshold=900, cleanup_on_access=False)-
Expand source code
class ExpiringDict: """Wrapper for a dictionary-like object wich tracks access times and data expiration""" def __init__(self, data=None, time_threshold=15 * 60, cleanup_on_access=False): """ Initializes the ExpiringDict object. :param data: Dictionary-like object (e.g. a python dictionary, SQLiteDict, or DbfilenameShelf). :param time_threshold: Time in seconds (default 15 minutes) after which unused keys will be deleted. :param cleanup_on_access: If True, all update_time calls will also cleanup expired keys. """ self.cleanup_on_access = cleanup_on_access self._data = data if data is not None else {} self._last_access = {} self.time_threshold = time_threshold self.can_sync = isinstance(self._data, shelve.DbfilenameShelf) or isinstance(self._data, SQLiteDict) # Initialize access times. for key in self._data.keys(): self.update_time(key) def update_entry(self, key, value: dict): """Utility to easily and correctly update the contents of an entry by its key.""" self[key] = self[key] | value def close(self): """Call the close method of a shelf object""" if isinstance(self._data, shelve.DbfilenameShelf): self._data.close() def sync(self): """Call the sync method of a shelf object""" if isinstance(self._data, shelve.DbfilenameShelf): self._data.sync() elif isinstance(self._data, SQLiteDict): logging.debug("Sync skipped. The SQLiteDict backend is context managed.") else: logging.debug("Sync skipped. Unsupported backend.") def update_time(self, key, cleanup_too=False): """Update the last access time for the key. Cleanup automatically if "cleanup_on_access" or "cleanup_too" are set. """ self._last_access[key] = time.time() if self.cleanup_on_access or cleanup_too: self.cleanup() def __len__(self): return self._data.__len__() def __iter__(self): return self._data.__iter__() def __contains__(self, key): """Method used by Python to check if a key is in a dictionary. This does not update access time, because the data itself is not accessed. """ return self._data.__contains__(key) def __setitem__(self, key, value): """ Intercept write operations to add or update a key in the dictionary. Updates the last access time for the key. """ key = str(key) self._data[key] = value self.update_time(key) def __getitem__(self, key): """ Intercept read operations to get the value of a key. Updates the last access time for the key. """ key = str(key) value = self._data[key] self.update_time(key) return value def __delitem__(self, key): """ Intercept delete operations to remove a key from the dictionary. Deletes the last access time for the key. """ key = str(key) if key in self._data: del self._data[key] del self._last_access[key] else: raise KeyError(f"Key '{key}' not found.") def get(self, key, default=None): """ Safe method to get a key from the dictionary, updates last access time. """ if key in self._data: self.update_time(key) return self._data[key] return default def update(self, other): """ Updates the dictionary with another dictionary or iterable of key-value pairs. Updates the last access time for all updated keys. """ for key, value in other.items(): self[key] = value def cleanup(self): """ Deletes keys that have not been accessed within the time threshold. """ # Get the current time. current_time = time.time() # Find expired keys. expired_keys = [key for key, last_access in self._last_access.items() if current_time - last_access > self.time_threshold] logging.debug(f"Expiring {len(expired_keys)} of {len(self._last_access)} keys from the tracker.") for key in expired_keys: del self[key] def items(self): """ Returns an iterator over the key-value pairs in the dictionary. Updates the last access time for each key. """ for key in list(self._data.keys()): # Update access time and yield each item. self.update_time(key) yield key, self._data[key] def __repr__(self): return f"ExpiringDict({self._data})"Wrapper for a dictionary-like object wich tracks access times and data expiration
Initializes the ExpiringDict object. :param data: Dictionary-like object (e.g. a python dictionary, SQLiteDict, or DbfilenameShelf). :param time_threshold: Time in seconds (default 15 minutes) after which unused keys will be deleted. :param cleanup_on_access: If True, all update_time calls will also cleanup expired keys.
Methods
def cleanup(self)-
Expand source code
def cleanup(self): """ Deletes keys that have not been accessed within the time threshold. """ # Get the current time. current_time = time.time() # Find expired keys. expired_keys = [key for key, last_access in self._last_access.items() if current_time - last_access > self.time_threshold] logging.debug(f"Expiring {len(expired_keys)} of {len(self._last_access)} keys from the tracker.") for key in expired_keys: del self[key]Deletes keys that have not been accessed within the time threshold.
def close(self)-
Expand source code
def close(self): """Call the close method of a shelf object""" if isinstance(self._data, shelve.DbfilenameShelf): self._data.close()Call the close method of a shelf object
def get(self, key, default=None)-
Expand source code
def get(self, key, default=None): """ Safe method to get a key from the dictionary, updates last access time. """ if key in self._data: self.update_time(key) return self._data[key] return defaultSafe method to get a key from the dictionary, updates last access time.
def items(self)-
Expand source code
def items(self): """ Returns an iterator over the key-value pairs in the dictionary. Updates the last access time for each key. """ for key in list(self._data.keys()): # Update access time and yield each item. self.update_time(key) yield key, self._data[key]Returns an iterator over the key-value pairs in the dictionary. Updates the last access time for each key.
def sync(self)-
Expand source code
def sync(self): """Call the sync method of a shelf object""" if isinstance(self._data, shelve.DbfilenameShelf): self._data.sync() elif isinstance(self._data, SQLiteDict): logging.debug("Sync skipped. The SQLiteDict backend is context managed.") else: logging.debug("Sync skipped. Unsupported backend.")Call the sync method of a shelf object
def update(self, other)-
Expand source code
def update(self, other): """ Updates the dictionary with another dictionary or iterable of key-value pairs. Updates the last access time for all updated keys. """ for key, value in other.items(): self[key] = valueUpdates the dictionary with another dictionary or iterable of key-value pairs. Updates the last access time for all updated keys.
def update_entry(self, key, value: dict)-
Expand source code
def update_entry(self, key, value: dict): """Utility to easily and correctly update the contents of an entry by its key.""" self[key] = self[key] | valueUtility to easily and correctly update the contents of an entry by its key.
def update_time(self, key, cleanup_too=False)-
Expand source code
def update_time(self, key, cleanup_too=False): """Update the last access time for the key. Cleanup automatically if "cleanup_on_access" or "cleanup_too" are set. """ self._last_access[key] = time.time() if self.cleanup_on_access or cleanup_too: self.cleanup()Update the last access time for the key. Cleanup automatically if "cleanup_on_access" or "cleanup_too" are set.
class MessageTracker (config: dict, controller: Controller, tracker=None)-
Expand source code
class MessageTracker: """Main message tracker class""" tracker: "ExpiringDict" default_time_threshold = 15*60 # 15 minutes def __init__(self, config: dict, controller: "Controller", tracker=None): self.controller = controller self.data_path: str = None # Sync interval for the tracker. self.tracker_sync_interval: float = config.get("tracker_sync_interval", 900) self.tracker_type: float = config.get("tracker_type", "dictionary") # Get the expiration time from the config. time_threshold = config.get("message_expiration_time", self.default_time_threshold) # Setup the command tracker as a disk-based dictionary. if tracker is None: logging.info(f"Instantiating a new tracker object with time_threshold={time_threshold}") # Connect to the database backend. data, self.data_path = tracker_loaders[self.tracker_type](config) # Instantiate the expiring dict wrapper. self.tracker = ExpiringDict(data=data, time_threshold=time_threshold) else: logging.info("Using the provided tracker object.") # self.tracker = tracker self.tracker = ExpiringDict(data=tracker, time_threshold=time_threshold) @classmethod def shelve_to_yaml(cls, shelve_file, yaml_file): """Convert a Python shelve database to a pretty YAML document. First read the data from the shelve file, and then write it to a YAML file using the yaml module. """ # Check if the shelve file exists if not Path(shelve_file).exists(): raise ValueError(f"Error: The shelve file '{shelve_file}' does not exist.") # Open the shelve file with shelve.open(shelve_file) as db: # Convert the shelve data to a dictionary data = dict(db) # Write the dictionary to a YAML file with open(yaml_file, 'w', encoding='utf-8') as file: yaml.dump(data, file, default_flow_style=False, sort_keys=False) def tracker_close(self): """Write changes to disk, and close the DB (releasing the file for re-opening).""" try: self.tracker.close() except AttributeError: pass except ValueError as e: logging.error(f"Failed to close shelf with error: {e}") async def tracker_sync(self, write_interval:float=None): """Write tracker changes to disk. Write back all entries in the cache if the shelf was opened with writeback set to True. Also empty the cache and synchronize the persistent dictionary on disk, if feasible. See: https://docs.python.org/3/library/shelve.html#shelve.Shelf.sync """ if write_interval is None: write_interval = self.tracker_sync_interval logging.info("Coroutine started.") try: while not self.controller.run: asyncio.sleep(write_interval) logging.info("Coroutine ready.") while self.controller.run: logging.debug(f"Sleeping for {write_interval} before next tracker sync.") # Wait for a bit. await asyncio.sleep(write_interval) # Sync. await self.do_sync() logging.info("Coroutine loop ended: controller not running.") except asyncio.CancelledError: logging.error("Coroutine cancelled. Attempting clean exit.") # Prevent blocking call by using to_thread. await asyncio.to_thread(self.tracker.close) except Exception as e: msg = f"Unhandled exception: '{e}'" logging.error(msg) print(msg) raise CommanderError(msg) from e logging.info("Tracker loop ended.") async def do_sync(self): # Cleanup old entries. logging.debug("Cleaning up before sync.") self.tracker.cleanup() # Check if un-syncable. if not self.tracker.can_sync: logging.debug("Can't save the tracker, it is not supported by its backend.") return # Sync. logging.debug("Synchronizing tracker to disk.") try: # Prevent blocking call by using to_thread. # https://stackoverflow.com/a/65319037 await asyncio.to_thread(self.tracker.sync) logging.debug("Tracker syncronized to disk.") except ValueError: # Operations on a closed shelf will fail with a ValueError. logging.error("Failed to sync with ValueError. The shelf is likely closed.") def get_result_by_id(self, cmd_id): """Lookup a response's "result", retuning it, or None if not found.""" response = self.get_response_by_id(cmd_id) if response: try: return response["result"] except KeyError: logging.debug(f"Failed to find result with cmd_id={cmd_id}") return None def get_response_by_id(self, cmd_id:str, what:str="response"): """Lookup a "response", retuning it, or None if not found.""" try: response = self.tracker[str(cmd_id)][what] logging.debug(f"Got '{what}' for cmd_id={cmd_id} with content={response}") return response except KeyError as e: logging.debug(f"Failed to find '{what}' with cmd_id={cmd_id} ({e})") return None def get_command_by_id(self, cmd_id, not_found_default=None): """Get an entry from the tracker dictionary by its ID.""" return self.tracker.get(cmd_id, not_found_default)Main message tracker class
Subclasses
Class variables
var default_time_thresholdvar tracker : pipettin-piper.piper.commanders.tracker.ExpiringDict
Static methods
def shelve_to_yaml(shelve_file, yaml_file)-
Convert a Python shelve database to a pretty YAML document. First read the data from the shelve file, and then write it to a YAML file using the yaml module.
Methods
async def do_sync(self)-
Expand source code
async def do_sync(self): # Cleanup old entries. logging.debug("Cleaning up before sync.") self.tracker.cleanup() # Check if un-syncable. if not self.tracker.can_sync: logging.debug("Can't save the tracker, it is not supported by its backend.") return # Sync. logging.debug("Synchronizing tracker to disk.") try: # Prevent blocking call by using to_thread. # https://stackoverflow.com/a/65319037 await asyncio.to_thread(self.tracker.sync) logging.debug("Tracker syncronized to disk.") except ValueError: # Operations on a closed shelf will fail with a ValueError. logging.error("Failed to sync with ValueError. The shelf is likely closed.") def get_command_by_id(self, cmd_id, not_found_default=None)-
Expand source code
def get_command_by_id(self, cmd_id, not_found_default=None): """Get an entry from the tracker dictionary by its ID.""" return self.tracker.get(cmd_id, not_found_default)Get an entry from the tracker dictionary by its ID.
def get_response_by_id(self, cmd_id: str, what: str = 'response')-
Expand source code
def get_response_by_id(self, cmd_id:str, what:str="response"): """Lookup a "response", retuning it, or None if not found.""" try: response = self.tracker[str(cmd_id)][what] logging.debug(f"Got '{what}' for cmd_id={cmd_id} with content={response}") return response except KeyError as e: logging.debug(f"Failed to find '{what}' with cmd_id={cmd_id} ({e})") return NoneLookup a "response", retuning it, or None if not found.
def get_result_by_id(self, cmd_id)-
Expand source code
def get_result_by_id(self, cmd_id): """Lookup a response's "result", retuning it, or None if not found.""" response = self.get_response_by_id(cmd_id) if response: try: return response["result"] except KeyError: logging.debug(f"Failed to find result with cmd_id={cmd_id}") return NoneLookup a response's "result", retuning it, or None if not found.
def tracker_close(self)-
Expand source code
def tracker_close(self): """Write changes to disk, and close the DB (releasing the file for re-opening).""" try: self.tracker.close() except AttributeError: pass except ValueError as e: logging.error(f"Failed to close shelf with error: {e}")Write changes to disk, and close the DB (releasing the file for re-opening).
async def tracker_sync(self, write_interval: float = None)-
Expand source code
async def tracker_sync(self, write_interval:float=None): """Write tracker changes to disk. Write back all entries in the cache if the shelf was opened with writeback set to True. Also empty the cache and synchronize the persistent dictionary on disk, if feasible. See: https://docs.python.org/3/library/shelve.html#shelve.Shelf.sync """ if write_interval is None: write_interval = self.tracker_sync_interval logging.info("Coroutine started.") try: while not self.controller.run: asyncio.sleep(write_interval) logging.info("Coroutine ready.") while self.controller.run: logging.debug(f"Sleeping for {write_interval} before next tracker sync.") # Wait for a bit. await asyncio.sleep(write_interval) # Sync. await self.do_sync() logging.info("Coroutine loop ended: controller not running.") except asyncio.CancelledError: logging.error("Coroutine cancelled. Attempting clean exit.") # Prevent blocking call by using to_thread. await asyncio.to_thread(self.tracker.close) except Exception as e: msg = f"Unhandled exception: '{e}'" logging.error(msg) print(msg) raise CommanderError(msg) from e logging.info("Tracker loop ended.")Write tracker changes to disk. Write back all entries in the cache if the shelf was opened with writeback set to True. Also empty the cache and synchronize the persistent dictionary on disk, if feasible. See: https://docs.python.org/3/library/shelve.html#shelve.Shelf.sync
class SQLiteDict (db_path, table_name='key_value_store')-
Expand source code
class SQLiteDict: """ A dictionary-like object backed by SQLite. Supports string keys and pickled Python objects as values. """ def __init__(self, db_path, table_name="key_value_store"): """ Initialize the SQLiteDict. :param db_path: Path to the SQLite database file. :param table_name: Name of the table used to store key-value pairs. """ self.db_path = db_path self.table_name = table_name self._init_db() def _init_db(self): """Initialize the SQLite database and table.""" with self._get_connection() as conn: conn.execute(f""" CREATE TABLE IF NOT EXISTS {self.table_name} ( key TEXT PRIMARY KEY, value BLOB ) """) def _get_connection(self): """Create and return a database connection.""" return sqlite3.connect(self.db_path) def __getitem__(self, key): """Get an item by key.""" with self._get_connection() as conn: cursor = conn.execute(f"SELECT value FROM {self.table_name} WHERE key = ?", (key,)) row = cursor.fetchone() if row is None: raise KeyError(key) return pickle.loads(row[0]) def __setitem__(self, key, value): """Set a key-value pair.""" with self._get_connection() as conn: conn.execute( f"INSERT OR REPLACE INTO {self.table_name} (key, value) VALUES (?, ?)", (key, pickle.dumps(value)), ) def __delitem__(self, key): """Delete an item by key.""" with self._get_connection() as conn: cursor = conn.execute(f"DELETE FROM {self.table_name} WHERE key = ?", (key,)) if cursor.rowcount == 0: raise KeyError(key) def __contains__(self, key): """Check if a key exists.""" with self._get_connection() as conn: cursor = conn.execute(f"SELECT 1 FROM {self.table_name} WHERE key = ?", (key,)) return cursor.fetchone() is not None def __iter__(self): """Iterate over keys.""" with self._get_connection() as conn: cursor = conn.execute(f"SELECT key FROM {self.table_name}") for row in cursor: yield row[0] def __len__(self): """Get the number of items.""" with self._get_connection() as conn: cursor = conn.execute(f"SELECT COUNT(*) FROM {self.table_name}") return cursor.fetchone()[0] def items(self): """Iterate over key-value pairs.""" with self._get_connection() as conn: cursor = conn.execute(f"SELECT key, value FROM {self.table_name}") for key, value in cursor: yield key, pickle.loads(value) def keys(self): """Iterate over keys.""" return iter(self) def values(self): """Iterate over values.""" with self._get_connection() as conn: cursor = conn.execute(f"SELECT value FROM {self.table_name}") for row in cursor: yield pickle.loads(row[0]) def clear(self): """Clear all items.""" with self._get_connection() as conn: conn.execute(f"DELETE FROM {self.table_name}") def close(self): """Close the database connection (optional, managed automatically).""" pass # SQLite connections are managed using context managers. def __del__(self): """Destructor to ensure the connection is closed.""" self.close()A dictionary-like object backed by SQLite. Supports string keys and pickled Python objects as values.
Initialize the SQLiteDict.
:param db_path: Path to the SQLite database file. :param table_name: Name of the table used to store key-value pairs.
Methods
def clear(self)-
Expand source code
def clear(self): """Clear all items.""" with self._get_connection() as conn: conn.execute(f"DELETE FROM {self.table_name}")Clear all items.
def close(self)-
Expand source code
def close(self): """Close the database connection (optional, managed automatically).""" pass # SQLite connections are managed using context managers.Close the database connection (optional, managed automatically).
def items(self)-
Expand source code
def items(self): """Iterate over key-value pairs.""" with self._get_connection() as conn: cursor = conn.execute(f"SELECT key, value FROM {self.table_name}") for key, value in cursor: yield key, pickle.loads(value)Iterate over key-value pairs.
def keys(self)-
Expand source code
def keys(self): """Iterate over keys.""" return iter(self)Iterate over keys.
def values(self)-
Expand source code
def values(self): """Iterate over values.""" with self._get_connection() as conn: cursor = conn.execute(f"SELECT value FROM {self.table_name}") for row in cursor: yield pickle.loads(row[0])Iterate over values.