''' ========================================================= ______ ______ /\ _ \ /\ _ \ __ \ \ \/\_\ ____ ____ ___\ \ \_\ \/\_\ \ \ \/_/_ / __ \ / __ \ / __'\ \ __/\/_/_ \ \ \_\ \/\ \_\ \/\ \__//\ \_\ \ \ \/ /\ \ \ \____/\ \____/\ \____\ \____/\ \_\ \ \_\ \/___/ \/___/ \/____/\/___/ \/_/ \/_/ __ /\ \ __ ---------------- \ \ \ /\_\ ____ __ __ __ _ pi.cocorobo.cn \ \ \ \/_/_ / _ \/\ \/\ \/\ \/ \ ---------------- \ \ \____ /\ \/\ \/\ \ \ \_\ \/> 0x131(0x31), Initial value = 0xFF. No XOROUT. N_DATA = 6 # 1 * 8 bits CRC N_CRC = 1 # Initial value. Equal to bit negation the first data (status of AHT20) INIT = 0xFF # Useful value to help calculate LAST_8_bit = 0xFF # Devide number retrieve from CRC-8 MAXIM G(x) = x8 + x5 + x4 + 1 CRC_DEVIDE_NUMBER = 0x131 # I2C communication driver for AHT20, using only smbus2 def __init__(self, BusNum=2): # Initialize AHT20 self.BusNum = BusNum self.cmd_soft_reset() # Check for calibration, if not done then do and wait 10 ms if not self.get_status_calibrated == 1: self.cmd_initialize() while not self.get_status_calibrated() == 1: time.sleep(0.01) def get_normalized_bit(self,value, bit_index): # Return only one bit from value indicated in bit_index return (value >> bit_index) & 1 def cmd_soft_reset(self): # Send the command to soft reset with SMBus(self.BusNum) as i2c_bus: i2c_bus.write_i2c_block_data(AHT20_I2CADDR, 0x0, AHT20_CMD_SOFTRESET) time.sleep(0.04) # Wait 40 ms after poweron return True def cmd_initialize(self): # Send the command to initialize (calibrate) with SMBus(self.BusNum) as i2c_bus: i2c_bus.write_i2c_block_data(AHT20_I2CADDR, 0x0 , AHT20_CMD_INITIALIZE) return True def cmd_measure(self): # Send the command to measure with SMBus(self.BusNum) as i2c_bus: i2c_bus.write_i2c_block_data(AHT20_I2CADDR, 0, AHT20_CMD_MEASURE) time.sleep(0.08) # Wait 80 ms after measure return True def get_status(self): # Get the full status byte with SMBus(self.BusNum) as i2c_bus: return i2c_bus.read_i2c_block_data(AHT20_I2CADDR, 0x0, 1)[0] return True def get_status_calibrated(self): # Get the calibrated bit return self.get_normalized_bit(self.get_status(), AHT20_STATUSBIT_CALIBRATED) def get_status_busy(self): # Get the busy bit return self.get_normalized_bit(self.get_status(), AHT20_STATUSBIT_BUSY) def get_measure(self): # Get the full measure # Command a measure self.cmd_measure() # Check if busy bit = 0, otherwise wait 80 ms and retry while self.get_status_busy() == 1: time.sleep(0.08) # Wait 80 ns # TODO: do CRC check # Read data and return it with SMBus(self.BusNum) as i2c_bus: return i2c_bus.read_i2c_block_data(AHT20_I2CADDR, 0x0, 7) def mod2_division_8bits(self,a, b, number_of_bytes, init_value): "calculate mod2 division in 8 bits. a mod b. init_value is for crc8 init value." head_of_a = 0x80 # Processiong a a = a << 8 # Preprocessing head_of_a for i in range(0, number_of_bytes): head_of_a = head_of_a << 8 b = b << 8 init_value = init_value << 8 a = a ^ init_value while (head_of_a > 0x80): # Find a 1 if (head_of_a & a): head_of_a = head_of_a >> 1 b = b >> 1 a = a ^ b else: head_of_a = head_of_a >> 1 b = b >> 1 # This will show calculate the remainder # print("a:{0}\thead of a:{1}\tb:{2}".format( # bin(a), bin(head_of_a), bin(b))) return a def AHT20_crc8_calculate(self,all_data_int): init_value = INIT # Preprocess all the data and CRCCode from AHT20 data_from_AHT20 = 0x00 # Preprocessing the first data (status) # print(bin(data_from_AHT20)) for i_data in range(0, len(all_data_int)): data_from_AHT20 = (data_from_AHT20 << 8) | all_data_int[i_data] # print(bin(data_from_AHT20)) mod_value = self.mod2_division_8bits( data_from_AHT20, CRC_DEVIDE_NUMBER, len(all_data_int), init_value) # print(mod_value) return mod_value def AHT20_crc8_check(self,all_data_int): """ The input data shoule be: Status Humidity0 Humidity1 Humidity2|Temperature0 Temperature1 Temperature2 CRCCode. In python's int64. """ mod_value = self.AHT20_crc8_calculate(all_data_int[:-1]) if (mod_value == all_data_int[-1]): return True else: return False def get_measure_CRC8(self): """ This function will calculate crc8 code with G(x) = x8 + x5 + x4 + 1 -> 0x131(0x31), Initial value = 0xFF. No XOROUT. return: all_data (1 bytes status + 2.5 byes humidity + 2.5 bytes temperature + 1 bytes crc8 code), isCRC8_pass """ all_data = self.get_measure() isCRC8_pass = self.AHT20_crc8_check(all_data) return all_data, isCRC8_pass def get_temperature(self): # Get a measure, select proper bytes, return converted data measure = self.get_measure() measure = ((measure[3] & 0xF) << 16) | (measure[4] << 8) | measure[5] measure = measure / (pow(2,20))*200-50 return measure def get_temperature_crc8(self): isCRC8Pass = False while (not isCRC8Pass): measure, isCRC8Pass = self.get_measure_CRC8() time.sleep(80 * 10**-3) measure = ((measure[3] & 0xF) << 16) | (measure[4] << 8) | measure[5] measure = measure / (pow(2,20))*200-50 return measure def get_humidity(self): # Get a measure, select proper bytes, return converted data measure = self.get_measure() measure = (measure[1] << 12) | (measure[2] << 4) | (measure[3] >> 4) measure = measure * 100 / pow(2,20) return measure def get_humidity_crc8(self): isCRC8Pass = False while (not isCRC8Pass): measure, isCRC8Pass = self.get_measure_CRC8() time.sleep(80 * 10**-3) measure = (measure[1] << 12) | (measure[2] << 4) | (measure[3] >> 4) measure = measure * 100 / pow(2,20) return measure ################################## # 2、MPU6050陀螺仪六轴传感器 # ################################## SLAVE_ADDR = 0x68 PWR_MGMT_1 = 0x6B PWR_MGMT_2 = 0x6C WHO_AM_I = 0x75 GYRO_X = 0x43 GYRO_Y = 0x45 GYRO_Z = 0x47 ACCL_X = 0x3B ACCL_Y = 0x3D ACCL_Z = 0x3F class MPU6050: def __init__(self, BusNum=2): # Initialize mpu6050 self.BusNum = BusNum self.cmd_soft_reset() def cmd_soft_reset(self): # Send the command to soft reset with SMBus(self.BusNum) as bus: bus.write_byte_data(SLAVE_ADDR, PWR_MGMT_1, 0) time.sleep(0.04) # Wait 40 ms after poweron def read_byte(self,addr): with SMBus(self.BusNum) as bus: return bus.read_byte_data(SLAVE_ADDR,addr) def read_word(self,addr): with SMBus(self.BusNum) as bus: h = bus.read_byte_data(SLAVE_ADDR, addr) l = bus.read_byte_data(SLAVE_ADDR, addr+1) val = (h << 8) + l return val def read_word_i2c(self,addr): val = self.read_word(addr) if (val >= 0x8000): return -((65535 - val) + 1) else: return val def dist(self,x, y): return math.sqrt((x*x) + (y*y)) def get_x_rotat(self,x,y, z): rad = math.atan2(y,self.dist(x, z)) return math.degrees(rad) def get_y_rotat(self,x, y, z): rad = math.atan2(x,self.dist(y, z)) return -math.degrees(rad) def read_gyro(self): GYR_X = self.read_word_i2c(GYRO_X) GYR_Y = self.read_word_i2c(GYRO_Y) GYR_Z = self.read_word_i2c(GYRO_Z) #print ("GYRO -> X:{:04.2f} Y:{:04.2f} Z:{:04.2f}".format((GYR_X/131), (GYR_Y/131), (GYR_Z/131))) return (GYR_X/131), (GYR_Y/131), (GYR_Z/131) def read_acc(self): ACC_X = self.read_word_i2c(ACCL_X) ACC_Y = self.read_word_i2c(ACCL_Y) ACC_Z = self.read_word_i2c(ACCL_Z) CALC_ACC_X = ACC_X/16384.0 CALC_ACC_Y = ACC_Y/16384.0 CALC_ACC_Z = ACC_Z/16384.0 #print ("ACCL -> X:{:04.2f} Y:{:04.2f} Z:{:04.2f}".format(CALC_ACC_X, CALC_ACC_Y, CALC_ACC_Z)) #print("ROTATE -> X:{:04.2f} Y:{:04.2f}\n".format(get_x_rotat(CALC_ACC_X, CALC_ACC_Y, CALC_ACC_Z), get_y_rotat(CALC_ACC_X, CALC_ACC_Y, CALC_ACC_Z))) return CALC_ACC_X, CALC_ACC_Y, CALC_ACC_Z,self.get_x_rotat(CALC_ACC_X, CALC_ACC_Y, CALC_ACC_Z),self.get_y_rotat(CALC_ACC_X, CALC_ACC_Y, CALC_ACC_Z) def get_gyro_x(self): return self.read_gyro()[0] def get_gyro_y(self): return self.read_gyro()[1] def get_gyro_z(self): return self.read_gyro()[2] def get_acc_x(self): return self.read_acc()[0] def get_acc_y(self): return self.read_acc()[1] def get_acc_z(self): return self.read_acc()[2] def get_angle_x(self): return self.read_acc()[3] def get_angle_y(self): return self.read_acc()[4] ''' ################################## # 3、板载4颗按钮A、B、C、D # ################################## class BUTTON: import os def __init__(self, gpioId): sendMsg_1='echo ' sendMsg_2=' > /sys/class/gpio/export' sendMsg_3=' > /sys/class/gpio/unexport' sendMsg_4='echo "in" > /sys/class/gpio/gpio' sendMsg_5='/direction' sendMsg_6='/sys/class/gpio/gpio' sendMsg_7='/value' self.gpio=str(224+gpioId) self.msgStart=sendMsg_1+self.gpio+sendMsg_2 self.msgMode=sendMsg_4+self.gpio+sendMsg_5 self.msgDel=sendMsg_1+self.gpio+sendMsg_3 self.msgGet=sendMsg_6+self.gpio+sendMsg_7 def is_pressed(self): import os if(os.access(self.msgGet, os.F_OK) is False): os.system(self.msgStart) os.system(self.msgMode) with open(self.msgGet, "rb") as self.file: self.getValue=int(self.file.read()) if self.getValue != 1: return True else: return False def __del__(self): os.system(self.msgDel) ''' ################################## # 3、板载4颗按钮A、B、C、D # ################################## class BUTTON: def __init__(self, gpioId): self.gpio=224+gpioId SUNXI_GPIO.setcfg(self.gpio, SUNXI_GPIO.IN) def is_pressed(self): self.getValue=SUNXI_GPIO.input(self.gpio) if self.getValue != 1: return True else: return False ################################## # 4、光线强度传感器 # ################################## class LIGHTINTENSITY: def __init__(self, addr=b"0x05070080") -> None: self.addr = addr self.path = "/sys/class/sunxi_dump/dump" self.file = open(self.path, "wb+") self.last = self.value() def value(self): self.file.write(b"0x05070080") self.file.seek(0) return int(self.file.read()[:-1], 16) def __del__(self): try: if self.file: self.file.close() del self.file except Exception as e: pass ''' ################################## # 5、板载LED灯 # ################################## class LED: import os def __init__(self, gpioId=14): sendMsg_1='echo ' sendMsg_2=' > /sys/class/gpio/export' sendMsg_3=' > /sys/class/gpio/unexport' sendMsg_4='echo "out" > /sys/class/gpio/gpio' sendMsg_5='/direction' self.sendMsg_6=' > /sys/class/gpio/gpio' self.sendMsg_7='/value' self.sendMsg_8='echo ' self.gpio=str(224+gpioId) self.msgStart=sendMsg_1+self.gpio+sendMsg_2 self.msgMode=sendMsg_4+self.gpio+sendMsg_5 self.msgDel=sendMsg_1+self.gpio+sendMsg_3 self.msgGet=self.sendMsg_6+self.gpio+self.sendMsg_7 def out(self,value): import os os.system(self.msgStart) os.system(self.msgMode) if value==0 or value==1: self.value=value else: self.value=0 self.msgSet=self.sendMsg_8+str(self.value)+self.sendMsg_6+self.gpio+self.sendMsg_7 os.system(self.msgSet) def __del__(self): os.system(self.msgDel) ''' ################################## # 5、板载LED灯 # ################################## class LED: def __init__(self, gpioId=69): self.gpioId=gpioId SUNXI_GPIO.setcfg(self.gpioId, SUNXI_GPIO.OUT) def out(self,value): if value==0 or value==1: self.value=1-value else: self.value=0 SUNXI_GPIO.output(self.gpioId,self.value) ''' ################################## # 6、直流电机 # ################################## class DCMOTOR: def __init__(self,dcMotorID="M1"): self.motorId=dcMotorID os.system('i2cset -y 2 0x40 0x00 0x00') #初始化 #设定频率freq=50,预分频prescale=int(25000000.0 / 4096.0 / freq + 0.5) os.system('i2cset -y 2 0x40 0x00 0x10') #设定pca9685为睡眠模式 os.system('i2cset -y 2 0x40 0xfe 0x7a') #设定预分频 os.system('i2cset -y 2 0x40 0x00 0x00') #重新初始化 time.sleep(0.01) os.system('i2cset -y 2 0x40 0x00 0xa1') #设定pca9685为活跃模式 def numberMap(self,value, leftMin, leftMax, rightMin, rightMax): leftSpan = leftMax - leftMin rightSpan = rightMax - rightMin valueScaled = float(value - leftMin) / float(leftSpan) return rightMin + (valueScaled * rightSpan) def dcMotorOffRegisterValue(self,dcMotorSetSpeed): dcMotorOffRegister = int(self.numberMap(dcMotorSetSpeed,0,255,0,4095)) dcMotorOffRegister_L = hex(dcMotorOffRegister & 0xff) dcMotorOffRegister_H = hex((dcMotorOffRegister >> 8) & 0xff) return dcMotorOffRegister_L, dcMotorOffRegister_H def dcMotorControl(self,dcMotorSetSpeed=0,dcMotorSetRotationDirection=True): if self.motorId=="M1": if dcMotorSetRotationDirection == True: dcMotorOffRegisterValue6_L ='i2cset -y 2 0x40 0x20 '+str(self.dcMotorOffRegisterValue(dcMotorSetSpeed)[0]) dcMotorOffRegisterValue6_H ='i2cset -y 2 0x40 0x21 '+str(self.dcMotorOffRegisterValue(dcMotorSetSpeed)[1]) os.system('i2cset -y 2 0x40 0x1e 0x00') #设定6通道ON_L为0 os.system('i2cset -y 2 0x40 0x1f 0x00') #设定6通道ON_H为0 os.system(dcMotorOffRegisterValue6_L) #设定6通道OFF_L为速度寄存器值低位 os.system(dcMotorOffRegisterValue6_H) #设定6通道OFF_H为速度寄存器值高位 os.system('i2cset -y 2 0x40 0x22 0x00') #设定7通道ON_L为0 os.system('i2cset -y 2 0x40 0x23 0x00') #设定7通道ON_H为0 os.system('i2cset -y 2 0x40 0x24 0x00') #设定7通道OFF_L为0 os.system('i2cset -y 2 0x40 0x25 0x00') #设定7通道OFF_H为0 elif dcMotorSetRotationDirection == False: dcMotorOffRegisterValue7_L ='i2cset -y 2 0x40 0x24 '+str(self.dcMotorOffRegisterValue(dcMotorSetSpeed)[0]) dcMotorOffRegisterValue7_H ='i2cset -y 2 0x40 0x25 '+str(self.dcMotorOffRegisterValue(dcMotorSetSpeed)[1]) os.system('i2cset -y 2 0x40 0x1e 0x00') #设定6通道ON_L为0 os.system('i2cset -y 2 0x40 0x1f 0x00') #设定6通道ON_H为0 os.system('i2cset -y 2 0x40 0x20 0x00') #设定6通道OFF_L为0 os.system('i2cset -y 2 0x40 0x21 0x00') #设定6通道OFF_H为0 os.system('i2cset -y 2 0x40 0x22 0x00') #设定7通道ON_L为0 os.system('i2cset -y 2 0x40 0x23 0x00') #设定7通道ON_H为0 os.system(dcMotorOffRegisterValue7_L) #设定7通道OFF_L为速度寄存器值低位 os.system(dcMotorOffRegisterValue7_H) #设定7通道OFF_H为速度寄存器值高位 else: pass elif self.motorId=="M2": if dcMotorSetRotationDirection == True: dcMotorOffRegisterValue9_L ='i2cset -y 2 0x40 0x2c '+str(self.dcMotorOffRegisterValue(dcMotorSetSpeed)[0]) dcMotorOffRegisterValue9_H ='i2cset -y 2 0x40 0x2d '+str(self.dcMotorOffRegisterValue(dcMotorSetSpeed)[1]) os.system('i2cset -y 2 0x40 0x2a 0x00') #设定9通道ON_L为0 os.system('i2cset -y 2 0x40 0x2b 0x00') #设定9通道ON_H为0 os.system(dcMotorOffRegisterValue9_L) #设定9通道OFF_L为速度寄存器值低位 os.system(dcMotorOffRegisterValue9_H) #设定9通道OFF_H为速度寄存器值高位 os.system('i2cset -y 2 0x40 0x2e 0x00') #设定10通道ON_L为0 os.system('i2cset -y 2 0x40 0x2f 0x00') #设定10通道ON_H为0 os.system('i2cset -y 2 0x40 0x30 0x00') #设定10通道OFF_L为0 os.system('i2cset -y 2 0x40 0x31 0x00') #设定10通道OFF_H为0 elif dcMotorSetRotationDirection == False: dcMotorOffRegisterValue10_L ='i2cset -y 2 0x40 0x30 '+str(self.dcMotorOffRegisterValue(dcMotorSetSpeed)[0]) dcMotorOffRegisterValue10_H ='i2cset -y 2 0x40 0x31 '+str(self.dcMotorOffRegisterValue(dcMotorSetSpeed)[1]) os.system('i2cset -y 2 0x40 0x2a 0x00') #设定9通道ON_L为0 os.system('i2cset -y 2 0x40 0x2b 0x00') #设定9通道ON_H为0 os.system('i2cset -y 2 0x40 0x2c 0x00') #设定9通道OFF_L为0 os.system('i2cset -y 2 0x40 0x2d 0x00') #设定9通道OFF_H为0 os.system('i2cset -y 2 0x40 0x2e 0x00') #设定10通道ON_L为0 os.system('i2cset -y 2 0x40 0x2f 0x00') #设定10通道ON_H为0 os.system(dcMotorOffRegisterValue10_L) #设定10通道OFF_L为速度寄存器值低位 os.system(dcMotorOffRegisterValue10_H) #设定10通道OFF_H为速度寄存器值高位 else: pass else: pass def __del__(self): if self.motorId=="M1": os.system('i2cset -y 2 0x40 0x1e 0x00') #设定6通道ON_L为0 os.system('i2cset -y 2 0x40 0x1f 0x00') #设定6通道ON_H为0 os.system('i2cset -y 2 0x40 0x20 0x00') #设定6通道OFF_L为0 os.system('i2cset -y 2 0x40 0x21 0x00') #设定6通道OFF_H为0 os.system('i2cset -y 2 0x40 0x22 0x00') #设定7通道ON_L为0 os.system('i2cset -y 2 0x40 0x23 0x00') #设定7通道ON_H为0 os.system('i2cset -y 2 0x40 0x24 0x00') #设定7通道OFF_L为0 os.system('i2cset -y 2 0x40 0x25 0x00') #设定7通道OFF_H为0 elif self.motorId=="M2": os.system('i2cset -y 2 0x40 0x2a 0x00') #设定9通道ON_L为0 os.system('i2cset -y 2 0x40 0x2b 0x00') #设定9通道ON_H为0 os.system('i2cset -y 2 0x40 0x2c 0x00') #设定9通道OFF_L为0 os.system('i2cset -y 2 0x40 0x2d 0x00') #设定9通道OFF_H为0 os.system('i2cset -y 2 0x40 0x2e 0x00') #设定10通道ON_L为0 os.system('i2cset -y 2 0x40 0x2f 0x00') #设定10通道ON_H为0 os.system('i2cset -y 2 0x40 0x30 0x00') #设定10通道OFF_H为0 os.system('i2cset -y 2 0x40 0x31 0x00') #设定10通道OFF_L为0 else: pass ''' ''' ################################## # 7、模拟舵机 # ################################## class SERVO: def __init__(self,servoID="S1"): self.servoId=servoID os.system('i2cset -y 2 0x40 0x00 0x00') #初始化 #设定频率freq=50,预分频prescale=int(25000000.0 / 4096.0 / freq + 0.5) os.system('i2cset -y 2 0x40 0x00 0x10') #设定pca9685为睡眠模式 os.system('i2cset -y 2 0x40 0xfe 0x7a') #设定预分频 os.system('i2cset -y 2 0x40 0x00 0x00') #重新初始化 time.sleep(0.01) os.system('i2cset -y 2 0x40 0x00 0xa1') #设定pca9685为活跃模式 def servoOffRegisterValue(self,servoSetDegree): servoPwmus = (servoSetDegree * 2000 / 180 + 500) # 0.6 ~ 2.4——【2023.03.14更改:0.5~2.5】 servoOffRegister = int(servoPwmus * 4096 / 20000) servoOffRegister_L = hex(servoOffRegister & 0xff) servoOffRegister_H = hex((servoOffRegister >> 8) & 0xff) return servoOffRegister_L, servoOffRegister_H def servoControl(self,servoSetDegree=0): if self.servoId=="S1": servoOffRegisterCmd5_L ="i2cset -y 2 0x40 0x1c "+str(self.servoOffRegisterValue(servoSetDegree)[0]) servoOffRegisterCmd5_H ="i2cset -y 2 0x40 0x1d "+str(self.servoOffRegisterValue(servoSetDegree)[1]) os.system('i2cset -y 2 0x40 0x1a 0x00') #设定5通道ON_L为0 os.system('i2cset -y 2 0x40 0x1b 0x00') #设定5通道ON_H为0 os.system(servoOffRegisterCmd5_L) #设定5通道OFF_L为 os.system(servoOffRegisterCmd5_H) #设定5通道OFF_H为 elif self.servoId=="S2": servoOffRegisterCmd11_L ="i2cset -y 2 0x40 0x34 "+str(self.servoOffRegisterValue(servoSetDegree)[0]) servoOffRegisterCmd11_H ="i2cset -y 2 0x40 0x35 "+str(self.servoOffRegisterValue(servoSetDegree)[1]) os.system('i2cset -y 2 0x40 0x32 0x00') #设定11通道ON_L为0 os.system('i2cset -y 2 0x40 0x33 0x00') #设定11通道ON_H为0 os.system(servoOffRegisterCmd11_L) #设定11通道OFF_L为 os.system(servoOffRegisterCmd11_H) #设定11通道OFF_H为 else: pass def __del__(self): time.sleep(0.01) ''' ################################## # 7、QMI8658 # ################################## class QMI8658(object): def __init__(self, smbus=2, address=0X6B): self._address = address import smbus2 self._bus = smbus2.SMBus(smbus) bRet = self.WhoAmI() if bRet: self.Read_Revision() else: return None self.Config_apply() def _read_byte(self, cmd): rec = self._bus.read_i2c_block_data(int(self._address), int(cmd), 1) return rec[0] def _read_block(self, reg, length=1): rec = self._bus.read_i2c_block_data(int(self._address), int(reg), length) return rec def _read_u16(self, cmd): LSB = self._bus.read_i2c_block_data(int(self._address), int(cmd), 1) MSB = self._bus.read_i2c_block_data(int(self._address), int(cmd)+1, 1) return (MSB[0] << 8) + LSB[0] def _write_byte(self, cmd, val): self._bus.write_i2c_block_data(int(self._address), int(cmd), bytes([int(val)])) def WhoAmI(self): bRet = False if (0x05) == self._read_byte(0x00): bRet = True return bRet def Read_Revision(self): return self._read_byte(0x01) def Config_apply(self): # REG CTRL1 self._write_byte(0x02, 0x60) # REG CTRL2 : QMI8658AccRange_8g and QMI8658AccOdr_1000Hz self._write_byte(0x03, 0x23) # REG CTRL3 : QMI8658GyrRange_512dps and QMI8658GyrOdr_1000Hz self._write_byte(0x04, 0x53) # REG CTRL4 : No self._write_byte(0x05, 0x00) # REG CTRL5 : Enable Gyroscope And Accelerometer Low-Pass Filter self._write_byte(0x06, 0x11) # REG CTRL6 : Disables Motion on Demand. self._write_byte(0x07, 0x00) # REG CTRL7 : Enable Gyroscope And Accelerometer self._write_byte(0x08, 0x03) def Read_Raw_XYZ(self): xyz = [0, 0, 0, 0, 0, 0] vals={} raw_timestamp = self._read_block(0x30, 3) raw_acc_xyz = self._read_block(0x35, 6) raw_gyro_xyz = self._read_block(0x3b, 6) raw_xyz = self._read_block(0x35, 12) timestamp = (raw_timestamp[2] << 16) | ( raw_timestamp[1] << 8) | (raw_timestamp[0]) for i in range(6): xyz[i] = (raw_xyz[(i*2)+1] << 8) | (raw_xyz[i*2]) if xyz[i] >= 32767: xyz[i] = xyz[i]-65535 vals["AcX"]=xyz[0] vals["AcY"]=xyz[1] vals["AcZ"]=xyz[2] vals["GyX"]=xyz[3] vals["GyY"]=xyz[4] vals["GyZ"]=xyz[5] return vals def Read_XYZ(self): xyz = [0, 0, 0, 0, 0, 0] raw_xyz = self.Read_Raw_XYZ() #QMI8658AccRange_8g acc_lsb_div = (1 << 12) #QMI8658GyrRange_512dps gyro_lsb_div = 64 for i in range(3): xyz[i] = raw_xyz[i]/acc_lsb_div # (acc_lsb_div/1000.0) xyz[i+3] = raw_xyz[i+3]*1.0/gyro_lsb_div return xyz def get_accel(self, samples=10, calibration=None): result = {} for _ in range(samples): v = self.Read_Raw_XYZ() for m in v.keys(): result[m] = result.get(m, 0) + v[m] / samples if calibration: for m in calibration.keys(): if m == "AcZ": pass else: result[m] -= calibration[m] return result def calibrate(self, threshold=50): print('Calibrating QMI8658... ', end='') while True: v1 = self.get_accel(100) v2 = self.get_accel(100) if all(abs(v1[m] - v2[m]) < threshold for m in v1.keys()): print('Done.') return v1 def getPitchYawRollGxGyGz(self,calibration): caliData=self.get_accel(10, calibration) pitch=caliData["AcX"]/45.51 yaw=caliData["AcY"]/45.51 roll=caliData["AcZ"] groX=caliData["GyX"] groY=caliData["GyY"] groZ=caliData["GyZ"] if roll<0: if pitch >=0: pitch=180-pitch elif pitch <0: pitch=(180+pitch)*(-1) if yaw >=0: yaw=180-yaw elif pitch <0: yaw=(180+yaw)*(-1) return [pitch,yaw,roll,groX,groY,groZ] import smbus2 import time class stm8s(object): bus = smbus2.SMBus(2) # 2 indicates /dev/i2c-2 address = 0x50 # 0x00 # 触发配置 # 0x01 # 重置配置 # 0x02 # pwm0 历史配置 # 0x03 # pwm1 历史配置 # 引脚配置模式有 1. pwm 2. gpio ouput gpio input 3. adc 4.ws2812_singe 5 ws2812_multi def __init__(self): self.reset() time.sleep(0.05) pass def clear(self): self.write(1, 1) self.reset() time.sleep(0.05) def write(self, addr, val): for i in range(0, 3): try: self.bus.write_byte_data(self.address, addr, val) time.sleep(0.001) # 1ms # print(addr, val) # debug return True except Exception: time.sleep(0.01) continue return False def read(self, addr): for i in range(0, 3): try: tmp = self.bus.read_byte_data(self.address, addr) time.sleep(0.001) # 1ms # print(addr, tmp) # debug return tmp except Exception: time.sleep(0.01) continue return None def reset(self): self.write(0, 1) time.sleep(0.05) # 重启并配置需要时间 def dump(self): for i in range(0, 32): print(i, self.read(i)) class singleRgb(stm8s): def __init__(self): self.valR=0 self.valG=0 self.valB=0 self.brightness=255 self.show() def setColor(self,r,g,b): if(r>=0 and r<=255): self.valR=int(r*self.brightness/255) else: self.valR=0 if(g>=0 and g<=255): self.valG=int(g*self.brightness/255) else: self.valG=0 if(b>=0 and b<=255): self.valB=int(b*self.brightness/255) else: self.valB=0 def setBrightness(self,brightness): if(brightness>=0 and brightness<=255): self.brightness=brightness else: self.brightness=0 def show(self): self.write(31, self.valB) self.write(30, self.valG) self.write(29, self.valR) self.write(28, 6) class dcMotor(stm8s): def __init__(self,id): self.id=id def m1a(self,val=0): self.write(4,5) #M1A #self.write(5,0) #self.write(6,0) self.write(7,val) def m1b(self,val=0): self.write(8,5) #M1B #self.write(9,0) #self.write(10,0) self.write(11,val) def m2a(self,val=0): self.write(12,5) #M2A #self.write(13,0) #self.write(14,0) self.write(15,val) def m2b(self,val=0): self.write(16,5) #M2B #self.write(17,0) #self.write(18,0) self.write(19,val) def dcMotorCtrl(self,dir,speed): #假如不加下列代码,会出现以下现象: #当输入为0~9或253~255时,电机运动时会发生突然变速的抖动 dir=1-dir if(0<=speed and speed<=255): if speed==253 or speed==254 or speed==255: speed=252 elif speed >=0 and speed<=10: speed=10 else: speed=speed if(self.id ==1): #Motor1 if(dir==0): self.m1a(speed) self.m1b(255) else: self.m1a(255) self.m1b(speed) elif(self.id ==2): if(dir==0): self.m2a(speed) self.m2b(255) else: self.m2a(255) self.m2b(speed) else: pass class multiFuncGpio(stm8s): def __init__(self,id=0,mode=0): self.id=id self.rgbId=0 self.brightness=255 self.mode=mode self.write(20+self.id*4,self.mode) self.reset() time.sleep(0.05) ####################################################### # mode | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 #------------------------------------------------------ # Func |free |servo|INPUT| OUT | ADC | PWM | BEEP|PIXEL ####################################################### # mode=0,为空闲状态,引脚不执行任何功能 # mode=1,为控制舵机模式,值控制范围为0°~180° # mode=2,为数字输入模式,返回值0或1 # mode=3,为数字输出模式,输出值为0或1 # mode=4,为ADC采样模式,返回值为0~1023 # mode=5,为PWM输出模式,输出值范围为0~255 # mode=6,为控制蜂鸣器模式,控制范围为20hz~12000hz # mode=7,为控制灯带模式,每个引脚可控制16颗RGB灯 # id=0,控制多功能引脚1,向20号寄存器写入模式数据 # id=1,控制多功能引脚2,向24号寄存器写入模式数据 def servoCtrl(self,angle): if ((angle>=0) and (angle<=180)): self.write(23+self.id*4,angle) else: pass def digitalRead(self): try: self.gpioVal= self.read(23+ self.id*4) if(self.gpioVal != None): return 1-self.gpioVal except: print("ERROR 233") def digitalWrite(self,val): if ((val>=0) and (val<=1)): self.write(23+self.id*4,val) else: pass def analogRead(self): try: self.adcValH= self.read(22+ self.id*4) self.adcValL= self.read(23+ self.id*4) if (self.adcValH != None and self.adcValL != None): # adcValH adcValL 大小端合并 16bit self.adcVal = self.adcValL + (self.adcValH << 8) return self.adcVal except: print("ERROR 2333") def analogWrite(self,val): if ((val>=0) and (val<=255)): self.write(23+self.id*4,val) else: pass def beep(self,frequency): if ((frequency>=20) and (frequency<=12000)): self.write(22+self.id*4,frequency>>8) self.write(23+self.id*4,frequency&0b11111111) else: pass def pixelInit_(self): for i in range(16): self.setPixelColor(i,0,0,0) self.pixelShow() def setBrightness(self,brightness): if(brightness>=0 and brightness<=255): self.brightness=brightness else: self.brightness=0 def setPixelColor(self,rgbId,r,g,b): if(rgbId >=0 and rgbId <=15): self.rgbId=rgbId #rgbId范围:0~15 else: self.rgbId=0 if(r>=0 and r<=255): self.valR=int(r*self.brightness/255) else: self.valR=0 if(g>=0 and g<=255): self.valG=int(g*self.brightness/255) else: self.valG=0 if(b>=0 and b<=255): self.valB=int(b*self.brightness/255) else: self.valB=0 #self.id的值为0或1,用于控制灯带缓冲区(S1或者S2引脚) def pixelShow(self): self.write(32+ self.id*48+self.rgbId*3, self.valR) self.write(33+ self.id*48+self.rgbId*3, self.valG) self.write(34+ self.id*48+self.rgbId*3, self.valB) class UvcVideo(camera.MaixVideo): def __init__(self, source="/dev/videoX"): self.source = source super(UvcVideo, self).__init__() import os, time usb_path = '/sys/devices/platform/soc/usbc0/otg_role' # 确认 usb_path 内容为 usb_host 如果不是,自动设置为 usb_host if os.popen('cat %s' % usb_path).read().strip() != 'usb_host': os.system('echo "usb_host" > %s' % usb_path) time.sleep(2) # 要设置 2s 左右才能初始化工作,否则会报 VIDIOC_S_FMT 失败。 def config(self, size=None, video=2, horizontal=0, vertical=0): if size == None: size = (320, 240) super(UvcVideo, self).config(size) print('[camera] config input size(%d, %d, %d)' % (self.width(), self.height(), video)) if self.cam: self.cam = None try: import _coco_camera self.cam = _coco_camera.Camera(self.width(), self.height(), video, horizontal, vertical) except Exception as e: print(e) self.cam = None def read(self): if self.cam == None: self.config() if self.cam: ret, frame = self.cam.read() if ret: return frame # bytes else: try: self.config() except Exception as e: print(e) return None def __del__(self): if self.cam: self.cam = None