I have an H7 camera, and I would like to use the RPC library to allow the camera to accept input from, and send data to my PC, while the camera is running.
On my PC side, I was able to read the image snapshot as a byte array with no problem. When I printed the result on my PC side, I got a nice big object like
The RPC library exclusively sends bytes() on the link. What you wrote for the camera is fine. That should work. Note, the second .bytearray() call isn’t needed. Same for the image object.
It’s probably and issue on the PC side with figuring out what the object is. Maybe:
Yeah, that sounds right. The bytearray(byte_object) was me grasping at straws, but I’ve done the same just passing the byte_object. I’m trying to figure out how to tell my PC side what to expect so that I can read the string.
If I just try something like string_result = result.decode('utf-8')
on my PC side I get an error for:
AttributeError: 'memoryview' object has no attribute 'decode'
These are the ways I’ve tried to decode on my PC side, and the results I get:
Ok, I’ve got it working now. I think I actually had an issue with my callback order? I changed so many things that I am not sure what the actual fix was.
Here’s my PC code
import io, rpc, serial, serial.tools.list_ports, socket, sys, os
#from tkinter import *
# Fix Python 2.x.
try: input = raw_input
except NameError: pass
print("\nAvailable Ports:\n")
for port, desc, hwid in serial.tools.list_ports.comports():
print("{} : {} [{}]".format(port, desc, hwid))
sys.stdout.write("\nPlease enter a port name: ")
sys.stdout.flush()
interface = rpc.rpc_usb_vcp_master(port=input())
sys.stdout.flush()
# This will be called with the bytes() object generated by the slave device.
def frame_buffer_cb(data):
print(data)
sys.stdout.flush()
while(True):
sys.stdout.flush()
# Call the openmv
result = interface.call("send_data_to_stream")
print('result= ',result)
if result is not None:
interface.stream_reader(frame_buffer_cb, queue_depth=8)
Here is my camera code:
import omv
import rpc
import sensor
sensor.reset()
sensor.set_pixformat(sensor.GRAYSCALE)
sensor.set_framesize(sensor.QVGA)
sensor.skip_frames(time=2000)
omv.disable_fb(True)
interface = rpc.rpc_usb_vcp_slave()
# This is called repeatedly by interface.stream_writer().
def test_for_nonbackground_pixels_cb():
string_to_encode = "pass"
byte_object = string_to_encode.encode('utf-8')
return byte_object
# Transmits a stream of bytes()'s generated by stream_generator_cb to the master device.
def stream_cb():
interface.stream_writer(test_for_nonbackground_pixels_cb)
def send_data_to_stream():
interface.schedule_callback(stream_cb)
return bytes()
interface.register_callback(send_data_to_stream)
interface.loop()