Posting this here in the event anyone wants to go down the route I needed to. i hope this is allowed, and wish i had a better way to present this.
I have a ton of ESP32’s laying around, so why not use one as a co-processor as both Adafruit & Arduino do. don’t be confused, the ESP32 is a Slave device. the OpenMV Cam H7 is a master.
Nina-FW project here: GitHub - arduino/nina-fw: Firmware for u-blox NINA W102 WiFi/BT module
Adafruit’s ESP32SPI libraries for the circuit python project here: GitHub - adafruit/Adafruit_CircuitPython_ESP32SPI: ESP32 as wifi with SPI interface
Adafruit’s minimqtt library: GitHub - adafruit/Adafruit_CircuitPython_MiniMQTT: MQTT Client Library for CircuitPython
based on their fine work, i present the (ugly, but functional) ports for anyone else in need or interested.
A few things. i did, and learned along the way. hopefully it helps if you try this, its taken me 6 months off and on to nail it all down. my use case was to use a Lolin D32 Pro, so i could offload alot of work i wanted a separate device to do on its own; lights, battery management, charging, and monitoring, wifi, and an LCD.
- yes, its ugly. but it’s completely functional
- i still can’t get my head around github, so its not posted there.
- this isn’t a 1:1 port. a few things had to be done, including:
a. borrowing code from some of their base libraries and incorporating it and some other modifications because of the minute code differences between micropython and circuit python.
b. changed some of it to make it non-blocking on wifi manager.
c. added a few of my own inclusions
d. i’d strong recommend whenever using the socket to call the “settimeout” function with any value other than the 0 it starts with. else, any internet delay will cause it to raise an exception. its not forgiving.
e. i renamed the libraries, and as such, their references. - put the libraries on the SD card directly. do not under any circumstance allow openmv’s IDE to transfer them to the SD card. the IDE transfers them fine to the computer, but something it does when transferring them to the SD card breaks them, and nothing works.
- i myself cannot get the SPI speed above 19.999999 Mhz. at 20 Mhz, everything goes haywire. I tested this with both a WROOM and a WROVER module with both VSPI and HSPI and regardless of polarity and phase, and it just can’t do 20mhz or higher.
- the reset pin needs to be set to “IN” in order to flash the ESP32 if hardwired.
- Polarity and phase must both be set to 1. it won’t work under any other combination. i bought an analyzer to check this.
- this works with version 1.5 of nina-fw.
If you have any suggestions, please let me know.
without further ado.
ESP32SPI.py
"""
`esp32spi`
================================================================================
CircuitPython driver library for using ESP32 as WiFi co-processor using SPI ported to the
OpenMV Cam.
Original source here: https://github.com/adafruit/Adafruit_CircuitPython_ESP32SPI
* Author(s): ladyada
* Forked by: Nezra
Implementation Notes
--------------------
**Hardware:**
ESP32 variant of your choice with the Nina-FW.
OpenMV Cam
"""
import struct
import utime
import time
from micropython import const
from machine import Pin
from digitalio import Direction
__version__ = "0.0.0-auto.0"
# pylint: disable=bad-whitespace
_SET_NET_CMD = const(0x10)
_SET_PASSPHRASE_CMD = const(0x11)
_SET_AP_NET_CMD = const(0x18)
_SET_AP_PASSPHRASE_CMD = const(0x19)
_SET_DEBUG_CMD = const(0x1A)
_GET_CONN_STATUS_CMD = const(0x20)
_GET_IPADDR_CMD = const(0x21)
_GET_MACADDR_CMD = const(0x22)
_GET_CURR_SSID_CMD = const(0x23)
_GET_CURR_BSSID_CMD = const(0x24)
_GET_CURR_RSSI_CMD = const(0x25)
_GET_CURR_ENCT_CMD = const(0x26)
_SCAN_NETWORKS = const(0x27)
_START_SERVER_TCP_CMD = const(0x28)
_GET_SOCKET_CMD = const(0x3F)
_GET_STATE_TCP_CMD = const(0x29)
_DATA_SENT_TCP_CMD = const(0x2A)
_AVAIL_DATA_TCP_CMD = const(0x2B)
_GET_DATA_TCP_CMD = const(0x2C)
_START_CLIENT_TCP_CMD = const(0x2D)
_STOP_CLIENT_TCP_CMD = const(0x2E)
_GET_CLIENT_STATE_TCP_CMD = const(0x2F)
_DISCONNECT_CMD = const(0x30)
_GET_IDX_RSSI_CMD = const(0x32)
_GET_IDX_ENCT_CMD = const(0x33)
_REQ_HOST_BY_NAME_CMD = const(0x34)
_GET_HOST_BY_NAME_CMD = const(0x35)
_START_SCAN_NETWORKS = const(0x36)
_GET_FW_VERSION_CMD = const(0x37)
_GET_TIME = const(0x3B)
_GET_IDX_BSSID_CMD = const(0x3C)
_GET_IDX_CHAN_CMD = const(0x3D)
_PING_CMD = const(0x3E)
_SEND_DATA_TCP_CMD = const(0x44)
_GET_DATABUF_TCP_CMD = const(0x45)
_SET_ENT_IDENT_CMD = const(0x4A)
_SET_ENT_UNAME_CMD = const(0x4B)
_SET_ENT_PASSWD_CMD = const(0x4C)
_SET_ENT_ENABLE_CMD = const(0x4F)
_SET_CLI_CERT = const(0x40)
_SET_PK = const(0x41)
_SET_PIN_MODE_CMD = const(0x50)
_SET_DIGITAL_WRITE_CMD = const(0x51)
_SET_ANALOG_WRITE_CMD = const(0x52)
_SET_DIGITAL_READ_CMD = const(0x53)
_SET_ANALOG_READ_CMD = const(0x54)
_START_CMD = const(0xE0)
_END_CMD = const(0xEE)
_ERR_CMD = const(0xEF)
_REPLY_FLAG = const(1<<7)
_CMD_FLAG = const(0)
SOCKET_CLOSED = const(0)
SOCKET_LISTEN = const(1)
SOCKET_SYN_SENT = const(2)
SOCKET_SYN_RCVD = const(3)
SOCKET_ESTABLISHED = const(4)
SOCKET_FIN_WAIT_1 = const(5)
SOCKET_FIN_WAIT_2 = const(6)
SOCKET_CLOSE_WAIT = const(7)
SOCKET_CLOSING = const(8)
SOCKET_LAST_ACK = const(9)
SOCKET_TIME_WAIT = const(10)
WL_NO_SHIELD = const(0xFF)
WL_NO_MODULE = const(0xFF)
WL_IDLE_STATUS = const(0)
WL_NO_SSID_AVAIL = const(1)
WL_SCAN_COMPLETED = const(2)
WL_CONNECTED = const(3)
WL_CONNECT_FAILED = const(4)
WL_CONNECTION_LOST = const(5)
WL_DISCONNECTED = const(6)
WL_AP_LISTENING = const(7)
WL_AP_CONNECTED = const(8)
WL_AP_FAILED = const(9)
ADC_ATTEN_DB_0 = const(0)
ADC_ATTEN_DB_2_5 = const(1)
ADC_ATTEN_DB_6 = const(2)
ADC_ATTEN_DB_11 = const(3)
class SPIDevice:
def __init__(self, spi, chip_select=None, *,
baudrate=100000, polarity=0, phase=0, extra_clocks=0):
self.spi = spi
self.baudrate = baudrate
self.polarity = polarity
self.phase = phase
self.extra_clocks = extra_clocks
self.chip_select = chip_select
if self.chip_select:
self.chip_select.high()
def __enter__(self):
if self.chip_select:
self.chip_select.low()
return self.spi
def __exit__(self, *exc):
if self.chip_select:
self.chip_select.high()
if self.extra_clocks > 0:
buf = bytearray(1)
buf[0] = 0xff
clocks = self.extra_clocks // 8
if self.extra_clocks % 8 != 0:
clocks += 1
for _ in range(clocks):
self.spi.write(buf)
return False
class ESP_SPIcontrol:
"""A class that will talk to an ESP32 module programmed with special firmware
that lets it act as a fast an efficient WiFi co-processor"""
TCP_MODE = const(0)
UDP_MODE = const(1)
TLS_MODE = const(2)
def __init__(self, spi, cs_pin, ready_pin, reset_pin, gpio0_pin=None, *, debug=False):
self._debug = debug
self.set_psk = False
self.set_crt = False
self._buffer = bytearray(10)
self._pbuf = bytearray(1)
self._sendbuf = bytearray(256)
self._socknum_ll = [[0]]
self._spi_device = SPIDevice(spi, cs_pin)#, baudrate=8000000)
self._cs = cs_pin
self._ready = ready_pin
self._reset = reset_pin
self._gpio0 = gpio0_pin
#self._cs.direction = Direction.OUTPUT
#self._ready.direction = Direction.INPUT
#self._reset.direction = Direction.OUTPUT
#if self._gpio0:
#self._gpio0.direction = Direction.INPUT
self.reset()
def reset(self):
"""Hard reset the ESP32 using the reset pin"""
if self._debug:
print("Reset ESP32")
self._cs.high()
self._reset.low()
utime.sleep_ms(100)
self._reset.high()
utime.sleep(1)
def _wait_for_ready(self):
"""Wait until the ready pin goes low"""
if self._debug:
print("Wait for ESP32 ready", end='')
print(self._ready.value())
times = time.ticks()
while (time.ticks() - times) < 10000:
if not self._ready.value():
break
if self._debug:
print('.', end='')
utime.sleep_ms(50)
else:
raise RuntimeError("ESP32 not responding")
if self._debug:
print()
def _send_command(self, cmd, params=None, *, param_len_16=False):
"""Send over a command with a list of parameters"""
if not params:
params = ()
packet_len = 4
for i, param in enumerate(params):
packet_len += len(param)
packet_len += 1
if param_len_16:
packet_len += 1
while packet_len % 4 != 0:
packet_len += 1
if packet_len > len(self._sendbuf):
self._sendbuf = bytearray(packet_len)
self._sendbuf[0] = _START_CMD
self._sendbuf[1] = cmd & ~_REPLY_FLAG
self._sendbuf[2] = len(params)
# handle parameters here
ptr = 3
for i, param in enumerate(params):
if self._debug:
print("\tSending param %d is %d bytes long" % (i, len(param)))
if param_len_16:
self._sendbuf[ptr] = (len(param) >> 8) & 0xFF
ptr += 1
self._sendbuf[ptr] = len(param) & 0xFF
ptr += 1
for j, par in enumerate(param):
self._sendbuf[ptr+j] = par
ptr += len(param)
self._sendbuf[ptr] = _END_CMD
self._wait_for_ready()
with self._spi_device as spi:
times = time.ticks()
while (time.ticks() - times) < 1000:
if self._ready.value():
break
else:
raise RuntimeError("ESP32 timed out on SPI select")
spi.write(self._sendbuf) #, start=0, end=packet_len)
if self._debug:
print("Wrote: ", [hex(b) for b in self._sendbuf[0:packet_len]])
def _read_byte(self, spi):
"""Read one byte from SPI"""
spi.readinto(self._pbuf)
if self._debug:
print("Read:", hex(self._pbuf[0]))
return self._pbuf[0]
def _read_bytes(self, spi, buffer, start=0, end=None):
"""Read many bytes from SPI"""
if not end:
end = len(buffer)
spi.readinto(buffer) #, start=start,end=end)
if self._debug:
print("\t\tRead:", [hex(i) for i in buffer])
def _wait_spi_char(self, spi, desired):
"""Read a byte with a time-out, and if we get it, check that its what we expect"""
times = time.ticks()
while (time.ticks() - times) < 100:
r = self._read_byte(spi)
if r == _ERR_CMD:
raise RuntimeError("Error response to command")
if r == desired:
return True
raise RuntimeError("Timed out waiting for SPI char")
def _check_data(self, spi, desired):
"""Read a byte and verify its the value we want"""
r = self._read_byte(spi)
if r != desired:
raise RuntimeError("Expected %02X but got %02X" % (desired, r))
def _wait_response_cmd(self, cmd, num_responses=None, *, param_len_16=False):
"""Wait for ready, then parse the response"""
self._wait_for_ready()
responses = []
with self._spi_device as spi:
times = time.ticks()
while (time.ticks() - times) < 1000:
if self._ready.value():
break
else:
raise RuntimeError("ESP32 timed out on SPI select")
self._wait_spi_char(spi, _START_CMD)
self._check_data(spi, cmd | _REPLY_FLAG)
if num_responses is not None:
self._check_data(spi, num_responses)
else:
num_responses = self._read_byte(spi)
for num in range(num_responses):
param_len = self._read_byte(spi)
if param_len_16:
param_len <<= 8
param_len |= self._read_byte(spi)
if self._debug:
print("\tParameter %d length is %d" % (num, param_len))
response = bytearray(param_len)
self._read_bytes(spi, response)
responses.append(response)
self._check_data(spi, _END_CMD)
if self._debug:
print("Read %d: " % len(responses[0]), responses)
return responses
def _send_command_get_response(self, cmd, params=None, *,
reply_params=1, sent_param_len_16=False,
recv_param_len_16=False):
"""Send a high level SPI command, wait and return the response"""
self._send_command(cmd, params, param_len_16=sent_param_len_16)
return self._wait_response_cmd(cmd, reply_params, param_len_16=recv_param_len_16)
@property
def status(self):
"""The status of the ESP32 WiFi core. Can be WL_NO_SHIELD or WL_NO_MODULE
(not found), WL_IDLE_STATUS, WL_NO_SSID_AVAIL, WL_SCAN_COMPLETED,
WL_CONNECTED, WL_CONNECT_FAILED, WL_CONNECTION_LOST, WL_DISCONNECTED,
WL_AP_LISTENING, WL_AP_CONNECTED, WL_AP_FAILED"""
if self._debug:
print("Connection status")
resp = self._send_command_get_response(_GET_CONN_STATUS_CMD)
if self._debug:
print("Conn status:", resp[0][0])
return resp[0][0]
@property
def firmware_version(self):
"""A string of the firmware version on the ESP32"""
if self._debug:
print("Firmware version")
resp = self._send_command_get_response(_GET_FW_VERSION_CMD)
return resp[0]
@property
def MAC_address(self):
"""A bytearray containing the MAC address of the ESP32"""
if self._debug:
print("MAC address")
resp = self._send_command_get_response(_GET_MACADDR_CMD, [b'\xFF'])
return resp[0]
def start_scan_networks(self):
"""Begin a scan of visible access points. Follow up with a call
to 'get_scan_networks' for response"""
if self._debug:
print("Start scan")
resp = self._send_command_get_response(_START_SCAN_NETWORKS)
if resp[0][0] != 1:
raise RuntimeError("Failed to start AP scan")
def get_scan_networks(self):
"""The results of the latest SSID scan. Returns a list of dictionaries with
'ssid', 'rssi' and 'encryption' entries, one for each AP found"""
self._send_command(_SCAN_NETWORKS)
names = self._wait_response_cmd(_SCAN_NETWORKS)
APs = []
for i, name in enumerate(names):
a_p = {'ssid': name}
rssi = self._send_command_get_response(_GET_IDX_RSSI_CMD, ((i,),))[0]
a_p['rssi'] = struct.unpack('<i', rssi)[0]
encr = self._send_command_get_response(_GET_IDX_ENCT_CMD, ((i,),))[0]
a_p['encryption'] = encr[0]
bssid = self._send_command_get_response(_GET_IDX_BSSID_CMD, ((i,),))[0]
a_p['bssid'] = bssid
chan = self._send_command_get_response(_GET_IDX_CHAN_CMD, ((i,),))[0]
a_p['channel'] = chan[0]
APs.append(a_p)
return APs
def scan_networks(self):
"""Scan for visible access points, returns a list of access point details.
Returns a list of dictionaries with 'ssid', 'rssi' and 'encryption' entries,
one for each AP found"""
self.start_scan_networks()
for _ in range(10):
utime.sleep(2)
APs = self.get_scan_networks()
if APs:
return APs
return None
def wifi_set_network(self, ssid):
"""Tells the ESP32 to set the access point to the given ssid"""
resp = self._send_command_get_response(_SET_NET_CMD, [ssid])
if resp[0][0] != 1:
raise RuntimeError("Failed to set network")
def wifi_set_passphrase(self, ssid, passphrase):
"""Sets the desired access point ssid and passphrase"""
resp = self._send_command_get_response(_SET_PASSPHRASE_CMD, [ssid, passphrase])
if resp[0][0] != 1:
raise RuntimeError("Failed to set passphrase")
def wifi_set_entidentity(self, ident):
"""Sets the WPA2 Enterprise anonymous identity"""
resp = self._send_command_get_response(_SET_ENT_IDENT_CMD, [ident])
if resp[0][0] != 1:
raise RuntimeError("Failed to set enterprise anonymous identity")
def wifi_set_entusername(self, username):
"""Sets the desired WPA2 Enterprise username"""
resp = self._send_command_get_response(_SET_ENT_UNAME_CMD, [username])
if resp[0][0] != 1:
raise RuntimeError("Failed to set enterprise username")
def wifi_set_entpassword(self, password):
"""Sets the desired WPA2 Enterprise password"""
resp = self._send_command_get_response(_SET_ENT_PASSWD_CMD, [password])
if resp[0][0] != 1:
raise RuntimeError("Failed to set enterprise password")
def wifi_set_entenable(self):
"""Enables WPA2 Enterprise mode"""
resp = self._send_command_get_response(_SET_ENT_ENABLE_CMD)
if resp[0][0] != 1:
raise RuntimeError("Failed to enable enterprise mode")
def _wifi_set_ap_network(self, ssid, channel):
"""Creates an Access point with SSID and Channel"""
resp = self._send_command_get_response(_SET_AP_NET_CMD, [ssid, channel])
if resp[0][0] != 1:
raise RuntimeError("Failed to setup AP network")
def _wifi_set_ap_passphrase(self, ssid, passphrase, channel):
"""Creates an Access point with SSID, passphrase, and Channel"""
resp = self._send_command_get_response(_SET_AP_PASSPHRASE_CMD, [ssid, passphrase, channel])
if resp[0][0] != 1:
raise RuntimeError("Failed to setup AP password")
@property
def ssid(self):
"""The name of the access point we're connected to"""
resp = self._send_command_get_response(_GET_CURR_SSID_CMD, [b'\xFF'])
return resp[0]
@property
def bssid(self):
"""The MAC-formatted service set ID of the access point we're connected to"""
resp = self._send_command_get_response(_GET_CURR_BSSID_CMD, [b'\xFF'])
return resp[0]
@property
def rssi(self):
"""The receiving signal strength indicator for the access point we're
connected to"""
resp = self._send_command_get_response(_GET_CURR_RSSI_CMD, [b'\xFF'])
return struct.unpack('<i', resp[0])[0]
@property
def network_data(self):
"""A dictionary containing current connection details such as the 'ip_addr',
'netmask' and 'gateway'"""
resp = self._send_command_get_response(_GET_IPADDR_CMD, [b'\xFF'], reply_params=3)
return {'ip_addr': resp[0], 'netmask': resp[1], 'gateway': resp[2]}
@property
def ip_address(self):
"""Our local IP address"""
return self.network_data['ip_addr']
@property
def is_connected(self):
"""Whether the ESP32 is connected to an access point"""
try:
return self.status == WL_CONNECTED
except RuntimeError:
self.reset()
return False
@property
def ap_listening(self):
"""Returns if the ESP32 is in access point mode and is listening for connections"""
try:
return self.status == WL_AP_LISTENING
except RuntimeError:
self.reset()
return False
def connect(self, secrets):
"""Connect to an access point using a secrets dictionary
that contains a 'ssid' and 'password' entry"""
self.connect_AP(secrets['ssid'], secrets['password'])
def connect_AP(self, ssid, password, timeout_s=10):
"""
Connect to an access point with given name and password.
Will wait until specified timeout seconds and return on success
or raise an exception on failure.
:param ssid: the SSID to connect to
:param passphrase: the password of the access point
:param timeout_s: number of seconds until we time out and fail to create AP
"""
if self._debug:
print("Connect to AP", ssid, password)
if isinstance(ssid, str):
ssid = bytes(ssid, 'utf-8')
if password:
if isinstance(password, str):
password = bytes(password, 'utf-8')
self.wifi_set_passphrase(ssid, password)
else:
self.wifi_set_network(ssid)
times = time.ticks()
while (time.ticks() - times) < (timeout_s*1000):
stat = self.status
if stat == WL_CONNECTED:
return stat
utime.sleep_ms(50)
if stat in (WL_CONNECT_FAILED, WL_CONNECTION_LOST, WL_DISCONNECTED):
raise RuntimeError("Failed to connect to ssid", ssid)
if stat == WL_NO_SSID_AVAIL:
raise RuntimeError("No such ssid", ssid)
raise RuntimeError("Unknown error 0x%02X" % stat)
def create_AP(self, ssid, password, channel=1, timeout=10): # pylint: disable=invalid-name
"""
Create an access point with the given name, password, and channel.
Will wait until specified timeout seconds and return on success
or raise an exception on failure.
:param str ssid: the SSID of the created Access Point. Must be less than 32 chars.
:param str password: the password of the created Access Point. Must be 8-63 chars.
:param int channel: channel of created Access Point (1 - 14).
:param int timeout: number of seconds until we time out and fail to create AP
"""
if len(ssid) > 32:
raise RuntimeError("ssid must be no more than 32 characters")
if password and (len(password) < 8 or len(password) > 64):
raise RuntimeError("password must be 8 - 63 characters")
if channel < 1 or channel > 14:
raise RuntimeError("channel must be between 1 and 14")
if isinstance(channel, int):
channel = bytes(channel)
if isinstance(ssid, str):
ssid = bytes(ssid, 'utf-8')
if password:
if isinstance(password, str):
password = bytes(password, 'utf-8')
self._wifi_set_ap_passphrase(ssid, password, channel)
else:
self._wifi_set_ap_network(ssid, channel)
times = time.ticks()
while (time.ticks() - times) < (timeout*1000): # wait up to timeout
stat = self.status
if stat == WL_AP_LISTENING:
return stat
utime.sleep_ms(50)
if stat == WL_AP_FAILED:
raise RuntimeError("Failed to create AP", ssid)
raise RuntimeError("Unknown error 0x%02x" % stat)
def pretty_ip(self, ip):
"""Converts a bytearray IP address to a dotted-quad string for printing"""
return "%d.%d.%d.%d" % (ip[0], ip[1], ip[2], ip[3])
def unpretty_ip(self, ip):
"""Converts a dotted-quad string to a bytearray IP address"""
octets = [int(x) for x in ip.split('.')]
return bytes(octets)
def get_host_by_name(self, hostname):
"""Convert a hostname to a packed 4-byte IP address. Returns
a 4 bytearray"""
if self._debug:
print("*** Get host by name")
if isinstance(hostname, str):
hostname = bytes(hostname, 'utf-8')
resp = self._send_command_get_response(_REQ_HOST_BY_NAME_CMD, (hostname,))
if resp[0][0] != 1:
raise RuntimeError("Failed to request hostname")
resp = self._send_command_get_response(_GET_HOST_BY_NAME_CMD)
return resp[0]
def ping(self, dest, ttl=250):
"""Ping a destination IP address or hostname, with a max time-to-live
(ttl). Returns a millisecond timing value"""
if isinstance(dest, str):
dest = self.get_host_by_name(dest)
ttl = max(0, min(ttl, 255))
resp = self._send_command_get_response(_PING_CMD, (dest, (ttl,)))
return struct.unpack('<H', resp[0])[0]
def get_socket(self):
"""Request a socket from the ESP32, will allocate and return a number that
can then be passed to the other socket commands"""
if self._debug:
print("*** Get socket")
resp = self._send_command_get_response(_GET_SOCKET_CMD)
resp = resp[0][0]
if resp == 255:
raise RuntimeError("No sockets available")
if self._debug:
print("Allocated socket %d" % resp)
return resp
def socket_open(self, socket_num, dest, port, conn_mode=TCP_MODE):
"""Open a socket to a destination IP address or hostname
using the ESP32's internal reference number. By default we use
'conn_mode' TCP_MODE but can also use UDP_MODE or TLS_MODE
(dest must be hostname for TLS_MODE!)"""
self._socknum_ll[0][0] = socket_num
if self._debug:
print("*** Open socket")
port_param = struct.pack('>H', port)
if isinstance(dest, str):
dest = bytes(dest, 'utf-8')
resp = self._send_command_get_response(_START_CLIENT_TCP_CMD,
(dest, b'\x00\x00\x00\x00',
port_param,
self._socknum_ll[0],
(conn_mode,)))
else:
resp = self._send_command_get_response(_START_CLIENT_TCP_CMD,
(dest, port_param,
self._socknum_ll[0],
(conn_mode,)))
if resp[0][0] != 1:
raise RuntimeError("Could not connect to remote server")
def socket_status(self, socket_num):
"""Get the socket connection status, can be SOCKET_CLOSED, SOCKET_LISTEN,
SOCKET_SYN_SENT, SOCKET_SYN_RCVD, SOCKET_ESTABLISHED, SOCKET_FIN_WAIT_1,
SOCKET_FIN_WAIT_2, SOCKET_CLOSE_WAIT, SOCKET_CLOSING, SOCKET_LAST_ACK, or
SOCKET_TIME_WAIT"""
self._socknum_ll[0][0] = socket_num
resp = self._send_command_get_response(_GET_CLIENT_STATE_TCP_CMD, self._socknum_ll)
return resp[0][0]
def socket_connected(self, socket_num):
"""Test if a socket is connected to the destination, returns boolean true/false"""
return self.socket_status(socket_num) == SOCKET_ESTABLISHED
def socket_write(self, socket_num, buffer):
"""Write the bytearray buffer to a socket"""
if self._debug:
print("Writing:", buffer)
self._socknum_ll[0][0] = socket_num
sent = 0
for chunk in range((len(buffer) // 64)+1):
resp = self._send_command_get_response(_SEND_DATA_TCP_CMD,
(self._socknum_ll[0],
memoryview(buffer)[(chunk*64):((chunk+1)*64)]),
sent_param_len_16=True)
sent += resp[0][0]
if sent != len(buffer):
raise RuntimeError("Failed to send %d bytes (sent %d)" % (len(buffer), sent))
resp = self._send_command_get_response(_DATA_SENT_TCP_CMD, self._socknum_ll)
if resp[0][0] != 1:
raise RuntimeError("Failed to verify data sent")
def socket_available(self, socket_num):
"""Determine how many bytes are waiting to be read on the socket"""
self._socknum_ll[0][0] = socket_num
resp = self._send_command_get_response(_AVAIL_DATA_TCP_CMD, self._socknum_ll)
reply = struct.unpack('<H', resp[0])[0]
if self._debug:
print("ESPSocket: %d bytes available" % reply)
return reply
def socket_read(self, socket_num, size):
"""Read up to 'size' bytes from the socket number. Returns a bytearray"""
if self._debug:
print("Reading %d bytes from ESP socket with status %d" %
(size, self.socket_status(socket_num)))
self._socknum_ll[0][0] = socket_num
resp = self._send_command_get_response(_GET_DATABUF_TCP_CMD,
(self._socknum_ll[0],
(size & 0xFF, (size >> 8) & 0xFF)),
sent_param_len_16=True,
recv_param_len_16=True)
return bytes(resp[0])
def socket_connect(self, socket_num, dest, port, conn_mode=TCP_MODE):
"""Open and verify we connected a socket to a destination IP address or hostname
using the ESP32's internal reference number. By default we use
'conn_mode' TCP_MODE but can also use UDP_MODE or TLS_MODE (dest must
be hostname for TLS_MODE!)"""
if self._debug:
print("*** Socket connect mode", conn_mode)
self.socket_open(socket_num, dest, port, conn_mode=conn_mode)
times = time.ticks()
while (time.ticks() - times) < 3000:
if self.socket_connected(socket_num):
return True
time.sleep_ms(10)
raise RuntimeError("Failed to establish connection")
def socket_close(self, socket_num):
"""Close a socket using the ESP32's internal reference number"""
if self._debug:
print("*** Closing socket %d" % socket_num)
self._socknum_ll[0][0] = socket_num
resp = self._send_command_get_response(_STOP_CLIENT_TCP_CMD, self._socknum_ll)
if resp[0][0] != 1:
raise RuntimeError("Failed to close socket")
def start_server(self, port, socket_num, conn_mode=TCP_MODE, ip=None): # pylint: disable=invalid-name
"""Opens a server on the specified port, using the ESP32's internal reference number"""
if self._debug:
print("*** starting server")
self._socknum_ll[0][0] = socket_num
params = [struct.pack('>H', port), self._socknum_ll[0], (conn_mode,)]
if ip:
params.insert(0, ip)
resp = self._send_command_get_response(_START_SERVER_TCP_CMD, params)
if resp[0][0] != 1:
raise RuntimeError("Could not start server")
def server_state(self, socket_num):
"""Get the state of the ESP32's internal reference server socket number"""
self._socknum_ll[0][0] = socket_num
resp = self._send_command_get_response(_GET_STATE_TCP_CMD, self._socknum_ll)
return resp[0][0]
def set_esp_debug(self, enabled):
"""Enable/disable debug mode on the ESP32. Debug messages will be
written to the ESP32's UART."""
resp = self._send_command_get_response(_SET_DEBUG_CMD, ((bool(enabled),),))
if resp[0][0] != 1:
raise RuntimeError("Failed to set debug mode")
def set_pin_mode(self, pin, mode):
"""
Set the io mode for a GPIO pin.
:param int pin: ESP32 GPIO pin to set.
:param value: direction for pin, digitalio.Direction or integer (0=input, 1=output).
"""
pin_mode = mode
resp = self._send_command_get_response(_SET_PIN_MODE_CMD,
((pin,), (pin_mode,)))
if resp[0][0] != 1:
raise RuntimeError("Failed to set pin mode")
def set_digital_write(self, pin, value):
"""
Set the digital output value of pin.
:param int pin: ESP32 GPIO pin to write to.
:param bool value: Value for the pin.
"""
resp = self._send_command_get_response(_SET_DIGITAL_WRITE_CMD,
((pin,), (value,)))
if resp[0][0] != 1:
raise RuntimeError("Failed to write to pin")
def set_analog_write(self, pin, analog_value):
"""
Set the analog output value of pin, using PWM.
:param int pin: ESP32 GPIO pin to write to.
:param float value: 0=off 1.0=full on
"""
value = int(255 * analog_value)
resp = self._send_command_get_response(_SET_ANALOG_WRITE_CMD,
((pin,), (value,)))
if resp[0][0] != 1:
raise RuntimeError("Failed to write to pin")
def set_digital_read(self, pin):
"""
Get the digital input value of pin. Returns the boolean value of the pin.
:param int pin: ESP32 GPIO pin to read from.
"""
# Verify nina-fw => 1.5.0
fw_semver_maj = bytes(self.firmware_version).decode("utf-8")[2]
assert int(fw_semver_maj) >= 5, "Please update nina-fw to 1.5.0 or above."
resp = self._send_command_get_response(_SET_DIGITAL_READ_CMD,
((pin,),))[0]
if resp[0] == 0:
return False
elif resp[0] == 1:
return True
else:
raise ValueError("_SET_DIGITAL_READ response error: response is not boolean", resp[0])
def set_analog_read(self, pin, atten=ADC_ATTEN_DB_11):
"""
Get the analog input value of pin. Returns an int between 0 and 65536.
:param int pin: ESP32 GPIO pin to read from.
:param int atten: attenuation constant
"""
# Verify nina-fw => 1.5.0
fw_semver_maj = bytes(self.firmware_version).decode("utf-8")[2]
assert int(fw_semver_maj) >= 5, "Please update nina-fw to 1.5.0 or above."
resp = self._send_command_get_response(_SET_ANALOG_READ_CMD,
((pin,), (atten,)))
resp_analog = struct.unpack('<i', resp[0])
if resp_analog[0] < 0:
raise ValueError("_SET_ANALOG_READ parameter error: invalid pin", resp_analog[0])
if self._debug:
print(resp, resp_analog, resp_analog[0], 16 * resp_analog[0])
return 16 * resp_analog[0]
def get_time(self):
"""The current unix timestamp"""
if self.status == WL_CONNECTED:
resp = self._send_command_get_response(_GET_TIME)
resp_time = struct.unpack('<i', resp[0])
if resp_time == (0,):
raise ValueError("_GET_TIME returned 0")
return resp_time
if self.status in (WL_AP_LISTENING, WL_AP_CONNECTED):
raise RuntimeError("Cannot obtain NTP while in AP mode, must be connected to internet")
raise RuntimeError("Must be connected to WiFi before obtaining NTP.")
def set_certificate(self, client_certificate):
"""Sets client certificate. Must be called
BEFORE a network connection is established.
:param str client_certificate: User-provided .PEM certificate up to 1300 bytes.
"""
if self._debug:
print("** Setting client certificate")
if self.status == WL_CONNECTED:
raise RuntimeError("set_certificate must be called BEFORE a connection is established.")
if isinstance(client_certificate, str):
client_certificate = bytes(client_certificate, 'utf-8')
if "-----BEGIN CERTIFICATE" not in client_certificate:
raise TypeError(".PEM must start with -----BEGIN CERTIFICATE")
assert len(client_certificate) < 1300, ".PEM must be less than 1300 bytes."
resp = self._send_command_get_response(_SET_CLI_CERT, (client_certificate,))
if resp[0][0] != 1:
raise RuntimeError("Failed to set client certificate")
self.set_crt = True
return resp[0]
def set_private_key(self, private_key):
"""Sets private key. Must be called
BEFORE a network connection is established.
:param str private_key: User-provided .PEM file up to 1700 bytes.
"""
if self._debug:
print("** Setting client's private key.")
if self.status == WL_CONNECTED:
raise RuntimeError("set_private_key must be called BEFORE a connection is established.")
if isinstance(private_key, str):
private_key = bytes(private_key, 'utf-8')
if "-----BEGIN RSA" not in private_key:
raise TypeError(".PEM must start with -----BEGIN RSA")
assert len(private_key) < 1700, ".PEM must be less than 1700 bytes."
resp = self._send_command_get_response(_SET_PK, (private_key,))
if resp[0][0] != 1:
raise RuntimeError("Failed to set private key.")
self.set_psk = True
return resp[0]