Module pipettin-piper.piper.coroutines

Functions

async def klipper_commander(commands,
tracker={},
cid=123,
run=True,
spam_info=True,
min_interval=1,
background=False,
uds_address='/tmp/klippy_uds',
UNIX_BUFFER_LIMIT=20971520)
Expand source code
async def klipper_commander(commands, tracker={},
                            # Initial "id" for commands.
                            cid = 123,
                            run = True,
                            spam_info=True,
                            min_interval=1, background=False,
                            uds_address = "/tmp/klippy_uds",
                            UNIX_BUFFER_LIMIT = 20 * 1024 * 1024):
    # Init output lists
    reader_out = []
    writer_out = []
    run = [run]
    cid = [cid]
    spam_info=[spam_info]

    # Open socket
    try:
        reader, writer = await asyncio.open_unix_connection(
            str(uds_address), limit=UNIX_BUFFER_LIMIT)
    except FileNotFoundError as e:
        print("klipper_commander: FileNotFoundError " + str(e))
        return reader_out, tracker
    except ConnectionRefusedError as e:
        print("klipper_commander: ConnectionRefusedError " + str(e))
        return reader_out, tracker

    # Define as dict for updating globally
    rw = {'reader': reader, 'writer': writer}

    # Schedule calls *concurrently*:
    try:
        await asyncio.gather(

            klipper_reader(rw=rw, msg_list=reader_out, run=run, tracker=tracker),

            klipper_reconnect(rw=rw, uds_address=uds_address, limit=UNIX_BUFFER_LIMIT, run=run),

            klipper_writer(rw=rw, commands=commands,
                           cid = cid, min_interval=min_interval,
                           background=background, run=run,
                           tracker=tracker),

            klipper_info(rw=rw, cid=cid, min_interval=min_interval, run=run, spam_info=spam_info)
        )

    except Exception as e:
        print("Unhandled exception at klipper_commander: " + str(e))
        traceback.print_exc()
        run[0] = False

    # Close socket
    writer.close()  # The method closes the stream and the underlying socket.
    try:
        await writer.wait_closed()  # The method should be used along with the wait_closed() method.
    except Exception as e:
        print("Unhandled exception on writer.wait_closed() at klipper_commander" + str(e))
        traceback.print_exc()

    return reader_out, tracker
async def klipper_info(rw, cid=[123], min_interval=1, run=[True], spam_info=[True])
Expand source code
async def klipper_info(rw, cid=[123], min_interval=1, run=[True], spam_info=[True]):

    while run[0] and spam_info[0]:
        print("klipper_info run value: " + str(run[0]))
        # Build command
        info_command = {"id": cid[0], "method": "info", "params": {}}
        # Track time
        last_time = time.time()
        # Send command
        print("Sending info command.")
        data = json.dumps(info_command).encode() + b"\x03"
        try:
            rw["writer"].write(data)
            cid[0] = cid[0] + 1
            # Wait until it is appropriate to resume writing to the stream.
            await rw["writer"].drain()

        except ConnectionResetError as e:
            print("ConectionResetError at klipper_info: " + str(e))

        except Exception as e:
            print("Unhandled exception at klipper_info: " + str(e))
            traceback.print_exc()

        # Wait for the actual remaining time (after the previous await)
        new_time = time.time()
        if new_time - last_time <= min_interval:
            await asyncio.sleep(min_interval - (new_time - last_time))

    print("klipper_info ended")
async def klipper_reader(rw, msg_list=[], run=[True], tracker={})
Expand source code
async def klipper_reader(rw, msg_list=[], run=[True], tracker={}):
    """
    This asyncio co-routine reads from the socket continuously.
    The "read" is awaited, which means that other co-routines
    can still run while the reader is checking for messages.
    Note: a "co-routine" is similar to the old "serial_reader" thread, made with in Threading.
    """
    print("klipper_reader started")
    while run[0]:
        print("klipper_reader run value: " + str(run[0]))
        try:
            data = await rw["reader"].readuntil(b'\x03')  # Read data from the stream until separator is found.
            if rw["reader"].at_eof():
                print("klipper_reader at EOF")
                break
            else:
                response = json.loads(data[:-1])
                print("Received response:")
                pprint(response)
                msg_list.append(response)

                # Add key to tracker dict
                if str(response["id"]) not in tracker:
                    tracker[str(response["id"])] = {}
                else:
                    print("Warning, response found in tracker: " + str(response) + "\nIt will be overwritten.")
                    tracker[str(response["id"])] |= {"warning": "overwriten"}

                # Update tracker dict
                tracker[str(response["id"])] |= {"status": "responded", "response": response}

        except asyncio.IncompleteReadError as e:
            print("IncompleteReadError in klipper_reader, possibly due to EOF signal from klipper_writer: " + str(e))
            await asyncio.sleep(1)

        except ConnectionResetError as e:
            print("ConectionResetError at klipper_reader: " + str(e))
            await asyncio.sleep(1)

        except Exception as e:
            print("Unhandled exception at klipper_reader: " + str(e))
            traceback.print_exc()
            await asyncio.sleep(1)

    print("klipper_reader ended")
    return msg_list

This asyncio co-routine reads from the socket continuously. The "read" is awaited, which means that other co-routines can still run while the reader is checking for messages. Note: a "co-routine" is similar to the old "serial_reader" thread, made with in Threading.

