37_speechToImageSynthesis.py 12 KB


  1. from maix import display
  2. from maix import image
  3. from maix import camera
  4. import pyaudio
  5. import wave
  6. import os
  7. from maix import mjpg
  8. from maix import utils
  9. import base64
  10. import time
  11. import requests
  12. import json
  13. import uuid
  14. import threading
  15. import socket
  16. import uuid
  17. import sys
  18. sys.path.append('/root/')
  19. from CocoPi import BUTTON
  20. import os
  21. ScreenOrientation = False
  22. try:
  23. if os.path.exists("/etc/cameraSize.cfg"):
  24. cameraSize = True
  25. else:
  26. cameraSize = False
  27. except:
  28. cameraSize = False
  29. def getLcdRotation(cameraCapture):
  30. global cameraSize
  31. if cameraSize:
  32. return lcdRotationNew(cameraCapture)
  33. else:
  34. return lcdRotation(cameraCapture)
  35. def lcdRotationNew(inputImg):
  36. global cameraSize,ScreenOrientation
  37. imageRotationBuffer = inputImg.crop(0, 0, 320, 240)
  38. if ScreenOrientation:
  39. imgRotationAim = image.new(size = (240, 320))
  40. rotationAngle = 90
  41. GETROTATION = imageRotationBuffer.rotate(+rotationAngle, adjust=1)
  42. else:
  43. imgRotationAim = image.new(size = (320, 240))
  44. GETROTATION = imageRotationBuffer
  45. GETROTATION = imgRotationAim.draw_image(GETROTATION,0,0,alpha=1)
  46. return GETROTATION
  47. def lcdRotation(inputImg):
  48. global cameraSize,ScreenOrientation
  49. imageRotationBuffer = inputImg.crop(0, 0, 240, 320)
  50. if ScreenOrientation:
  51. imgRotationAim = image.new(size = (240, 320))
  52. rotationAngle = 180
  53. else:
  54. imgRotationAim = image.new(size = (320, 240))
  55. rotationAngle = 90
  56. GETROTATION = imageRotationBuffer.rotate(+rotationAngle, adjust=1)
  57. GETROTATION = imgRotationAim.draw_image(GETROTATION,0,0,alpha=1)
  58. return GETROTATION
  59. key_B = BUTTON(8)
  60. key_C = BUTTON(13)
  61. key_D = BUTTON(7)
  62. if cameraSize==True:
  63. camera.camera.config(size=(320,240))
  64. else:
  65. camera.camera.config(size=(240,320))
  66. image.load_freetype("/root/preset/fonts/SourceHanSansCN-Regular.otf")
  67. canvas = image.new(size = (320, 240))
  68. # 录音
  69. WAVE_OUTPUT_FILENAME = "/root/user/audio/record.wav"
  70. def streamVoice():
  71. global WAVE_OUTPUT_FILENAME
  72. print(os.path.exists(WAVE_OUTPUT_FILENAME))
  73. if(os.path.exists(WAVE_OUTPUT_FILENAME)):
  74. os.system("rm "+WAVE_OUTPUT_FILENAME)
  75. CHUNK = 1024
  76. FORMAT = pyaudio.paInt16
  77. CHANNELS = 1
  78. RATE = 16000
  79. RECORD_SECONDS = 10
  80. p = pyaudio.PyAudio()
  81. stream = p.open(format=FORMAT,channels=CHANNELS,rate=RATE,input=True,frames_per_buffer=CHUNK)
  82. canvas.clear()
  83. canvas.draw_string(0,0, "Start", scale = 1, color = (0,255,255) , thickness = 1)
  84. display.show(canvas)
  85. print("* recording")
  86. frames = []
  87. for i in range(0, int(RATE / CHUNK * RECORD_SECONDS)):
  88. data = stream.read(CHUNK,exception_on_overflow = False)
  89. frames.append(data)
  90. print("* done recording")
  91. canvas.clear()
  92. canvas.draw_string(0,0, "End", scale = 1, color = (0,255,255) , thickness = 1)
  93. display.show(canvas)
  94. stream.stop_stream()
  95. stream.close()
  96. p.terminate()
  97. wf = wave.open(WAVE_OUTPUT_FILENAME, "wb")
  98. wf.setnchannels(CHANNELS)
  99. wf.setsampwidth(p.get_sample_size(FORMAT))
  100. wf.setframerate(RATE)
  101. wf.writeframes(b"".join(frames))
  102. wf.close()
  103. # 获取GPT 回答结果
  104. def get_post_chatgpt(datas,uid):
  105. global list_num
  106. # 目标URL
  107. url = "https://gpt4.cocorobo.cn/v831AskForTopicNew"
  108. payload = json.dumps({"topic": datas})
  109. headers={
  110. "Content-Type":"application/json"
  111. }
  112. response=requests.request("POST", url, headers=headers, data=payload)
  113. if response.status_code == 200:
  114. result = json.loads(response.text)["FunctionResponse"]["result"]
  115. url = "https://gpt4.cocorobo.cn/getV831Audio"
  116. payload = json.dumps({"input": result,"response_format":"mp3","voice": "shimmer","uid":uid})
  117. headers={
  118. "Content-Type":"application/json"
  119. }
  120. response=requests.request("POST", url, headers=headers, data=payload)
  121. if response.status_code == 200:
  122. print(json.loads(response.text)["FunctionResponse"]["url"])
  123. a = "https://gpt4.cocorobo.cn" + json.loads(response.text)["FunctionResponse"]["url"]
  124. r = requests.get(a)
  125. with open("/root/preset/audio/" + uid+".wav","wb") as f:
  126. f.write(r.content)
  127. return result
  128. else:
  129. return ""
  130. # 线程1
  131. def thread_calsss_fun1():
  132. global isStateVoice,voiceResult
  133. while True:
  134. if isStateVoice==1:
  135. streamVoice()
  136. time.sleep(0.5)
  137. isStateVoice=0
  138. elif isStateVoice == 2:
  139. voiceResult = getVoiceResult()
  140. if len(voiceResult):
  141. getImage(voiceResult)
  142. # 关闭线程
  143. # CocoPiThread1.join()
  144. def getVoiceResult():
  145. global WAVE_OUTPUT_FILENAME,voiceResult
  146. voiceData = open(WAVE_OUTPUT_FILENAME,'rb')
  147. data = {"file": voiceData}
  148. param = {}
  149. res = requests.post("https://gpt4.cocorobo.cn/transcribe_file_stream", files=data, data=param)
  150. try:
  151. voiceResult = (json.loads(res.text))["FunctionResponse"]
  152. except:
  153. voiceResult = ""
  154. print("voiceResult:"+str(voiceResult))
  155. return voiceResult
  156. def getImage(text):
  157. global imgUrlStr,isStateVoice
  158. uid = str(uuid.uuid1().hex)
  159. param = {
  160. "size": "1024x1024",
  161. "quality": "hd",
  162. "n": 1,
  163. "prompt": text,
  164. "style":"natural",
  165. "uid": uid
  166. }
  167. res = requests.post("https://gpt4.cocorobo.cn/getImageV831", json=param)
  168. try:
  169. url = (json.loads(res.text))["FunctionResponse"]["image_url_list"][0]
  170. r = requests.get("https://gpt4.cocorobo.cn"+url)
  171. with open("/root/user/img/" + uid + ".png","wb") as f:
  172. f.write(r.content)
  173. imgUrlStr = "/root/user/img/" + uid + ".png"
  174. isStateVoice = 3
  175. except:
  176. isStateVoice = 3
  177. pass
  178. # 判断联网
  179. def wifi_is_content():
  180. global getDateNum
  181. cmd = "ifconfig"
  182. res = os.popen(cmd).read()
  183. data = False
  184. if res.rfind("inet addr:")!=48:
  185. data = True
  186. return data
  187. def getPrivateIp():
  188. st = socket.socket(socket.AF_INET,socket.SOCK_DGRAM)
  189. try:
  190. st.connect(("10.255.255.255",1))
  191. IP = st.getsockname()[0]
  192. except Exception:
  193. IP = "127.0.0.1"
  194. finally:
  195. st.close()
  196. return IP
  197. voiceResult = ""
  198. keyboardStr = []
  199. isStateVoice = 0 # 变量为0开始录音,1 GPT 识别录音结果
  200. ScreenOrientation = False
  201. CAMERAROTATE = +90
  202. ssidInfo = ""
  203. passwordInfo = ""
  204. checkConnectState = False
  205. connectText = ""
  206. startConnect=False
  207. wifiConnectState = False
  208. # PublicIp=""
  209. PrivateIP=""
  210. runConnectSig=True
  211. connectSuccessSig=False
  212. while True:
  213. if wifi_is_content():
  214. break
  215. else:
  216. canvas = getLcdRotation(camera.capture())
  217. if ssidInfo!="" and passwordInfo!="":
  218. startConnect=True
  219. #connectText = "Waitting for Connection..."
  220. #canvas.draw_string(10,40, ssidInfo+" "+str(len(ssidInfo))+" "+str(type(ssidInfo)), scale = 1.5, color = (0,0,0), thickness = 1)
  221. canvas.draw_string(10,50, "Connect WiFi...", scale = 1.5, color = (0,0,0), thickness = 1)
  222. display.show(canvas)
  223. if startConnect==True:
  224. canvas_1 = image.new(size = (320, 320), color = (255,255,255), mode = "RGB")
  225. canvas.draw_image(canvas_1,0,0, alpha=0.4)
  226. if wifiConnectState == True:
  227. if key_B.is_pressed():
  228. while (key_B.is_pressed() == True):
  229. time.sleep(0.001)
  230. startConnect= False
  231. connectText = ""
  232. checkConnectState = False
  233. connectSuccessSig = False
  234. wifiConnectState = False
  235. runConnectSig== True
  236. ssidInfo=""
  237. passwordInfo=""
  238. if checkConnectState == True:
  239. if connectSuccessSig == False:
  240. # PublicIp=getPublicIp()
  241. connectSuccessSig = True
  242. connectText = "Wifi connection successfully!"
  243. # canvas.draw_string(10,80, "The WLAN PUBLIC IP:" + PublicIp, scale = 1.5, color = (0,0,0), thickness = 1)
  244. canvas.draw_string(10,110, "IP:" + PrivateIP, scale = 1.5, color = (0,0,0), thickness = 1)
  245. # canvas.draw_image((image.open("/root/preset/img/restart_ff0000_24x24.png")).rotate(0, adjust=0),8,216,alpha=1)
  246. ssidInfo=""
  247. passwordInfo=""
  248. canvas.clear()
  249. break
  250. else:
  251. pass
  252. else:
  253. if checkConnectState == False:
  254. os.system("wifi_disconnect_ap_test")
  255. os.system('wifi_connect_chinese_ap_test '+ssidInfo+' '+passwordInfo+'')
  256. checkConnectState = True
  257. if checkConnectState == True:
  258. if key_B.is_pressed():
  259. while (key_B.is_pressed() == True):
  260. time.sleep(0.001)
  261. startConnect= False
  262. connectText = ""
  263. checkConnectState = False
  264. connectSuccessSig = False
  265. wifiConnectState = False
  266. runConnectSig== True
  267. passwordInfo = ""
  268. ssidInfo = ""
  269. else:
  270. if runConnectSig== True:
  271. connectText = "Connect WiFi..."
  272. runConnectSig= False
  273. else:
  274. passwordInfo = ""
  275. ssidInfo = ""
  276. connectText = "WiFi Connect Failed!"
  277. canvas.draw_image((image.open("/root/preset/img/restart_ff0000_24x24.png")).rotate(0, adjust=0),8,216,alpha=1)
  278. canvas.draw_string(10,50, connectText, scale = 1.5, color = (0,0,0) , thickness = 1)
  279. else:
  280. mks = canvas.find_qrcodes()
  281. for mk in mks:
  282. #外框数据
  283. X = mk['x']
  284. Y = mk['y']
  285. W = mk['w']
  286. H = mk['h']
  287. #二维码信息
  288. string = mk['payload']
  289. codeData = string.split(";")
  290. ssidInfo = codeData[0].split(":")[1]
  291. passwordInfo = codeData[1].split(":")[1]
  292. #画外框
  293. canvas.draw_rectangle(X, Y, X + W, Y + H, color=(0, 0, 255), thickness = 2)
  294. #打印信息
  295. canvas.draw_string(int(X) , int(Y - 45) , "wifi name:"+ssidInfo, scale = 1, color = (255, 0, 0), thickness = 2) #内框ID
  296. canvas.draw_string(int(X) , int(Y - 25) , "password:"+passwordInfo, scale = 1, color = (255, 0, 0), thickness = 2) #内框ID
  297. canvas.draw_image((image.open("/root/preset/img/exit_ff0000_24x24.png")).rotate(0, adjust=0),288,216,alpha=1)
  298. display.show(canvas)
  299. imgUrlStr = ""
  300. # 线程1
  301. CocoPiThread1 = threading.Thread(target=thread_calsss_fun1)
  302. # 开启线程
  303. CocoPiThread1.start()
  304. while True:
  305. if key_C.is_pressed():
  306. isStateVoice = 2
  307. voiceResult = ""
  308. elif key_D.is_pressed():
  309. isStateVoice = 1
  310. imgUrlStr = ""
  311. if isStateVoice == 2:
  312. canvas.clear()
  313. canvas.draw_string(0,0, "Image generation in progress...", scale = 1, color = (0,255,255) , thickness = 1)
  314. elif isStateVoice != 2 and isStateVoice != 1:
  315. canvas.clear()
  316. if len(imgUrlStr)>0:
  317. canvas.draw_image((image.open(imgUrlStr)).rotate(0, adjust=0),0,0,alpha=1)
  318. canvas.draw_image(image.open("/root/preset/img/recording.png"),0 , 2,alpha=1)
  319. canvas.draw_string(250,0, "C Chart", scale = 1, color = (0,255,255) , thickness = 1)
  320. display.show(canvas)