Migrated to arduino

This commit is contained in:
2025-12-25 15:37:12 +01:00
parent 692320a635
commit ce6a2b9185
6 changed files with 126 additions and 0 deletions

View File

@@ -0,0 +1,13 @@
from machine import Pin, SPI
from sx127x import SX127x
# SPI bus
spi = SPI(1, baudrate=1000000, polarity=0, phase=0,
sck=Pin(18), mosi=Pin(23), miso=Pin(19))
# LoRa module
lora = SX127x(spi, cs=Pin(14), reset=Pin(26), irq=Pin(33),
freq=868E6, # or 915E6 depending on region
sf=7, # spreading factor
bw=125E3, # bandwidth
cr=5) # coding rate

View File

@@ -0,0 +1,43 @@
import time
import _thread
from machine import Pin, ADC, I2C
from screen import Screen
light_adc = ADC(Pin(1))
light_adc.atten(ADC.ATTN_11DB)
light_adc.width(ADC.WIDTH_12BIT)
screen = Screen()
def read_light(samples=8):
total = 0
for _ in range(samples):
total += light_adc.read()
time.sleep_ms(5)
return total // samples
def sensor_task():
while True:
raw = read_light()
percent = int((raw / 2439) * 100)
advice = None
if percent > 80:
advice = "Too bright !"
elif percent < 15:
advice = "Too dark !"
else :
advice = "I'm fine :)"
lines = [
"Light Sensor",
f"Level: {percent}%",
f"{advice}"
]
screen.display_lines(lines, 0)
time.sleep(1)
_thread.start_new_thread(sensor_task, ())
print("Main thread running")

View File

@@ -0,0 +1,39 @@
from machine import Pin, SoftI2C
from ssd1306 import SSD1306_I2C
import time
class Screen:
def __init__(self):
# OLED pins on Heltec V3
self.sda = Pin(17)
self.scl = Pin(18)
self.rst = Pin(21, Pin.OUT)
self.__reset_oled()
self.i2c = SoftI2C(scl=self.scl, sda=self.sda, freq=100000)
self.oled = SSD1306_I2C(128, 64, self.i2c, addr=0x3C)
self.clear()
def __reset_oled(self):
self.rst.value(0)
time.sleep_ms(150)
self.rst.value(1)
time.sleep_ms(150)
def clear(self):
self.oled.fill(0)
self.oled.show()
def display(self, message, col=0, row=0):
self.clear()
self.oled.text(message, col, row)
self.oled.show()
def display_lines(self, lines, col):
self.clear()
for i, line in enumerate(lines):
self.oled.text(line, col, i * 20)
self.oled.show()

View File

@@ -0,0 +1,71 @@
# MicroPython SSD1306 OLED driver, I2C interface
from micropython import const
import framebuf
SET_CONTRAST = const(0x81)
SET_ENTIRE_ON = const(0xA4)
SET_NORM_INV = const(0xA6)
SET_DISP = const(0xAE)
SET_MEM_ADDR = const(0x20)
SET_COL_ADDR = const(0x21)
SET_PAGE_ADDR = const(0x22)
SET_DISP_START_LINE = const(0x40)
SET_SEG_REMAP = const(0xA0)
SET_MUX_RATIO = const(0xA8)
SET_COM_OUT_DIR = const(0xC0)
SET_DISP_OFFSET = const(0xD3)
SET_COM_PIN_CFG = const(0xDA)
SET_DISP_CLK_DIV = const(0xD5)
SET_PRECHARGE = const(0xD9)
SET_VCOM_DESEL = const(0xDB)
SET_CHARGE_PUMP = const(0x8D)
class SSD1306_I2C(framebuf.FrameBuffer):
def __init__(self, width, height, i2c, addr=0x3C):
self.width = width
self.height = height
self.i2c = i2c
self.addr = addr
self.pages = self.height // 8
self.buffer = bytearray(self.pages * self.width)
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB)
self.init_display()
def write_cmd(self, cmd):
self.i2c.writeto(self.addr, bytearray([0x80, cmd]))
def write_data(self, buf):
self.i2c.writeto(self.addr, bytearray([0x40]) + buf)
def init_display(self):
for cmd in (
SET_DISP | 0x00,
SET_MEM_ADDR, 0x00,
SET_DISP_START_LINE | 0x00,
SET_SEG_REMAP | 0x01,
SET_MUX_RATIO, self.height - 1,
SET_COM_OUT_DIR | 0x08,
SET_DISP_OFFSET, 0x00,
SET_COM_PIN_CFG, 0x12,
SET_DISP_CLK_DIV, 0x80,
SET_PRECHARGE, 0xF1,
SET_VCOM_DESEL, 0x30,
SET_CONTRAST, 0xFF,
SET_ENTIRE_ON,
SET_NORM_INV,
SET_CHARGE_PUMP, 0x14,
SET_DISP | 0x01,
):
self.write_cmd(cmd)
self.fill(0)
self.show()
def show(self):
self.write_cmd(SET_COL_ADDR)
self.write_cmd(0)
self.write_cmd(self.width - 1)
self.write_cmd(SET_PAGE_ADDR)
self.write_cmd(0)
self.write_cmd(self.pages - 1)
self.write_data(self.buffer)