async def klipper_reconnect(rw, uds_address, limit, run=[True])
Expand source code
async def klipper_reconnect(rw, uds_address, limit, run=[True]):
    print("klipper_reconnect started")
    while run[0]:
        print("klipper_reconnect run value: " + str(run[0]))

        if run[0] and ( rw["writer"].is_closing() or rw["reader"].at_eof() ):
            print("klipper_reconnect reconnecting...")
            print("klipper_reconnect writer.is_closing(): " + str(rw["writer"].is_closing()))
            print("klipper_reconnect reader.at_eof(): " + str(rw["reader"].at_eof()))
            try:
                # Close
                rw["writer"].close()
                # Reopen socket
                reader, writer = await asyncio.open_unix_connection(
                    str(uds_address),
                    limit=limit
                )
                # Update objects globally
                rw["writer"] = writer
                rw["reader"] = reader

                if writer.is_closing() or reader.at_eof():
                    print("klipper_reconnect unsuccessful.")
                else:
                    print("klipper_reconnect successful.")
            except FileNotFoundError as e:
                print("UDS not found at klipper_reconnect: " + str(e))
                run[0] = False
            except Exception as e:
                print("Unhandled exception at klipper_reconnect: " + str(e))
                traceback.print_exc()

        # Wait before retry
        await asyncio.sleep(1)
    print("klipper_reconnect ended")
async def klipper_writer(rw, commands, cid=[123], min_interval=1, background=False, run=[True], tracker={})
Expand source code
async def klipper_writer(rw, commands, cid=[123], min_interval=1,
                         background=False, run=[True],
                         tracker={}):
    """
    This asyncio co-routine writes commands from a list into the socket.
    There are two "awaits" here, that allow the serial reader to check for messages
    while this is running. First, with the "drain" await, and then with a
    1 second "sleep" await. Any of those can cause a *handoff* to the
    reader co-routine.
    Note: a "co-routine" is similar to the old "serial_reader" thread, made with in Threading.
    """
    print("klipper_writer started")

    # Main writer loop
    for command in commands:
        # Add key to tracker dict
        if str(command["id"]) not in tracker:
            tracker[str(command["id"])] = {}
        else:
            print("Warning, command found in tracker: " + str(command) + "\nIt will be overwritten.")
            tracker[str(command["id"])] |= {"warning": "overwriten"}

        # Update command tracker status
        tracker[str(command["id"])] |= {"status": "sending", "command": command}

        # Check for running state
        if not run[0]:
            break

        # Track time
        last_time = time.time()

        # Send command
        print("Sending command:")
        pprint(command)
        data = json.dumps(command).encode() + b"\x03"
        try:
            rw["writer"].write(data)
            tracker[str(command["id"])] |= {"status": "sent"}
            cid[0] = cid[0] + 1
            # Wait until it is appropriate to resume writing to the stream.
            await rw["writer"].drain()
        except ConnectionResetError as e:
            print("ConectionResetError at klipper_writer: " + str(e))
        except Exception as e:
            print("Unhandled exception at klipper_writer: " + str(e))
            traceback.print_exc()

        # Wait for the actual remaining time (after the previous await)
        new_time = time.time()
        if new_time - last_time <= min_interval:
            await asyncio.sleep(min_interval - (new_time - last_time))

    print("klipper_writer command list completed")

    # Secondary writer loop, responding to appends to the "commands" list
    if background:
        print("klipper_writer background loop started")
        n = len(commands)
    while background and run[0]:
        print("klipper_writer run value: " + str(run[0]))

        # Track time
        last_time = time.time()

        # Check if a new command is available
        if len(commands) > n:
            command = commands[n]
            n = n + 1

            # Quit condition
            if command == "quit":
                print("Quit command received")
                run[0] = False
                break

            # Add key to tracker dict
            if str(command["id"]) not in tracker:
                tracker[str(command["id"])] = {}
            else:
                print("Warning, command found in tracker: " + str(command) + "\nIt will be overwritten.")
                tracker[str(command["id"])] |= {"warning": "overwriten"}

            # Update command tracker status
            tracker[str(command["id"])] |= {"status": "sending", "command": command}

            # Send command
            print("Sending new command:")
            pprint(command)
            data = json.dumps(command).encode() + b"\x03"
            try:
                rw["writer"].write(data)
                tracker[str(command["id"])] |= {"status": "sent"}
                cid[0] = cid[0] + 1
                # Wait until it is appropriate to resume writing to the stream.
                await rw["writer"].drain()
            except ConnectionResetError as e:
                print("ConectionResetError at klipper_writer: " + str(e))
            except Exception as e:
                print("Unhandled exception at klipper_writer: " + str(e))
                traceback.print_exc()

        # Wait for the actual remaining time (after the previous await)
        new_time = time.time()
        if new_time - last_time <= min_interval:
            await asyncio.sleep(min_interval - (new_time - last_time))

    print("klipper_writer secondary writer completed")


    # Signal the EOF to the reader, terminating it.
    try:
        print("klipper_writer cleaning up, signaling eof")
        run[0] = False  # Just in case
        await asyncio.sleep(1)  # Give a chance to finish coroutines
        rw["writer"].write_eof()
        # Wait until it is appropriate to resume writing to the stream.
        await rw["writer"].drain()

    except ConnectionResetError as e:
        print("ConectionResetError at klipper_writer cleanup: " + str(e))

    except Exception as e:
        print("Unhandled exception at klipper_writer cleanup: " + str(e))
        traceback.print_exc()

    print("klipper_writer ended")

This asyncio co-routine writes commands from a list into the socket. There are two "awaits" here, that allow the serial reader to check for messages while this is running. First, with the "drain" await, and then with a 1 second "sleep" await. Any of those can cause a handoff to the reader co-routine. Note: a "co-routine" is similar to the old "serial_reader" thread, made with in Threading.