Im having trouble keeping constant frame rate with the openmv camera (h7 plus). We are sending a subarea uncompressed images, while updating the region of interest every second. The ROI is updated based on full jpeg image, not just a subregion of the image. To do so, we send a 4 character string (either “roii” to adjust the offset parameter, or “jpeg” to get the jpeg image from the cam, or “snap” to get the uncompressed ROI image from the pc side).The problem is that after sending a string command from the pc side, the first read on the openmv is always very slow (around 3-5 FPS), every read afterwards runs at normal speed (15 fps for a 200x100 pixel roi approx). What can be a possible solution for that?
script to flash on Openmv cam:
import sensor, image, utime, omv, gc
from pyb import USB_VCP
def zfill(string, fill):
for i in range(0, fill - len(string)):
string = "0" + string
return string
def MVread(str_in): # READ CONFIG MESSAGE SENT BY PC
offX = ''
offY = ''
width = ''
height = ''
RGBx = ''
RGBy = ''
commaNum = 0
for j in range(0, len(str_in)):
if str_in[j] == ',':
commaNum += 1
else:
if commaNum == 0:
width += (str_in[j])
if commaNum == 1:
height += (str_in[j])
if commaNum == 2:
offX += (str_in[j])
if commaNum == 3:
offY += (str_in[j])
if commaNum == 4:
break
return int(width), int(height), int(offX), int(offY)
gc.enable()
usb = USB_VCP()
omv.disable_fb(True)
sensor.reset()
sensor.set_pixformat(sensor.RGB565)
# Set frame size to WQXGA2 (2592x1944) or QVGA (320x240)
offx = 0
offy = 0
quality = 30
#sensor.ioctl(sensor.IOCTL_SET_READOUT_WINDOW, (2400,1800))
#sensor.set_framesize(sensor.SVGA)
#sensor.ioctl(sensor.IOCTL_SET_READOUT_WINDOW, (1920,1440))
sensor.ioctl(sensor.IOCTL_SET_READOUT_WINDOW, (2592,1944))
sensor.set_framesize(sensor.VGA)
sensor.skip_frames(1)
while True:
gc.collect()
cmd = usb.recv(32, timeout=5000)
#send RGB image
if (cmd[0:4] == b'snap'):
frame = sensor.snapshot()
usb.send(frame)
#send JPEG image
if (cmd[0:4] == b'jpeg'):
frame = sensor.snapshot()
img = frame.compress(quality)
info = zfill(str(len(img.bytearray())), 9)
usb.send(info)
usb.send(img)
#adjust ROI values
elif (cmd[0:4] == b'roii'):
msg = usb.recv(32, timeout=5000)
width, height, offx, offy = MVread(msg.decode())
if width >= 800:
width = 800
if height >= 600:
height = 600
if offx >= 6000 and offx != 9000:
offx = (-1)*(offx-6000)
if offy >= 6000 and offx != 9000:
offy = (-1)*(offy-6000)
if offx == 9000:
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.VGA)
sensor.ioctl(sensor.IOCTL_SET_READOUT_WINDOW, (2592,1944))
sensor.set_windowing((640,480))
sensor.skip_frames(1)
else:
ioX = offx
setX = 0
ioY = offy
setY = 0
CAPw = 800
CAPh = 600
if offx + CAPw < 2592:
ioX = offx
setX = 0
else:
ioX = 2592 - CAPw
setX = (offx - ioX)
setX = int(setX) - int(setX)%2
if offy + CAPh < 1944:
ioY = offy
setY = 0
else:
ioY = 1944 - CAPh
setY = (offy - ioY)
setY = int(setY) - int(setY)%2
sensor.set_pixformat(sensor.RGB565)
sensor.set_framesize(sensor.SVGA) #Moved above IOCTL so that offsets are (-896, -672) instead of VGA offsets
sensor.ioctl(sensor.IOCTL_SET_READOUT_WINDOW, (ioX,ioY,CAPw,CAPh))
sensor.set_windowing((setX,setY,width,height))
sensor.skip_frames(1)
script to run on your PC:
import serial, struct, time
import numpy as np
import pdb
import dlib
from imutils import face_utils
import PIL.Image as Image
import io
class openmv():
def __init__(self, portname, avg_mode=False):
self.serial_port = serial.Serial(portname, baudrate=115200, bytesize=serial.EIGHTBITS,
parity=serial.PARITY_NONE,
xonxoff=False, stopbits=serial.STOPBITS_ONE, timeout=None, rtscts=True,
dsrdtr=False)
self.focal_length = 1.2
self.roi_dim = {"width": 100, "height": 100, "offx": 0, "offy": 0}
self.face_detector = cv2.CascadeClassifier('inputstream/openmv/haarcascades_frontalface.xml')
self.landmark_detector = dlib.shape_predictor("inputstream/openmv/shape_predictor_68_face_landmarks.dat")
self.avg_mode = avg_mode
self.stream_open = False
self.adjustROI()
print("openmv cam ", portname, "connected")
def setRoiDim(self, x, y, offx, offy, set_variables=True):
if set_variables:
self.roi_dim = {"width": x, "height": y, "offx": offx, "offy": offy}
self.sendParametersToOpenMV("roii")
self.sendParametersToOpenMV([x, y, offx, offy])
self.serial_port.flush()
def read(self):
img = self._readImg()
return img
def _readImg(self, read_full_jpeg=False):
if read_full_jpeg:
# set capturing frame to full frame
self.setRoiDim(0, 0, 9000, 9000, set_variables=False)
# send command to read jpeg
self.sendParametersToOpenMV("jpeg")
# read length and image
length = self.serial_port.read(9).decode()
jpeglen = int(length)
imgbytes = self.serial_port.read(jpeglen)
# decode
image_jpeg = Image.open(io.BytesIO(imgbytes))
img = np.array(image_jpeg.convert('RGB'))
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
# set capturing frame to previous roi
self.setRoiDim(self.roi_dim["width"], self.roi_dim["height"], self.roi_dim["offx"], self.roi_dim["offy"])
return img
elif (self.roi_dim["width"] * self.roi_dim["height"] * 2) == 0:
return np.array([])
else:
self.sendParametersToOpenMV("snap")
buf = self.serial_port.read(self.roi_dim["width"] * self.roi_dim["height"] * 2)
img = decodeRGB(buf, self.roi_dim["width"], self.roi_dim["height"])
return img
def sendParametersToOpenMV(self, parameters):
msg = ""
if type(parameters).__name__ == 'list':
for par in parameters: msg += str(par) + ","
else:
msg = parameters
# encode msg as bytes
msg = msg.encode()
# openmv expecting a 32 bytes message
dummyvar = bytearray(32 - len(msg))
msg += dummyvar
self.serial_port.write(msg)
# print(msg)
self.serial_port.flush()
def adjustROI(self, img = np.array([])):
ts = time.time()
if not img.any():
img = self._readImg(read_full_jpeg=True)
# if img is empty, then return previous roi coord
if not img.any():
return self.roi_dim
# convert to gray space
grey = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
# detect face
bbox = self.face_detector.detectMultiScale(grey)
roi_coord = [0,0,0,0]
if len(bbox) != 0:
# find largest bbox in detected bboxs
face_coord = getLargestBboxCoord(bbox)
left, top, right, bottom = face_coord[0], face_coord[1], face_coord[0] + face_coord[2], face_coord[1] + \
face_coord[3]
rectangle = dlib.rectangle(left=left, top=top, right=right, bottom=bottom)
# find facial landmarks
landmarks = self.landmark_detector(img, rectangle)
landmarks = face_utils.shape_to_np(landmarks)
# get roi coordinates of forehead
bbox = (left, top, right, bottom)
roi_coord = getROICoordinates(bbox, landmarks)
# get roi coordinates
x_dim, y_dim = abs(roi_coord[2] - roi_coord[0]), abs(roi_coord[3] - roi_coord[1])
Bleft = roi_coord[0]
Btop = roi_coord[1]
# print('here', roi_coord)
SCALE_w = 2592 / 640
SCALE_h = 1944 / 480
w = x_dim * SCALE_w
h = y_dim * SCALE_h
# offx = Bleft * 3 - (2592-800)/2 + (2592-2400)/2
# offy = Btop * 3 - (1944-600)/2 + (1944-1800)/2
# offx = Bleft * 3 - (2592-800)/2 + (2592-1920)/2
# offy = Btop * 3 - (1944-600)/2 + (1944-1440)/2
offx = Bleft * SCALE_w - (2592 - 800) / 2 #- max((800 - w)/2, 0)
offy = Btop * SCALE_h - (1944 - 600) / 2 #- max((600 - h)/2, 0)
if offx < 0:
offx = (-1)*offx + 6000
if offy < 0:
offy = (-1)*offy + 6000
self.roi_dim = {"width": int(w) - int(w) % 2,
"height": int(h) - int(h) % 2,
"offx": int(offx) - int(offx) % 2,
"offy": int(offy) - int(offy) % 2}
omw_coord = [self.roi_dim["width"], self.roi_dim["height"], self.roi_dim["offx"], self.roi_dim["offy"]]
# send new ROI coord to openmv
self.setRoiDim(self.roi_dim["width"], self.roi_dim["height"], self.roi_dim["offx"], self.roi_dim["offy"])
print("fps adjust roi", 1 /(time.time()-ts))
return roi_coord, omw_coord
def beginAcquistion(self):
openmv.adjustROI()
self.stream_open = True
def endAcquistion(self):
self.stream_open = False
def getFocalLength(self):
return self.focal_length
def getROICoordinates(bbox, landmarks, roi_area="forehead"):
roi_points = [800, 600, 9000, 9000]
if roi_area == "forehead":
y_lower = [19, 24]
x_lower = 19
x_upper = 25
y_lm_max = float('inf')
if roi_area == "skin":
y_lower = [30]
x_lower = 2
x_upper = 16
y_lm_max = float('inf')
# ROI coordinates
for index, (x_lm, y_lm) in enumerate(landmarks):
# lower bound x line
if index == x_lower:
roi_points[0] = x_lm
# lower y bound
# if index == y_lower[0] or index == y_lower[1]:
if index in y_lower:
if y_lm_max > y_lm:
y_lm_max = y_lm
roi_points[1] = y_lm_max
# upper bound x line
if index == x_upper:
roi_points[2] = x_lm
# upper y bound
roi_points[3] = bbox[1]
roi_points[1], roi_points[3] = roi_points[3], roi_points[1]
return roi_points
def decodeRGB(bcolor, width, height):
flag1 = time.time()
MASK5 = 0b011111
MASK6 = 0b111111
im = np.empty([width, height], dtype=np.uint16)
i = 0
for y in range(0, height):
for x in range(0, width):
im[x, y] = int.from_bytes(bcolor[i + 1:i + 2] + bcolor[i:i + 1], "big")
i += 2
b = (im & MASK5) << 3
g = ((im >> 5) & MASK6) << 2
r = ((im >> (5 + 6)) & MASK5) << 3
bgr = np.dstack((b, g, r)).astype(np.uint8)
flag4 = time.time()
return bgr
def getLargestBboxCoord(bbox):
max_area = 0
max_coord = [0, 0, 0, 0]
for (x, y, w, h) in bbox:
area = (x + w) * (y + h)
if area > max_area:
max_area = area
max_coord = [x, y, w, h]
return max_coord
portname = "/dev/serial/by-id/usb-OpenMV_OpenMV_Virtual_Comm_Port_in_FS_Mode_3254366F3139-if00"
openmv = omv.openmv(portname, avg_mode=False)
dur=20
start = time.time()
n_frames = 0
openmv.adjustROI()
time_dif = 0
while time.time()-start < dur:
ts = time.time()
msg = openmv.read()
print("fps", 1/(time.time()-ts))
n_frames+= 1
# adjust omw avg ROI after OMW_UPDATE_TIME_ROI
if ts - time_dif > OMW_UPDATE_TIME_ROI:
time_dif = ts
t1 = time.time()
roi_coord = openmv.adjustROI()
print("fps adjust ROI", 1/ (time.time()-t1))
"""t2 = time.time()
msg = openmv.read()
print("fps first read", 1/ (time.time()-t2))"""