Reorganised repo

This commit is contained in:
Conr86
2019-07-09 13:36:59 +10:00
parent de2e70136a
commit 411f02dcff
4 changed files with 19 additions and 18 deletions

View File

@@ -1,2 +0,0 @@
from .sgp30 import SGP30
from .crc import Crc8

View File

@@ -1,23 +0,0 @@
class CRC8:
def __init__(s):
s.crc = 255
def hash(s, int_list):
for i in int_list:
s.addVal(i)
return s.crc
def addVal(s, n):
crc = s.crc
for bit in range(0, 8):
if (n ^ crc) & 0x80:
crc = (crc << 1) ^ 0x31
else:
crc = (crc << 1)
n = n << 1
s.crc = crc & 0xFF
return s.crc
#print(CRC8().hash([1,144]))
#print(hex(CRC8().hash([0xBE, 0xEF])))
#[1, 144, 76, 0, 6, 39]

View File

@@ -1,120 +0,0 @@
import smbus2
from smbus2 import SMBusWrapper, SMBus, i2c_msg
from collections import namedtuple
from functools import partial
from time import sleep, asctime, time
from copy import copy
from .crc import CRC8
DEVICE_BUS = 1
class _cmds():
"""container class for mapping between human readable names and the command values used by the sgp"""
SGP30Cmd = namedtuple("SGP30Cmd", ["commands", "replylen", "waittime"])
IAQ_INIT = SGP30Cmd([0x20, 0x03], 0, 10)
IAQ_MEASURE = SGP30Cmd([0x20, 0x08], 6, 12)
GET_BASELINE = SGP30Cmd([0x20, 0x15], 6, 120)
SET_BASELINE = SGP30Cmd([0x20, 0x1e], 0, 10)
SET_HUMIDITY = SGP30Cmd([0x20, 0x61], 0, 10)
IAQ_SELFTEST = SGP30Cmd([0x20, 0x32], 3, 520)
GET_FEATURES = SGP30Cmd([0x20, 0x2f], 3, 3)
GET_SERIAL = SGP30Cmd([0x36, 0x82], 9, 10)
@classmethod
def new_set_baseline(cls, baseline_data):
cmd = cls.SET_BASELINE
return cls.SGP30Cmd(cmd.commands + baseline_data, cmd.replylen, cmd.waittime)
class SGP30():
def __init__(self, bus, device_address=0x58, baseline=[]):
self._bus = bus
self._device_addr = device_address
self._start_time = time()
self._last_save_time = time()
self._baseline = baseline
SGP30Packet = namedtuple("SGP30Packet", ["data", "raw", "crc_ok"])
def _raw_validate_crc(s, r):
a = list(zip(r[0::3], r[1::3]))
crc = r[2::3] == [CRC8().hash(i) for i in a]
return(crc, a)
def read_write(self, cmd):
write = i2c_msg.write(self._device_addr, cmd.commands)
if cmd.replylen <= 0:
self._bus.i2c_rdwr(write)
else:
read = i2c_msg.read(self._device_addr, cmd.replylen)
self._bus.i2c_rdwr(write)
sleep(cmd.waittime/1000.0)
self._bus.i2c_rdwr(read)
r = list(read)
crc_ok, a = self._raw_validate_crc(r)
answer = [i << 8 | j for i, j in a]
return self.SGP30Packet(answer, r, crc_ok)
def dump_baseline(self):
baseline = self.read_write(_cmds.GET_BASELINE)
if baseline.crc_ok == True:
print(baseline)
else:
print("Ignoring baseline due to invalid CRC")
def set_baseline(self):
crc, _ = self._raw_validate_crc(self._baseline)
if len(self._baseline) == 6 and crc == True:
self.read_write(_cmds.new_set_baseline(self._baseline))
return True
else:
#print("Failed to load baseline, invalid data")
return False
def read_measurements(self):
return self.read_write(_cmds.IAQ_MEASURE)
def read_selftest(self):
return self.read_write(_cmds.IAQ_SELFTEST)
def read_serial(self):
return self.read_write(_cmds.GET_SERIAL)
def read_features(self):
return self.read_write(_cmds.GET_FEATURES)
def init_sgp(self):
self.read_write(_cmds.IAQ_INIT)
def i2c_general_call(self):
"""This attempts to reset _ALL_ devices on the i2c buss
This command issues the i2c-general call RW command that should result
in all devices aborting any read/write operations and starting to listen
for new i2c-commands.
This will usually un-stick the SGP30, but might reset or otherwise
affect any device on the bus.
"""
self._bus.write_byte(0, 0x06)
sleep(0.1)
def main():
with SMBusWrapper(1) as bus:
sgp = SGP30(bus, baseline_filename=BASELINE_FILENAME+".TESTING")
print("resetting all i2c devices")
sgp.i2c_general_call()
print(sgp.read_features())
print(sgp.read_serial())
sgp.init_sgp()
for i in range(300):
print(sgp.read_measurements())
time.sleep(0.1)
sgp.store_baseline()
bus.close()
if __name__ == "__main__":
main()