#!/usr/bin/env python #version : 2024.03.07 #language : en from maix import display from maix import image from maix import camera import pyaudio import wave import os from maix import mjpg from maix import utils import base64 import time import requests import json import uuid import threading import socket import uuid import sys sys.path.append('/root/') from CocoPi import BUTTON key_B = BUTTON(8) key_C = BUTTON(13) key_D = BUTTON(7) camera.camera.config(size=(320,240)) image.load_freetype("/root/preset/fonts/simhei.ttf") canvas = image.new(size = (320, 240)) # 录音 WAVE_OUTPUT_FILENAME = "/root/user/audio/record.wav" def streamVoice(): global WAVE_OUTPUT_FILENAME print(os.path.exists(WAVE_OUTPUT_FILENAME)) if(os.path.exists(WAVE_OUTPUT_FILENAME)): os.system("rm "+WAVE_OUTPUT_FILENAME) CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 RECORD_SECONDS = 10 p = pyaudio.PyAudio() stream = p.open(format=FORMAT,channels=CHANNELS,rate=RATE,input=True,frames_per_buffer=CHUNK) canvas.clear() canvas.draw_string(0,0, "Start", scale = 1, color = (0,255,255) , thickness = 1) display.show(canvas) print("* recording") frames = [] for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): data = stream.read(CHUNK,exception_on_overflow = False) frames.append(data) print("* done recording") canvas.clear() canvas.draw_string(0,0, "End", scale = 1, color = (0,255,255) , thickness = 1) display.show(canvas) stream.stop_stream() stream.close() p.terminate() wf = wave.open(WAVE_OUTPUT_FILENAME, "wb") wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b"".join(frames)) wf.close() # 获取GPT 回答结果 def get_post_chatgpt(datas,uid): global list_num # 目标URL url = "https://gpt4.cocorobo.cn/v831AskForTopicNew" payload = json.dumps({"topic": datas}) headers={ "Content-Type":"application/json" } response=requests.request("POST", url, headers=headers, data=payload) if response.status_code == 200: result = json.loads(response.text)["FunctionResponse"]["result"] url = "https://gpt4.cocorobo.cn/getV831Audio" payload = json.dumps({"input": result,"response_format":"mp3","voice": "shimmer","uid":uid}) headers={ "Content-Type":"application/json" } response=requests.request("POST", url, headers=headers, data=payload) if response.status_code == 200: print(json.loads(response.text)["FunctionResponse"]["url"]) a = "https://gpt4.cocorobo.cn" + json.loads(response.text)["FunctionResponse"]["url"] r = requests.get(a) with open("/root/preset/audio/" + uid+".wav","wb") as f: f.write(r.content) return result else: return "" # 线程1 def thread_calsss_fun1(): global isStateVoice,voiceResult while True: if isStateVoice==1: streamVoice() time.sleep(0.5) isStateVoice=0 elif isStateVoice == 2: voiceResult = getVoiceResult() if len(voiceResult): getImage(voiceResult) # 关闭线程 # CocoPiThread1.join() def getVoiceResult(): global WAVE_OUTPUT_FILENAME,voiceResult voiceData = open(WAVE_OUTPUT_FILENAME,'rb') data = {"file": voiceData} param = {} res = requests.post("https://gpt4.cocorobo.cn/transcribe_file_stream", files=data, data=param) try: voiceResult = (json.loads(res.text))["FunctionResponse"] except: voiceResult = "" print("voiceResult:"+str(voiceResult)) return voiceResult def getImage(text): global imgUrlStr,isStateVoice uid = str(uuid.uuid1().hex) param = { "size": "1024x1024", "quality": "hd", "n": 1, "prompt": text, "style":"natural", "uid": uid } res = requests.post("https://gpt4.cocorobo.cn/getImageV831", json=param) try: url = (json.loads(res.text))["FunctionResponse"]["image_url_list"][0] r = requests.get("https://gpt4.cocorobo.cn"+url) with open("/root/user/img/" + uid + ".png","wb") as f: f.write(r.content) imgUrlStr = "/root/user/img/" + uid + ".png" isStateVoice = 3 except: isStateVoice = 3 pass # 判断联网 def wifi_is_content(): global getDateNum cmd = "ifconfig" res = os.popen(cmd).read() data = False if res.rfind("inet addr:")!=48: data = True return data def getPrivateIp(): st = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) try: st.connect(("10.255.255.255",1)) IP = st.getsockname()[0] except Exception: IP = "127.0.0.1" finally: st.close() return IP voiceResult = "" keyboardStr = [] isStateVoice = 0 # 变量为0开始录音,1 GPT 识别录音结果 ScreenOrientation = False CAMERAROTATE = +90 ssidInfo = "" passwordInfo = "" checkConnectState = False connectText = "" startConnect=False wifiConnectState = False # PublicIp="" PrivateIP="" runConnectSig=True connectSuccessSig=False while True: if wifi_is_content(): break else: canvas = camera.capture() if ssidInfo!="" and passwordInfo!="": startConnect=True #connectText = "Waitting for Connection..." #canvas.draw_string(10,40, ssidInfo+" "+str(len(ssidInfo))+" "+str(type(ssidInfo)), scale = 1.5, color = (0,0,0), thickness = 1) canvas.draw_string(10,50, "Connecting to wifi...", scale = 1.5, color = (0,0,0), thickness = 1) display.show(canvas) if startConnect==True: canvas_1 = image.new(size = (320, 320), color = (255,255,255), mode = "RGB") canvas.draw_image(canvas_1,0,0, alpha=0.4) if wifiConnectState == True: if key_B.is_pressed(): while (key_B.is_pressed() == True): time.sleep(0.001) startConnect= False connectText = "" checkConnectState = False connectSuccessSig = False wifiConnectState = False runConnectSig== True ssidInfo="" passwordInfo="" if checkConnectState == True: if connectSuccessSig == False: # PublicIp=getPublicIp() connectSuccessSig = True connectText = "Wifi connection successfully!" # canvas.draw_string(10,80, "The WLAN PUBLIC IP:" + PublicIp, scale = 1.5, color = (0,0,0), thickness = 1) canvas.draw_string(10,110, "THE WLAN PRIVATE IP:" + PrivateIP, scale = 1.5, color = (0,0,0), thickness = 1) # canvas.draw_image((image.open("/root/preset/img/restart_ff0000_24x24.png")).rotate(0, adjust=0),8,216,alpha=1) ssidInfo="" passwordInfo="" canvas.clear() break else: pass else: if checkConnectState == False: os.system("wifi_disconnect_ap_test") os.system('wifi_connect_chinese_ap_test '+ssidInfo+' '+passwordInfo+'') checkConnectState = True if checkConnectState == True: if key_B.is_pressed(): while (key_B.is_pressed() == True): time.sleep(0.001) startConnect= False connectText = "" checkConnectState = False connectSuccessSig = False wifiConnectState = False runConnectSig== True passwordInfo = "" ssidInfo = "" else: if runConnectSig== True: connectText = "Connecting to wifi,please wait..." runConnectSig= False else: passwordInfo = "" ssidInfo = "" connectText = "Wifi connection failed!" canvas.draw_image((image.open("/root/preset/img/restart_ff0000_24x24.png")).rotate(0, adjust=0),8,216,alpha=1) canvas.draw_string(10,50, connectText, scale = 1.5, color = (0,0,0) , thickness = 1) else: mks = canvas.find_qrcodes() for mk in mks: #外框数据 X = mk['x'] Y = mk['y'] W = mk['w'] H = mk['h'] #二维码信息 string = mk['payload'] codeData = string.split(";") ssidInfo = codeData[0].split(":")[1] passwordInfo = codeData[1].split(":")[1] #画外框 canvas.draw_rectangle(X, Y, X + W, Y + H, color=(0, 0, 255), thickness = 2) #打印信息 canvas.draw_string(int(X) , int(Y - 45) , "wifi name:"+ssidInfo, scale = 1, color = (255, 0, 0), thickness = 2) #内框ID canvas.draw_string(int(X) , int(Y - 25) , "password:"+passwordInfo, scale = 1, color = (255, 0, 0), thickness = 2) #内框ID canvas.draw_image((image.open("/root/preset/img/exit_ff0000_24x24.png")).rotate(0, adjust=0),288,216,alpha=1) display.show(canvas) imgUrlStr = "" # 线程1 CocoPiThread1 = threading.Thread(target=thread_calsss_fun1) # 开启线程 CocoPiThread1.start() while True: if key_C.is_pressed(): isStateVoice = 2 voiceResult = "" elif key_D.is_pressed(): isStateVoice = 1 imgUrlStr = "" if isStateVoice == 2: canvas.clear() canvas.draw_string(0,0, "Picture generation in...", scale = 1, color = (0,255,255) , thickness = 1) elif isStateVoice != 2 and isStateVoice != 1: canvas.clear() if len(imgUrlStr)>0: canvas.draw_image((image.open(imgUrlStr)).rotate(0, adjust=0),0,0,alpha=1) canvas.draw_image(image.open("/root/preset/img/recording.png"),0 , 2,alpha=1) canvas.draw_string(250,0, "C Chart", scale = 1, color = (0,255,255) , thickness = 1) display.show(canvas)