chao 2 months ago
parent
commit
ce1f25a4fd
1 changed files with 83 additions and 191 deletions
  1. 83 191
      CocoPi.py

+ 83 - 191
CocoPi.py

@@ -105,7 +105,7 @@
 #########################################################
 
 
-from smbus2 import SMBus
+from smbus2 import SMBus, i2c_msg
 import time
 import math
 from maix import gpio
@@ -122,6 +122,7 @@ sys.path.append('/root/preset/server')
 #     1、AHT20温湿度传感器       #
 ##################################
 
+# 20250715
 AHT20_I2CADDR = 0x38
 AHT20_CMD_SOFTRESET = [0xBA]
 AHT20_CMD_INITIALIZE = [0xBE, 0x08, 0x00]
@@ -130,178 +131,76 @@ AHT20_STATUSBIT_BUSY = 7                    # The 7th bit is the Busy indication
 AHT20_STATUSBIT_CALIBRATED = 3              # The 3rd bit is the CAL (calibration) Enable bit. 1 = Calibrated, 0 = not
 
 class AHT20:
-    # Usage: AHT20 crc8 checker. 
-    # A total of 6 * 8 bits data need to check. G(x) = x8 + x5 + x4 + 1 -> 0x131(0x31), Initial value = 0xFF. No XOROUT. 
-    N_DATA = 6
-    # 1 * 8 bits CRC
-    N_CRC = 1
-    # Initial value. Equal to bit negation the first data (status of AHT20)
-    INIT = 0xFF
-    # Useful value to help calculate
-    LAST_8_bit = 0xFF
-    # Devide number retrieve from CRC-8 MAXIM G(x) = x8 + x5 + x4 + 1
-    CRC_DEVIDE_NUMBER = 0x131
-    
-    # I2C communication driver for AHT20, using only smbus2
-    def __init__(self, BusNum=2):
-        # Initialize AHT20
-        self.BusNum = BusNum
-        self.cmd_soft_reset()
+    def __init__(self, bus_num=2):
+        self.bus_num = bus_num
+        self.bus = SMBus(bus_num)
+        # self._initialize_sensor()
+    
+    def _initialize_sensor(self):
 
-        # Check for calibration, if not done then do and wait 10 ms
-        if not self.get_status_calibrated == 1:
-            self.cmd_initialize()
-            while not self.get_status_calibrated() == 1:
-                time.sleep(0.01)
-                
-    def get_normalized_bit(self,value, bit_index):
-        # Return only one bit from value indicated in bit_index
-        return (value >> bit_index) & 1
+        self._write_command([0xBA])
+        time.sleep(0.04)
         
-    def cmd_soft_reset(self):
-        # Send the command to soft reset
-        with SMBus(self.BusNum) as i2c_bus:
-            i2c_bus.write_i2c_block_data(AHT20_I2CADDR, 0x0, AHT20_CMD_SOFTRESET)
-        time.sleep(0.04)    # Wait 40 ms after poweron
-        return True
-
-    def cmd_initialize(self):
-        # Send the command to initialize (calibrate)
-        with SMBus(self.BusNum) as i2c_bus:
-            i2c_bus.write_i2c_block_data(AHT20_I2CADDR, 0x0 , AHT20_CMD_INITIALIZE)
-        return True
-
-    def cmd_measure(self):
-        # Send the command to measure
-        with SMBus(self.BusNum) as i2c_bus:
-            i2c_bus.write_i2c_block_data(AHT20_I2CADDR, 0, AHT20_CMD_MEASURE)
-        time.sleep(0.08)    # Wait 80 ms after measure
-        return True
-
-    def get_status(self):
-        # Get the full status byte
-        with SMBus(self.BusNum) as i2c_bus:
-            return i2c_bus.read_i2c_block_data(AHT20_I2CADDR, 0x0, 1)[0]
-        return True
-
-    def get_status_calibrated(self):
-        # Get the calibrated bit
-        return self.get_normalized_bit(self.get_status(), AHT20_STATUSBIT_CALIBRATED)
-
-    def get_status_busy(self):
-        # Get the busy bit
-        return self.get_normalized_bit(self.get_status(), AHT20_STATUSBIT_BUSY)
-            
-    def get_measure(self):
-        # Get the full measure
-
-        # Command a measure
-        self.cmd_measure()
-
-        # Check if busy bit = 0, otherwise wait 80 ms and retry
-        while self.get_status_busy() == 1:
-            time.sleep(0.08) # Wait 80 ns
+        self._write_command([0xBE, 0x08, 0x00])
+        time.sleep(0.01)
         
