Added test, fixed typo

This commit is contained in:
Conr86
2019-06-22 17:15:44 +10:00
parent 75457cdad8
commit e7cb94e364
3 changed files with 63 additions and 49 deletions

View File

@@ -1,23 +1,23 @@
class Crc8:
def __init__(s):
s.crc=255
s.crc = 255
def hash(s,int_list):
def hash(s, int_list):
for i in int_list:
s.addVal(i)
return s.crc
def addVal(s,n):
def addVal(s, n):
crc = s.crc
for bit in range(0,8):
if ( n ^ crc ) & 0x80:
crc = ( crc << 1 ) ^ 0x31
for bit in range(0, 8):
if (n ^ crc) & 0x80:
crc = (crc << 1) ^ 0x31
else:
crc = ( crc << 1 )
crc = (crc << 1)
n = n << 1
s.crc = crc & 0xFF
return s.crc
#print(Crc8().hash([1,144]))
# print(Crc8().hash([1,144]))
#print(hex(Crc8().hash([0xBE, 0xEF])))
#[1, 144, 76, 0, 6, 39]

View File

@@ -1,8 +1,8 @@
import smbus2
from smbus2 import SMBusWrapper, SMBus, i2c_msg
from collections import namedtuple
from collections import namedtuple
from functools import partial
from time import sleep, asctime,time
from time import sleep, asctime, time
import json
from copy import copy
import os.path
@@ -11,58 +11,60 @@ from .crc import Crc8
DEVICE_BUS = 1
BASELINE_FILENAME = os.path.expanduser("~/.sgp_config_data.txt")
class _cmds():
"""container class for mapping between human readable names and the command values used by the sgp"""
Sgp30Cmd = namedtuple("Sgp30Cmd",["commands","replylen","waittime"])
IAQ_INIT=Sgp30Cmd([0x20, 0x03],0,10)
IAQ_MEASURE=Sgp30Cmd([0x20, 0x08],6,12)
GET_BASELINE=Sgp30Cmd([0x20, 0x15],6,120)
SET_BASELINE=Sgp30Cmd([0x20, 0x1e],0,10)
SET_HUMIDITY=Sgp30Cmd([0x20, 0x61],0,10)
IAQ_SELFTEST=Sgp30Cmd([0x20, 0x32],3,520)
GET_FEATURES=Sgp30Cmd([0x20, 0x2f],3,3)
GET_SERIAL=Sgp30Cmd([0x36, 0x82],9,10)
SGP30Cmd = namedtuple("SGP30Cmd", ["commands", "replylen", "waittime"])
IAQ_INIT = SGP30Cmd([0x20, 0x03], 0, 10)
IAQ_MEASURE = SGP30Cmd([0x20, 0x08], 6, 12)
GET_BASELINE = SGP30Cmd([0x20, 0x15], 6, 120)
SET_BASELINE = SGP30Cmd([0x20, 0x1e], 0, 10)
SET_HUMIDITY = SGP30Cmd([0x20, 0x61], 0, 10)
IAQ_SELFTEST = SGP30Cmd([0x20, 0x32], 3, 520)
GET_FEATURES = SGP30Cmd([0x20, 0x2f], 3, 3)
GET_SERIAL = SGP30Cmd([0x36, 0x82], 9, 10)
@classmethod
def new_set_baseline(cls,baseline_data):
def new_set_baseline(cls, baseline_data):
cmd = cls.SET_BASELINE
return cls.Sgp30Cmd(cmd.commands +baseline_data,cmd.replylen,cmd.waittime)
return cls.SGP30Cmd(cmd.commands + baseline_data, cmd.replylen, cmd.waittime)
class Sgp30():
def __init__(self,bus,device_address = 0x58, baseline_filename=BASELINE_FILENAME):
class SGP30():
def __init__(self, bus, device_address=0x58, baseline_filename=BASELINE_FILENAME):
self._bus = bus
self._device_addr = device_address
self._start_time = time()
self._last_save_time = time()
self._baseline_filename=baseline_filename
self._baseline_filename = baseline_filename
Sgp30Answer = namedtuple("Sgp30Answer",["data","raw","crc_ok"])
def _raw_validate_crc(s,r):
a = list(zip(r[0::3],r[1::3]))
crc = r[2::3] == [Crc8().hash(i) for i in a ]
return(crc,a)
SGP30Answer = namedtuple("SGP30Answer", ["data", "raw", "crc_ok"])
def read_write(self,cmd):
write = i2c_msg.write(self._device_addr,cmd.commands)
if cmd.replylen <= 0 :
self._bus.i2c_rdwr(write)
def _raw_validate_crc(s, r):
a = list(zip(r[0::3], r[1::3]))
crc = r[2::3] == [Crc8().hash(i) for i in a]
return(crc, a)
def read_write(self, cmd):
write = i2c_msg.write(self._device_addr, cmd.commands)
if cmd.replylen <= 0:
self._bus.i2c_rdwr(write)
else:
read = i2c_msg.read(self._device_addr,cmd.replylen)
self._bus.i2c_rdwr(write)
read = i2c_msg.read(self._device_addr, cmd.replylen)
self._bus.i2c_rdwr(write)
sleep(cmd.waittime/1000.0)
self._bus.i2c_rdwr(read)
r = list(read)
crc_ok,a=self._raw_validate_crc(r)
answer = [i<<8 | j for i,j in a]
return self.Sgp30Answer(answer,r,crc_ok)
crc_ok, a = self._raw_validate_crc(r)
answer = [i << 8 | j for i, j in a]
return self.SGP30Answer(answer, r, crc_ok)
def store_baseline(self):
with open(self._baseline_filename,"w") as conf:
baseline=self.read_write(_cmds.GET_BASELINE)
with open(self._baseline_filename, "w") as conf:
baseline = self.read_write(_cmds.GET_BASELINE)
if baseline.crc_ok == True:
json.dump(baseline.raw,conf)
json.dump(baseline.raw, conf)
return True
else:
#print("Ignoring baseline due to invalid CRC")
@@ -70,14 +72,14 @@ class Sgp30():
def try_set_baseline(self):
try:
with open(self._baseline_filename,"r") as conf:
with open(self._baseline_filename, "r") as conf:
conf = json.load(conf)
except IOError:
pass
except ValueError:
pass
else:
crc,_ = self._raw_validate_crc(conf)
crc, _ = self._raw_validate_crc(conf)
if len(conf) == 6 and crc == True:
self.read_write(_cmds.new_set_baseline(conf))
return True
@@ -101,9 +103,9 @@ class Sgp30():
#print("Initializing SGP30")
self.read_write(_cmds.IAQ_INIT)
def i2c_geral_call(self):
def i2c_general_call(self):
"""This attempts to reset _ALL_ devices on the i2c buss
This command issues the i2c-general call RW command that should result
in all devices aborting any read/write operations and starting to listen
for new i2c-commands.
@@ -111,20 +113,21 @@ class Sgp30():
This will usually un-stick the SGP30, but might reset or otherwise
affect any device on the bus.
"""
self._bus.write_byte(0,0x06)
self._bus.write_byte(0, 0x06)
sleep(.1)
def main():
with SMBusWrapper(1) as bus:
sgp=Sgp30(bus,baseline_filename=BASELINE_FILENAME+".TESTING")
sgp = SGP30(bus, baseline_filename=BASELINE_FILENAME+".TESTING")
print("resetting all i2c devices")
sgp.i2c_geral_call()
sgp.i2c_general_call()
print(sgp.read_features())
print(sgp.read_serial())
sgp.init_sgp()
print(sgp.read_measurements())
bus.close()
if __name__ == "__main__":
main()

11
test.py Normal file
View File

@@ -0,0 +1,11 @@
from smbus2 import SMBusWrapper
from sgp30 import Sgp30
import time
with SMBusWrapper(1) as bus:
sgp=Sgp30(bus)
print("resetting all i2c devices")
sgp.i2c_geral_call()
print(sgp.read_features())
print(sgp.read_serial())
sgp.init_sgp()
print(sgp.read_measurements())