#!/usr/bin/env python #version : 2023.09.07 #language : en from evdev import InputDevice, categorize, ecodes import asyncio import pty import os import sys from maix import display from maix import image from maix import camera from maix import mjpg from maix import utils import base64 import time import requests import json import threading camera.camera.config(size=(320,240)) image.load_freetype("/root/preset/fonts/simhei.ttf") canvas = image.new(size = (320, 240)) try: # 获取键盘设备 keyboard = InputDevice('/dev/input/event1') # 替换成实际的键盘设备路径 except: keyboard = "" if keyboard=="": canvas.draw_string(0,12, "Keyboard not detected", scale = 1.3, color = (255,255,255) , thickness = 1) display.show(canvas) time.sleep(3) sys.exit() def v831_display_show_canvas(displayShow): global _canvas_y,_canvas_x CANVASSHOWIMGAGE = "" if ScreenOrientation: displayShowCanvas = image.new(size = (240, 320)) displayShowCanvas.draw_rectangle(0,0,240,320, color=(0,0,0), thickness=-1) displayShowCanvas.draw_image(displayShow,_canvas_x,_canvas_y,alpha=1) displayShowVER = displayShowCanvas.crop(0,0,240,320) displayShowVER = displayShowVER.rotate(-90, adjust=1) CANVASSHOWIMGAGE = displayShowVER display.show(displayShowVER) else: displayShowCanvas = image.new(size = (320, 240)) displayShowCanvas.draw_rectangle(0,0,320,240, color=(0,0,0), thickness=-1) displayShowCanvas.draw_image(displayShow,_canvas_x,_canvas_y,alpha=1) CANVASSHOWIMGAGE = displayShowCanvas display.show(displayShowCanvas) # jpg = utils.rgb2jpg(CANVASSHOWIMGAGE.convert("RGB").tobytes(), CANVASSHOWIMGAGE.width, CANVASSHOWIMGAGE.height) # img_base64 = base64.b64encode(jpg) # img_base64_str = "data:image/jpg;base64," + str(img_base64)[2:-1] # time.sleep(0.1) # try: # if img_base64_str.rfind("data:image/jpg;base64,")>-1: # print(img_base64_str) # pass # except Exception as e: # #print(str(e)) # pass def get_post_chatgpt(datas): # 目标URL url = "https://gpt.cocorobo.cn/v831AskForTopic" payload = json.dumps({"topic":datas}) headers={ 'Content-Type':'application/json' } print("payload"+str(payload)) response=requests.request("POST",url,headers=headers,data=payload) print("response.text",response.text) if response.status_code == 200: result_num = len(json.loads(response.text)["FunctionResponse"]["result"]) # keyboardStr[list_num] = else: keyboardStr[list_num] = "fail" ScreenOrientation = False _canvas_x = 0 _canvas_y = 0 keyboardStr = [""] list_num = 0 line = b"" lines = [] process = None keyboard = InputDevice("/dev/input/event1") master, slave = pty.openpty() #等待控制台的输出 def read_lines(master): global lines, line,keyboardStr,list_num,isShow char = os.read(master, 1) if char == b"\n": # lines.append(line.decode().strip()) lines.append(line.decode().rstrip()) line = b"" else: line += char # 用来判断是不是终端有新的输出 isShow = "" # 输入字符串 inputStr = "" # 跟踪Ctrl键的状态 ctrl_pressed = False #读取键盘事件写入 def write_io(): global keyboardStr,list_num,isShow,inputStr,ctrl_pressed,lines for event in keyboard.read_loop(): if event.type == ecodes.EV_KEY: key_event = categorize(event) if key_event.keystate == key_event.key_down: try: input_command = "" if key_event.keycode == "KEY_LEFTCTRL" or key_event.keycode == "KEY_RIGHTCTRL": ctrl_pressed = True elif ctrl_pressed and key_event.keycode == "KEY_C": # print("Ctrl+C 组合键被按下") input_command = "\x1A\x03\x02\x03\x1D\x1A" # pass else: ctrl_pressed = False if ctrl_pressed == False: if key_event.keycode[4:] == "SPACE": inputStr = inputStr + " " input_command = " " lines[len(lines)-1] = "root@CocoPi:/# "+inputStr elif key_event.keycode[4:] == "SLASH": inputStr = inputStr + "/" input_command = "/" lines[len(lines)-1] = "root@CocoPi:/# "+inputStr elif key_event.keycode[4:] == "DOT": inputStr = inputStr + "." input_command = "." lines[len(lines)-1] = "root@CocoPi:/# "+inputStr elif key_event.keycode[4:] == "MINUS": inputStr = inputStr + "_" input_command = "_" lines[len(lines)-1] = "root@CocoPi:/# "+inputStr elif key_event.keycode[4:] == "BACKSPACE": inputStr = inputStr[:-1] input_command = "" lines[len(lines)-1] = "root@CocoPi:/# "+inputStr # if len(keyboardStr[list_num]) ==0 and list_num>0: # list_num = list_num - 1 elif key_event.keycode[4:] == "ENTER": inputStr = "" input_command = "\n" # time.sleep(1) # lines.append("root@CocoPi:/# ") else: input_command = key_event.keycode[4:].lower() inputStr = inputStr + key_event.keycode[4:].lower() # if list_num == len(lines): lines[len(lines)-1] = "root@CocoPi:/# "+inputStr print("\x1A\x03\x02\x03\x1D\x1A") print(input_command) os.write(master, input_command.encode()) except EOFError: break async def console_io(): global master, slave, process,list_num,keyboardStr,isShow process = await asyncio.create_subprocess_exec('adb_shell', stdin=slave, stdout=slave, stderr=slave) while True: #读取所有的行数,放在lines read_lines(master) #屏幕循环的显示所有的数据 # print(lines) #屏幕读取 if isShow.lower() != " ".join(lines).lower(): isShow = " ".join(lines) showTreminal(1) list_num = len(lines) else: pass os.close(master) os.close(slave) def showTreminal(val): global master, slave, process,list_num,keyboardStr,lines,inputStr for i,box in enumerate(lines): if len(box)>0: keyboardStr.append(box) # if box.rfind("root@CocoPi")>-1 and lines[i-1] == box: # pass # else: # keyboardStr.append(box) elif i == len(lines)-1: # lines.append("root@CocoPi:/# ") keyboardStr.append("root@CocoPi:/# ") list_num = len(keyboardStr)-1 canvas.clear() if len(keyboardStr)>20: for i in range(len(keyboardStr)-1,len(keyboardStr)-20,-1): canvas.draw_string(0,12*(i-(len(keyboardStr)-20)), str(keyboardStr[i]), scale = 0.785, color = (255,255,255) , thickness = 1) else: for i in range(len(keyboardStr)): canvas.draw_string(0,12*i, str(keyboardStr[i]), scale = 0.785, color = (255,255,255) , thickness = 1) v831_display_show_canvas(canvas) # display.show(canvas) def main(): thread = threading.Thread(target=write_io) thread.start() asyncio.run(console_io()) thread.join() main()