View File

@@ -0,0 +1,499 @@
from time import sleep, ticks_ms
from machine import SPI, Pin
from micropython import const
import gc
PA_OUTPUT_RFO_PIN = const(0)
PA_OUTPUT_PA_BOOST_PIN = const(1)
# registers
REG_FIFO = const(0x00)
REG_OP_MODE = const(0x01)
REG_FRF_MSB = const(0x06)
REG_FRF_MID = const(0x07)
REG_FRF_LSB = const(0x08)
REG_PA_CONFIG = const(0x09)
REG_LNA = const(0x0C)
REG_FIFO_ADDR_PTR = const(0x0D)
REG_FIFO_TX_BASE_ADDR = const(0x0E)
FifoTxBaseAddr = const(0x00)
REG_FIFO_RX_BASE_ADDR = const(0x0F)
FifoRxBaseAddr = const(0x00)
REG_FIFO_RX_CURRENT_ADDR = const(0x10)
REG_IRQ_FLAGS_MASK = const(0x11)
REG_IRQ_FLAGS = const(0x12)
REG_RX_NB_BYTES = const(0x13)
REG_PKT_RSSI_VALUE = const(0x1A)
REG_PKT_SNR_VALUE = const(0x19)
REG_MODEM_CONFIG_1 = const(0x1D)
REG_MODEM_CONFIG_2 = const(0x1E)
REG_PREAMBLE_MSB = const(0x20)
REG_PREAMBLE_LSB = const(0x21)
REG_PAYLOAD_LENGTH = const(0x22)
REG_FIFO_RX_BYTE_ADDR = const(0x25)
REG_MODEM_CONFIG_3 = const(0x26)
REG_RSSI_WIDEBAND = const(0x2C)
REG_DETECTION_OPTIMIZE = const(0x31)
REG_DETECTION_THRESHOLD = const(0x37)
REG_SYNC_WORD = const(0x39)
REG_DIO_MAPPING_1 = const(0x40)
REG_VERSION = const(0x42)
# invert IQ
REG_INVERTIQ = const(0x33)
RFLR_INVERTIQ_RX_MASK = const(0xBF)
RFLR_INVERTIQ_RX_OFF = const(0x00)
RFLR_INVERTIQ_RX_ON = const(0x40)
RFLR_INVERTIQ_TX_MASK = const(0xFE)
RFLR_INVERTIQ_TX_OFF = const(0x01)
RFLR_INVERTIQ_TX_ON = const(0x00)
REG_INVERTIQ2 = const(0x3B)
RFLR_INVERTIQ2_ON = const(0x19)
RFLR_INVERTIQ2_OFF = const(0x1D)
# modes
# bit 7: 1 => LoRa mode
MODE_LONG_RANGE_MODE = const(0x80)
MODE_SLEEP = const(0x00)
MODE_STDBY = const(0x01)
MODE_TX = const(0x03)
MODE_RX_CONTINUOUS = const(0x05)
MODE_RX_SINGLE = const(0x06)
# PA config
PA_BOOST = const(0x80)
# IRQ masks
IRQ_TX_DONE_MASK = const(0x08)
IRQ_PAYLOAD_CRC_ERROR_MASK = const(0x20)
IRQ_RX_DONE_MASK = const(0x40)
IRQ_RX_TIME_OUT_MASK = const(0x80)
# Buffer size
MAX_PKT_LENGTH = const(255)
class SX127x:
default_parameters = {
"frequency": 869525000,
"frequency_offset": 0,
"tx_power_level": 14,
"signal_bandwidth": 125e3,
"spreading_factor": 9,
"coding_rate": 5,
"preamble_length": 8,
"implicitHeader": False,
"sync_word": 0x12,
"enable_CRC": True,
"invert_IQ": False,
}
def __init__(self, spi, pins, parameters={}):
self.spi = spi
self.pins = pins
self.parameters = parameters
self.pin_ss = Pin(self.pins["ss"], Pin.OUT)
self.lock = False
self.implicit_header_mode = None
self.parameters = SX127x.default_parameters
if parameters:
self.parameters.update(parameters)
# check version
version = None
for i in range(5):
version = self.readRegister(REG_VERSION)
if version:
break
# debug output
print("SX version: {}".format(version))
# put in LoRa and sleep mode
self.sleep()
# config
self.setFrequency(self.parameters["frequency"])
self.setSignalBandwidth(self.parameters["signal_bandwidth"])
# set LNA boost
self.writeRegister(REG_LNA, self.readRegister(REG_LNA) | 0x03)
# set auto AGC
self.writeRegister(REG_MODEM_CONFIG_3, 0x04)
self.setTxPower(self.parameters["tx_power_level"])
self.implicitHeaderMode(self.parameters["implicitHeader"])
self.setSpreadingFactor(self.parameters["spreading_factor"])
self.setCodingRate(self.parameters["coding_rate"])
self.setPreambleLength(self.parameters["preamble_length"])
self.setSyncWord(self.parameters["sync_word"])
self.enableCRC(self.parameters["enable_CRC"])
self.invertIQ(self.parameters["invert_IQ"])
# set LowDataRateOptimize flag if symbol time > 16ms (default disable on reset)
# self.writeRegister(REG_MODEM_CONFIG_3, self.readRegister(REG_MODEM_CONFIG_3) & 0xF7) # default disable on reset
bw = self.parameters["signal_bandwidth"]
sf = self.parameters["spreading_factor"]
if 1000 / bw / 2 ** sf > 16:
self.writeRegister(
REG_MODEM_CONFIG_3, self.readRegister(REG_MODEM_CONFIG_3) | 0x08
)
# set base addresses
self.writeRegister(REG_FIFO_TX_BASE_ADDR, FifoTxBaseAddr)
self.writeRegister(REG_FIFO_RX_BASE_ADDR, FifoRxBaseAddr)
self.standby()
def beginPacket(self, implicitHeaderMode=False):
self.standby()
self.implicitHeaderMode(implicitHeaderMode)
# reset FIFO address and payload length
self.writeRegister(REG_FIFO_ADDR_PTR, FifoTxBaseAddr)
self.writeRegister(REG_PAYLOAD_LENGTH, 0)
def endPacket(self):
# put in TX mode
self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_TX)
# wait for TX done, standby automatically on TX_DONE
while (self.readRegister(REG_IRQ_FLAGS) & IRQ_TX_DONE_MASK) == 0:
pass
# clear IRQ's
self.writeRegister(REG_IRQ_FLAGS, IRQ_TX_DONE_MASK)
def write(self, buffer):
currentLength = self.readRegister(REG_PAYLOAD_LENGTH)
size = len(buffer)
# check size
size = min(size, (MAX_PKT_LENGTH - FifoTxBaseAddr - currentLength))
# write data
for i in range(size):
self.writeRegister(REG_FIFO, buffer[i])
# update length
self.writeRegister(REG_PAYLOAD_LENGTH, currentLength + size)
return size
def aquirelock(self, lock=False):
self.lock = False
def println(self, message, implicitHeader=False, repeat=1):
# wait until RX_Done, lock and begin writing
self.aquirelock(True)
if isinstance(message, str):
message = message.encode()
self.beginPacket(implicitHeader)
self.write(message)
for i in range(repeat):
self.endPacket()
# unlock when done writing
self.aquirelock(False)
self.collectGarbage()
def getIrqFlags(self):
irqFlags = self.readRegister(REG_IRQ_FLAGS)
self.writeRegister(REG_IRQ_FLAGS, irqFlags)
return irqFlags
def packetRssi(self, rfi="hf"):
packet_rssi = self.readRegister(REG_PKT_RSSI_VALUE)
return packet_rssi - (157 if rfi == "hf" else 164)
def packetSnr(self):
return (self.readRegister(REG_PKT_SNR_VALUE)) * 0.25
def standby(self):
self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_STDBY)
def sleep(self):
self.writeRegister(REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_SLEEP)
def setTxPower(self, level, outputPin=PA_OUTPUT_PA_BOOST_PIN):
self.parameters["tx_power_level"] = level
if outputPin == PA_OUTPUT_RFO_PIN:
# RFO
level = min(max(level, 0), 14)
self.writeRegister(REG_PA_CONFIG, 0x70 | level)
else:
# PA BOOST
level = min(max(level, 2), 17)
self.writeRegister(REG_PA_CONFIG, PA_BOOST | (level - 2))
def setFrequency(self, frequency):
# TODO min max limit
frequency = int(frequency)
self.parameters["frequency"] = frequency
frequency += self.parameters["frequency_offset"]
frf = (frequency << 19) // 32000000
self.writeRegister(REG_FRF_MSB, (frf >> 16) & 0xFF)
self.writeRegister(REG_FRF_MID, (frf >> 8) & 0xFF)
self.writeRegister(REG_FRF_LSB, (frf >> 0) & 0xFF)
def setSpreadingFactor(self, sf):
sf = min(max(sf, 6), 12)
self.writeRegister(REG_DETECTION_OPTIMIZE, 0xC5 if sf == 6 else 0xC3)
self.writeRegister(REG_DETECTION_THRESHOLD, 0x0C if sf == 6 else 0x0A)
self.writeRegister(
REG_MODEM_CONFIG_2,
(self.readRegister(REG_MODEM_CONFIG_2) & 0x0F) | ((sf << 4) & 0xF0),
)
def setSignalBandwidth(self, sbw):
bins = (
7.8e3,
10.4e3,
15.6e3,
20.8e3,
31.25e3,
41.7e3,
62.5e3,
125e3,
250e3,
)
bw = 9
if sbw < 10:
bw = sbw
else:
for i in range(len(bins)):
if sbw <= bins[i]:
bw = i
break
self.writeRegister(
REG_MODEM_CONFIG_1,
(self.readRegister(REG_MODEM_CONFIG_1) & 0x0F) | (bw << 4),
)
def setCodingRate(self, denominator):
denominator = min(max(denominator, 5), 8)
cr = denominator - 4
self.writeRegister(
REG_MODEM_CONFIG_1,
(self.readRegister(REG_MODEM_CONFIG_1) & 0xF1) | (cr << 1),
)
def setPreambleLength(self, length):
self.writeRegister(REG_PREAMBLE_MSB, (length >> 8) & 0xFF)
self.writeRegister(REG_PREAMBLE_LSB, (length >> 0) & 0xFF)
def enableCRC(self, enable_CRC=False):
modem_config_2 = self.readRegister(REG_MODEM_CONFIG_2)
config = modem_config_2 | 0x04 if enable_CRC else modem_config_2 & 0xFB
self.writeRegister(REG_MODEM_CONFIG_2, config)
def invertIQ(self, invertIQ):
self.parameters["invertIQ"] = invertIQ
if invertIQ:
self.writeRegister(
REG_INVERTIQ,
(
(
self.readRegister(REG_INVERTIQ)
& RFLR_INVERTIQ_TX_MASK
& RFLR_INVERTIQ_RX_MASK
)
| RFLR_INVERTIQ_RX_ON
| RFLR_INVERTIQ_TX_ON
),
)
self.writeRegister(REG_INVERTIQ2, RFLR_INVERTIQ2_ON)
else:
self.writeRegister(
REG_INVERTIQ,
(
(
self.readRegister(REG_INVERTIQ)
& RFLR_INVERTIQ_TX_MASK
& RFLR_INVERTIQ_RX_MASK
)
| RFLR_INVERTIQ_RX_OFF
| RFLR_INVERTIQ_TX_OFF
),
)
self.writeRegister(REG_INVERTIQ2, RFLR_INVERTIQ2_OFF)
def setSyncWord(self, sw):
self.writeRegister(REG_SYNC_WORD, sw)
def setChannel(self, parameters):
self.standby()
for key in parameters:
if key == "frequency":
self.setFrequency(parameters[key])
continue
if key == "invert_IQ":
self.invertIQ(parameters[key])
continue
if key == "tx_power_level":
self.setTxPower(parameters[key])
continue
def dumpRegisters(self):
# TODO end=''
for i in range(128):
print("0x{:02X}: {:02X}".format(i, self.readRegister(i)), end="")
if (i + 1) % 4 == 0:
print()
else:
print(" | ", end="")
def implicitHeaderMode(self, implicitHeaderMode=False):
if (
self.implicit_header_mode != implicitHeaderMode
): # set value only if different.
self.implicit_header_mode = implicitHeaderMode
modem_config_1 = self.readRegister(REG_MODEM_CONFIG_1)
config = (
modem_config_1 | 0x01
if implicitHeaderMode
else modem_config_1 & 0xFE
)
self.writeRegister(REG_MODEM_CONFIG_1, config)
def receive(self, size=0):
self.implicitHeaderMode(size > 0)
if size > 0:
self.writeRegister(REG_PAYLOAD_LENGTH, size & 0xFF)
# The last packet always starts at FIFO_RX_CURRENT_ADDR
# no need to reset FIFO_ADDR_PTR
self.writeRegister(
REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_CONTINUOUS
)
def listen(self, time=1000):
time = min(max(time, 0), 10000)
self.receive()
start = ticks_ms()
while True:
if self.receivedPacket():
return self.readPayload()
if ticks_ms() - start > time:
return None
def onReceive(self, callback):
self.onReceive = callback
if "dio_0" in self.pins:
self.pin_rx_done = Pin(self.pins["dio_0"], Pin.IN)
if self.pin_rx_done:
if callback:
self.writeRegister(REG_DIO_MAPPING_1, 0x00)
self.pin_rx_done.irq(
trigger=Pin.IRQ_RISING, handler=self.handleOnReceive
)
else:
pass
# TODO detach irq
def handleOnReceive(self, event_source):
# lock until TX_Done
self.aquirelock(True)
irqFlags = self.getIrqFlags()
# RX_DONE only, irqFlags should be 0x40
if irqFlags & IRQ_RX_DONE_MASK == IRQ_RX_DONE_MASK:
# automatically standby when RX_DONE
if self.onReceive:
payload = self.readPayload()
self.onReceive(self, payload)
elif self.readRegister(REG_OP_MODE) != (
MODE_LONG_RANGE_MODE | MODE_RX_SINGLE
):
# no packet received.
# reset FIFO address / # enter single RX mode
self.writeRegister(REG_FIFO_ADDR_PTR, FifoRxBaseAddr)
self.writeRegister(
REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE
)
self.aquirelock(False) # unlock in any case.
self.collectGarbage()
return True
def receivedPacket(self, size=0):
irqFlags = self.getIrqFlags()
self.implicitHeaderMode(size > 0)
if size > 0:
self.writeRegister(REG_PAYLOAD_LENGTH, size & 0xFF)
# if (irqFlags & IRQ_RX_DONE_MASK) and \
# (irqFlags & IRQ_RX_TIME_OUT_MASK == 0) and \
# (irqFlags & IRQ_PAYLOAD_CRC_ERROR_MASK == 0):
if (
irqFlags == IRQ_RX_DONE_MASK
): # RX_DONE only, irqFlags should be 0x40
# automatically standby when RX_DONE
return True
elif self.readRegister(REG_OP_MODE) != (
MODE_LONG_RANGE_MODE | MODE_RX_SINGLE
):
# no packet received.
# reset FIFO address / # enter single RX mode
self.writeRegister(REG_FIFO_ADDR_PTR, FifoRxBaseAddr)
self.writeRegister(
REG_OP_MODE, MODE_LONG_RANGE_MODE | MODE_RX_SINGLE
)
def readPayload(self):
# set FIFO address to current RX address
# fifo_rx_current_addr = self.readRegister(REG_FIFO_RX_CURRENT_ADDR)
self.writeRegister(
REG_FIFO_ADDR_PTR, self.readRegister(REG_FIFO_RX_CURRENT_ADDR)
)
# read packet length
packet_length = 0
if self.implicit_header_mode:
packet_length = self.readRegister(REG_PAYLOAD_LENGTH)
else:
packet_length = self.readRegister(REG_RX_NB_BYTES)
payload = bytearray()
for i in range(packet_length):
payload.append(self.readRegister(REG_FIFO))
self.collectGarbage()
return bytes(payload)
def readRegister(self, address, byteorder="big", signed=False):
response = self.transfer(address & 0x7F)
return int.from_bytes(response, byteorder)
def writeRegister(self, address, value):
self.transfer(address | 0x80, value)
def transfer(self, address, value=0x00):
response = bytearray(1)
self.pin_ss.value(0)
self.spi.write(bytes([address]))
self.spi.write_readinto(bytes([value]), response)
self.pin_ss.value(1)
return response
def collectGarbage(self):
gc.collect()
# print('[Mem aft - free: {} allocated: {}]'.format(gc.mem_free(), gc.mem_alloc()))