deadlinequeue

Overview

A queue of items sorted by the time they are due.

An item is due when the deadline expires.

To add an element that will expire immediately, use timedue==0.

The queue consists of tuples of the form (deadline, idx, item). The lowest deadline is sorted to the front of the queue.

The deadline is based on time.perf_counter().

The role of the idx variable is to properly order items with the same priority level. By keeping a constantly increasing idx, the items will be sorted according to the order in which they were inserted.

Classes

class pymisclib.deadlinequeue.DeadlineQueue(logger: ~logging.Logger = <Logger pymisclib.deadlinequeue (WARNING)>)

A queue of items sorted by the time they are due.

An item is due when the deadline expires.

To add an element that will expire immediately, use timedue==0.

The queue consists of tuples of the form (deadline, idx, item). The lowest deadline is sorted to the front of the queue.

The deadline is based on time.perf_counter().

The role of the idx variable is to properly order items with the same priority level. By keeping a constantly increasing idx, the items will be sorted according to the order in which they were inserted.

get() tuple[float, int, Any]

Return the first element in the queue and remove it.

The element will be tuple (deadline, count, item).

Returns:

First element in the queue.

Rtype tuple[float, int, Any]:

peek() tuple[float, int, Any]

Returns the first element in the queue without removing it.

The element will be tuple (deadline, count, item).

Returns:

First element in the queue.

Rtype tuple[float, int, Any]:

put(item_tuple: tuple[float, Any])

Place a tuple in the queue.

The item tuple is (deadline, item). A lower deadline is sorted earlier in the queue.

Parameters:

Any] (item_tuple tuple[float,) – An item to place in the queue with a deadline in seconds.

qsize() int

Return the number of elements in the queue.

Returns:

Number of elements in the queue.

Rtype int:

Example

import heapq
import sys
import time
from threading import Barrier, BrokenBarrierError, Event, Thread

from pymisclib.deadlinequeue import DeadlineQueue

start_barrier = Barrier(2, timeout=1)   # Start running producer and consumer.
end_barrier = Barrier(3, timeout=60)    # Wait to terminate.
terminate_event = Event()

# Object that signals shutdown
_sentinel = object()

# The time offsets (in microseconds) when an event will be fired.
event_times = [0, 0.5, 0.2, 0.1, 0, 0.01, 0.02, 0, 0.099]

def producer(dq_out):
    """Place a new data item into the queue every interval seconds for number items."""
    start_barrier.wait()
    index = 0
    start_micros = time.perf_counter() + 0.010
    for et in event_times:
        if et != 0:
            et = start_micros + et
        # Produce some data
        data = f'Some value {index}'
        dq_out.put((et, data))
        print(f'Put: {data} @ {et:.6f} s')
        index = index + 1
    dq_out.put((0, 'abc'))
    print('Put: sentinel')
    dq_out.put((sys.float_info.max, _sentinel))
    print(f'Queue: {dq_out}')
    print(f'Producer is done, created {index} elements (plus sentinel).')
    try:
        end_barrier.wait()
    except BrokenBarrierError:
        print('producer() end_barrier broken')

def consumer(dq_in):
    """Get data items from the queue and process them."""
    start_barrier.wait()
    items = 0
    while True:
        # Peek at the data.
        (deadline, idx, data) = dq_in.peek()
        current_s = time.perf_counter()
        if deadline > current_s:
            if terminate_event.is_set():
                break
            continue

        # Get the first element in the queue.
        (deadline, idx, data) = dq_in.get()
        items = items + 1

        if data is _sentinel:
            print('Get: sentinel')
            break
        print(f'Get: {data} @ {deadline:.6f} s (qsize {dq_in.qsize()})'
              f' delta_s {current_s - deadline:.6f}')

    # Indicate completion
    print(f'Consumer is done, qsize {dq_in.qsize()}.')
    print(f'Queue: {dq_in}')
    try:
        end_barrier.wait()
    except BrokenBarrierError:
        print('consumer() end_barrier broken')

q = DeadlineQueue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
# Wait for all produced items to be consumed
try:
    end_barrier.wait(timeout=5)
except BrokenBarrierError:
    print('demo() end_barrier broken')
    terminate_event.set()
print('Both threads are done. Terminating')