37_speechToImageSynthesis.py 11 KB


  1. from maix import display
  2. from maix import image
  3. from maix import camera
  4. import pyaudio
  5. import wave
  6. import os
  7. from maix import mjpg
  8. from maix import utils
  9. import base64
  10. import time
  11. import requests
  12. import json
  13. import uuid
  14. import threading
  15. import socket
  16. import uuid
  17. import sys
  18. sys.path.append('/root/')
  19. from CocoPi import BUTTON
  20. key_B = BUTTON(8)
  21. key_C = BUTTON(13)
  22. key_D = BUTTON(7)
  23. camera.camera.config(size=(320,240))
  24. image.load_freetype("/root/preset/fonts/SourceHanSansCN-Regular.otf")
  25. canvas = image.new(size = (320, 240))
  26. # 录音
  27. WAVE_OUTPUT_FILENAME = "/root/user/audio/record.wav"
  28. def streamVoice():
  29. global WAVE_OUTPUT_FILENAME
  30. print(os.path.exists(WAVE_OUTPUT_FILENAME))
  31. if(os.path.exists(WAVE_OUTPUT_FILENAME)):
  32. os.system("rm "+WAVE_OUTPUT_FILENAME)
  33. CHUNK = 1024
  34. FORMAT = pyaudio.paInt16
  35. CHANNELS = 1
  36. RATE = 16000
  37. RECORD_SECONDS = 10
  38. p = pyaudio.PyAudio()
  39. stream = p.open(format=FORMAT,channels=CHANNELS,rate=RATE,input=True,frames_per_buffer=CHUNK)
  40. canvas.clear()
  41. canvas.draw_string(0,0, "Start", scale = 1, color = (0,255,255) , thickness = 1)
  42. display.show(canvas)
  43. print("* recording")
  44. frames = []
  45. for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
  46. data = stream.read(CHUNK,exception_on_overflow = False)
  47. frames.append(data)
  48. print("* done recording")
  49. canvas.clear()
  50. canvas.draw_string(0,0, "End", scale = 1, color = (0,255,255) , thickness = 1)
  51. display.show(canvas)
  52. stream.stop_stream()
  53. stream.close()
  54. p.terminate()
  55. wf = wave.open(WAVE_OUTPUT_FILENAME, "wb")
  56. wf.setnchannels(CHANNELS)
  57. wf.setsampwidth(p.get_sample_size(FORMAT))
  58. wf.setframerate(RATE)
  59. wf.writeframes(b"".join(frames))
  60. wf.close()
  61. # 获取GPT 回答结果
  62. def get_post_chatgpt(datas,uid):
  63. global list_num
  64. # 目标URL
  65. url = "https://gpt4.cocorobo.cn/v831AskForTopicNew"
  66. payload = json.dumps({"topic": datas})
  67. headers={
  68. "Content-Type":"application/json"
  69. }
  70. response=requests.request("POST", url, headers=headers, data=payload)
  71. if response.status_code == 200:
  72. result = json.loads(response.text)["FunctionResponse"]["result"]
  73. url = "https://gpt4.cocorobo.cn/getV831Audio"
  74. payload = json.dumps({"input": result,"response_format":"mp3","voice": "shimmer","uid":uid})
  75. headers={
  76. "Content-Type":"application/json"
  77. }
  78. response=requests.request("POST", url, headers=headers, data=payload)
  79. if response.status_code == 200:
  80. print(json.loads(response.text)["FunctionResponse"]["url"])
  81. a = "https://gpt4.cocorobo.cn" + json.loads(response.text)["FunctionResponse"]["url"]
  82. r = requests.get(a)
  83. with open("/root/preset/audio/" + uid+".wav","wb") as f:
  84. f.write(r.content)
  85. return result
  86. else:
  87. return ""
  88. # 线程1
  89. def thread_calsss_fun1():
  90. global isStateVoice,voiceResult
  91. while True:
  92. if isStateVoice==1:
  93. streamVoice()
  94. time.sleep(0.5)
  95. isStateVoice=0
  96. elif isStateVoice == 2:
  97. voiceResult = getVoiceResult()
  98. if len(voiceResult):
  99. getImage(voiceResult)
  100. # 关闭线程
  101. # CocoPiThread1.join()
  102. def getVoiceResult():
  103. global WAVE_OUTPUT_FILENAME,voiceResult
  104. voiceData = open(WAVE_OUTPUT_FILENAME,'rb')
  105. data = {"file": voiceData}
  106. param = {}
  107. res = requests.post("https://gpt4.cocorobo.cn/transcribe_file_stream", files=data, data=param)
  108. try:
  109. voiceResult = (json.loads(res.text))["FunctionResponse"]
  110. except:
  111. voiceResult = ""
  112. print("voiceResult:"+str(voiceResult))
  113. return voiceResult
  114. def getImage(text):
  115. global imgUrlStr,isStateVoice
  116. uid = str(uuid.uuid1().hex)
  117. param = {
  118. "size": "1024x1024",
  119. "quality": "hd",
  120. "n": 1,
  121. "prompt": text,
  122. "style":"natural",
  123. "uid": uid
  124. }
  125. res = requests.post("https://gpt4.cocorobo.cn/getImageV831", json=param)
  126. try:
  127. url = (json.loads(res.text))["FunctionResponse"]["image_url_list"][0]
  128. r = requests.get("https://gpt4.cocorobo.cn"+url)
  129. with open("/root/user/img/" + uid + ".png","wb") as f:
  130. f.write(r.content)
  131. imgUrlStr = "/root/user/img/" + uid + ".png"
  132. isStateVoice = 3
  133. except:
  134. isStateVoice = 3
  135. pass
  136. # 判断联网
  137. def wifi_is_content():
  138. global getDateNum
  139. cmd = "ifconfig"
  140. res = os.popen(cmd).read()
  141. data = False
  142. if res.rfind("inet addr:")!=48:
  143. data = True
  144. return data
  145. def getPrivateIp():
  146. st = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
  147. try:
  148. st.connect(("10.255.255.255",1))
  149. IP = st.getsockname()[0]
  150. except Exception:
  151. IP = "127.0.0.1"
  152. finally:
  153. st.close()
  154. return IP
  155. voiceResult = ""
  156. keyboardStr = []
  157. isStateVoice = 0 # 变量为0开始录音,1 GPT 识别录音结果
  158. ScreenOrientation = False
  159. CAMERAROTATE = +90
  160. ssidInfo = ""
  161. passwordInfo = ""
  162. checkConnectState = False
  163. connectText = ""
  164. startConnect=False
  165. wifiConnectState = False
  166. # PublicIp=""
  167. PrivateIP=""
  168. runConnectSig=True
  169. connectSuccessSig=False
  170. while True:
  171. if wifi_is_content():
  172. break
  173. else:
  174. canvas = camera.capture()
  175. if ssidInfo!="" and passwordInfo!="":
  176. startConnect=True
  177. #connectText = "Waitting for Connection..."
  178. #canvas.draw_string(10,40, ssidInfo+" "+str(len(ssidInfo))+" "+str(type(ssidInfo)), scale = 1.5, color = (0,0,0), thickness = 1)
  179. canvas.draw_string(10,50, "正在连接WiFi...", scale = 1.5, color = (0,0,0), thickness = 1)
  180. display.show(canvas)
  181. if startConnect==True:
  182. canvas_1 = image.new(size = (320, 320), color = (255,255,255), mode = "RGB")
  183. canvas.draw_image(canvas_1,0,0, alpha=0.4)
  184. if wifiConnectState == True:
  185. if key_B.is_pressed():
  186. while (key_B.is_pressed() == True):
  187. time.sleep(0.001)
  188. startConnect= False
  189. connectText = ""
  190. checkConnectState = False
  191. connectSuccessSig = False
  192. wifiConnectState = False
  193. runConnectSig== True
  194. ssidInfo=""
  195. passwordInfo=""
  196. if checkConnectState == True:
  197. if connectSuccessSig == False:
  198. # PublicIp=getPublicIp()
  199. connectSuccessSig = True
  200. connectText = "Wifi connection successfully!"
  201. # canvas.draw_string(10,80, "The WLAN PUBLIC IP:" + PublicIp, scale = 1.5, color = (0,0,0), thickness = 1)
  202. canvas.draw_string(10,110, "局域网IP:" + PrivateIP, scale = 1.5, color = (0,0,0), thickness = 1)
  203. # canvas.draw_image((image.open("/root/preset/img/restart_ff0000_24x24.png")).rotate(0, adjust=0),8,216,alpha=1)
  204. ssidInfo=""
  205. passwordInfo=""
  206. canvas.clear()
  207. break
  208. else:
  209. pass
  210. else:
  211. if checkConnectState == False:
  212. os.system("wifi_disconnect_ap_test")
  213. os.system('wifi_connect_chinese_ap_test '+ssidInfo+' '+passwordInfo+'')
  214. checkConnectState = True
  215. if checkConnectState == True:
  216. if key_B.is_pressed():
  217. while (key_B.is_pressed() == True):
  218. time.sleep(0.001)
  219. startConnect= False
  220. connectText = ""
  221. checkConnectState = False
  222. connectSuccessSig = False
  223. wifiConnectState = False
  224. runConnectSig== True
  225. passwordInfo = ""
  226. ssidInfo = ""
  227. else:
  228. if runConnectSig== True:
  229. connectText = "正在连接WiFi,请等待..."
  230. runConnectSig= False
  231. else:
  232. passwordInfo = ""
  233. ssidInfo = ""
  234. connectText = "WiFi连接失败!"
  235. canvas.draw_image((image.open("/root/preset/img/restart_ff0000_24x24.png")).rotate(0, adjust=0),8,216,alpha=1)
  236. canvas.draw_string(10,50, connectText, scale = 1.5, color = (0,0,0) , thickness = 1)
  237. else:
  238. mks = canvas.find_qrcodes()
  239. for mk in mks:
  240. #外框数据
  241. X = mk['x']
  242. Y = mk['y']
  243. W = mk['w']
  244. H = mk['h']
  245. #二维码信息
  246. string = mk['payload']
  247. codeData = string.split(";")
  248. ssidInfo = codeData[0].split(":")[1]
  249. passwordInfo = codeData[1].split(":")[1]
  250. #画外框
  251. canvas.draw_rectangle(X, Y, X + W, Y + H, color=(0, 0, 255), thickness = 2)
  252. #打印信息
  253. canvas.draw_string(int(X) , int(Y - 45) , "wifi name:"+ssidInfo, scale = 1, color = (255, 0, 0), thickness = 2) #内框ID
  254. canvas.draw_string(int(X) , int(Y - 25) , "password:"+passwordInfo, scale = 1, color = (255, 0, 0), thickness = 2) #内框ID
  255. canvas.draw_image((image.open("/root/preset/img/exit_ff0000_24x24.png")).rotate(0, adjust=0),288,216,alpha=1)
  256. display.show(canvas)
  257. imgUrlStr = ""
  258. # 线程1
  259. CocoPiThread1 = threading.Thread(target=thread_calsss_fun1)
  260. # 开启线程
  261. CocoPiThread1.start()
  262. while True:
  263. if key_C.is_pressed():
  264. isStateVoice = 2
  265. voiceResult = ""
  266. elif key_D.is_pressed():
  267. isStateVoice = 1
  268. imgUrlStr = ""
  269. if isStateVoice == 2:
  270. canvas.clear()
  271. canvas.draw_string(0,0, "图片生成中...", scale = 1, color = (0,255,255) , thickness = 1)
  272. elif isStateVoice != 2 and isStateVoice != 1:
  273. canvas.clear()
  274. if len(imgUrlStr)>0:
  275. canvas.draw_image((image.open(imgUrlStr)).rotate(0, adjust=0),0,0,alpha=1)
  276. canvas.draw_image(image.open("/root/preset/img/recording.png"),0 , 2,alpha=1)
  277. canvas.draw_string(250,0, "C Chart", scale = 1, color = (0,255,255) , thickness = 1)
  278. display.show(canvas)