-        # TODO: do CRC check
-
-        # Read data and return it
-        with SMBus(self.BusNum) as i2c_bus:
-            return i2c_bus.read_i2c_block_data(AHT20_I2CADDR, 0x0, 7)
-
-    def mod2_division_8bits(self,a, b, number_of_bytes, init_value):
-        "calculate mod2 division in 8 bits. a mod b. init_value is for crc8 init value."
-        head_of_a = 0x80
-        # Processiong a
-        a = a << 8
-        # Preprocessing head_of_a
-        for i in range(0, number_of_bytes):
-            head_of_a = head_of_a << 8
-            b = b << 8
-            init_value = init_value << 8
-        a = a ^ init_value
-        while (head_of_a > 0x80):
-            # Find a 1
-            if (head_of_a & a):
-                head_of_a = head_of_a >> 1
-                b = b >> 1
-                a = a ^ b
-            else:
-                head_of_a = head_of_a >> 1
-                b = b >> 1
-            # This will show calculate the remainder
-            # print("a:{0}\thead of a:{1}\tb:{2}".format(
-            #     bin(a), bin(head_of_a), bin(b)))
-        return a
-
-    def AHT20_crc8_calculate(self,all_data_int):
-        init_value = INIT
-        # Preprocess all the data and CRCCode from AHT20
-        data_from_AHT20 = 0x00
-        # Preprocessing the first data (status)
-        # print(bin(data_from_AHT20))
-        for i_data in range(0, len(all_data_int)):
-            data_from_AHT20 = (data_from_AHT20 << 8) | all_data_int[i_data]
-        # print(bin(data_from_AHT20))
-        mod_value = self.mod2_division_8bits(
-            data_from_AHT20, CRC_DEVIDE_NUMBER, len(all_data_int), init_value)
-        # print(mod_value)
-        return mod_value
-
-
-    def AHT20_crc8_check(self,all_data_int):
-        """
-        The input data shoule be:
-        Status Humidity0 Humidity1 Humidity2|Temperature0 Temperature1 Temperature2 CRCCode.
-        In python's int64.
-        """
-        mod_value = self.AHT20_crc8_calculate(all_data_int[:-1])
-        if (mod_value == all_data_int[-1]):
-            return True
-        else:
-            return False
-
-    def get_measure_CRC8(self):
-        """
-        This function will calculate crc8 code with G(x) = x8 + x5 + x4 + 1 -> 0x131(0x31), Initial value = 0xFF. No XOROUT.
-        return: all_data (1 bytes status + 2.5 byes humidity + 2.5 bytes temperature + 1 bytes crc8 code), isCRC8_pass
-        """
-        all_data = self.get_measure()
-        isCRC8_pass = self.AHT20_crc8_check(all_data)
-
-        return all_data, isCRC8_pass
-
-    def get_temperature(self):
-        # Get a measure, select proper bytes, return converted data
-        measure = self.get_measure()
-        measure = ((measure[3] & 0xF) << 16) | (measure[4] << 8) | measure[5]
-        measure = measure / (pow(2,20))*200-50
-        return measure
+        while not self._is_calibrated():
+            time.sleep(0.01)
+    
+    def _write_command(self, command):
+        msg = i2c_msg.write(0x38, command)
+        self.bus.i2c_rdwr(msg)
+    
+    def _read_data(self, length):
+        msg = i2c_msg.read(0x38, length)
+        self.bus.i2c_rdwr(msg)
+        return list(msg)
+    
+    def _is_calibrated(self):
+        status = self._read_data(1)[0]
+        return (status >> 3) & 1
+    
+    def _trigger_measurement(self):
+        self._write_command([0xAC, 0x33, 0x00])
+        start_time = time.time()
+        time.sleep(85 * 10**-3)
+        while True:
+            data = self._read_data(7)
+            if not (data[0] >> 7) & 1:  # Check busy bit
+                return data
+            if time.time() - start_time > 0.08:
+                raise TimeoutError("AHT20 measurement timeout")
+            time.sleep(0.01)
+    
+    def _crc8(self, data):
+        crc = 0xFF
+        for byte in data:
+            crc ^= byte
+            for _ in range(8):
+                if crc & 0x80:
+                    crc = (crc << 1) ^ 0x31
+                else:
+                    crc <<= 1
+                crc &= 0xFF
+        return crc
+    
+    def read_raw_data(self):
+        data = self._trigger_measurement()
+        crc = self._crc8(data[:6])
+        if crc != data[6]:
+            raise ValueError("CRC check failed")
         
