from maix import display from maix import image from maix import camera import pyaudio import wave import os from maix import mjpg from maix import utils import base64 import time import requests import json import uuid import threading import socket import uuid import sys sys.path.append('/root/') from CocoPi import BUTTON key_B = BUTTON(8) key_C = BUTTON(13) key_D = BUTTON(7) camera.camera.config(size=(320,240)) image.load_freetype("/root/preset/fonts/SourceHanSansCN-Regular.otf") canvas = image.new(size = (320, 240)) # 录音 WAVE_OUTPUT_FILENAME = "/root/user/audio/record.wav" def streamVoice(): global WAVE_OUTPUT_FILENAME print(os.path.exists(WAVE_OUTPUT_FILENAME)) if(os.path.exists(WAVE_OUTPUT_FILENAME)): os.system("rm "+WAVE_OUTPUT_FILENAME) CHUNK = 1024 FORMAT = pyaudio.paInt16 CHANNELS = 1 RATE = 16000 RECORD_SECONDS = 10 p = pyaudio.PyAudio() stream = p.open(format=FORMAT,channels=CHANNELS,rate=RATE,input=True,frames_per_buffer=CHUNK) canvas.clear() canvas.draw_string(0,0, "Start", scale = 1, color = (0,255,255) , thickness = 1) display.show(canvas) print("* recording") frames = [] for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)): data = stream.read(CHUNK,exception_on_overflow = False) frames.append(data) print("* done recording") canvas.clear() canvas.draw_string(0,0, "End", scale = 1, color = (0,255,255) , thickness = 1) display.show(canvas) stream.stop_stream() stream.close() p.terminate() wf = wave.open(WAVE_OUTPUT_FILENAME, "wb") wf.setnchannels(CHANNELS) wf.setsampwidth(p.get_sample_size(FORMAT)) wf.setframerate(RATE) wf.writeframes(b"".join(frames)) wf.close() # 获取GPT 回答结果 def get_post_chatgpt(datas,uid): global list_num # 目标URL url = "https://gpt4.cocorobo.cn/v831AskForTopicNew" payload = json.dumps({"topic": datas}) headers={ "Content-Type":"application/json" } response=requests.request("POST", url, headers=headers, data=payload) if response.status_code == 200: result = json.loads(response.text)["FunctionResponse"]["result"] url = "https://gpt4.cocorobo.cn/getV831Audio" payload = json.dumps({"input": result,"response_format":"mp3","voice": "shimmer","uid":uid}) headers={ "Content-Type":"application/json" } response=requests.request("POST", url, headers=headers, data=payload) if response.status_code == 200: print(json.loads(response.text)["FunctionResponse"]["url"]) a = "https://gpt4.cocorobo.cn" + json.loads(response.text)["FunctionResponse"]["url"] r = requests.get(a) with open("/root/preset/audio/" + uid+".wav","wb") as f: f.write(r.content) return result else: return "" # 线程1 def thread_calsss_fun1(): global isStateVoice,voiceResult while True: if isStateVoice==1: streamVoice() time.sleep(0.5) isStateVoice=0 elif isStateVoice == 2: voiceResult = getVoiceResult() if len(voiceResult): getImage(voiceResult) # 关闭线程 # CocoPiThread1.join() def getVoiceResult(): global WAVE_OUTPUT_FILENAME,voiceResult voiceData = open(WAVE_OUTPUT_FILENAME,'rb') data = {"file": voiceData} param = {} res = requests.post("https://gpt4.cocorobo.cn/transcribe_file_stream", files=data, data=param) try: voiceResult = (json.loads(res.text))["FunctionResponse"] except: voiceResult = "" print("voiceResult:"+str(voiceResult)) return voiceResult def getImage(text): global imgUrlStr,isStateVoice uid = str(uuid.uuid1().hex) param = { "size": "1024x1024", "quality": "hd", "n": 1, "prompt": text, "style":"natural", "uid": uid } res = requests.post("https://gpt4.cocorobo.cn/getImageV831", json=param) try: url = (json.loads(res.text))["FunctionResponse"]["image_url_list"][0] r = requests.get("https://gpt4.cocorobo.cn"+url) with open("/root/user/img/" + uid + ".png","wb") as f: f.write(r.content) imgUrlStr = "/root/user/img/" + uid + ".png" isStateVoice = 3 except: isStateVoice = 3 pass # 判断联网 def wifi_is_content(): global getDateNum cmd = "ifconfig" res = os.popen(cmd).read() data = False if res.rfind("inet addr:")!=48: data = True return data def getPrivateIp(): st = socket.socket(socket.AF_INET,socket.SOCK_DGRAM) try: st.connect(("10.255.255.255",1)) IP = st.getsockname()[0] except Exception: IP = "127.0.0.1" finally: st.close() return IP voiceResult = "" keyboardStr = [] isStateVoice = 0 # 变量为0开始录音,1 GPT 识别录音结果 ScreenOrientation = False CAMERAROTATE = +90 ssidInfo = "" passwordInfo = "" checkConnectState = False connectText = "" startConnect=False wifiConnectState = False # PublicIp="" PrivateIP="" runConnectSig=True connectSuccessSig=False while True: if wifi_is_content(): break else: canvas = camera.capture() if ssidInfo!="" and passwordInfo!="": startConnect=True #connectText = "Waitting for Connection..." #canvas.draw_string(10,40, ssidInfo+" "+str(len(ssidInfo))+" "+str(type(ssidInfo)), scale = 1.5, color = (0,0,0), thickness = 1) canvas.draw_string(10,50, "正在连接WiFi...", scale = 1.5, color = (0,0,0), thickness = 1) display.show(canvas) if startConnect==True: canvas_1 = image.new(size = (320, 320), color = (255,255,255), mode = "RGB") canvas.draw_image(canvas_1,0,0, alpha=0.4) if wifiConnectState == True: if key_B.is_pressed(): while (key_B.is_pressed() == True): time.sleep(0.001) startConnect= False connectText = "" checkConnectState = False connectSuccessSig = False wifiConnectState = False runConnectSig== True ssidInfo="" passwordInfo="" if checkConnectState == True: if connectSuccessSig == False: # PublicIp=getPublicIp() connectSuccessSig = True connectText = "Wifi connection successfully!" # canvas.draw_string(10,80, "The WLAN PUBLIC IP:" + PublicIp, scale = 1.5, color = (0,0,0), thickness = 1) canvas.draw_string(10,110, "局域网IP:" + PrivateIP, scale = 1.5, color = (0,0,0), thickness = 1) # canvas.draw_image((image.open("/root/preset/img/restart_ff0000_24x24.png")).rotate(0, adjust=0),8,216,alpha=1) ssidInfo="" passwordInfo="" canvas.clear() break else: pass else: if checkConnectState == False: os.system("wifi_disconnect_ap_test") os.system('wifi_connect_chinese_ap_test '+ssidInfo+' '+passwordInfo+'') checkConnectState = True if checkConnectState == True: if key_B.is_pressed(): while (key_B.is_pressed() == True): time.sleep(0.001) startConnect= False connectText = "" checkConnectState = False connectSuccessSig = False wifiConnectState = False runConnectSig== True passwordInfo = "" ssidInfo = "" else: if runConnectSig== True: connectText = "正在连接WiFi,请等待..." runConnectSig= False else: passwordInfo = "" ssidInfo = "" connectText = "WiFi连接失败!" canvas.draw_image((image.open("/root/preset/img/restart_ff0000_24x24.png")).rotate(0, adjust=0),8,216,alpha=1) canvas.draw_string(10,50, connectText, scale = 1.5, color = (0,0,0) , thickness = 1) else: mks = canvas.find_qrcodes() for mk in mks: #外框数据 X = mk['x'] Y = mk['y'] W = mk['w'] H = mk['h'] #二维码信息 string = mk['payload'] codeData = string.split(";") ssidInfo = codeData[0].split(":")[1] passwordInfo = codeData[1].split(":")[1] #画外框 canvas.draw_rectangle(X, Y, X + W, Y + H, color=(0, 0, 255), thickness = 2) #打印信息 canvas.draw_string(int(X) , int(Y - 45) , "wifi name:"+ssidInfo, scale = 1, color = (255, 0, 0), thickness = 2) #内框ID canvas.draw_string(int(X) , int(Y - 25) , "password:"+passwordInfo, scale = 1, color = (255, 0, 0), thickness = 2) #内框ID canvas.draw_image((image.open("/root/preset/img/exit_ff0000_24x24.png")).rotate(0, adjust=0),288,216,alpha=1) display.show(canvas) imgUrlStr = "" # 线程1 CocoPiThread1 = threading.Thread(target=thread_calsss_fun1) # 开启线程 CocoPiThread1.start() while True: if key_C.is_pressed(): isStateVoice = 2 voiceResult = "" elif key_D.is_pressed(): isStateVoice = 1 imgUrlStr = "" if isStateVoice == 2: canvas.clear() canvas.draw_string(0,0, "图片生成中...", scale = 1, color = (0,255,255) , thickness = 1) elif isStateVoice != 2 and isStateVoice != 1: canvas.clear() if len(imgUrlStr)>0: canvas.draw_image((image.open(imgUrlStr)).rotate(0, adjust=0),0,0,alpha=1) canvas.draw_image(image.open("/root/preset/img/recording.png"),0 , 2,alpha=1) canvas.draw_string(250,0, "C Chart", scale = 1, color = (0,255,255) , thickness = 1) display.show(canvas)