''' ========================================================= ______ ______ /\ _ \ /\ _ \ __ \ \ \/\_\ ____ ____ ___\ \ \_\ \/\_\ \ \ \/_/_ / __ \ / __ \ / __'\ \ __/\/_/_ \ \ \_\ \/\ \_\ \/\ \__//\ \_\ \ \ \/ /\ \ \ \____/\ \____/\ \____\ \____/\ \_\ \ \_\ \/___/ \/___/ \/____/\/___/ \/_/ \/_/ __ /\ \ __ ---------------- \ \ \ /\_\ ____ __ __ __ _ pi.cocorobo.cn \ \ \ \/_/_ / _ \/\ \/\ \/\ \/ \ ---------------- \ \ \____ /\ \/\ \/\ \ \ \_\ \/> 0x131(0x31), Initial value = 0xFF. No XOROUT. N_DATA = 6 # 1 * 8 bits CRC N_CRC = 1 # Initial value. Equal to bit negation the first data (status of AHT20) INIT = 0xFF # Useful value to help calculate LAST_8_bit = 0xFF # Devide number retrieve from CRC-8 MAXIM G(x) = x8 + x5 + x4 + 1 CRC_DEVIDE_NUMBER = 0x131 # I2C communication driver for AHT20, using only smbus2 def __init__(self, BusNum=2): # Initialize AHT20 self.BusNum = BusNum self.cmd_soft_reset() # Check for calibration, if not done then do and wait 10 ms if not self.get_status_calibrated == 1: self.cmd_initialize() while not self.get_status_calibrated() == 1: time.sleep(0.01) def get_normalized_bit(self,value, bit_index): # Return only one bit from value indicated in bit_index return (value >> bit_index) & 1 def cmd_soft_reset(self): # Send the command to soft reset with SMBus(self.BusNum) as i2c_bus: i2c_bus.write_i2c_block_data(AHT20_I2CADDR, 0x0, AHT20_CMD_SOFTRESET) time.sleep(0.04) # Wait 40 ms after poweron return True def cmd_initialize(self): # Send the command to initialize (calibrate) with SMBus(self.BusNum) as i2c_bus: i2c_bus.write_i2c_block_data(AHT20_I2CADDR, 0x0 , AHT20_CMD_INITIALIZE) return True def cmd_measure(self): # Send the command to measure with SMBus(self.BusNum) as i2c_bus: i2c_bus.write_i2c_block_data(AHT20_I2CADDR, 0, AHT20_CMD_MEASURE) time.sleep(0.08) # Wait 80 ms after measure return True def get_status(self): # Get the full status byte with SMBus(self.BusNum) as i2c_bus: return i2c_bus.read_i2c_block_data(AHT20_I2CADDR, 0x0, 1)[0] return True def get_status_calibrated(self): # Get the calibrated bit return self.get_normalized_bit(self.get_status(), AHT20_STATUSBIT_CALIBRATED) def get_status_busy(self): # Get the busy bit return self.get_normalized_bit(self.get_status(), AHT20_STATUSBIT_BUSY) def get_measure(self): # Get the full measure # Command a measure self.cmd_measure() # Check if busy bit = 0, otherwise wait 80 ms and retry while self.get_status_busy() == 1: time.sleep(0.08) # Wait 80 ns # TODO: do CRC check # Read data and return it with SMBus(self.BusNum) as i2c_bus: return i2c_bus.read_i2c_block_data(AHT20_I2CADDR, 0x0, 7) def mod2_division_8bits(self,a, b, number_of_bytes, init_value): "calculate mod2 division in 8 bits. a mod b. init_value is for crc8 init value." head_of_a = 0x80 # Processiong a a = a << 8 # Preprocessing head_of_a for i in range(0, number_of_bytes): head_of_a = head_of_a << 8 b = b << 8 init_value = init_value << 8 a = a ^ init_value while (head_of_a > 0x80): # Find a 1 if (head_of_a & a): head_of_a = head_of_a >> 1 b = b >> 1 a = a ^ b else: head_of_a = head_of_a >> 1 b = b >> 1 # This will show calculate the remainder # print("a:{0}\thead of a:{1}\tb:{2}".format( # bin(a), bin(head_of_a), bin(b))) return a def AHT20_crc8_calculate(self,all_data_int): init_value = INIT # Preprocess all the data and CRCCode from AHT20 data_from_AHT20 = 0x00 # Preprocessing the first data (status) # print(bin(data_from_AHT20)) for i_data in range(0, len(all_data_int)): data_from_AHT20 = (data_from_AHT20 << 8) | all_data_int[i_data] # print(bin(data_from_AHT20)) mod_value = self.mod2_division_8bits( data_from_AHT20, CRC_DEVIDE_NUMBER, len(all_data_int), init_value) # print(mod_value) return mod_value def AHT20_crc8_check(self,all_data_int): """ The input data shoule be: Status Humidity0 Humidity1 Humidity2|Temperature0 Temperature1 Temperature2 CRCCode. In python's int64. """ mod_value = self.AHT20_crc8_calculate(all_data_int[:-1]) if (mod_value == all_data_int[-1]): return True else: return False def get_measure_CRC8(self): """ This function will calculate crc8 code with G(x) = x8 + x5 + x4 + 1 -> 0x131(0x31), Initial value = 0xFF. No XOROUT. return: all_data (1 bytes status + 2.5 byes humidity + 2.5 bytes temperature + 1 bytes crc8 code), isCRC8_pass """ all_data = self.get_measure() isCRC8_pass = self.AHT20_crc8_check(all_data) return all_data, isCRC8_pass def get_temperature(self): # Get a measure, select proper bytes, return converted data measure = self.get_measure() measure = ((measure[3] & 0xF) << 16) | (measure[4] << 8) | measure[5] measure = measure / (pow(2,20))*200-50 return measure def get_temperature_crc8(self): isCRC8Pass = False while (not isCRC8Pass): measure, isCRC8Pass = self.get_measure_CRC8() time.sleep(80 * 10**-3) measure = ((measure[3] & 0xF) << 16) | (measure[4] << 8) | measure[5] measure = measure / (pow(2,20))*200-50 return measure def get_humidity(self): # Get a measure, select proper bytes, return converted data measure = self.get_measure() measure = (measure[1] << 12) | (measure[2] << 4) | (measure[3] >> 4) measure = measure * 100 / pow(2,20) return measure def get_humidity_crc8(self): isCRC8Pass = False while (not isCRC8Pass): measure, isCRC8Pass = self.get_measure_CRC8() time.sleep(80 * 10**-3) measure = (measure[1] << 12) | (measure[2] << 4) | (measure[3] >> 4) measure = measure * 100 / pow(2,20) return measure ################################## # 2、MPU6050陀螺仪六轴传感器 # ################################## SLAVE_ADDR = 0x68 PWR_MGMT_1 = 0x6B PWR_MGMT_2 = 0x6C WHO_AM_I = 0x75 GYRO_X = 0x43 GYRO_Y = 0x45 GYRO_Z = 0x47 ACCL_X = 0x3B ACCL_Y = 0x3D ACCL_Z = 0x3F class MPU6050: def __init__(self, BusNum=2): # Initialize mpu6050 self.BusNum = BusNum self.cmd_soft_reset() def cmd_soft_reset(self): # Send the command to soft reset with SMBus(self.BusNum) as bus: bus.write_byte_data(SLAVE_ADDR, PWR_MGMT_1, 0) time.sleep(0.04) # Wait 40 ms after poweron def read_byte(self,addr): with SMBus(self.BusNum) as bus: return bus.read_byte_data(SLAVE_ADDR,addr) def read_word(self,addr): with SMBus(self.BusNum) as bus: h = bus.read_byte_data(SLAVE_ADDR, addr) l = bus.read_byte_data(SLAVE_ADDR, addr+1) val = (h << 8) + l return val def read_word_i2c(self,addr): val = self.read_word(addr) if (val >= 0x8000): return -((65535 - val) + 1) else: return val def dist(self,x, y): return math.sqrt((x*x) + (y*y)) def get_x_rotat(self,x,y, z): rad = math.atan2(y,self.dist(x, z)) return math.degrees(rad) def get_y_rotat(self,x, y, z): rad = math.atan2(x,self.dist(y, z)) return -math.degrees(rad) def read_gyro(self): GYR_X = self.read_word_i2c(GYRO_X) GYR_Y = self.read_word_i2c(GYRO_Y) GYR_Z = self.read_word_i2c(GYRO_Z) #print ("GYRO -> X:{:04.2f} Y:{:04.2f} Z:{:04.2f}".format((GYR_X/131), (GYR_Y/131), (GYR_Z/131))) return (GYR_X/131), (GYR_Y/131), (GYR_Z/131) def read_acc(self): ACC_X = self.read_word_i2c(ACCL_X) ACC_Y = self.read_word_i2c(ACCL_Y) ACC_Z = self.read_word_i2c(ACCL_Z) CALC_ACC_X = ACC_X/16384.0 CALC_ACC_Y = ACC_Y/16384.0 CALC_ACC_Z = ACC_Z/16384.0 #print ("ACCL -> X:{:04.2f} Y:{:04.2f} Z:{:04.2f}".format(CALC_ACC_X, CALC_ACC_Y, CALC_ACC_Z)) #print("ROTATE -> X:{:04.2f} Y:{:04.2f}\n".format(get_x_rotat(CALC_ACC_X, CALC_ACC_Y, CALC_ACC_Z), get_y_rotat(CALC_ACC_X, CALC_ACC_Y, CALC_ACC_Z))) return CALC_ACC_X, CALC_ACC_Y, CALC_ACC_Z,self.get_x_rotat(CALC_ACC_X, CALC_ACC_Y, CALC_ACC_Z),self.get_y_rotat(CALC_ACC_X, CALC_ACC_Y, CALC_ACC_Z) def get_gyro_x(self): return self.read_gyro()[0] def get_gyro_y(self): return self.read_gyro()[1] def get_gyro_z(self): return self.read_gyro()[2] def get_acc_x(self): return self.read_acc()[0] def get_acc_y(self): return self.read_acc()[1] def get_acc_z(self): return self.read_acc()[2] def get_angle_x(self): return self.read_acc()[3] def get_angle_y(self): return self.read_acc()[4] ################################## # 3、板载4颗按钮A、B、C、D # ################################## class BUTTON: def __init__(self, gpioId): self.gpio=224+gpioId SUNXI_GPIO.setcfg(self.gpio, SUNXI_GPIO.IN) def is_pressed(self): self.getValue=SUNXI_GPIO.input(self.gpio) if self.getValue != 1: return True else: return False ################################## # 4、光线强度传感器 # ################################## class LIGHTINTENSITY: def __init__(self, addr=b"0x05070080") -> None: self.addr = addr self.path = "/sys/class/sunxi_dump/dump" self.file = open(self.path, "wb+") self.last = self.value() def value(self): self.file.write(b"0x05070080") self.file.seek(0) return int(self.file.read()[:-1], 16) def __del__(self): try: if self.file: self.file.close() del self.file except Exception as e: pass ################################## # 5、板载LED灯 # ################################## class LED: def __init__(self, gpioId=69): self.gpioId=gpioId SUNXI_GPIO.setcfg(self.gpioId, SUNXI_GPIO.OUT) def out(self,value): if value==0 or value==1: self.value=1-value else: self.value=0 SUNXI_GPIO.output(self.gpioId,self.value) ################################## # 6、QMI8658 # ################################## class QMI8658(object): def __init__(self, smbus=2, address=0X6B): self._address = address import smbus2 self._bus = smbus2.SMBus(smbus) bRet = self.WhoAmI() if bRet: self.Read_Revision() else: return None self.Config_apply() def _read_byte(self, cmd): rec = self._bus.read_i2c_block_data(int(self._address), int(cmd), 1) return rec[0] def _read_block(self, reg, length=1): rec = self._bus.read_i2c_block_data(int(self._address), int(reg), length) return rec def _read_u16(self, cmd): LSB = self._bus.read_i2c_block_data(int(self._address), int(cmd), 1) MSB = self._bus.read_i2c_block_data(int(self._address), int(cmd)+1, 1) return (MSB[0] << 8) + LSB[0] def _write_byte(self, cmd, val): self._bus.write_i2c_block_data(int(self._address), int(cmd), bytes([int(val)])) def WhoAmI(self): bRet = False if (0x05) == self._read_byte(0x00): bRet = True return bRet def Read_Revision(self): return self._read_byte(0x01) def Config_apply(self): self._write_byte(0x02, 0x40) self._write_byte(0x08, 0x03) self._write_byte(0x03, 0x04) self._write_byte(0x04, 0x64) self._write_byte(0x06, 0x11) def Read_Raw_XYZ(self): xyz = [0, 0, 0, 0, 0, 0] vals={} raw_timestamp = self._read_block(0x30, 3) raw_acc_xyz = self._read_block(0x35, 6) raw_gyro_xyz = self._read_block(0x3b, 6) raw_xyz = self._read_block(0x35, 12) timestamp = (raw_timestamp[2] << 16) | ( raw_timestamp[1] << 8) | (raw_timestamp[0]) for i in range(6): xyz[i] = (raw_xyz[(i*2)+1] << 8) | (raw_xyz[i*2]) if xyz[i] >= 32767: xyz[i] = xyz[i]-65535 vals["AcX"]=xyz[0] vals["AcY"]=xyz[1] vals["AcZ"]=xyz[2] vals["GyX"]=xyz[3] vals["GyY"]=xyz[4] vals["GyZ"]=xyz[5] return vals def Read_XYZ(self): xyz = [0, 0, 0, 0, 0, 0] raw_xyz = self.Read_Raw_XYZ() #QMI8658AccRange_8g acc_lsb_div = (1 << 12) #QMI8658GyrRange_512dps gyro_lsb_div = 64 for i in range(3): xyz[i] = raw_xyz[i]/acc_lsb_div # (acc_lsb_div/1000.0) xyz[i+3] = raw_xyz[i+3]*1.0/gyro_lsb_div return xyz def get_accel(self, samples=10, calibration=None): result = {} for _ in range(samples): v = self.Read_Raw_XYZ() for m in v.keys(): result[m] = result.get(m, 0) + v[m] / samples if calibration: for m in calibration.keys(): if m == "AcZ": pass else: result[m] -= calibration[m] return result def calibrate(self, threshold=50): print('Calibrating QMI8658... ', end='') while True: v1 = self.get_accel(100) v2 = self.get_accel(100) if all(abs(v1[m] - v2[m]) < threshold for m in v1.keys()): print('Done.') return v1 def getPitchYawRollGxGyGz(self,calibration): caliData=self.get_accel(10, calibration) pitch=caliData["AcX"]/182.04 yaw=caliData["AcY"]/182.04 roll=caliData["AcZ"]/182.04 groX=caliData["GyX"] groY=caliData["GyY"] groZ=caliData["GyZ"] if roll<0: if pitch >=0: pitch=180-pitch elif pitch <0: pitch=(180+pitch)*(-1) if yaw >=0: yaw=180-yaw elif pitch <0: yaw=(180+yaw)*(-1) return [pitch,yaw,roll,groX,groY,groZ] ################################## # 7、STM8S # ################################## import smbus2 import time class stm8s(object): bus = smbus2.SMBus(2) # 2 indicates /dev/i2c-2 address = 0x50 # 0x00 # 触发配置 # 0x01 # 重置配置 # 0x02 # pwm0 历史配置 # 0x03 # pwm1 历史配置 # 引脚配置模式有 1. pwm 2. gpio ouput gpio input 3. adc 4.ws2812_singe 5 ws2812_multi def __init__(self): self.reset() time.sleep(0.05) pass def clear(self): self.write(1, 1) self.reset() time.sleep(0.05) def write(self, addr, val): for i in range(0, 3): try: self.bus.write_byte_data(self.address, addr, val) time.sleep(0.001) # 1ms # print(addr, val) # debug return True except Exception: time.sleep(0.01) continue return False def fastWrite(self,addr,valBlock): try: self.bus.write_i2c_block_data(self.address,addr,valBlock) return True except Exception: return False def read(self, addr): for i in range(0, 3): try: tmp = self.bus.read_byte_data(self.address, addr) time.sleep(0.001) # 1ms # print(addr, tmp) # debug return tmp except Exception: time.sleep(0.01) continue return None def reset(self): self.write(0, 1) time.sleep(0.05) # 重启并配置需要时间 def dump(self): for i in range(0, 32): print(i, self.read(i)) class singleRgb(stm8s): def __init__(self): self.valR=0 self.valG=0 self.valB=0 self.brightness=255 self.show() def setColor(self,r,g,b): if(r>=0 and r<=255): self.valR=int(r*self.brightness/255) else: self.valR=0 if(g>=0 and g<=255): self.valG=int(g*self.brightness/255) else: self.valG=0 if(b>=0 and b<=255): self.valB=int(b*self.brightness/255) else: self.valB=0 def setBrightness(self,brightness): if(brightness>=0 and brightness<=255): self.brightness=brightness else: self.brightness=0 def show(self): self.write(31, self.valB) self.write(30, self.valG) self.write(29, self.valR) self.write(28, 6) class dcMotor(stm8s): def __init__(self,id): self.id=id def m1a(self,val=0): self.write(4,5) #M1A #self.write(5,0) #self.write(6,0) self.write(7,val) def m1b(self,val=0): self.write(8,5) #M1B #self.write(9,0) #self.write(10,0) self.write(11,val) def m2a(self,val=0): self.write(12,5) #M2A #self.write(13,0) #self.write(14,0) self.write(15,val) def m2b(self,val=0): self.write(16,5) #M2B #self.write(17,0) #self.write(18,0) self.write(19,val) def dcMotorCtrl(self,dir,speed): dir=1-dir if(0=0) and (angle<=180)): self.write(23+self.id*4,angle) else: pass def digitalRead(self): try: self.gpioVal= self.read(23+ self.id*4) if(self.gpioVal != None): return self.gpioVal except: print("ERROR 233") def digitalWrite(self,val): if ((val>=0) and (val<=1)): self.write(23+self.id*4,val) else: pass def analogRead(self): try: self.adcValH= self.read(22+ self.id*4) self.adcValL= self.read(23+ self.id*4) if (self.adcValH != None and self.adcValL != None): # adcValH adcValL 大小端合并 16bit self.adcVal = self.adcValL + (self.adcValH << 8) return self.adcVal except: print("ERROR 2333") def analogWrite(self,val): if ((val>=0) and (val<=255)): self.write(23+self.id*4,val) else: pass def beep(self,frequency): if ((frequency>=20) and (frequency<=12000)): self.write(22+self.id*4,frequency>>8) self.write(23+self.id*4,frequency&0b11111111) else: pass def pixelInit_(self): for i in range(16): self.setPixelColor(i,0,0,0) self.pixelShow() self.write(23+self.id*4,1) def setBrightness(self,brightness): if(brightness>=0 and brightness<=255): self.brightness=brightness else: self.brightness=0 def setPixelColor(self,rgbId,r,g,b): if(rgbId >=0 and rgbId <=15): self.rgbId=rgbId #rgbId范围:0~15 else: self.rgbId=0 if(r>=0 and r<=255): self.valR=int(r*self.brightness/255) else: self.valR=0 if(g>=0 and g<=255): self.valG=int(g*self.brightness/255) else: self.valG=0 if(b>=0 and b<=255): self.valB=int(b*self.brightness/255) else: self.valB=0 #self.id的值为0或1,用于控制灯带缓冲区(S1或者S2引脚) def pixelShow(self): self.write(32+ self.id*48+self.rgbId*3, self.valR) self.write(33+ self.id*48+self.rgbId*3, self.valG) self.write(34+ self.id*48+self.rgbId*3, self.valB) def setRgbSerialColor(self,rgbId,r,g,b): if(rgbId >=0 and rgbId <=15): self.rgbId=rgbId #rgbId范围:0~15 else: self.rgbId=0 if(r>=0 and r<=255): self.valR=int(r*self.brightness/255) else: self.valR=0 if(g>=0 and g<=255): self.valG=int(g*self.brightness/255) else: self.valG=0 if(b>=0 and b<=255): self.valB=int(b*self.brightness/255) else: self.valB=0 self.rgbSerial[self.rgbId*3]=self.valR self.rgbSerial[self.rgbId*3+1]=self.valG self.rgbSerial[self.rgbId*3+2]=self.valB def rgbSerialShow(self): self.fastWrite(32+self.id*48,self.rgbSerial) self.write(23+self.id*4,1) ################################## # 8、UvcVideo # ################################## class UvcVideo(camera.MaixVideo): def __init__(self, source="/dev/videoX"): self.source = source super(UvcVideo, self).__init__() import os, time usb_path = '/sys/devices/platform/soc/usbc0/otg_role' # 确认 usb_path 内容为 usb_host 如果不是,自动设置为 usb_host if os.popen('cat %s' % usb_path).read().strip() != 'usb_host': os.system('echo "usb_host" > %s' % usb_path) time.sleep(2) # 要设置 2s 左右才能初始化工作,否则会报 VIDIOC_S_FMT 失败。 def config(self, size=None, video=2, horizontal=0, vertical=0): if size == None: size = (320, 240) super(UvcVideo, self).config(size) print('[camera] config input size(%d, %d, %d)' % (self.width(), self.height(), video)) if self.cam: self.cam = None try: import _coco_camera self.cam = _coco_camera.Camera(self.width(), self.height(), video, horizontal, vertical) except Exception as e: print(e) self.cam = None def read(self): if self.cam == None: self.config() if self.cam: ret, frame = self.cam.read() if ret: return frame # bytes else: try: self.config() except Exception as e: print(e) return None def __del__(self): if self.cam: self.cam = None ################################## # 9、DS3231 # ################################## from math import floor class DS3231(object): # create RTC instance def __init__(self): self.bus = smbus2.SMBus(2) # 2 indicates /dev/i2c-2 self.address = 0x68 def bcd_to_int(self,bcd, n=2): """Decode n least significant packed binary coded decimal digits to binary. Return binary result. n defaults to 2 (BCD digits). n=0 decodes all digits. """ return int(('%x' % bcd)[-n:]) def int_to_bcd(self,x, n=2): """ Encode the n least significant decimal digits of x to packed binary coded decimal (BCD). Return packed BCD value. n defaults to 2 (digits). n=0 encodes all digits. """ return int(str(x)[-n:], 0x10) def read(self, addr): for i in range(0, 3): try: tmp = self.bus.read_byte_data(self.address, addr) time.sleep(0.001) # 1ms # print(addr, tmp) # debug return tmp except Exception: time.sleep(0.01) continue return None def write(self, addr, val): for i in range(0, 3): try: self.bus.write_byte_data(self.address, addr, val) #time.sleep(0.001) # 1ms # print(addr, val) # debug return True except Exception: time.sleep(0.001) continue return False # set times functions ---------------------------------------- def setYear(self, year): # only last two digits (last two digits are used if longer) self.write(0x06, self.int_to_bcd(year%100)) def setMonth(self, month): if not 1 <= month <= 12: raise ValueError('Month is out of range [1,12].') self.write(0x05,self.int_to_bcd(month)) def setDay(self, day): if not 1 <= day <= 31: raise ValueError('Day is out of range [1,31].') self.write(0x04,self.int_to_bcd(day)) def setDayOfWeek(self, dayOfWeek): if not 1 <= dayOfWeek <= 7: raise ValueError('Day of week is out of range [1,7].') self.write(0x03,self.int_to_bcd(dayOfWeek)) def setHour(self, hour): if not 0 <= hours < 24: raise ValueError('Hours is out of range [0,23].') self.write(0x02,self.int_to_bcd(hour)& 0x3F) def setMinutes(self, minutes): if not 0 <= minutes < 59: raise ValueError('Minutes is out of range [0,59].') self.write(0x01,self.int_to_bcd(minutes)) def setSeconds(self, seconds): if not 0 <= seconds < 60: raise ValueError('Seconds is out of range [0,59].') self.write(0x00,self.int_to_bcd(seconds)) def setDateTime(self, year, month, day, dayOfWeek, hour, minutes, seconds): # set all the date and times (year is last two digits of year) self.setYear(year) self.setMonth(month) self.setDay(day) self.setDayOfWeek(dayOfWeek) self.setHour(hour) self.setMinutes(minutes) self.setSeconds(seconds) # get times functions ------------------------------------------------- def getYear(self): return self.bcd_to_int(self.read(0x06)) def getMonth(self): temp = self.read(0x05) return self.bcd_to_int((temp<<4>>4) &0x7F) #return temp[0] & 0x7F def getDay(self): # 0 - 31 return self.bcd_to_int(self.read(0x04)) def getDayOfWeek(self): # 1 - 7 return self.bcd_to_int(self.read(0x03)) def getHour(self): temp = self.read(0x02) if temp==0x64: #if hour is 24:00,convert it to 00:00 temp=0x40 return self.bcd_to_int((temp<< 4 >>4)& 0x3F) #return temp[0] & 0x3F def getMinutes(self): return self.bcd_to_int(self.read(0x01)) def getSeconds(self): return self.bcd_to_int(self.read(0x00)) def getDateTime(self): # returns whole date and time as list # (last two digits of year, month, day, day of week, hour, minutes, seconds) dateTime = [0, 0, 0, 0, 0, 0, 0] dateTime[0] = self.getYear() dateTime[1] = self.getMonth() dateTime[2] = self.getDay() dateTime[3] = self.getDayOfWeek() dateTime[4] = self.getHour() dateTime[5] = self.getMinutes() dateTime[6] = self.getSeconds() return dateTime def convertToByteType(self,number): return bytes([number]) def decodeToDec(self,byte): return ((byte[0] >> 4) * 10) + (byte[0] & 0x0F) def encodeToByte(self,dec): tens = floor(dec / 10) ones = dec - tens*10 return (tens << 4) + ones def decodeAlarmType(self,alarmTime): if(len(alarmTime) > 4): m1Bit = (alarmTime[3] & 0x80) >> 7 else: m1Bit = False m2Bit = (alarmTime[2] & 0x80) >> 7 m3Bit = (alarmTime[1] & 0x80) >> 7 m4Bit = (alarmTime[0] & 0x80) >> 7 dayBit = (alarmTime[0] & 0x40) >> 6 if(m1Bit and m2Bit and m3Bit and m4Bit): return "everySecond" elif(not m1Bit and m2Bit and m3Bit and m4Bit): return "everyMinute" elif(not m1Bit and not m2Bit and m3Bit and m4Bit): return "everyHour" elif(not m1Bit and not m2Bit and not m3Bit and m4Bit): return "everyDay" elif(not dayBit and not m1Bit and not m2Bit and not m3Bit and not m4Bit): return "everyMonth" elif(dayBit and not m1Bit and not m2Bit and not m3Bit and not m4Bit): return "everyWeek" else: return "noValidAlarmType" def decodeAlarmTime(self,alarmTime): alarmTime[0] = decodeToDec(convertToByteType(alarmTime[0] & 0x3F)) alarmTime[1] = decodeToDec(convertToByteType(alarmTime[1] & 0x3F)) alarmTime[2] = decodeToDec(convertToByteType(alarmTime[2] & 0x7F)) if(len(alarmTime) > 4): alarmTime[3] = decodeToDec(convertToByteType(alarmTime[3] & 0x7F)) return alarmTime def encodeAlarmType(self,alarmType): if(alarmType == "everySecond"): return 15 #0b01111 elif(alarmType == "everyMinute"): return 14 #0b01110 elif(alarmType == "everyHour"): return 12 #0b01100 elif(alarmType == "everyDay"): return 8 #0b01000 elif(alarmType == "everyMonth"): return 0 #0b00000 elif(alarmType == "everyWeek"): return 16 #0b10000 else: raise ValueError("""Not a supported alarmType. Options are: 'everySecond' (only Alarm 1), 'everyMinute', 'everyHour', 'everyDay', 'everyMonth', 'everyWeek'""") def encodeDateTime(self,day, hour, minutes, seconds, alarmType): alarmBits = encodeAlarmType(alarmType) alarmTime = [0, 0, 0, 0] alarmTime[0] = (encodeToByte(day) & 0x3F) | ((alarmBits & 0x10) << 2) | ((alarmBits & 0x08) << 4) alarmTime[1] = (encodeToByte(hour) & 0x3F) | ((alarmBits & 0x04) << 5) alarmTime[2] = (encodeToByte(minutes) & 0x7F) | ((alarmBits & 0x02) << 6) alarmTime[3] = (encodeToByte(seconds) & 0x7F) | ((alarmBits & 0x01) << 7) return alarmTime ################################## # 10、SPI_WS2812 # ################################## class SPI_WS2812(object): from maix import spi spi = spi.SpiDev() def __init__(self,num): self.num=num self.spi.open(0, 0) self.spi.mode = 3 self.spi.bits_per_word = 8 self.spi.max_speed_hz = 20000000 self.data=[[0,0,0]]*self.num self.brightness=255 def setRgbColor(self,id,r,g,b): if id>(self.num-1): raise ValueError("Out of range!") else: self.data[id]=[int(g*self.brightness/255),int(r*self.brightness/255),int(b*self.brightness/255)] #print("set complete!") def setBrightness(self,brightness): if brightness>255 or brightness<0: raise ValueError("Out of range!") self.brightness=brightness def rgbShow(self): tx=[] for rgb in self.data: for byte in rgb: for ibit in range(3,-1,-1): tx.append(((byte>>(2*ibit+1))&1)*0x60 + ((byte>>(2*ibit+0))&1)*0x06 + 0x88) print([hex(v) for v in tx]) tx[0]=0b1000000 tx.append(0b1000000) self.spi.xfer(tx,int(4/1.05e-6)) def __del__(self): self.data=[[0,0,0]]*self.num self.rgbShow() ################################## # 11、HCSR04 # ################################## class HCSR04: def __init__(self,Trig,Echo): pinNum={"TXD":"198","RXD":"199","CLK":"224","MOSI":"225","MISO":"226","CS":"227"} os.system("insmod /home/drivers/hcsr04.ko") pinCmd=pinNum.get(Trig)+","+pinNum.get(Echo)+",23200,0" os.system('echo "'+pinCmd+'" > /sys/class/hcsr04/value') def getDistance(self): startTime=round(time.time(),3) #print("startTime:"+str(startTime)) while True: dis=os.popen("cat /sys/class/hcsr04/value").read() #print("dis:"+str(dis)+" len:"+str(len(dis))) if "-1" in dis: pass else: dis=round(int(dis)/58,2) if dis<250: return dis else: pass if((round(time.time(),3)-startTime)>=1.200):#超时时间 return -1 def __del__(self): os.system("rmmod /home/drivers/hcsr04.ko") import struct import math import os ################################## # 12、MCP23017 # ################################## class MCP23017(object): bus = smbus2.SMBus(2) # 2 indicates /dev/i2c-2 address = 0x20 iodira=0x00 #A引脚方向寄存器地址。初始值为0xFF,输入模式 iodirb=0x01 #B引脚方向寄存器地址。初始值为0xFF,输入模式 gppua=0x0c #A引脚上拉电阻配置寄存器地址。初始值为0x00,无上拉 gppub=0x0d #B引脚上拉电阻配置寄存器地址,初始值为0x00,无上拉 gpioa=0x12 #A端口寄存器,初始值为0x00,低电平 gpiob=0x13 #B端口寄存器,初始值为0x00,低电平 DIR_A_V=0xff #A引脚方向初始值 DIR_B_V=0xff #B引脚方向初始值 GPPU_A_V=0x00 #A引脚上拉设定初始值 GPPU_B_V=0x00 #B引脚上拉设定初始值 GPIO_A_V=0x00 #A引脚电平初始值 GPIO_B_V=0x00 #B引脚电平初始值 def __init__(self): self.reset() def reset(self): self.DIR_A=self.DIR_A_V self.DIR_B=self.DIR_B_V self.GPPU_A=self.GPPU_A_V self.GPPU_B=self.GPPU_B_V self.GPIO_A=self.GPIO_A_V self.GPIO_B=self.GPIO_B_V self.byteWrite(self.iodira,self.DIR_A_V) self.byteWrite(self.iodirb,self.DIR_B_V) self.byteWrite(self.gppua,self.GPPU_A_V) self.byteWrite(self.gppub,self.GPPU_B_V) self.byteWrite(self.gpioa,self.GPIO_A_V) self.byteWrite(self.gpiob,self.GPIO_B_V) print("mcp23017 reset completed!") def byteWrite(self, addr, val): try: self.bus.write_byte_data(self.address, addr, val) return True except Exception: pass def byteRead(self,addr): tmp = self.bus.read_byte_data(self.address, addr) return tmp def bitWrite(self,reg,num,val): #寄存器值(byte类型),寄存器第...位(0-7),该位的值(0/1) return (reg>>(num+1)<<(num+1))|(val << num)|(reg<<(8-num)&0xFF)>>(8-num) def bitRead(self,reg,num): #寄存器值(byte类型),寄存器第...位(0-7) return ((reg<<(7-num)&0xFF)>>7) def setGpioDir(self,channel,pinId,direction): #set gpio direction #channel:"A" or "B" #pinId:0~7 #direction:[0:output],[1:input(default)] if channel=="A": self.DIR_A=self.bitWrite(self.DIR_A,pinId,direction) print("Dir_A="+str(self.DIR_A)) self.byteWrite(self.iodira,self.DIR_A) elif channel=="B": self.DIR_B=self.bitWrite(self.DIR_B,pinId,direction) self.byteWrite(self.iodirb,self.DIR_B) else: raise ValueError("Error Channel!") def setGppuEnable(self,channel,pinId,pullup): #set gpio pullup on or off #channel:"A" or "B" #pinId:0~7 #pullup:[0:disable],[1:enable(default)] if channel=="A": self.GPPU_A=self.bitWrite(self.GPPU_A,pinId,pullup) self.byteWrite(self.gppua,self.GPPU_A) elif channel=="B": self.GPPU_B=self.bitWrite(self.GPPU_B,pinId,pullup) self.byteWrite(self.gppub,self.GPPU_B) else: raise ValueError("Error Channel!") def setGpioLevel(self,channel,pinId,level): #set gpio volt level #channel:"A" or "B" #pinId:0~7 #level:[0:low(default)],[1:high] if channel=="A": self.GPIO_A=self.bitWrite(self.GPIO_A,pinId,level) self.byteWrite(self.gpioa,self.GPIO_A) elif channel=="B": self.GPIO_B=self.bitWrite(self.GPIO_B,pinId,level) self.byteWrite(self.gpiob,self.GPIO_B) else: raise ValueError("Error Channel!") def getGpioLevel(self,channel,pinId): #get gpio volt level #channel:"A" or "B" #pinId:0~7 #level:[0:low(default)],[1:high] if channel=="A": return self.bitRead(self.byteRead(self.gpioa),pinId) elif channel=="B": return self.bitRead(self.byteRead(self.gpiob),pinId) else: raise ValueError("Error Channel!") def __del__(self): self.reset() time.sleep(0.001) print("del mcp23017") class ADS1115(object): bus = smbus2.SMBus(2) # 2 indicates /dev/i2c-2 # Registers in the ADS1115,设备的4个寄存器 DEVICE_REG_CONVERSION = 0x00 #转换寄存器 DEVICE_REG_CONFIG = 0x01 #配置寄存器 DEVICE_REG_LO_THRESH = 0x02 #最低阈值寄存器 DEVICE_REG_HI_THRESH = 0x03 #最高阈值寄存器 # Configuration register fields #配置寄存器展开,共16bit # Operational Status #操作状态,在第15位 CONFIG_OS = 0X8000 #无效 CONFIG_OS_START = 0X8000 #开始单次转换(处于掉电状态) CONFIG_OS_PERFORMING_CONVERSION = 0X0000 #转换进行中 CONFIG_OS_NOT_PERFORMING_OPERATION = 0X8000 #转换闲置 # Differential measurements #测量模式设置,在第12-14位 CONFIG_MUX_AIN0P_AIN1N = 0X0000 # (default) #AIN0差分输入,AIN1作为参考 CONFIG_MUX_AIN1P_AIN3N = 0X1000 #AIN1差分输入,AIN3作为参考 CONFIG_MUX_AIN2P_AIN3N = 0X2000 #AIN2差分输入,AIN3作为参考 CONFIG_MUX_AIN3P_AIN3N = 0X3000 #AIN3差分输入,AIN3作为参考 # Single ended measurements #单端测量 CONFIG_MUX_AIN0P_GNDN = 0X4000 #AIN0单端输入 CONFIG_MUX_AIN1P_GNDN = 0X5000 #AIN1单端输入 CONFIG_MUX_AIN2P_GNDN = 0X6000 #AIN2单端输入 CONFIG_MUX_AIN3P_GNDN = 0X7000 #AIN3单端输入 # Programmable gain amplifier configuration #可编程增益放大器(量程选择)FSR:Feedback Shift Register,反馈移位寄存器,在第9-11位 CONFIG_FSR_6V144 = 0X0000 #CocoPi 需选择 CONFIG_FSR_4V096 = 0X0200 CONFIG_FSR_2V048 = 0X0400 # (default) CONFIG_FSR_1V024 = 0X0600 CONFIG_FSR_0V512 = 0X0800 CONFIG_FSR_0V256 = 0X0A00 CONFIG_FSR_0V256 = 0X0C00 CONFIG_FSR_0V256 = 0X0E00 # Continuous or single shot mode #设备运行模式:持续转换或者单次转换,在第8位 CONFIG_MODE_CONTINUOUS = 0X0000 #持续转换 CONFIG_MODE_SINGLE_SHOT = 0X0100 # (default) #单次转换 # Data rate #转换速率,第5-7位 CONFIG_DATA_RATE_8SPS = 0X0000 #SPS:Samples Per Second,每秒8次采样 CONFIG_DATA_RATE_16SPS = 0X0020 CONFIG_DATA_RATE_32SPS = 0X0040 CONFIG_DATA_RATE_64SPS = 0X0060 CONFIG_DATA_RATE_128SPS = 0X0080 #(default) CONFIG_DATA_RATE_2508SPS = 0X00A0 CONFIG_DATA_RATE_475SPS = 0X00C0 CONFIG_DATA_RATE_860SPS = 0X00E0 # Comparitor mode #比较器模式,第4位 CONFIG_COMP_MODE_TRADITIONAL = 0X0000 #(default) #传统比较器 CONFIG_COMP_MODE_WINDOW = 0X0010 #窗口模式 # Comparitor polarity #比较器集型,第3位,控制ALRET/RDY引脚极性 CONFIG_COMP_POL_ACTIVE_LOW = 0X0000 #(default) #低电平有效(默认) CONFIG_COMP_POL_ACTIVE_HIGH = 0X0008 #高电平有效 # Comparitor latching #锁存比较器,第2位,该位控制ALERT/RDY引脚在被置为有效后锁存,还是在转换后处于上限和下限阈值范围内清零 CONFIG_COMP_LAT = 0X0004 CONFIG_COMP_LAT_NON_LATCHING = 0X0000 #(default) #默认不锁存 CONFIG_COMP_LAT_LATCHING = 0X0004 # comparitor queue and disable #比较器置位和禁用 CONFIG_COMP_QUE = 0X0003 #默认禁用比较器并将ALERT/RDY引脚设置为高阻抗 CONFIG_COMP_QUE_1_CONV = 0X0000 #一次转换后断言 CONFIG_COMP_QUE_2_CONV = 0X0001 #两次转换后置位 CONFIG_COMP_QUE_4_CONV = 0X0002 #四次转换后置位 CONFIG_COMP_QUE_DISABLE = 0X0003 #(default) #默认禁用比较器并将ALERT/RDY引脚设置为高阻抗 # Address for the device ,设备地址选择 #addr=0x48 # ADDR tied to GND addr=0x49 # ADDR tied to VDD #addr=0x4A # ADDR tied to SDA #addr=0x4B # ADDR tied to SCL def __init__(self): self.address=0x49 self.config=0x0000 #time.sleep(0.05) pass def swap(self,val): return (((val&0xff00)>>8)|((val&0x00ff)<<8)) #交换高8位与低8位 ### def readAdc(self,channel): if (channel>3) or (channel<0): return False self.config=(self.CONFIG_OS_START + # start conversion #self.CONFIG_MUX_AIN0P_GNDN + # single ended conversion ((channel+4)<<12) + # select channel self.CONFIG_FSR_6V144 + # (5v signal) self.CONFIG_MODE_SINGLE_SHOT + # single conversion and shutdown self.CONFIG_DATA_RATE_128SPS + # data rate self.CONFIG_COMP_MODE_TRADITIONAL + # comp conventional self.CONFIG_COMP_POL_ACTIVE_LOW + # comp active low self.CONFIG_COMP_LAT_NON_LATCHING + # comp non latching self.CONFIG_COMP_QUE_DISABLE ) # comp disabled ### self.bus.write_word_data(self.address, self.DEVICE_REG_CONFIG, self.swap(self.config)) while True: # read status (note byte swap) status=self.swap(self.bus.read_word_data(self.address,self.DEVICE_REG_CONFIG)) #print("status:"+str(status)) # when the Operational Status is no longer performing a conversion # we can break out of this wait loop if (status & self.CONFIG_OS) != self.CONFIG_OS_PERFORMING_CONVERSION: break # read result (note byte swap) result=self.swap(self.bus.read_word_data(self.address,self.DEVICE_REG_CONVERSION)) # return 16 bit integer A2D result for the specified channel volt=round((result*6.144/32768),3) return [result,volt] class PCA9685(object): bus=smbus2.SMBus(2) def __init__(self,freq=400,min_us=460,max_us=2400,address=0x40,degrees=180): self.address=address self.period=1000000/freq self.min_duty = self._us2duty(min_us) self.max_duty = self._us2duty(max_us) self.freq(freq) self.reset() #for i in range(0,16): #self.duty(i,0) print("Pca9685 init") def write(self, addr, val): for i in range(0, 2): try: self.bus.write_byte_data(self.address, addr, val) #time.sleep(0.001) # 1ms # print(addr, val) # debug return True except Exception: time.sleep(0.001) continue return False def read(self,addr): for i in range(0, 3): try: tmp = self.bus.read_byte_data(self.address, addr) #time.sleep(0.001) # 1ms # print(addr, tmp) # debug return tmp except Exception: time.sleep(0.01) continue return None def reset(self): self.write(0x00,0x00) #初始化 def freq(self,freq=None): if freq is None: return int(25000000.0/4096/(self.read(0xfe)-0.5)) #设定频率freq,预分频prescale=int(25000000.0 / (4096.0 * freq) + 0.5) prescale=int(25000000.0/4096/freq+0.5) self.write(0x00,0x10) #设定pca9685为睡眠模式 self.write(0xfe,prescale) #设定频率 self.reset() time.sleep(0.01) self.write(0x00,0xa1) #设定pca9685为活跃模式 def pwm(self,index,on=None,off=None): #on和off来调节PWM的占空比 if not 0<= index <=15: raise ValueError("Pin ID out of range!") if on is None or off is None: data = self.bus.read_i2c_block_data(self.address,0x06+index*4,4) return data data= [0]*4 data[0]=int(hex(on & 0xff),16) data[1]=int(hex((on >> 8) & 0xff),16) data[2]=int(hex(off & 0xff),16) data[3]=int(hex((off >> 8) & 0xff),16) # print(data) for i in range(0,4): self.write(0x06+i+index*4,data[i]) def duty(self,index,value=None): if value == None: return self.pwm(index) elif not 0 <= value <=4095: raise ValueError("Out of range!") elif value==0: self.pwm(index,0,4096) elif value == 4095: self.pwm(index,4096,0) else: self.pwm(index,0,value) def _us2duty(self,value): return 4095*value/self.period def __del__(self): print("del pac9685") time.sleep(1) for i in range(0,16): self.duty(i,0) time.sleep(0.001) class extDcMotor(PCA9685): def __init__(self,motorId): PCA9685.__init__(self) self.motorPin=[11,12,13,10,9,8,5,6,7,4,3,2] self.motorId=motorId pass def speedControl(self,speed): self.speed=abs(speed) if not -255<= speed<=255: raise ValueError("Out of range!") if self.motorId == "C": if speed<0: self.duty(self.motorPin[0],4095) self.duty(self.motorPin[1],0) self.duty(self.motorPin[2],int(self.speed*16)) else: self.duty(self.motorPin[0],0) self.duty(self.motorPin[1],4095) self.duty(self.motorPin[2],int(self.speed*16)) elif self.motorId == "D": if speed<0: self.duty(self.motorPin[3],4095) self.duty(self.motorPin[4],0) self.duty(self.motorPin[5],int(self.speed*16)) else: self.duty(self.motorPin[3],0) self.duty(self.motorPin[4],4095) self.duty(self.motorPin[5],int(self.speed*16)) elif self.motorId == "E": if speed<0: self.duty(self.motorPin[6],4095) self.duty(self.motorPin[7],0) self.duty(self.motorPin[8],int(self.speed*16)) else: self.duty(self.motorPin[6],0) self.duty(self.motorPin[7],4095) self.duty(self.motorPin[8],int(self.speed*16)) elif self.motorId == "F": if speed<0: self.duty(self.motorPin[9],4095) self.duty(self.motorPin[10],0) self.duty(self.motorPin[11],int(self.speed*16)) else: self.duty(self.motorPin[9],0) self.duty(self.motorPin[10],4095) self.duty(self.motorPin[11],int(self.speed*16)) else: pass def __del__(self): if self.motorId == "C": self.duty(self.motorPin[0],0) self.duty(self.motorPin[1],0) self.duty(self.motorPin[2],0) elif self.motorId == "D": self.duty(self.motorPin[3],0) self.duty(self.motorPin[4],0) self.duty(self.motorPin[5],0) elif self.motorId == "E": self.duty(self.motorPin[6],0) self.duty(self.motorPin[7],0) self.duty(self.motorPin[8],0) elif self.motorId == "F": self.duty(self.motorPin[9],0) self.duty(self.motorPin[10],0) self.duty(self.motorPin[11],0) else: pass time.sleep(0.001) class extServo(PCA9685): def __init__(self,servoId): PCA9685.__init__(self) self.servoId=servoId self.servoPin=[14,15,1,0] self.degrees=180 pass def position(self,degrees=None): #index:0,1,2,3 span = self.max_duty - self.min_duty duty = self.min_duty + span * degrees / self.degrees duty = int(min(self.max_duty, max(self.min_duty, int(duty)))) self.duty(self.servoPin[self.servoId], duty) def release(self): self.duty(self.servoPin[self.servoId],0) def __del__(self): self.duty(self.servoPin[self.servoId],0) class extOutputPin(PCA9685): def __init__(self): PCA9685.__init__(self) self.pin=[14,15,1,0] self.pinId=0 pass def digitalSet(self,pinId,val): #val:0/1 self.pinId=pinId if val==0: self.duty(self.pin[self.pinId],0) else: self.duty(self.pin[self.pinId],4095) def pwmWrite(self,pinId,val): #val:0~255 self.pinId=pinId # print("pinid"+str(self.pin[self.pinId])) self.duty(self.pin[self.pinId],val*16) def __del__(self): self.duty(self.pin[self.pinId],0) print("del extPin") class extGpio(MCP23017,ADS1115,extOutputPin): def __init__(self): MCP23017.__init__(self) ADS1115.__init__(self) extOutputPin.__init__(self) self.address=0x20 #初始默认为mcp23017的地址 def pinMode(self,pinId,mode): #控制mcp23017 self.address=0x20 #---------- #pinId: #p0~p3:0~3 #a0~a3:4~7 #--------- #mode: #output:1 #input:0 if pinId>=0 and pinId<=3: if mode==0: if pinId==0: #p0 self.setGpioDir("B",2,0) #output self.setGpioLevel("B",2,1) #high self.setGpioDir("B",3,1) #input elif pinId==1: #p1 self.setGpioDir("B",1,0) self.setGpioLevel("B",1,1) self.setGpioDir("B",0,1) elif pinId==2: #p2 self.setGpioDir("B",6,0) self.setGpioLevel("B",6,1) self.setGpioDir("A",1,1) elif pinId==3: #p3 self.setGpioDir("B",5,0) self.setGpioLevel("B",5,1) self.setGpioDir("B",4,1) else: pass elif mode==1: if pinId==0: self.setGpioDir("B",2,0) #output self.setGpioLevel("B",2,1) #high self.setGpioDir("B",3,0) #output elif pinId==1: self.setGpioDir("B",1,0) self.setGpioLevel("B",1,1) self.setGpioDir("B",0,0) elif pinId==2: self.setGpioDir("B",6,0) self.setGpioLevel("B",6,1) self.setGpioDir("A",1,0) elif pinId==3: self.setGpioDir("B",5,0) self.setGpioLevel("B",5,1) self.setGpioDir("B",4,0) # print("a") time.sleep(1) else: pass else: pass elif pinId>=4 and pinId<=7: if mode==0: if pinId==4: #a0 self.setGpioDir("A",4,0) #output self.setGpioLevel("A",4,1) #high self.setGpioDir("A",5,1) #input elif pinId==5: #a1 self.setGpioDir("A",3,0) self.setGpioLevel("A",3,1) self.setGpioDir("A",7,1) elif pinId==6: #a2 self.setGpioDir("A",2,0) self.setGpioLevel("A",2,1) self.setGpioDir("A",6,1) elif pinId==7: #a3 self.setGpioDir("B",7,0) self.setGpioLevel("B",7,1) self.setGpioDir("A",0,1) else: pass elif mode==1: if pinId==4: #a0 self.setGpioDir("A",4,0) #output self.setGpioLevel("A",4,1) #high self.setGpioDir("A",5,0) #output elif pinId==5: #a1 self.setGpioDir("A",3,0) self.setGpioLevel("A",3,1) self.setGpioDir("A",7,0) elif pinId==6: #a2 self.setGpioDir("A",2,0) self.setGpioLevel("A",2,1) self.setGpioDir("A",6,0) elif pinId==7: #a3 self.setGpioDir("B",7,0) self.setGpioLevel("B",7,1) self.setGpioDir("A",0,0) else: pass else: pass def digitalWrite(self,pinId,level): #控制mcp23017 #---------- #pinId: #p0~p3:0~3 #a0~a3:4~7 #--------- #level: #5v:1 #0v:0 self.address=0x20 if pinId>=0 and pinId<=3: if pinId==0: # print("digital write level") self.setGpioLevel("B",3,level) elif pinId==1: self.setGpioLevel("B",0,level) elif pinId==2: self.setGpioLevel("A",1,level) elif pinId==3: self.setGpioLevel("B",4,level) else: pass elif pinId>=4 and pinId<=7: if pinId==4: #a0 self.setGpioLevel("A",5,level) elif pinId==5: #a1 self.setGpioLevel("A",7,level) elif pinId==6: #a2 self.setGpioLevel("A",6,level) elif pinId==7: #a3 self.setGpioLevel("A",0,level) else: pass else: pass def digitalRead(self,pinId): #控制mcp23017 self.address=0x20 #---------- #pinId: #p0~p3:0~3 #a0~a3:4~7 if pinId>=0 and pinId<=3: if pinId==0: # print("get digital write level") return self.getGpioLevel("B",3) elif pinId==1: return self.getGpioLevel("B",0) elif pinId==2: return self.getGpioLevel("A",1) elif pinId==3: return self.getGpioLevel("B",4) else: pass elif pinId>=4 and pinId<=7: if pinId==4: #a0 return self.getGpioLevel("A",5) elif pinId==5: #a1 return self.getGpioLevel("A",7) elif pinId==6: #a2 return self.getGpioLevel("A",6) elif pinId==7: #a3 return self.getGpioLevel("A",0) else: pass else: pass def analogWrite(self,pinId,pwm): #控制PCA9685,但是需要Mcp23017进行通道选择 self.address=0x20 #先用mcp23017进行通道选择 #select X channel if pinId==0: #p0 self.setGpioDir("B",2,1) #output self.setGpioLevel("B",2,0) #low elif pinId==1: #p1 self.setGpioDir("B",1,1) self.setGpioLevel("B",1,0) elif pinId==2: #p2 self.setGpioDir("B",6,1) self.setGpioLevel("B",6,0) elif pinId==3: #p3 self.setGpioDir("B",5,1) self.setGpioLevel("B",5,0) else: pass self.address=0x40 #再用PCA9685进行控制 self.pwmWrite(pinId,pwm) def analogRead(self,pinId): #控制ADS1115,但是需要Mcp23017进行通道选择 self.adcPin=[3,1,0,2] self.address=0x20 #先用mcp23017进行通道选择 #select X channel if pinId==4: #a0 self.setGpioDir("A",4,1) #input self.setGpioLevel("A",4,0) #low elif pinId==5: #a1 self.setGpioDir("A",3,1) self.setGpioLevel("A",3,0) elif pinId==6: #a2 self.setGpioDir("A",2,1) self.setGpioLevel("A",2,0) elif pinId==7: #a3 self.setGpioDir("B",7,0) self.setGpioLevel("B",7,0) else: pass self.address=0x49 #再用Ads1115进行ADC读取 return self.readAdc(self.adcPin[pinId-4]) def __del__(self): self.address=0x40 extOutputPin.__del__(self) self.address=0x20 MCP23017.__del__(self) #最先init的配置最后再del print("del finished!")