From e7cb94e36415d333a97ae7d8f4a6bebbdf6ad6b4 Mon Sep 17 00:00:00 2001 From: Conr86 Date: Sat, 22 Jun 2019 17:15:44 +1000 Subject: [PATCH] Added test, fixed typo --- sgp30/crc.py | 16 +++++----- sgp30/sgp30.py | 85 ++++++++++++++++++++++++++------------------------ test.py | 11 +++++++ 3 files changed, 63 insertions(+), 49 deletions(-) create mode 100644 test.py diff --git a/sgp30/crc.py b/sgp30/crc.py index fa28007..2252483 100644 --- a/sgp30/crc.py +++ b/sgp30/crc.py @@ -1,23 +1,23 @@ class Crc8: def __init__(s): - s.crc=255 + s.crc = 255 - def hash(s,int_list): + def hash(s, int_list): for i in int_list: s.addVal(i) return s.crc - def addVal(s,n): + def addVal(s, n): crc = s.crc - for bit in range(0,8): - if ( n ^ crc ) & 0x80: - crc = ( crc << 1 ) ^ 0x31 + for bit in range(0, 8): + if (n ^ crc) & 0x80: + crc = (crc << 1) ^ 0x31 else: - crc = ( crc << 1 ) + crc = (crc << 1) n = n << 1 s.crc = crc & 0xFF return s.crc -#print(Crc8().hash([1,144])) +# print(Crc8().hash([1,144])) #print(hex(Crc8().hash([0xBE, 0xEF]))) #[1, 144, 76, 0, 6, 39] diff --git a/sgp30/sgp30.py b/sgp30/sgp30.py index 601763a..256a066 100644 --- a/sgp30/sgp30.py +++ b/sgp30/sgp30.py @@ -1,8 +1,8 @@ import smbus2 from smbus2 import SMBusWrapper, SMBus, i2c_msg -from collections import namedtuple +from collections import namedtuple from functools import partial -from time import sleep, asctime,time +from time import sleep, asctime, time import json from copy import copy import os.path @@ -11,58 +11,60 @@ from .crc import Crc8 DEVICE_BUS = 1 BASELINE_FILENAME = os.path.expanduser("~/.sgp_config_data.txt") + class _cmds(): """container class for mapping between human readable names and the command values used by the sgp""" - Sgp30Cmd = namedtuple("Sgp30Cmd",["commands","replylen","waittime"]) - IAQ_INIT=Sgp30Cmd([0x20, 0x03],0,10) - IAQ_MEASURE=Sgp30Cmd([0x20, 0x08],6,12) - GET_BASELINE=Sgp30Cmd([0x20, 0x15],6,120) - SET_BASELINE=Sgp30Cmd([0x20, 0x1e],0,10) - SET_HUMIDITY=Sgp30Cmd([0x20, 0x61],0,10) - IAQ_SELFTEST=Sgp30Cmd([0x20, 0x32],3,520) - GET_FEATURES=Sgp30Cmd([0x20, 0x2f],3,3) - GET_SERIAL=Sgp30Cmd([0x36, 0x82],9,10) + SGP30Cmd = namedtuple("SGP30Cmd", ["commands", "replylen", "waittime"]) + IAQ_INIT = SGP30Cmd([0x20, 0x03], 0, 10) + IAQ_MEASURE = SGP30Cmd([0x20, 0x08], 6, 12) + GET_BASELINE = SGP30Cmd([0x20, 0x15], 6, 120) + SET_BASELINE = SGP30Cmd([0x20, 0x1e], 0, 10) + SET_HUMIDITY = SGP30Cmd([0x20, 0x61], 0, 10) + IAQ_SELFTEST = SGP30Cmd([0x20, 0x32], 3, 520) + GET_FEATURES = SGP30Cmd([0x20, 0x2f], 3, 3) + GET_SERIAL = SGP30Cmd([0x36, 0x82], 9, 10) @classmethod - def new_set_baseline(cls,baseline_data): + def new_set_baseline(cls, baseline_data): cmd = cls.SET_BASELINE - return cls.Sgp30Cmd(cmd.commands +baseline_data,cmd.replylen,cmd.waittime) + return cls.SGP30Cmd(cmd.commands + baseline_data, cmd.replylen, cmd.waittime) -class Sgp30(): - def __init__(self,bus,device_address = 0x58, baseline_filename=BASELINE_FILENAME): +class SGP30(): + + def __init__(self, bus, device_address=0x58, baseline_filename=BASELINE_FILENAME): self._bus = bus self._device_addr = device_address self._start_time = time() self._last_save_time = time() - self._baseline_filename=baseline_filename + self._baseline_filename = baseline_filename - Sgp30Answer = namedtuple("Sgp30Answer",["data","raw","crc_ok"]) - - def _raw_validate_crc(s,r): - a = list(zip(r[0::3],r[1::3])) - crc = r[2::3] == [Crc8().hash(i) for i in a ] - return(crc,a) + SGP30Answer = namedtuple("SGP30Answer", ["data", "raw", "crc_ok"]) - def read_write(self,cmd): - write = i2c_msg.write(self._device_addr,cmd.commands) - if cmd.replylen <= 0 : - self._bus.i2c_rdwr(write) + def _raw_validate_crc(s, r): + a = list(zip(r[0::3], r[1::3])) + crc = r[2::3] == [Crc8().hash(i) for i in a] + return(crc, a) + + def read_write(self, cmd): + write = i2c_msg.write(self._device_addr, cmd.commands) + if cmd.replylen <= 0: + self._bus.i2c_rdwr(write) else: - read = i2c_msg.read(self._device_addr,cmd.replylen) - self._bus.i2c_rdwr(write) + read = i2c_msg.read(self._device_addr, cmd.replylen) + self._bus.i2c_rdwr(write) sleep(cmd.waittime/1000.0) self._bus.i2c_rdwr(read) r = list(read) - crc_ok,a=self._raw_validate_crc(r) - answer = [i<<8 | j for i,j in a] - return self.Sgp30Answer(answer,r,crc_ok) + crc_ok, a = self._raw_validate_crc(r) + answer = [i << 8 | j for i, j in a] + return self.SGP30Answer(answer, r, crc_ok) def store_baseline(self): - with open(self._baseline_filename,"w") as conf: - baseline=self.read_write(_cmds.GET_BASELINE) + with open(self._baseline_filename, "w") as conf: + baseline = self.read_write(_cmds.GET_BASELINE) if baseline.crc_ok == True: - json.dump(baseline.raw,conf) + json.dump(baseline.raw, conf) return True else: #print("Ignoring baseline due to invalid CRC") @@ -70,14 +72,14 @@ class Sgp30(): def try_set_baseline(self): try: - with open(self._baseline_filename,"r") as conf: + with open(self._baseline_filename, "r") as conf: conf = json.load(conf) except IOError: pass except ValueError: pass else: - crc,_ = self._raw_validate_crc(conf) + crc, _ = self._raw_validate_crc(conf) if len(conf) == 6 and crc == True: self.read_write(_cmds.new_set_baseline(conf)) return True @@ -101,9 +103,9 @@ class Sgp30(): #print("Initializing SGP30") self.read_write(_cmds.IAQ_INIT) - def i2c_geral_call(self): + def i2c_general_call(self): """This attempts to reset _ALL_ devices on the i2c buss - + This command issues the i2c-general call RW command that should result in all devices aborting any read/write operations and starting to listen for new i2c-commands. @@ -111,20 +113,21 @@ class Sgp30(): This will usually un-stick the SGP30, but might reset or otherwise affect any device on the bus. """ - self._bus.write_byte(0,0x06) + self._bus.write_byte(0, 0x06) sleep(.1) def main(): with SMBusWrapper(1) as bus: - sgp=Sgp30(bus,baseline_filename=BASELINE_FILENAME+".TESTING") + sgp = SGP30(bus, baseline_filename=BASELINE_FILENAME+".TESTING") print("resetting all i2c devices") - sgp.i2c_geral_call() + sgp.i2c_general_call() print(sgp.read_features()) print(sgp.read_serial()) sgp.init_sgp() print(sgp.read_measurements()) bus.close() + if __name__ == "__main__": main() diff --git a/test.py b/test.py new file mode 100644 index 0000000..cc26a40 --- /dev/null +++ b/test.py @@ -0,0 +1,11 @@ +from smbus2 import SMBusWrapper +from sgp30 import Sgp30 +import time +with SMBusWrapper(1) as bus: + sgp=Sgp30(bus) + print("resetting all i2c devices") + sgp.i2c_geral_call() + print(sgp.read_features()) + print(sgp.read_serial()) + sgp.init_sgp() + print(sgp.read_measurements()) \ No newline at end of file