-    def get_temperature_crc8(self):
-        isCRC8Pass = False
-        while (not isCRC8Pass): 
-            measure, isCRC8Pass = self.get_measure_CRC8()
-            time.sleep(80 * 10**-3)
-        measure = ((measure[3] & 0xF) << 16) | (measure[4] << 8) | measure[5]
-        measure = measure / (pow(2,20))*200-50
-        return measure
-
+        return data
+    
     def get_humidity(self):
-        # Get a measure, select proper bytes, return converted data
-        measure = self.get_measure()
-        measure = (measure[1] << 12) | (measure[2] << 4) | (measure[3] >> 4)
-        measure = measure * 100 / pow(2,20)
-        return measure
-
-    def get_humidity_crc8(self):
-        isCRC8Pass = False
-        while (not isCRC8Pass): 
-            measure, isCRC8Pass = self.get_measure_CRC8()
-            time.sleep(80 * 10**-3)
-        measure = (measure[1] << 12) | (measure[2] << 4) | (measure[3] >> 4)
-        measure = measure * 100 / pow(2,20)
-        return measure
-
+        data = self.read_raw_data()
+        raw_humidity = ((data[1] << 16) | (data[2] << 8) | data[3]) >> 4
+        return (raw_humidity * 100.0) / (2**20)
+    
+    def get_temperature(self):
+        data = self.read_raw_data()
+        raw_temp = ((data[3] & 0x0F) << 16) | (data[4] << 8) | data[5]
+        return (raw_temp * 200.0) / (2**20) - 50.0
 ##################################
 #   2、MPU6050陀螺仪六轴传感器   #
 ##################################
@@ -1278,7 +1177,7 @@ class MCP23017(object):
         self.byteWrite(self.gppub,self.GPPU_B_V)
         self.byteWrite(self.gpioa,self.GPIO_A_V)
         self.byteWrite(self.gpiob,self.GPIO_B_V)
-        # print("mcp23017 reset completed!")
+        print("mcp23017 reset completed!")
         
     def byteWrite(self, addr, val):
         try:
@@ -1303,7 +1202,7 @@ class MCP23017(object):
         #direction:[0:output],[1:input(default)]
         if channel=="A":
             self.DIR_A=self.bitWrite(self.DIR_A,pinId,direction)
-            # print("Dir_A="+str(self.DIR_A))
+            print("Dir_A="+str(self.DIR_A))
             self.byteWrite(self.iodira,self.DIR_A)
         elif channel=="B":
             self.DIR_B=self.bitWrite(self.DIR_B,pinId,direction)
@@ -1441,16 +1340,10 @@ class ADS1115(object):
     
     def swap(self,val):
         return (((val&0xff00)>>8)|((val&0x00ff)<<8))                    #交换高8位与低8位
