95 lines
3.1 KiB
Python
95 lines
3.1 KiB
Python
import smbus2
|
|
from collections import namedtuple
|
|
import time
|
|
from .crc import CRC8
|
|
|
|
class _cmds():
|
|
"""container class for mapping between human readable names and the command values used by the sgp"""
|
|
SGP30Cmd = namedtuple("SGP30Cmd", ["commands", "replylen", "waittime"])
|
|
IAQ_INIT = SGP30Cmd([0x20, 0x03], 0, 10)
|
|
IAQ_MEASURE = SGP30Cmd([0x20, 0x08], 6, 12)
|
|
GET_BASELINE = SGP30Cmd([0x20, 0x15], 6, 120)
|
|
SET_BASELINE = SGP30Cmd([0x20, 0x1e], 0, 10)
|
|
SET_HUMIDITY = SGP30Cmd([0x20, 0x61], 0, 10)
|
|
IAQ_SELFTEST = SGP30Cmd([0x20, 0x32], 3, 520)
|
|
GET_FEATURES = SGP30Cmd([0x20, 0x2f], 3, 3)
|
|
GET_SERIAL = SGP30Cmd([0x36, 0x82], 9, 10)
|
|
|
|
@classmethod
|
|
def new_set_baseline(cls, baseline_data):
|
|
cmd = cls.SET_BASELINE
|
|
return cls.SGP30Cmd(cmd.commands + baseline_data, cmd.replylen, cmd.waittime)
|
|
|
|
|
|
class SGP30():
|
|
|
|
def __init__(self, bus, address=0x58, baseline=[]):
|
|
self.bus = bus
|
|
self.address = address
|
|
self.baseline = baseline
|
|
|
|
SGP30Packet = namedtuple("SGP30Packet", ["data", "raw", "crc_ok"])
|
|
|
|
def _raw_validate_crc(s, r):
|
|
a = list(zip(r[0::3], r[1::3]))
|
|
crc = r[2::3] == [CRC8().hash(i) for i in a]
|
|
return(crc, a)
|
|
|
|
def read_write(self, cmd):
|
|
write = smbus2.i2c_msg.write(self.address, cmd.commands)
|
|
if cmd.replylen <= 0:
|
|
self.bus.i2c_rdwr(write)
|
|
else:
|
|
read = smbus2.i2c_msg.read(self.address, cmd.replylen)
|
|
self.bus.i2c_rdwr(write)
|
|
time.sleep(cmd.waittime/1000.0)
|
|
self.bus.i2c_rdwr(read)
|
|
r = list(read)
|
|
crc_ok, a = self._raw_validate_crc(r)
|
|
answer = [i << 8 | j for i, j in a]
|
|
return self.SGP30Packet(answer, r, crc_ok)
|
|
|
|
def dump_baseline(self):
|
|
baseline = self.read_write(_cmds.GET_BASELINE)
|
|
if baseline.crc_ok == True:
|
|
print(baseline)
|
|
else:
|
|
print("Ignoring baseline due to invalid CRC")
|
|
|
|
def set_baseline(self):
|
|
crc, _ = self._raw_validate_crc(self.baseline)
|
|
if len(self.baseline) == 6 and crc == True:
|
|
self.read_write(_cmds.new_set_baseline(self.baseline))
|
|
return True
|
|
else:
|
|
#print("Failed to load baseline, invalid data")
|
|
return False
|
|
|
|
def read_measurements(self):
|
|
return self.read_write(_cmds.IAQ_MEASURE)
|
|
|
|
def read_selftest(self):
|
|
return self.read_write(_cmds.IAQ_SELFTEST)
|
|
|
|
def read_serial(self):
|
|
return self.read_write(_cmds.GET_SERIAL)
|
|
|
|
def read_features(self):
|
|
return self.read_write(_cmds.GET_FEATURES)
|
|
|
|
def init_sgp(self):
|
|
self.read_write(_cmds.IAQ_INIT)
|
|
|
|
def i2c_general_call(self):
|
|
"""This attempts to reset _ALL_ devices on the i2c buss
|
|
|
|
This command issues the i2c-general call RW command that should result
|
|
in all devices aborting any read/write operations and starting to listen
|
|
for new i2c-commands.
|
|
|
|
This will usually un-stick the SGP30, but might reset or otherwise
|
|
affect any device on the bus.
|
|
"""
|
|
self.bus.write_byte(0, 0x06)
|
|
time.sleep(0.1)
|