deadlinequeue
Overview
A queue of items sorted by the time they are due.
An item is due when the deadline expires.
To add an element that will expire immediately, use timedue==0.
The queue consists of tuples of the form (deadline, idx, item). The lowest deadline is sorted to the front of the queue.
The deadline is based on time.perf_counter().
The role of the idx variable is to properly order items with the same priority level. By keeping a constantly increasing idx, the items will be sorted according to the order in which they were inserted.
Classes
- class pymisclib.deadlinequeue.DeadlineQueue(logger: ~logging.Logger = <Logger pymisclib.deadlinequeue (WARNING)>)
A queue of items sorted by the time they are due.
An item is due when the deadline expires.
To add an element that will expire immediately, use timedue==0.
The queue consists of tuples of the form (deadline, idx, item). The lowest deadline is sorted to the front of the queue.
The deadline is based on time.perf_counter().
The role of the idx variable is to properly order items with the same priority level. By keeping a constantly increasing idx, the items will be sorted according to the order in which they were inserted.
- get() tuple[float, int, Any]
Return the first element in the queue and remove it.
The element will be tuple (deadline, count, item).
- Returns:
First element in the queue.
- Rtype tuple[float, int, Any]:
- peek() tuple[float, int, Any]
Returns the first element in the queue without removing it.
The element will be tuple (deadline, count, item).
- Returns:
First element in the queue.
- Rtype tuple[float, int, Any]:
- put(item_tuple: tuple[float, Any])
Place a tuple in the queue.
The item tuple is (deadline, item). A lower deadline is sorted earlier in the queue.
- Parameters:
Any] (item_tuple tuple[float,) – An item to place in the queue with a deadline in seconds.
- qsize() int
Return the number of elements in the queue.
- Returns:
Number of elements in the queue.
- Rtype int:
Example
import heapq
import sys
import time
from threading import Barrier, BrokenBarrierError, Event, Thread
from pymisclib.deadlinequeue import DeadlineQueue
start_barrier = Barrier(2, timeout=1) # Start running producer and consumer.
end_barrier = Barrier(3, timeout=60) # Wait to terminate.
terminate_event = Event()
# Object that signals shutdown
_sentinel = object()
# The time offsets (in microseconds) when an event will be fired.
event_times = [0, 0.5, 0.2, 0.1, 0, 0.01, 0.02, 0, 0.099]
def producer(dq_out):
"""Place a new data item into the queue every interval seconds for number items."""
start_barrier.wait()
index = 0
start_micros = time.perf_counter() + 0.010
for et in event_times:
if et != 0:
et = start_micros + et
# Produce some data
data = f'Some value {index}'
dq_out.put((et, data))
print(f'Put: {data} @ {et:.6f} s')
index = index + 1
dq_out.put((0, 'abc'))
print('Put: sentinel')
dq_out.put((sys.float_info.max, _sentinel))
print(f'Queue: {dq_out}')
print(f'Producer is done, created {index} elements (plus sentinel).')
try:
end_barrier.wait()
except BrokenBarrierError:
print('producer() end_barrier broken')
def consumer(dq_in):
"""Get data items from the queue and process them."""
start_barrier.wait()
items = 0
while True:
# Peek at the data.
(deadline, idx, data) = dq_in.peek()
current_s = time.perf_counter()
if deadline > current_s:
if terminate_event.is_set():
break
continue
# Get the first element in the queue.
(deadline, idx, data) = dq_in.get()
items = items + 1
if data is _sentinel:
print('Get: sentinel')
break
print(f'Get: {data} @ {deadline:.6f} s (qsize {dq_in.qsize()})'
f' delta_s {current_s - deadline:.6f}')
# Indicate completion
print(f'Consumer is done, qsize {dq_in.qsize()}.')
print(f'Queue: {dq_in}')
try:
end_barrier.wait()
except BrokenBarrierError:
print('consumer() end_barrier broken')
q = DeadlineQueue()
t1 = Thread(target=consumer, args=(q,))
t2 = Thread(target=producer, args=(q,))
t1.start()
t2.start()
# Wait for all produced items to be consumed
try:
end_barrier.wait(timeout=5)
except BrokenBarrierError:
print('demo() end_barrier broken')
terminate_event.set()
print('Both threads are done. Terminating')