37_speechToImageSynthesis.py 11 KB


  1. #!/usr/bin/env python
  2. #version : 2024.03.07
  3. #language : en
  4. from maix import display
  5. from maix import image
  6. from maix import camera
  7. import pyaudio
  8. import wave
  9. import os
  10. from maix import mjpg
  11. from maix import utils
  12. import base64
  13. import time
  14. import requests
  15. import json
  16. import uuid
  17. import threading
  18. import socket
  19. import uuid
  20. import sys
  21. sys.path.append('/root/')
  22. from CocoPi import BUTTON
  23. key_B = BUTTON(8)
  24. key_C = BUTTON(13)
  25. key_D = BUTTON(7)
  26. camera.camera.config(size=(320,240))
  27. image.load_freetype("/root/preset/fonts/simhei.ttf")
  28. canvas = image.new(size = (320, 240))
  29. # 录音
  30. WAVE_OUTPUT_FILENAME = "/root/user/audio/record.wav"
  31. def streamVoice():
  32. global WAVE_OUTPUT_FILENAME
  33. print(os.path.exists(WAVE_OUTPUT_FILENAME))
  34. if(os.path.exists(WAVE_OUTPUT_FILENAME)):
  35. os.system("rm "+WAVE_OUTPUT_FILENAME)
  36. CHUNK = 1024
  37. FORMAT = pyaudio.paInt16
  38. CHANNELS = 1
  39. RATE = 16000
  40. RECORD_SECONDS = 10
  41. p = pyaudio.PyAudio()
  42. stream = p.open(format=FORMAT,channels=CHANNELS,rate=RATE,input=True,frames_per_buffer=CHUNK)
  43. canvas.clear()
  44. canvas.draw_string(0,0, "Start", scale = 1, color = (0,255,255) , thickness = 1)
  45. display.show(canvas)
  46. print("* recording")
  47. frames = []
  48. for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
  49. data = stream.read(CHUNK,exception_on_overflow = False)
  50. frames.append(data)
  51. print("* done recording")
  52. canvas.clear()
  53. canvas.draw_string(0,0, "End", scale = 1, color = (0,255,255) , thickness = 1)
  54. display.show(canvas)
  55. stream.stop_stream()
  56. stream.close()
  57. p.terminate()
  58. wf = wave.open(WAVE_OUTPUT_FILENAME, "wb")
  59. wf.setnchannels(CHANNELS)
  60. wf.setsampwidth(p.get_sample_size(FORMAT))
  61. wf.setframerate(RATE)
  62. wf.writeframes(b"".join(frames))
  63. wf.close()
  64. # 获取GPT 回答结果
  65. def get_post_chatgpt(datas,uid):
  66. global list_num
  67. # 目标URL
  68. url = "https://gpt4.cocorobo.cn/v831AskForTopicNew"
  69. payload = json.dumps({"topic": datas})
  70. headers={
  71. "Content-Type":"application/json"
  72. }
  73. response=requests.request("POST", url, headers=headers, data=payload)
  74. if response.status_code == 200:
  75. result = json.loads(response.text)["FunctionResponse"]["result"]
  76. url = "https://gpt4.cocorobo.cn/getV831Audio"
  77. payload = json.dumps({"input": result,"response_format":"mp3","voice": "shimmer","uid":uid})
  78. headers={
  79. "Content-Type":"application/json"
  80. }
  81. response=requests.request("POST", url, headers=headers, data=payload)
  82. if response.status_code == 200:
  83. print(json.loads(response.text)["FunctionResponse"]["url"])
  84. a = "https://gpt4.cocorobo.cn" + json.loads(response.text)["FunctionResponse"]["url"]
  85. r = requests.get(a)
  86. with open("/root/preset/audio/" + uid+".wav","wb") as f:
  87. f.write(r.content)
  88. return result
  89. else:
  90. return ""
  91. # 线程1
  92. def thread_calsss_fun1():
  93. global isStateVoice,voiceResult
  94. while True:
  95. if isStateVoice==1:
  96. streamVoice()
  97. time.sleep(0.5)
  98. isStateVoice=0
  99. elif isStateVoice == 2:
  100. voiceResult = getVoiceResult()
  101. if len(voiceResult):
  102. getImage(voiceResult)
  103. # 关闭线程
  104. # CocoPiThread1.join()
  105. def getVoiceResult():
  106. global WAVE_OUTPUT_FILENAME,voiceResult
  107. voiceData = open(WAVE_OUTPUT_FILENAME,'rb')
  108. data = {"file": voiceData}
  109. param = {}
  110. res = requests.post("https://gpt4.cocorobo.cn/transcribe_file_stream", files=data, data=param)
  111. try:
  112. voiceResult = (json.loads(res.text))["FunctionResponse"]
  113. except:
  114. voiceResult = ""
  115. print("voiceResult:"+str(voiceResult))
  116. return voiceResult
  117. def getImage(text):
  118. global imgUrlStr,isStateVoice
  119. uid = str(uuid.uuid1().hex)
  120. param = {
  121. "size": "1024x1024",
  122. "quality": "hd",
  123. "n": 1,
  124. "prompt": text,
  125. "style":"natural",
  126. "uid": uid
  127. }
  128. res = requests.post("https://gpt4.cocorobo.cn/getImageV831", json=param)
  129. try:
  130. url = (json.loads(res.text))["FunctionResponse"]["image_url_list"][0]
  131. r = requests.get("https://gpt4.cocorobo.cn"+url)
  132. with open("/root/user/img/" + uid + ".png","wb") as f:
  133. f.write(r.content)
  134. imgUrlStr = "/root/user/img/" + uid + ".png"
  135. isStateVoice = 3
  136. except:
  137. isStateVoice = 3
  138. pass
  139. # 判断联网
  140. def wifi_is_content():
  141. global getDateNum
  142. cmd = "ifconfig"
  143. res = os.popen(cmd).read()
  144. data = False
  145. if res.rfind("inet addr:")!=48:
  146. data = True
  147. return data
  148. def getPrivateIp():
  149. st = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
  150. try:
  151. st.connect(("10.255.255.255",1))
  152. IP = st.getsockname()[0]
  153. except Exception:
  154. IP = "127.0.0.1"
  155. finally:
  156. st.close()
  157. return IP
  158. voiceResult = ""
  159. keyboardStr = []
  160. isStateVoice = 0 # 变量为0开始录音,1 GPT 识别录音结果
  161. ScreenOrientation = False
  162. CAMERAROTATE = +90
  163. ssidInfo = ""
  164. passwordInfo = ""
  165. checkConnectState = False
  166. connectText = ""
  167. startConnect=False
  168. wifiConnectState = False
  169. # PublicIp=""
  170. PrivateIP=""
  171. runConnectSig=True
  172. connectSuccessSig=False
  173. while True:
  174. if wifi_is_content():
  175. break
  176. else:
  177. canvas = camera.capture()
  178. if ssidInfo!="" and passwordInfo!="":
  179. startConnect=True
  180. #connectText = "Waitting for Connection..."
  181. #canvas.draw_string(10,40, ssidInfo+" "+str(len(ssidInfo))+" "+str(type(ssidInfo)), scale = 1.5, color = (0,0,0), thickness = 1)
  182. canvas.draw_string(10,50, "Connecting to wifi...", scale = 1.5, color = (0,0,0), thickness = 1)
  183. display.show(canvas)
  184. if startConnect==True:
  185. canvas_1 = image.new(size = (320, 320), color = (255,255,255), mode = "RGB")
  186. canvas.draw_image(canvas_1,0,0, alpha=0.4)
  187. if wifiConnectState == True:
  188. if key_B.is_pressed():
  189. while (key_B.is_pressed() == True):
  190. time.sleep(0.001)
  191. startConnect= False
  192. connectText = ""
  193. checkConnectState = False
  194. connectSuccessSig = False
  195. wifiConnectState = False
  196. runConnectSig== True
  197. ssidInfo=""
  198. passwordInfo=""
  199. if checkConnectState == True:
  200. if connectSuccessSig == False:
  201. # PublicIp=getPublicIp()
  202. connectSuccessSig = True
  203. connectText = "Wifi connection successfully!"
  204. # canvas.draw_string(10,80, "The WLAN PUBLIC IP:" + PublicIp, scale = 1.5, color = (0,0,0), thickness = 1)
  205. canvas.draw_string(10,110, "THE WLAN PRIVATE IP:" + PrivateIP, scale = 1.5, color = (0,0,0), thickness = 1)
  206. # canvas.draw_image((image.open("/root/preset/img/restart_ff0000_24x24.png")).rotate(0, adjust=0),8,216,alpha=1)
  207. ssidInfo=""
  208. passwordInfo=""
  209. canvas.clear()
  210. break
  211. else:
  212. pass
  213. else:
  214. if checkConnectState == False:
  215. os.system("wifi_disconnect_ap_test")
  216. os.system('wifi_connect_chinese_ap_test '+ssidInfo+' '+passwordInfo+'')
  217. checkConnectState = True
  218. if checkConnectState == True:
  219. if key_B.is_pressed():
  220. while (key_B.is_pressed() == True):
  221. time.sleep(0.001)
  222. startConnect= False
  223. connectText = ""
  224. checkConnectState = False
  225. connectSuccessSig = False
  226. wifiConnectState = False
  227. runConnectSig== True
  228. passwordInfo = ""
  229. ssidInfo = ""
  230. else:
  231. if runConnectSig== True:
  232. connectText = "Connecting to wifi,please wait..."
  233. runConnectSig= False
  234. else:
  235. passwordInfo = ""
  236. ssidInfo = ""
  237. connectText = "Wifi connection failed!"
  238. canvas.draw_image((image.open("/root/preset/img/restart_ff0000_24x24.png")).rotate(0, adjust=0),8,216,alpha=1)
  239. canvas.draw_string(10,50, connectText, scale = 1.5, color = (0,0,0) , thickness = 1)
  240. else:
  241. mks = canvas.find_qrcodes()
  242. for mk in mks:
  243. #外框数据
  244. X = mk['x']
  245. Y = mk['y']
  246. W = mk['w']
  247. H = mk['h']
  248. #二维码信息
  249. string = mk['payload']
  250. codeData = string.split(";")
  251. ssidInfo = codeData[0].split(":")[1]
  252. passwordInfo = codeData[1].split(":")[1]
  253. #画外框
  254. canvas.draw_rectangle(X, Y, X + W, Y + H, color=(0, 0, 255), thickness = 2)
  255. #打印信息
  256. canvas.draw_string(int(X) , int(Y - 45) , "wifi name:"+ssidInfo, scale = 1, color = (255, 0, 0), thickness = 2) #内框ID
  257. canvas.draw_string(int(X) , int(Y - 25) , "password:"+passwordInfo, scale = 1, color = (255, 0, 0), thickness = 2) #内框ID
  258. canvas.draw_image((image.open("/root/preset/img/exit_ff0000_24x24.png")).rotate(0, adjust=0),288,216,alpha=1)
  259. display.show(canvas)
  260. imgUrlStr = ""
  261. # 线程1
  262. CocoPiThread1 = threading.Thread(target=thread_calsss_fun1)
  263. # 开启线程
  264. CocoPiThread1.start()
  265. while True:
  266. if key_C.is_pressed():
  267. isStateVoice = 2
  268. voiceResult = ""
  269. elif key_D.is_pressed():
  270. isStateVoice = 1
  271. imgUrlStr = ""
  272. if isStateVoice == 2:
  273. canvas.clear()
  274. canvas.draw_string(0,0, "Picture generation in...", scale = 1, color = (0,255,255) , thickness = 1)
  275. elif isStateVoice != 2 and isStateVoice != 1:
  276. canvas.clear()
  277. if len(imgUrlStr)>0:
  278. canvas.draw_image((image.open(imgUrlStr)).rotate(0, adjust=0),0,0,alpha=1)
  279. canvas.draw_image(image.open("/root/preset/img/recording.png"),0 , 2,alpha=1)
  280. canvas.draw_string(250,0, "C Chart", scale = 1, color = (0,255,255) , thickness = 1)
  281. display.show(canvas)