-    def numberMap(self,value, leftMin, leftMax, rightMin, rightMax):
-        leftSpan = leftMax - leftMin
-        rightSpan = rightMax - rightMin
-        valueScaled = float(value - leftMin) / float(leftSpan)
-        return rightMin + (valueScaled * rightSpan)
     ###
     def readAdc(self,channel):
-        # print("111111111111111",(channel>3) or (channel<0))
-        # if (channel>3) or (channel<0):
-        #     return False
+        if (channel>3) or (channel<0):
+            return False
         self.config=(self.CONFIG_OS_START +                                # start conversion 
             #self.CONFIG_MUX_AIN0P_GNDN  +                         # single ended conversion
             ((channel+4)<<12) +                                         # select channel
@@ -1474,9 +1367,8 @@ class ADS1115(object):
         # read result (note byte swap)
         result=self.swap(self.bus.read_word_data(self.address,self.DEVICE_REG_CONVERSION))
         # return 16 bit integer A2D result for the specified channel
-        # volt=round((result*6.144/32768),3)
-        result = int(self.numberMap(result,0,32768,0,1023))
-        return result
+        volt=round((result*6.144/32768),3)
+        return [result,volt]
 
 class PCA9685(object):
     bus=smbus2.SMBus(2)
@@ -1489,7 +1381,7 @@ class PCA9685(object):
         self.reset()
         #for i in range(0,16):
             #self.duty(i,0)
-        # print("Pca9685 init")
+        print("Pca9685 init")
         
     def write(self, addr, val):
         for i in range(0, 2):
@@ -1681,7 +1573,7 @@ class extOutputPin(PCA9685):
     
     def __del__(self):
         self.duty(self.pin[self.pinId],0)
-        print("del extPin")
+        # print("del extPin")
         
 class extGpio(MCP23017,ADS1115,extOutputPin):
     def __init__(self):
@@ -1858,16 +1750,16 @@ class extGpio(MCP23017,ADS1115,extOutputPin):
         #select X channel
         if pinId==0:            #p0
             self.setGpioDir("B",2,1)    #output
-            self.setGpioLevel("B",2,1)    #low
+            self.setGpioLevel("B",2,0)    #low
         elif pinId==1:            #p1
             self.setGpioDir("B",1,1)
-            self.setGpioLevel("B",1,1)
+            self.setGpioLevel("B",1,0)
         elif pinId==2:            #p2
             self.setGpioDir("B",6,1)
-            self.setGpioLevel("B",6,1)
+            self.setGpioLevel("B",6,0)
         elif pinId==3:            #p3
             self.setGpioDir("B",5,1)
-            self.setGpioLevel("B",5,1)
+            self.setGpioLevel("B",5,0)
         else:
             pass
         self.address=0x40                #再用PCA9685进行控制
@@ -1878,13 +1770,13 @@ class extGpio(MCP23017,ADS1115,extOutputPin):
         self.address=0x20                #先用mcp23017进行通道选择
         #select X channel
         if pinId==4:            #a0
-            self.setGpioDir("A",4,0)    #input
+            self.setGpioDir("A",4,1)    #input
             self.setGpioLevel("A",4,0)    #low
         elif pinId==5:            #a1
-            self.setGpioDir("A",3,0)
+            self.setGpioDir("A",3,1)
             self.setGpioLevel("A",3,0)
         elif pinId==6:            #a2
-            self.setGpioDir("A",2,0)
+            self.setGpioDir("A",2,1)
             self.setGpioLevel("A",2,0)
         elif pinId==7:            #a3
             self.setGpioDir("B",7,0)
@@ -1898,4 +1790,4 @@ class extGpio(MCP23017,ADS1115,extOutputPin):
         extOutputPin.__del__(self)
         self.address=0x20
         MCP23017.__del__(self)        #最先init的配置最后再del
-        # print("del fini
+        print("del finished!")