Source code for spidr4.stream

# Copyright (c) 2019 and onward National Institute for Subatomic Physics Nikhef
# SPDX-License-Identifier: MIT

import threading
from spidr4 import rpc
import sys
import queue
import socket
import time
import numpy as np
from collections import defaultdict
import atexit


[docs] class PacketReaderThread: def __init__(self, tpxStub, queue, chipIndex=0, top=True, bottom=True, forward_empty=True, tag=None): self._thread = threading.Thread(target=self._reader, args=(tpxStub, queue, chipIndex, top, bottom, forward_empty, tag)) self._wl = threading.Condition(threading.Lock()) atexit.register(self.stop) with self._wl: self._thread.start() self._wl.wait() # HACK just to make 99% sure the control app thread is active time.sleep(0.1) def _reader(self, tpxStub, queue, chipIndex, top=True, bottom=True, forward_empty=True, tag=None): if top and bottom: half = rpc.TPX4_BOTH elif top: half = rpc.TPX4_TOP elif bottom: half = rpc.TPX4_BOTTOM else: raise RuntimeError("No packets to read, at least top or bottom should be true") options = rpc.Tpx4ReadoutOptions(idx=chipIndex, half=half, forward_empty=forward_empty) self._resp = tpxStub.PacketReadout2(options) with self._wl: self._wl.notify() try: if tag is None: for pkts in self._resp: for pkt in pkts.packets: queue.put(pkt) else: for pkts in self._resp: for pkt in pkts.packets: queue.put((tag, pkt)) except rpc.grpc.RpcError as r: # cancelled is the expected error if r.code() != rpc.grpc.StatusCode.CANCELLED: print("GRPC error: %s (%s)" % (r.details(), r.code()), file=sys.stdout) queue.put(None) # Place None to indicate error in stream except Exception as e: print("Packet read thread exits: %s" % e, file=sys.stdout) queue.put(None) # Place none to indicate error in stream
[docs] def stop(self): if self._thread is None: return atexit.unregister(self.stop) try: self._resp.cancel() except Exception as _: pass self._thread.join() self._thread = None
[docs] def packet_read_thread(tpxStub, queue, chipIndex=0, top=True, bottom=True, forward_empty=True, tag=None): """ Start read thread starts a thread retrieving pixel data from the ScDaq using PacketReadout. This thread will run for the entire duration of the program. :param tpxStub: The gRPC Timepix4 stub :param queue: The queue to place packets in :param chipIndex: the index of the chip (default = 0) :param top: Whether or not to stream the top :param bottom: Whether or not to stream the bottom :param forward_empty: Whether or not to forward empty packets :param tag: The optional tag to put the queue :return: PacketReaderThread """ return PacketReaderThread(tpxStub, queue, chipIndex=chipIndex, top=top, bottom=bottom, forward_empty=forward_empty, tag=tag)
[docs] class UDPReaderThread: def __init__(self, addr, port, q, tag=None, rx_buffer_size = 1024*1024*32): self._cond = threading.Condition() self._rx_thread = threading.Thread(target=self._reader, args=(addr, port)) self._rx_thread.daemon = True self._q_thread = threading.Thread(target=self._qthread, args=(q, tag)) self._q_thread.daemon = True self._buff_queue = queue.Queue() self._stop = False self._rx_buffer_size = rx_buffer_size self._q_thread.start() with self._cond: self._rx_thread.start() self._cond.wait() time.sleep(0.1) def _qthread(self, q, tag): dt = np.dtype(np.uint64).newbyteorder('<') ts = time.time() bytes = 0 try: while not self._stop: try: data = self._buff_queue.get(True, 0.1) except queue.Empty as _: continue if data is None: q.put(None) break bytes += len (data) if len(data) % 8 != 0: print("WARNING: Received UDP packet size not mutliple of 8, data corrupted?", file=sys.stderr) packet_count = len(data) // 8 pkts = np.frombuffer(data, dtype=dt, count=packet_count) if tag is None: for i in range(len(pkts)): q.put(int(pkts[i])) else: for i in range(len(pkts)): q.put((tag, int(pkts[i]))) except Exception as e: print("Packet queue thread failed, exits with: %s" % e, file=sys.stdout) q.put(None) # Place none to indicate error in stream ts = time.time() - ts print(f"Received {bytes} bytes in {ts} seconds") def _reader(self, addr, port): ssock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) ssock.bind((addr, port)) ssock.settimeout(0.1) ssock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 16*1024*1024) # ssock.setsockopt() try: with self._cond: self._cond.notify() while not self._stop: try: data = ssock.recv(32*1024) self._buff_queue.put(data) except socket.timeout: pass except Exception as e: print("Packet read thread failed, exits with: %s" % e, file=sys.stdout)
[docs] def stop(self): if self._rx_thread is None: return self._stop = True self._rx_thread.join(0.5) self._rx_thread = None self._q_thread.join(0.5) self._q_thread = None
[docs] def udp_read_thread(addr, port, q, tag=None): return UDPReaderThread(addr, port, q, tag)
[docs] def framefilter(pkt_gen): in_frame = False for pkt in pkt_gen: if pkt == 0 and not in_frame: continue elif pkt == 0x79f8000000000000: in_frame = True elif pkt == 0x7908000000000000: in_frame = False yield pkt
[docs] def framefilter_with_tag(pkt_gen): in_frame = defaultdict(lambda: False) for tag, pkt in pkt_gen: if pkt == 0 and not in_frame[tag]: continue elif pkt == 0x79f8000000000000: in_frame[tag] = True elif pkt == 0x7908000000000000: in_frame[tag] = False yield tag, pkt
[docs] def frame8b_generator(q, top_tag='top', bottom_tag='bot', frame_factory=lambda: np.zeros((512, 448))): """ Generator for complete frames. :param q: A tagged queue for bottom and top. 0 :param top_tag: :param bottom_tag: :param frame_factory: :return: """
[docs] def queue_generator(q, timeout=0.05): """ Generator for retrieving packets from a queue. Data can be placed in the queue from the packet_read_thread. :param q: The queue to read from :param timeout: Optional timeout, default 40 ms """ try: while True: v = q.get(True, timeout) if v is None: raise RuntimeError("Packet read thread stopped") yield v except queue.Empty as _: return
[docs] def unwrap_packet_stream(pktstream): return (pkt for pkts in pktstream for pkt in pkts.packets)