37_speechToImageSynthesis.py 12 KB


  1. from maix import display
  2. from maix import image
  3. from maix import camera
  4. import pyaudio
  5. import wave
  6. import os
  7. from maix import mjpg
  8. from maix import utils
  9. import base64
  10. import time
  11. import requests
  12. import json
  13. import uuid
  14. import threading
  15. import socket
  16. import uuid
  17. import sys
  18. sys.path.append('/root/')
  19. from CocoPi import BUTTON
  20. import os
  21. ScreenOrientation = False
  22. try:
  23. if os.path.exists("/etc/cameraSize.cfg"):
  24. cameraSize = True
  25. else:
  26. cameraSize = False
  27. except:
  28. cameraSize = False
  29. def getLcdRotation(cameraCapture):
  30. global cameraSize
  31. if cameraSize:
  32. return lcdRotationNew(cameraCapture)
  33. else:
  34. return lcdRotation(cameraCapture)
  35. def lcdRotationNew(inputImg):
  36. global cameraSize,ScreenOrientation
  37. imageRotationBuffer = inputImg.crop(0, 0, 320, 240)
  38. if ScreenOrientation:
  39. imgRotationAim = image.new(size = (240, 320))
  40. rotationAngle = 90
  41. GETROTATION = imageRotationBuffer.rotate(+rotationAngle, adjust=1)
  42. else:
  43. imgRotationAim = image.new(size = (320, 240))
  44. GETROTATION = imageRotationBuffer
  45. GETROTATION = imgRotationAim.draw_image(GETROTATION,0,0,alpha=1)
  46. return GETROTATION
  47. def lcdRotation(inputImg):
  48. global cameraSize,ScreenOrientation
  49. imageRotationBuffer = inputImg.crop(0, 0, 240, 320)
  50. if ScreenOrientation:
  51. imgRotationAim = image.new(size = (240, 320))
  52. rotationAngle = 180
  53. else:
  54. imgRotationAim = image.new(size = (320, 240))
  55. rotationAngle = 90
  56. GETROTATION = imageRotationBuffer.rotate(+rotationAngle, adjust=1)
  57. GETROTATION = imgRotationAim.draw_image(GETROTATION,0,0,alpha=1)
  58. return GETROTATION
  59. key_B = BUTTON(8)
  60. key_C = BUTTON(13)
  61. key_D = BUTTON(7)
  62. camera.camera.config(size=(240,320))
  63. image.load_freetype("/root/preset/fonts/SourceHanSansCN-Regular.otf")
  64. canvas = image.new(size = (320, 240))
  65. # 录音
  66. WAVE_OUTPUT_FILENAME = "/root/user/audio/record.wav"
  67. def streamVoice():
  68. global WAVE_OUTPUT_FILENAME
  69. print(os.path.exists(WAVE_OUTPUT_FILENAME))
  70. if(os.path.exists(WAVE_OUTPUT_FILENAME)):
  71. os.system("rm "+WAVE_OUTPUT_FILENAME)
  72. CHUNK = 1024
  73. FORMAT = pyaudio.paInt16
  74. CHANNELS = 1
  75. RATE = 16000
  76. RECORD_SECONDS = 10
  77. p = pyaudio.PyAudio()
  78. stream = p.open(format=FORMAT,channels=CHANNELS,rate=RATE,input=True,frames_per_buffer=CHUNK)
  79. canvas.clear()
  80. canvas.draw_string(0,0, "Start", scale = 1, color = (0,255,255) , thickness = 1)
  81. display.show(canvas)
  82. print("* recording")
  83. frames = []
  84. for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
  85. data = stream.read(CHUNK,exception_on_overflow = False)
  86. frames.append(data)
  87. print("* done recording")
  88. canvas.clear()
  89. canvas.draw_string(0,0, "End", scale = 1, color = (0,255,255) , thickness = 1)
  90. display.show(canvas)
  91. stream.stop_stream()
  92. stream.close()
  93. p.terminate()
  94. wf = wave.open(WAVE_OUTPUT_FILENAME, "wb")
  95. wf.setnchannels(CHANNELS)
  96. wf.setsampwidth(p.get_sample_size(FORMAT))
  97. wf.setframerate(RATE)
  98. wf.writeframes(b"".join(frames))
  99. wf.close()
  100. # 获取GPT 回答结果
  101. def get_post_chatgpt(datas,uid):
  102. global list_num
  103. # 目标URL
  104. url = "https://gpt4.cocorobo.cn/v831AskForTopicNew"
  105. payload = json.dumps({"topic": datas})
  106. headers={
  107. "Content-Type":"application/json"
  108. }
  109. response=requests.request("POST", url, headers=headers, data=payload)
  110. if response.status_code == 200:
  111. result = json.loads(response.text)["FunctionResponse"]["result"]
  112. url = "https://gpt4.cocorobo.cn/getV831Audio"
  113. payload = json.dumps({"input": result,"response_format":"mp3","voice": "shimmer","uid":uid})
  114. headers={
  115. "Content-Type":"application/json"
  116. }
  117. response=requests.request("POST", url, headers=headers, data=payload)
  118. if response.status_code == 200:
  119. print(json.loads(response.text)["FunctionResponse"]["url"])
  120. a = "https://gpt4.cocorobo.cn" + json.loads(response.text)["FunctionResponse"]["url"]
  121. r = requests.get(a)
  122. with open("/root/preset/audio/" + uid+".wav","wb") as f:
  123. f.write(r.content)
  124. return result
  125. else:
  126. return ""
  127. # 线程1
  128. def thread_calsss_fun1():
  129. global isStateVoice,voiceResult
  130. while True:
  131. if isStateVoice==1:
  132. streamVoice()
  133. time.sleep(0.5)
  134. isStateVoice=0
  135. elif isStateVoice == 2:
  136. voiceResult = getVoiceResult()
  137. if len(voiceResult):
  138. getImage(voiceResult)
  139. # 关闭线程
  140. # CocoPiThread1.join()
  141. def getVoiceResult():
  142. global WAVE_OUTPUT_FILENAME,voiceResult
  143. voiceData = open(WAVE_OUTPUT_FILENAME,'rb')
  144. data = {"file": voiceData}
  145. param = {}
  146. res = requests.post("https://gpt4.cocorobo.cn/transcribe_file_stream", files=data, data=param)
  147. try:
  148. voiceResult = (json.loads(res.text))["FunctionResponse"]
  149. except:
  150. voiceResult = ""
  151. print("voiceResult:"+str(voiceResult))
  152. return voiceResult
  153. def getImage(text):
  154. global imgUrlStr,isStateVoice
  155. uid = str(uuid.uuid1().hex)
  156. param = {
  157. "size": "1024x1024",
  158. "quality": "hd",
  159. "n": 1,
  160. "prompt": text,
  161. "style":"natural",
  162. "uid": uid
  163. }
  164. res = requests.post("https://gpt4.cocorobo.cn/getImageV831", json=param)
  165. try:
  166. url = (json.loads(res.text))["FunctionResponse"]["image_url_list"][0]
  167. r = requests.get("https://gpt4.cocorobo.cn"+url)
  168. with open("/root/user/img/" + uid + ".png","wb") as f:
  169. f.write(r.content)
  170. imgUrlStr = "/root/user/img/" + uid + ".png"
  171. isStateVoice = 3
  172. except:
  173. isStateVoice = 3
  174. pass
  175. # 判断联网
  176. def wifi_is_content():
  177. global getDateNum
  178. cmd = "ifconfig"
  179. res = os.popen(cmd).read()
  180. data = False
  181. if res.rfind("inet addr:")!=48:
  182. data = True
  183. return data
  184. def getPrivateIp():
  185. st = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
  186. try:
  187. st.connect(("10.255.255.255",1))
  188. IP = st.getsockname()[0]
  189. except Exception:
  190. IP = "127.0.0.1"
  191. finally:
  192. st.close()
  193. return IP
  194. voiceResult = ""
  195. keyboardStr = []
  196. isStateVoice = 0 # 变量为0开始录音,1 GPT 识别录音结果
  197. ScreenOrientation = False
  198. CAMERAROTATE = +90
  199. ssidInfo = ""
  200. passwordInfo = ""
  201. checkConnectState = False
  202. connectText = ""
  203. startConnect=False
  204. wifiConnectState = False
  205. # PublicIp=""
  206. PrivateIP=""
  207. runConnectSig=True
  208. connectSuccessSig=False
  209. while True:
  210. if wifi_is_content():
  211. break
  212. else:
  213. canvas = getLcdRotation(camera.capture())
  214. if ssidInfo!="" and passwordInfo!="":
  215. startConnect=True
  216. #connectText = "Waitting for Connection..."
  217. #canvas.draw_string(10,40, ssidInfo+" "+str(len(ssidInfo))+" "+str(type(ssidInfo)), scale = 1.5, color = (0,0,0), thickness = 1)
  218. canvas.draw_string(10,50, "正在连接WiFi...", scale = 1.5, color = (0,0,0), thickness = 1)
  219. display.show(canvas)
  220. if startConnect==True:
  221. canvas_1 = image.new(size = (320, 320), color = (255,255,255), mode = "RGB")
  222. canvas.draw_image(canvas_1,0,0, alpha=0.4)
  223. if wifiConnectState == True:
  224. if key_B.is_pressed():
  225. while (key_B.is_pressed() == True):
  226. time.sleep(0.001)
  227. startConnect= False
  228. connectText = ""
  229. checkConnectState = False
  230. connectSuccessSig = False
  231. wifiConnectState = False
  232. runConnectSig== True
  233. ssidInfo=""
  234. passwordInfo=""
  235. if checkConnectState == True:
  236. if connectSuccessSig == False:
  237. # PublicIp=getPublicIp()
  238. connectSuccessSig = True
  239. connectText = "Wifi connection successfully!"
  240. # canvas.draw_string(10,80, "The WLAN PUBLIC IP:" + PublicIp, scale = 1.5, color = (0,0,0), thickness = 1)
  241. canvas.draw_string(10,110, "局域网IP:" + PrivateIP, scale = 1.5, color = (0,0,0), thickness = 1)
  242. # canvas.draw_image((image.open("/root/preset/img/restart_ff0000_24x24.png")).rotate(0, adjust=0),8,216,alpha=1)
  243. ssidInfo=""
  244. passwordInfo=""
  245. canvas.clear()
  246. break
  247. else:
  248. pass
  249. else:
  250. if checkConnectState == False:
  251. os.system("wifi_disconnect_ap_test")
  252. os.system('wifi_connect_chinese_ap_test '+ssidInfo+' '+passwordInfo+'')
  253. checkConnectState = True
  254. if checkConnectState == True:
  255. if key_B.is_pressed():
  256. while (key_B.is_pressed() == True):
  257. time.sleep(0.001)
  258. startConnect= False
  259. connectText = ""
  260. checkConnectState = False
  261. connectSuccessSig = False
  262. wifiConnectState = False
  263. runConnectSig== True
  264. passwordInfo = ""
  265. ssidInfo = ""
  266. else:
  267. if runConnectSig== True:
  268. connectText = "正在连接WiFi,请等待..."
  269. runConnectSig= False
  270. else:
  271. passwordInfo = ""
  272. ssidInfo = ""
  273. connectText = "WiFi连接失败!"
  274. canvas.draw_image((image.open("/root/preset/img/restart_ff0000_24x24.png")).rotate(0, adjust=0),8,216,alpha=1)
  275. canvas.draw_string(10,50, connectText, scale = 1.5, color = (0,0,0) , thickness = 1)
  276. else:
  277. mks = canvas.find_qrcodes()
  278. for mk in mks:
  279. #外框数据
  280. X = mk['x']
  281. Y = mk['y']
  282. W = mk['w']
  283. H = mk['h']
  284. #二维码信息
  285. string = mk['payload']
  286. codeData = string.split(";")
  287. ssidInfo = codeData[0].split(":")[1]
  288. passwordInfo = codeData[1].split(":")[1]
  289. #画外框
  290. canvas.draw_rectangle(X, Y, X + W, Y + H, color=(0, 0, 255), thickness = 2)
  291. #打印信息
  292. canvas.draw_string(int(X) , int(Y - 45) , "wifi name:"+ssidInfo, scale = 1, color = (255, 0, 0), thickness = 2) #内框ID
  293. canvas.draw_string(int(X) , int(Y - 25) , "password:"+passwordInfo, scale = 1, color = (255, 0, 0), thickness = 2) #内框ID
  294. canvas.draw_image((image.open("/root/preset/img/exit_ff0000_24x24.png")).rotate(0, adjust=0),288,216,alpha=1)
  295. display.show(canvas)
  296. imgUrlStr = ""
  297. # 线程1
  298. CocoPiThread1 = threading.Thread(target=thread_calsss_fun1)
  299. # 开启线程
  300. CocoPiThread1.start()
  301. while True:
  302. if key_C.is_pressed():
  303. isStateVoice = 2
  304. voiceResult = ""
  305. elif key_D.is_pressed():
  306. isStateVoice = 1
  307. imgUrlStr = ""
  308. if isStateVoice == 2:
  309. canvas.clear()
  310. canvas.draw_string(0,0, "图片生成中...", scale = 1, color = (0,255,255) , thickness = 1)
  311. elif isStateVoice != 2 and isStateVoice != 1:
  312. canvas.clear()
  313. if len(imgUrlStr)>0:
  314. canvas.draw_image((image.open(imgUrlStr)).rotate(0, adjust=0),0,0,alpha=1)
  315. canvas.draw_image(image.open("/root/preset/img/recording.png"),0 , 2,alpha=1)
  316. canvas.draw_string(250,0, "C Chart", scale = 1, color = (0,255,255) , thickness = 1)
  317. display.show(canvas)