Module pipettin-piper.piper.coroutines
Functions
async def klipper_commander(commands,
tracker={},
cid=123,
run=True,
spam_info=True,
min_interval=1,
background=False,
uds_address='/tmp/klippy_uds',
UNIX_BUFFER_LIMIT=20971520)-
Expand source code
async def klipper_commander(commands, tracker={}, # Initial "id" for commands. cid = 123, run = True, spam_info=True, min_interval=1, background=False, uds_address = "/tmp/klippy_uds", UNIX_BUFFER_LIMIT = 20 * 1024 * 1024): # Init output lists reader_out = [] writer_out = [] run = [run] cid = [cid] spam_info=[spam_info] # Open socket try: reader, writer = await asyncio.open_unix_connection( str(uds_address), limit=UNIX_BUFFER_LIMIT) except FileNotFoundError as e: print("klipper_commander: FileNotFoundError " + str(e)) return reader_out, tracker except ConnectionRefusedError as e: print("klipper_commander: ConnectionRefusedError " + str(e)) return reader_out, tracker # Define as dict for updating globally rw = {'reader': reader, 'writer': writer} # Schedule calls *concurrently*: try: await asyncio.gather( klipper_reader(rw=rw, msg_list=reader_out, run=run, tracker=tracker), klipper_reconnect(rw=rw, uds_address=uds_address, limit=UNIX_BUFFER_LIMIT, run=run), klipper_writer(rw=rw, commands=commands, cid = cid, min_interval=min_interval, background=background, run=run, tracker=tracker), klipper_info(rw=rw, cid=cid, min_interval=min_interval, run=run, spam_info=spam_info) ) except Exception as e: print("Unhandled exception at klipper_commander: " + str(e)) traceback.print_exc() run[0] = False # Close socket writer.close() # The method closes the stream and the underlying socket. try: await writer.wait_closed() # The method should be used along with the wait_closed() method. except Exception as e: print("Unhandled exception on writer.wait_closed() at klipper_commander" + str(e)) traceback.print_exc() return reader_out, tracker async def klipper_info(rw, cid=[123], min_interval=1, run=[True], spam_info=[True])-
Expand source code
async def klipper_info(rw, cid=[123], min_interval=1, run=[True], spam_info=[True]): while run[0] and spam_info[0]: print("klipper_info run value: " + str(run[0])) # Build command info_command = {"id": cid[0], "method": "info", "params": {}} # Track time last_time = time.time() # Send command print("Sending info command.") data = json.dumps(info_command).encode() + b"\x03" try: rw["writer"].write(data) cid[0] = cid[0] + 1 # Wait until it is appropriate to resume writing to the stream. await rw["writer"].drain() except ConnectionResetError as e: print("ConectionResetError at klipper_info: " + str(e)) except Exception as e: print("Unhandled exception at klipper_info: " + str(e)) traceback.print_exc() # Wait for the actual remaining time (after the previous await) new_time = time.time() if new_time - last_time <= min_interval: await asyncio.sleep(min_interval - (new_time - last_time)) print("klipper_info ended") async def klipper_reader(rw, msg_list=[], run=[True], tracker={})-
Expand source code
async def klipper_reader(rw, msg_list=[], run=[True], tracker={}): """ This asyncio co-routine reads from the socket continuously. The "read" is awaited, which means that other co-routines can still run while the reader is checking for messages. Note: a "co-routine" is similar to the old "serial_reader" thread, made with in Threading. """ print("klipper_reader started") while run[0]: print("klipper_reader run value: " + str(run[0])) try: data = await rw["reader"].readuntil(b'\x03') # Read data from the stream until separator is found. if rw["reader"].at_eof(): print("klipper_reader at EOF") break else: response = json.loads(data[:-1]) print("Received response:") pprint(response) msg_list.append(response) # Add key to tracker dict if str(response["id"]) not in tracker: tracker[str(response["id"])] = {} else: print("Warning, response found in tracker: " + str(response) + "\nIt will be overwritten.") tracker[str(response["id"])] |= {"warning": "overwriten"} # Update tracker dict tracker[str(response["id"])] |= {"status": "responded", "response": response} except asyncio.IncompleteReadError as e: print("IncompleteReadError in klipper_reader, possibly due to EOF signal from klipper_writer: " + str(e)) await asyncio.sleep(1) except ConnectionResetError as e: print("ConectionResetError at klipper_reader: " + str(e)) await asyncio.sleep(1) except Exception as e: print("Unhandled exception at klipper_reader: " + str(e)) traceback.print_exc() await asyncio.sleep(1) print("klipper_reader ended") return msg_listThis asyncio co-routine reads from the socket continuously. The "read" is awaited, which means that other co-routines can still run while the reader is checking for messages. Note: a "co-routine" is similar to the old "serial_reader" thread, made with in Threading.
async def klipper_reconnect(rw, uds_address, limit, run=[True])-
Expand source code
async def klipper_reconnect(rw, uds_address, limit, run=[True]): print("klipper_reconnect started") while run[0]: print("klipper_reconnect run value: " + str(run[0])) if run[0] and ( rw["writer"].is_closing() or rw["reader"].at_eof() ): print("klipper_reconnect reconnecting...") print("klipper_reconnect writer.is_closing(): " + str(rw["writer"].is_closing())) print("klipper_reconnect reader.at_eof(): " + str(rw["reader"].at_eof())) try: # Close rw["writer"].close() # Reopen socket reader, writer = await asyncio.open_unix_connection( str(uds_address), limit=limit ) # Update objects globally rw["writer"] = writer rw["reader"] = reader if writer.is_closing() or reader.at_eof(): print("klipper_reconnect unsuccessful.") else: print("klipper_reconnect successful.") except FileNotFoundError as e: print("UDS not found at klipper_reconnect: " + str(e)) run[0] = False except Exception as e: print("Unhandled exception at klipper_reconnect: " + str(e)) traceback.print_exc() # Wait before retry await asyncio.sleep(1) print("klipper_reconnect ended") async def klipper_writer(rw, commands, cid=[123], min_interval=1, background=False, run=[True], tracker={})-
Expand source code
async def klipper_writer(rw, commands, cid=[123], min_interval=1, background=False, run=[True], tracker={}): """ This asyncio co-routine writes commands from a list into the socket. There are two "awaits" here, that allow the serial reader to check for messages while this is running. First, with the "drain" await, and then with a 1 second "sleep" await. Any of those can cause a *handoff* to the reader co-routine. Note: a "co-routine" is similar to the old "serial_reader" thread, made with in Threading. """ print("klipper_writer started") # Main writer loop for command in commands: # Add key to tracker dict if str(command["id"]) not in tracker: tracker[str(command["id"])] = {} else: print("Warning, command found in tracker: " + str(command) + "\nIt will be overwritten.") tracker[str(command["id"])] |= {"warning": "overwriten"} # Update command tracker status tracker[str(command["id"])] |= {"status": "sending", "command": command} # Check for running state if not run[0]: break # Track time last_time = time.time() # Send command print("Sending command:") pprint(command) data = json.dumps(command).encode() + b"\x03" try: rw["writer"].write(data) tracker[str(command["id"])] |= {"status": "sent"} cid[0] = cid[0] + 1 # Wait until it is appropriate to resume writing to the stream. await rw["writer"].drain() except ConnectionResetError as e: print("ConectionResetError at klipper_writer: " + str(e)) except Exception as e: print("Unhandled exception at klipper_writer: " + str(e)) traceback.print_exc() # Wait for the actual remaining time (after the previous await) new_time = time.time() if new_time - last_time <= min_interval: await asyncio.sleep(min_interval - (new_time - last_time)) print("klipper_writer command list completed") # Secondary writer loop, responding to appends to the "commands" list if background: print("klipper_writer background loop started") n = len(commands) while background and run[0]: print("klipper_writer run value: " + str(run[0])) # Track time last_time = time.time() # Check if a new command is available if len(commands) > n: command = commands[n] n = n + 1 # Quit condition if command == "quit": print("Quit command received") run[0] = False break # Add key to tracker dict if str(command["id"]) not in tracker: tracker[str(command["id"])] = {} else: print("Warning, command found in tracker: " + str(command) + "\nIt will be overwritten.") tracker[str(command["id"])] |= {"warning": "overwriten"} # Update command tracker status tracker[str(command["id"])] |= {"status": "sending", "command": command} # Send command print("Sending new command:") pprint(command) data = json.dumps(command).encode() + b"\x03" try: rw["writer"].write(data) tracker[str(command["id"])] |= {"status": "sent"} cid[0] = cid[0] + 1 # Wait until it is appropriate to resume writing to the stream. await rw["writer"].drain() except ConnectionResetError as e: print("ConectionResetError at klipper_writer: " + str(e)) except Exception as e: print("Unhandled exception at klipper_writer: " + str(e)) traceback.print_exc() # Wait for the actual remaining time (after the previous await) new_time = time.time() if new_time - last_time <= min_interval: await asyncio.sleep(min_interval - (new_time - last_time)) print("klipper_writer secondary writer completed") # Signal the EOF to the reader, terminating it. try: print("klipper_writer cleaning up, signaling eof") run[0] = False # Just in case await asyncio.sleep(1) # Give a chance to finish coroutines rw["writer"].write_eof() # Wait until it is appropriate to resume writing to the stream. await rw["writer"].drain() except ConnectionResetError as e: print("ConectionResetError at klipper_writer cleanup: " + str(e)) except Exception as e: print("Unhandled exception at klipper_writer cleanup: " + str(e)) traceback.print_exc() print("klipper_writer ended")This asyncio co-routine writes commands from a list into the socket. There are two "awaits" here, that allow the serial reader to check for messages while this is running. First, with the "drain" await, and then with a 1 second "sleep" await. Any of those can cause a handoff to the reader co-routine. Note: a "co-routine" is similar to the old "serial_reader" thread, made with in Threading.