Thanks for all of the work here! If you don’t mind, I could use some help on a related problem. I’m streaming events over the H7 Plus, and while the event stream itself is consistent, I’m having trouble displaying it properly. When I try to display each packet and reconstruct the stream, I get jumpy behavior, where the stream will seem to skip frames. My overall goal is to get the same smoothness that would be seen from the OpenMV IDE but using the raw events over USB. Here are the scripts I’m using as well as the command I’m running to use them:
stream_test.py
#!/usr/bin/env python3
#
# This work is licensed under the MIT license.
# Copyright (c) 2013-2025 OpenMV LLC. All rights reserved.
# https://github.com/openmv/openmv/blob/master/LICENSE
#
# This example shows off using the genx320 event camera from Prophesee
# using event streaming mode and sending the data back to the PC.
#
# This script is meant to be run using https://github.com/openmv/openmv-python
# from a PC using desktop tools. Statistics on the events being transferred
# are printed to the console.
import sys
import os
import argparse
import time
import logging
import signal
import atexit
import numpy as np
from openmv.camera import Camera
import threading
from pathlib import Path
from einops import rearrange, reduce
import cv2
def events_to_image(events, width=320, height=320):
"""
Converts raw GENX320 events (N, 6) into a displayable BGR image.
"""
# Create a neutral gray background
img = np.full((height, width, 3), 127, dtype=np.uint8)
# Filter for valid coordinates just in case
x = events[:, 4].astype(int)
y = events[:, 5].astype(int)
# Simple polarity: Type is usually in column [0]
# (Check your GENX320 script, but typically 1 is ON, 0 is OFF)
p = events[:, 0]
# Mask for ON events (White) and OFF events (Black)
img[y[p == 1], x[p == 1]] = [255, 255, 255]
img[y[p == 0], x[p == 0]] = [0, 0, 0]
return img
COLOR_CAMERA = "\033[32m" # green
COLOR_RESET = "\033[0m"
class GENX320(threading.Thread):
def __init__(self, args, zmq_pub_addr="tcp://127.0.0.1:5556", save_path: Path = None):
super().__init__()
# Extract GENX320 specific configuration
self.port = args.port
self.script = args.script
self.poll = args.poll
self.timeout = args.timeout
self.debug = args.debug
self.baudrate = args.baudrate
self.crc = args.crc
self.seq = args.seq
self.ack = args.ack
self.events = args.events
self.max_retry = args.max_retry
self.max_payload = args.max_payload
self.drop_rate = args.drop_rate
self.quiet = args.quiet
self.alpha = args.alpha
# Extract temporal accumulation configuration
self.window_duration_us = args.window_duration_us
# Variable to track whether we send events or not
self.active = True
# Register signal handlers for clean exit
signal.signal(signal.SIGINT, self.signal_handler)
signal.signal(signal.SIGTERM, self.signal_handler)
atexit.register(self.cleanup_and_exit)
# Configure logging
if self.debug:
log_level = logging.DEBUG
elif not self.quiet:
log_level = logging.INFO
else:
log_level = logging.ERROR
logging.basicConfig(
format="%(relativeCreated)010.3f - %(message)s",
level=log_level,
)
# Enable file saving if a save_path is provided
self.save_path = save_path
def cleanup_and_exit(self):
os._exit(0)
def signal_handler(self, signum, frame):
self.cleanup_and_exit()
def run(self):
# Load script
with open(self.script, 'r') as f:
script = f.read()
logging.info(f"Loaded script from {self.script}")
log_file = None
if self.save_path:
log_file = self.save_path.open("wb")
logging.info(f"Logging raw events to {self.save_path}")
try:
with Camera(self.port, baudrate=self.baudrate, crc=self.crc, seq=self.seq,
ack=self.ack, events=self.events,
timeout=self.timeout, max_retry=self.max_retry,
max_payload=self.max_payload, drop_rate=self.drop_rate) as camera:
logging.info(f"Connected to OpenMV camera on {self.port}")
# Stop any running script
camera.stop()
time.sleep(0.500)
# Execute script
camera.exec(script)
logging.info("Script executed...")
# Stat variables
start_time = time.perf_counter()
last_time = start_time
total_events = 0
total_bytes = 0
event_rate_ema = 0.0
mbps_ema = 0.0
last_window_time = time.perf_counter()
window_fps = 0.0
# Temporal accumulation variables
accum_events = []
accum_start_time_us = None
while self.active:
t0 = time.time()
# Read camera status
status = camera.read_status()
# Read text output
if not self.quiet and status and status.get('stdout'):
if text := camera.read_stdout():
print(f"{COLOR_CAMERA}{text}{COLOR_RESET}", end='')
if not camera.has_channel('events'):
#time.sleep(0.01)
continue
if not status.get('events'):
#time.sleep(0.01)
continue
size = camera.channel_size('events')
if size <= 0:
#time.sleep(0.01)
continue
now = time.perf_counter()
dt = now - last_time
last_time = now
if dt <= 0.0:
continue
data = camera.channel_read('events', size)
# Each event row: 6 x uint16 (little-endian)
if (len(data) % (6 * 2)) != 0:
print(f"Warning: misaligned packet: {len(data)} bytes (not multiple of 12)")
continue
events = np.frombuffer(data, dtype='<u2').reshape(-1, 6)
# Shape: (event_count, 6)
# Columns:
# [0] Event type
# [1] Seconds timestamp
# [2] Milliseconds timestamp
# [3] Microseconds timestamp
# [4] X coordinate 0 to 319 for GENX320
# [5] Y coordinate 0 to 319 for GENX320
event_count = events.shape[0]
# put events into file if provided
if log_file:
log_file.write(data)
ts_chunk_us = (events[:, 1].astype(np.uint64) * 1000000 +
events[:, 2].astype(np.uint64) * 1000 +
events[:, 3].astype(np.uint64))
if accum_start_time_us is None:
accum_start_time_us = ts_chunk_us[0]
accum_events.append(events)
current_duration = ts_chunk_us[-1] - accum_start_time_us
# publish events
if current_duration >= self.window_duration_us:
# Concatenate and publish
full_packet = np.vstack(accum_events)
now_window = time.perf_counter()
window_dt = now_window - last_window_time
window_fps = 1.0 / window_dt if window_dt > 0 else 0
last_window_time = now_window
# Reset for next window
accum_events = []
accum_start_time_us = None
# Log stats only when we actually send a window
logging.info(f"Published Window: {full_packet.shape[0]} events over {current_duration/1000:.2f}ms")
img = events_to_image(full_packet)
cv2.putText(img, f"Disp FPS: {window_fps:.1f}", (10, 30),
cv2.FONT_HERSHEY_SIMPLEX, 0.7, (0, 255, 0), 2)
cv2.imshow("GENX320 Event Histogram", img)
cv2.waitKey(1)
# Instantaneous rates
events_per_sec = event_count / dt
mb_per_sec = len(data) / 1048576.0 / dt
# EMA smoothing
if event_rate_ema == 0.0:
event_rate_ema = events_per_sec
else:
event_rate_ema = event_rate_ema * (1.0 - self.alpha) + events_per_sec * self.alpha
if mbps_ema == 0.0:
mbps_ema = mb_per_sec
else:
mbps_ema = mbps_ema * (1.0 - self.alpha) + mb_per_sec * self.alpha
total_events += event_count
total_bytes += len(data)
t1 = time.time()
fps = 1 / (t1 - t0)
elapsed = now - start_time
logging.info(
f"events={event_count:6d} "
f"rate={event_rate_ema:9.0f} ev/s "
f"bw={mbps_ema:6.2f} MB/s "
f"total_events={total_events:10d} "
f"uptime={elapsed:7.1f}s "
f"FPS={fps:4.2f}"
)
except KeyboardInterrupt:
logging.info("Interrupted by user")
except Exception as e:
logging.error(f"Error: {e}")
if self.debug:
import traceback
logging.error(f"{traceback.format_exc()}")
sys.exit(1)
def stop(self):
self.active = False
def str2bool(v):
"""Convert string to boolean for argparse"""
if isinstance(v, bool):
return v
if v.lower() in ('yes', 'true', 't', 'y', '1'):
return True
elif v.lower() in ('no', 'false', 'f', 'n', '0'):
return False
else:
raise argparse.ArgumentTypeError('Boolean value expected.')
def main():
parser = argparse.ArgumentParser()
parser.add_argument('--port',
action='store', default='/dev/ttyACM0',
help='Serial port (default: /dev/ttyACM0)')
parser.add_argument("--script",
action="store", required=True,
help="Script file")
parser.add_argument('--poll', action='store',
default=4, type=int,
help='Poll rate in ms (default: 4)')
parser.add_argument('--timeout',
action='store', type=float, default=1.0,
help='Protocol timeout in seconds')
parser.add_argument('--debug',
action='store_true',
help='Enable debug logging')
parser.add_argument('--baudrate',
type=int, default=921600,
help='Serial baudrate (default: 921600)')
parser.add_argument('--crc',
type=str2bool, nargs='?', const=True, default=False,
help='Enable CRC validation (default: false)')
parser.add_argument('--seq',
type=str2bool, nargs='?', const=True, default=True,
help='Enable sequence number validation (default: true)')
parser.add_argument('--ack',
type=str2bool, nargs='?', const=True, default=False,
help='Enable packet acknowledgment (default: false)')
parser.add_argument('--events',
type=str2bool, nargs='?', const=True, default=True,
help='Enable event notifications (default: true)')
parser.add_argument('--max-retry',
type=int, default=3,
help='Maximum number of retries (default: 3)')
parser.add_argument('--max-payload',
type=int, default=4096,
help='Maximum payload size in bytes (default: 4096)')
parser.add_argument('--drop-rate',
type=float, default=0.0,
help='Packet drop simulation rate (0.0-1.0, default: 0.0)')
parser.add_argument('--quiet',
action='store_true',
help='Suppress script output text')
parser.add_argument("--alpha",
type=float, default=0.2,
help="EMA smoothing factor (0<alpha<=1). Default: 0.2")
parser.add_argument("--window_duration_us",
type=float, default=50000,
help="Duration of each event time slice in us. Default: 50000")
args = parser.parse_args()
driver = GENX320(args=args)
driver.start()
if __name__ == '__main__':
main()
genx320_stream.py:
# This work is licensed under the MIT license.
# Copyright (c) 2013-2025 OpenMV LLC. All rights reserved.
# https://github.com/openmv/openmv/blob/master/LICENSE
#
# This example shows off using the genx320 event camera from Prophesee
# using event streaming mode and sending the data back to the PC.
#
# This script is meant to be run using https://github.com/openmv/openmv-python
# from a PC using desktop tools. No visualization or text output is generated
# by this script for OpenMV IDE.
import csi
import protocol
# https://micropython-ulab.readthedocs.io/en/latest/index.html
from ulab import numpy as np
# Event buffers arrive from the camera and are stored in
# a fifo buffer before being processed by python.
CSI_FIFO_DEPTH = 8
# Raw events post-processed by python, put into another
# fifo buffer, and then sent to the PC.
EVENT_FIFO_DEPTH = 8
# Stores camera events (8 buffers)
# Shape: (EVT_res, 6) where EVT_res is the event resolution
# EVT_res: must be a power of two between 1024 and 65536.
# Columns:
# [0] Event type
# [1] Seconds timestamp
# [2] Milliseconds timestamp
# [3] Microseconds timestamp
# [4] X coordinate 0 to 319 for GENX320
# [5] Y coordinate 0 to 319 for GENX320
events = [np.zeros((4096, 6), dtype=np.uint16) for i in range(EVENT_FIFO_DEPTH)]
event_counts = [0 for i in range(EVENT_FIFO_DEPTH)]
# ULAB .tobytes() creates shallow copy bytearrays. Wrap these in memoryviews
# for fast slicing without copies.
mv_events = [memoryview(events[i].tobytes()) for i in range(EVENT_FIFO_DEPTH)]
# Event buffer fifo pointers.
wr_index = 0
rd_index = 0
def read_available():
a = wr_index - rd_index
if a < 0:
a += EVENT_FIFO_DEPTH
return a
def write_available():
return EVENT_FIFO_DEPTH - 1 - read_available()
# Initialize the sensor.
csi0 = csi.CSI(cid=csi.GENX320)
csi0.reset()
csi0.ioctl(csi.IOCTL_GENX320_SET_MODE, csi.GENX320_MODE_EVENT, events[0].shape[0])
csi0.framebuffers(CSI_FIFO_DEPTH)
# set sensor biases
BIASES = {"DIFF_OFF": 30, "DIFF_ON": 30, "HPF": 40, "FO": 34, "REFR": 10}
csi0.ioctl(csi.IOCTL_GENX320_SET_BIAS, csi.GENX320_BIAS_DIFF_OFF, BIASES["DIFF_OFF"])
csi0.ioctl(csi.IOCTL_GENX320_SET_BIAS, csi.GENX320_BIAS_DIFF_ON, BIASES["DIFF_ON"])
csi0.ioctl(csi.IOCTL_GENX320_SET_BIAS, csi.GENX320_BIAS_HPF, BIASES["HPF"])
csi0.ioctl(csi.IOCTL_GENX320_SET_BIAS, csi.GENX320_BIAS_FO, BIASES["FO"])
csi0.ioctl(csi.IOCTL_GENX320_SET_BIAS, csi.GENX320_BIAS_REFR, BIASES["REFR"])
# # set AFK filter, band-stop from 50 to 500 FPS
enable = 1
bsl = 50
bsh = 500
csi0.ioctl(csi.IOCTL_GENX320_SET_AFK, enable, bsl, bsh)
#csi0.ioctl(csi.IOCTL_GENX320_SET_AFK, 0)
class EventChannel:
def size(self):
if read_available():
return event_counts[rd_index] * 12
return 0
def readp(self, offset, size):
global rd_index
if read_available():
end = offset + size
mv = mv_events[rd_index][offset:end]
# Free the buffer after all data has been read.
if end == event_counts[rd_index] * 12:
rd_index = (rd_index + 1) % EVENT_FIFO_DEPTH
return mv
return bytes(size)
def poll(self):
return read_available()
protocol.register(name='events', backend=EventChannel())
while True:
if (write_available()):
# Reads up to 32768 events from the camera.
# Returns the number of valid events (0-32768) or a negative error code.
# Note that old events in the buffer are not cleared to save CPU time.
event_counts[wr_index] = csi0.ioctl(csi.IOCTL_GENX320_READ_EVENTS, events[wr_index])
wr_index = (wr_index + 1) % EVENT_FIFO_DEPTH
To run these, i used the command:
python3 driver/stream_test.py --poll 1 --script driver/openmv_scripts/genx320_event_mode_streaming_on_cam.py --port /dev/ttyACM0 --crc True