# Copyright (c) 2019 and onward National Institute for Subatomic Physics Nikhef
# SPDX-License-Identifier: MIT
import threading
from spidr4 import rpc
import sys
import queue
import socket
import time
import numpy as np
from collections import defaultdict
import atexit
[docs]
class PacketReaderThread:
def __init__(self, tpxStub, queue, chipIndex=0, top=True, bottom=True, forward_empty=True, tag=None):
self._thread = threading.Thread(target=self._reader, args=(tpxStub, queue, chipIndex, top, bottom, forward_empty, tag))
self._wl = threading.Condition(threading.Lock())
atexit.register(self.stop)
with self._wl:
self._thread.start()
self._wl.wait()
# HACK just to make 99% sure the control app thread is active
time.sleep(0.1)
def _reader(self, tpxStub, queue, chipIndex, top=True, bottom=True, forward_empty=True, tag=None):
if top and bottom:
half = rpc.TPX4_BOTH
elif top:
half = rpc.TPX4_TOP
elif bottom:
half = rpc.TPX4_BOTTOM
else:
raise RuntimeError("No packets to read, at least top or bottom should be true")
options = rpc.Tpx4ReadoutOptions(idx=chipIndex, half=half, forward_empty=forward_empty)
self._resp = tpxStub.PacketReadout2(options)
with self._wl:
self._wl.notify()
try:
if tag is None:
for pkts in self._resp:
for pkt in pkts.packets:
queue.put(pkt)
else:
for pkts in self._resp:
for pkt in pkts.packets:
queue.put((tag, pkt))
except rpc.grpc.RpcError as r:
# cancelled is the expected error
if r.code() != rpc.grpc.StatusCode.CANCELLED:
print("GRPC error: %s (%s)" % (r.details(), r.code()), file=sys.stdout)
queue.put(None) # Place None to indicate error in stream
except Exception as e:
print("Packet read thread exits: %s" % e, file=sys.stdout)
queue.put(None) # Place none to indicate error in stream
[docs]
def stop(self):
if self._thread is None:
return
atexit.unregister(self.stop)
try:
self._resp.cancel()
except Exception as _:
pass
self._thread.join()
self._thread = None
[docs]
def packet_read_thread(tpxStub, queue, chipIndex=0, top=True, bottom=True, forward_empty=True, tag=None):
"""
Start read thread starts a thread retrieving pixel
data from the ScDaq using PacketReadout.
This thread will run for the entire duration of the program.
:param tpxStub: The gRPC Timepix4 stub
:param queue: The queue to place packets in
:param chipIndex: the index of the chip (default = 0)
:param top: Whether or not to stream the top
:param bottom: Whether or not to stream the bottom
:param forward_empty: Whether or not to forward empty packets
:param tag: The optional tag to put the queue
:return: PacketReaderThread
"""
return PacketReaderThread(tpxStub, queue, chipIndex=chipIndex, top=top, bottom=bottom, forward_empty=forward_empty, tag=tag)
[docs]
class UDPReaderThread:
def __init__(self, addr, port, q, tag=None, rx_buffer_size = 1024*1024*32):
self._cond = threading.Condition()
self._rx_thread = threading.Thread(target=self._reader, args=(addr, port))
self._rx_thread.daemon = True
self._q_thread = threading.Thread(target=self._qthread, args=(q, tag))
self._q_thread.daemon = True
self._buff_queue = queue.Queue()
self._stop = False
self._rx_buffer_size = rx_buffer_size
self._q_thread.start()
with self._cond:
self._rx_thread.start()
self._cond.wait()
time.sleep(0.1)
def _qthread(self, q, tag):
dt = np.dtype(np.uint64).newbyteorder('<')
ts = time.time()
bytes = 0
try:
while not self._stop:
try:
data = self._buff_queue.get(True, 0.1)
except queue.Empty as _:
continue
if data is None:
q.put(None)
break
bytes += len (data)
if len(data) % 8 != 0:
print("WARNING: Received UDP packet size not mutliple of 8, data corrupted?", file=sys.stderr)
packet_count = len(data) // 8
pkts = np.frombuffer(data, dtype=dt, count=packet_count)
if tag is None:
for i in range(len(pkts)):
q.put(int(pkts[i]))
else:
for i in range(len(pkts)):
q.put((tag, int(pkts[i])))
except Exception as e:
print("Packet queue thread failed, exits with: %s" % e, file=sys.stdout)
q.put(None) # Place none to indicate error in stream
ts = time.time() - ts
print(f"Received {bytes} bytes in {ts} seconds")
def _reader(self, addr, port):
ssock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
ssock.bind((addr, port))
ssock.settimeout(0.1)
ssock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16*1024*1024)
# ssock.setsockopt()
try:
with self._cond:
self._cond.notify()
while not self._stop:
try:
data = ssock.recv(32*1024)
self._buff_queue.put(data)
except socket.timeout:
pass
except Exception as e:
print("Packet read thread failed, exits with: %s" % e, file=sys.stdout)
[docs]
def stop(self):
if self._rx_thread is None:
return
self._stop = True
self._rx_thread.join(0.5)
self._rx_thread = None
self._q_thread.join(0.5)
self._q_thread = None
[docs]
def udp_read_thread(addr, port, q, tag=None):
return UDPReaderThread(addr, port, q, tag)
[docs]
def framefilter(pkt_gen):
in_frame = False
for pkt in pkt_gen:
if pkt == 0 and not in_frame:
continue
elif pkt == 0x79f8000000000000:
in_frame = True
elif pkt == 0x7908000000000000:
in_frame = False
yield pkt
[docs]
def framefilter_with_tag(pkt_gen):
in_frame = defaultdict(lambda: False)
for tag, pkt in pkt_gen:
if pkt == 0 and not in_frame[tag]:
continue
elif pkt == 0x79f8000000000000:
in_frame[tag] = True
elif pkt == 0x7908000000000000:
in_frame[tag] = False
yield tag, pkt
[docs]
def frame8b_generator(q, top_tag='top', bottom_tag='bot', frame_factory=lambda: np.zeros((512, 448))):
"""
Generator for complete frames.
:param q: A tagged queue for bottom and top. 0
:param top_tag:
:param bottom_tag:
:param frame_factory:
:return:
"""
[docs]
def queue_generator(q, timeout=0.05):
"""
Generator for retrieving packets from a queue. Data can
be placed in the queue from the packet_read_thread.
:param q: The queue to read from
:param timeout: Optional timeout, default 40 ms
"""
try:
while True:
v = q.get(True, timeout)
if v is None:
raise RuntimeError("Packet read thread stopped")
yield v
except queue.Empty as _:
return
[docs]
def unwrap_packet_stream(pktstream):
return (pkt for pkts in pktstream for pkt in pkts.packets)