kinve 1 năm trước cách đây
mục cha
commit
dde1363658
3 tập tin đã thay đổi với 230 bổ sung116 xóa
  1. 210 105
      ble.py
  2. 15 6
      config.json
  3. 5 5
      websocket_server.py

+ 210 - 105
ble.py

@@ -12,6 +12,10 @@ from Crypto.Cipher import AES
 from Crypto.Util.Padding import pad, unpad
 import copy
 import os
+import threading
+import ctypes
+import inspect
+import re
 
 from message_base import MessageBase
 from websocket_server import WebServer
@@ -49,6 +53,9 @@ EV_SOFT_BLE_RECV_CODE = '$*@AB%DHJqopENC#'    #//蓝牙接收密钥
 # EV_SOFT_BLE_SEND_CODE = 'XH23456789ABCDEF'    #//蓝牙发送密钥
 # EV_SOFT_BLE_RECV_CODE = 'XH23456789ABCDEF'    #//蓝牙接收密钥 
 
+g_run = True
+asyncio_list = []
+thread_list = []
 g_config = {}
 g_ble_mtu = 20
 g_ble_client = None
@@ -76,7 +83,16 @@ def exit_handler():
  
  
 def sig_handler(signum, frame):
-    logger.info('catched singal: %d' % signum)
+    global g_run
+    logger.info('singal: %d' % signum)
+    logger.info("sig退出")
+    g_run = False
+    # 设置事件,通知所有线程退出
+    exit_event.set()
+    # for asy in asyncio_list:
+    #     asy.stop()    
+    # for thread in thread_list:
+    #     thread.stop()
     sys.exit(0)
 
 def shuncomdacode(data):
@@ -228,71 +244,24 @@ def PACK_LEN():
     C = 2 if g_config["def_cfg"]["checknum_type"]!="" else 0
     return H+L+C
 
-@calculate_time
-def recv_handler(data: bytearray, reve=False):
-    global recv_start_time
-    global g_download_cfg
-    if g_config["def_cfg"]["recv_detail_print"]:
-        logger.info("包接收:{}".format(bytes(data))) 
-
-    for d in data:   
-        recv_data.append(d)
-
-    if len(recv_data)>1500:
-        recv_data.clear()
-        return        
-
-    if len(recv_data)<PACK_LEN():
-        return
-
-    recv_data_str = str(bytes(recv_data))
-    HEAD_SEND = str(g_config["def_cfg"]["head_send"])
-    HEAD_RECV = str(g_config["def_cfg"]["head_recv"])
-    find_head = HEAD_SEND if not reve else HEAD_RECV
-    index = -1
-    if find_head != "":
-        index = recv_data_str.find(find_head)#'EV>'
-        if index < 2:
-            return
-    index -= 2
-    data_len = (recv_data[index+4]&0xff) | (recv_data[index+3]&0xff)<<8
-    if index+PACK_LEN()+data_len > len(recv_data):
-        return
-    
-    if g_config["def_cfg"]["recv_soc_data_print"]:
-        logger.info("接收:{}".format(bytes(recv_data).hex())) 
-
-    if recv_start_time:
-        recv_end_time = time.time()   
-        logger.info('接收耗时:%.3fs', recv_end_time - recv_start_time)  
-        recv_start_time = 0     
-
-    soc_data = recv_data[index+PACK_LEN():index+PACK_LEN()+data_len]
-    if g_config["def_cfg"]["checknum_type"]!="":
-        H = len(g_config["def_cfg"]["head_send"])
-        L = 2
-        get_crc = (recv_data[index+(H+L+1)]&0xff)| (recv_data[index+(H+L)]&0xff)<<8 
-        check_num = checknum_16(soc_data)
-        if get_crc != check_num:
-            logger.info("校验失败(%d):0x%04X 0x%04X", len(recv_data), get_crc, check_num)
-            return
+def batch_split_key_value(input_str):
+    # 正则表达式匹配key=value格式,支持多个键值对
+    pattern = re.compile(r'(\w+)=([\w.]+)')
+    result = {}
+    for match in pattern.finditer(input_str):
+        key, value = match.groups()
+        result[key] = value
+    return result
 
-    logger.info("校验成功")
-
-    if g_config["def_cfg"]["aes_cbc_enbable"]:
-        find_type= "recv" if not reve else 'send'
-        key = get_aes_key(find_type) #"recv"
-        dec_data = AES_Decrypt(bytes(key), bytes(key), bytes(soc_data))
-        logger.info("解密后:{}".format(bytes(dec_data))) 
-    else:
-        dec_data = soc_data
 
-    # 清理
-    recv_data.clear()
+@calculate_time
+def recv_handler(data: bytearray, reve=False):
+    mq.add("ble_recv", data)
 
 def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
     global mq
-    recv_handler(data)
+    # recv_handler(data)
+    mq.add("ble_recv", data)
     mq.add("ws_send", data)
 
 async def ble_send(client, data):
@@ -340,7 +309,7 @@ def print_data_list():
         logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
 
 @calculate_time
-async def send_form_data_list(client, i, data=None):
+def send_form_data_list(client, i, data=None):
     global g_config    
     if data:
         json_data = data
@@ -380,7 +349,8 @@ async def send_form_data_list(client, i, data=None):
     byte_array = bytearray(byte_sequence)
     byte_array = ev_packing(byte_array)
     send_data = bytes(byte_array)
-    await ble_send(client, send_data)
+    # await ble_send(client, send_data)
+    mq.add("ble_send", send_data)
 
 async def task():
     global g_config
@@ -402,14 +372,21 @@ def read_config_call():
 
 
 @calculate_time
-async def heart_beat_call(client):
+def heart_beat_call(client):
     global g_config
     # 心跳发送:维持蓝牙通讯
     logger.info("执行心跳发送任务..")
-    await send_form_data_list(client, g_config["def_cfg"]["heart_beat_sel"])
+    send_form_data_list(client, g_config["def_cfg"]["heart_beat_sel"])
+
+def extract_key_value(text):
+    pattern = r"(\w+)=([^;]+)"  # 匹配形如 key=value 的字符串
+    matches = re.findall(pattern, text)
+    for key, value in matches:
+        print(f"Key: {key}, Value: {value}")    
+    return matches
 
 @calculate_time
-async def input_call(client):
+def input_call(client):
     global g_config
     # 输入
     userinput = None
@@ -421,17 +398,18 @@ async def input_call(client):
     if userinput:
         # 判断类型: 数字int 字符串str 列表list 元组tuple 字典dict
         # isinstance(userinput, str)
-        if not userinput.isalpha():    
-            await send_form_data_list(client, int(userinput)) 
+        if not userinput.isalpha():       
+            send_form_data_list(client, int(userinput)) 
         elif str(userinput)=='l':
             # 读取更新json
             with open(config_file, "r", encoding="utf-8") as f:
                 send_list_new = json.load(f)
                 logger.info("json内容如下:")
-                print_data_list()       
+                print_data_list()    
+          
 
 @calculate_time
-async def auto_getcfg_call(client):
+def auto_getcfg_call(client):
     global g_config
     global g_download_cfg
     # 自动发送任务
@@ -446,25 +424,22 @@ async def auto_getcfg_call(client):
                 Number = g_download_cfg["Number"] 
                 Total = g_download_cfg["Total"] 
                 new_one[3]["Number"] =  Number
-                await send_form_data_list(client, 0, new_one)
+                send_form_data_list(client, 0, new_one)
                 g_download_cfg["Number"] = Number+1 if Number < Total else 0
 
     except Exception as e: 
         logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
 
-async def main():
+async def ble_main():
+    global g_run
     global g_ble_client
     global g_config
     global g_ble_mtu
     global mq
 
-    logger.info("读取配置...")
-    with open(config_file, "r", encoding="utf-8") as f:
-        g_config = json.load(f)
-        print_data_list()  
     logger.info("快速显示列表, 可输入l")     
 
-    while True:
+    while g_run:
         logger.info("开始扫描...")
         #基于MAC地址查找设备
         device = await BleakScanner.find_device_by_address(
@@ -496,35 +471,22 @@ async def main():
                 logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
 
             g_download_cfg["Number"] = 0
-            read_config_time = 0
-            heart_beart_time = 0
-            input_wait_time = 0
-            while client and client.is_connected:
-
-                if time.time()-read_config_time >= 3:
-                    read_config_time = time.time()
-                    read_config_call()
-
-                if g_config["def_cfg"]["auto_send_GetConfigurationNumber"] and g_download_cfg["Number"]>0:  
-                    await auto_getcfg_call(client)   
-
-                elif g_config["def_cfg"]["heart_beat_interval"] >0 and time.time()-heart_beart_time>=g_config["def_cfg"]["heart_beat_interval"]:
-                    heart_beart_time = time.time()
-                    await heart_beat_call(client)
-
-                elif g_config["def_cfg"]["input_interval"]>0 and time.time()-input_wait_time>=g_config["def_cfg"]["input_interval"]:
-                    input_wait_time = time.time()                      
-                    await input_call(client)
 
+            while client and client.is_connected and g_run:
                 try:
+                    ble_data = mq.get("ble_send")
+                    if ble_data:
+                        await ble_send(client, ble_data)     
+
                     # 开始根据设备即功能处理消息
                     ws_data = mq.get("ws_recv")
                     if ws_data:
                         await ble_send(client, ws_data)        
-                except Exception as err:
-                    pass
+                except Exception as e:
+                    logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
 
-                time.sleep(0.1)
+                time.sleep(0.01)
+    logger.info("结束:{}".format(inspect.currentframe().f_code.co_name)) 
 
 def test_aes():
     key = EV_SOFT_BLE_SEND_CODE
@@ -554,25 +516,168 @@ def test_recv():
     recv_handler(bytes.fromhex(str_b), True)    
 
 
-def run(tasks = []):
+def asyncio_handler():
     # tasks.append(task())
-    loop = asyncio.get_event_loop()
-    loop.run_until_complete(asyncio.wait(tasks))
-    loop.close()
+    # loop = asyncio.get_event_loop()
+    # loop.run_until_complete(asyncio.wait([ble_main]))
+    # loop.close()
+    loop = asyncio.new_event_loop()
+    loop.run_until_complete(asyncio.wait([ble_main()]))
 
 
+def ble_recv_handler():
+    global g_run
+    global recv_start_time
+    global g_download_cfg
+    while g_run:
+        data = mq.get("ble_recv")
+        if not data:
+            continue
+
+        if g_config["def_cfg"]["recv_detail_print"]:
+            logger.info("包接收:{}".format(bytes(data))) 
+
+        for d in data:   
+            recv_data.append(d)
+
+        if len(recv_data)>1500:
+            recv_data.clear()
+            return        
+
+        if len(recv_data)<PACK_LEN():
+            return
+
+        recv_data_str = str(bytes(recv_data))
+        HEAD_SEND = str(g_config["def_cfg"]["head_send"])
+        HEAD_RECV = str(g_config["def_cfg"]["head_recv"])
+        reve=False
+        find_head = HEAD_SEND if not reve else HEAD_RECV
+        index = -1
+        if find_head != "":
+            index = recv_data_str.find(find_head)#'EV>'
+            if index < 2:
+                return
+        index -= 2
+        data_len = (recv_data[index+4]&0xff) | (recv_data[index+3]&0xff)<<8
+        if index+PACK_LEN()+data_len > len(recv_data):
+            return
+        
+        if g_config["def_cfg"]["recv_soc_data_print"]:
+            logger.info("接收:{}".format(bytes(recv_data).hex())) 
+
+        if recv_start_time:
+            recv_end_time = time.time()   
+            logger.info('接收耗时:%.3fs', recv_end_time - recv_start_time)  
+            recv_start_time = 0     
+
+        soc_data = recv_data[index+PACK_LEN():index+PACK_LEN()+data_len]
+        if g_config["def_cfg"]["checknum_type"]!="":
+            H = len(g_config["def_cfg"]["head_send"])
+            L = 2
+            get_crc = (recv_data[index+(H+L+1)]&0xff)| (recv_data[index+(H+L)]&0xff)<<8 
+            check_num = checknum_16(soc_data)
+            if get_crc != check_num:
+                logger.info("校验失败(%d):0x%04X 0x%04X", len(recv_data), get_crc, check_num)
+                return
+
+        logger.info("校验成功")
+
+        if g_config["def_cfg"]["aes_cbc_enbable"]:
+            find_type= "recv" if not reve else 'send'
+            key = get_aes_key(find_type) #"recv"
+            dec_data = AES_Decrypt(bytes(key), bytes(key), bytes(soc_data))
+            logger.info("解密后:{}".format(bytes(dec_data))) 
+        else:
+            dec_data = soc_data
+
+
+        # 特殊处理:升级
+        out = batch_split_key_value(dec_data)
+        for key in out:
+            if key == "upgrade_req":
+                values = out[key].split(",")
+                if len(values) == 2:
+                    file = values[0]
+                    page = values[1]
+                    with open(file, "rb", encoding="utf-8") as f:
+                        f.seek(page*(1024+4))  #位移到最后    SEEK_END(值为2)     SEEK_CUR(值为1)  SEEK_SET(值为0)
+                        datas = f.read(1024+4)
+                        f.close()
+
+                        # 发送
+                        byte_array = bytearray("set:upgrade_data=")+bytearray(datas)
+                        byte_array = ev_packing(byte_array)
+                        send_data = bytes(byte_array)
+                        mq.add("ble_send", send_data)
+
+        # 清理
+        recv_data.clear()
+
+        time.sleep(0.1)  
+
+    logger.info("结束:{}".format(inspect.currentframe().f_code.co_name))     
+
+def main_key_handler():
+    global g_run
+    read_config_time = 0
+    heart_beart_time = 0
+    input_wait_time = 0
+
+    while g_run:
+        if time.time()-read_config_time >= 3:
+            read_config_time = time.time()
+            read_config_call()
+
+        if g_config["def_cfg"]["auto_send_GetConfigurationNumber"] and g_download_cfg["Number"]>0:  
+            auto_getcfg_call(None)   
+
+        elif g_config["def_cfg"]["heart_beat_interval"] >0 and time.time()-heart_beart_time>=g_config["def_cfg"]["heart_beat_interval"]:
+            heart_beart_time = time.time()
+            heart_beat_call(None)
+
+        elif g_config["def_cfg"]["input_interval"]>0 and time.time()-input_wait_time>=g_config["def_cfg"]["input_interval"]:
+            input_wait_time = time.time()                      
+            input_call(None)
+
+        time.sleep(0.1)    
+
+    logger.info("结束:{}".format(inspect.currentframe().f_code.co_name)) 
+
 #---------------------------------------------------------
 if __name__ == "__main__":
     signal.signal(signal.SIGTERM, sig_handler)  # kill pid
     signal.signal(signal.SIGINT, sig_handler)  # ctrl -c
 
+    logger.info("读取配置...")
+    with open(config_file, "r", encoding="utf-8") as f:
+        g_config = json.load(f)
+        print_data_list()  
+
+
+    exit_event = threading.Event()
+    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())    
     # test_aes()
     # test_base64()
     # test_recv()
+    import websockets
     mq = MessageBase()
     ws = WebServer("0.0.0.0", 11100, mq)
-    ws.run()
-    run([main(),])
+
+    thread_list.append(threading.Thread(target=main_key_handler, args=()))
+    thread_list.append(threading.Thread(target=ble_recv_handler, args=()))
+    for thread in thread_list:
+        thread.start()
+
+
+    ws_serve = websockets.serve(ws.echo, ws.host, ws.port)
+    asyncio.get_event_loop().run_until_complete(ws_serve)
+    task = asyncio.get_event_loop().create_task(ble_main())
+    asyncio.get_event_loop().run_until_complete(task)
+
+
+    # 等待所有线程完成
+    for thread in thread_list:
+        thread.join()
 
     sys.exit(0)
 

+ 15 - 6
config.json

@@ -1,19 +1,22 @@
 {
 "def_cfg":{
-	"ble_mac": "7c:b9:4c:da:af:5c",
+	"ble_mac": "D5:4F:62:FB:C7:C8",
+	"ble_mac4": "7c:b9:4c:da:af:5c",
 	"ble_mac7": "7C:B9:4C:DA:61:7C",
 	"ble_mac2": "E9:8B:FC:41:3F:CF",
 	"ble_mac3": "DB:57:46:A6:14:22",
 	"heart_beat_sel": 1,
 	"input_interval": 5,
 	"heart_beat_interval": 25,	
-	"notif_char": "49535343-8841-43f4-a8d4-ecbe34729bb3",
-	"write_char": "49535343-1e4d-4bd9-ba61-23c647249616",
+	"notif_char": "0000ffe2-0000-1000-8000-00805f9b34fb",
+	"write_char": "0000ffe1-0000-1000-8000-00805f9b34fb",
+	"notif_char1": "49535343-8841-43f4-a8d4-ecbe34729bb3",
+	"write_char1": "49535343-1e4d-4bd9-ba61-23c647249616",
 	"ble_mtu": 0,
 	"ble_send_wait": 1.5,
-	"head_send": "",
-	"head_recv": "",
-	"checknum_type": "",
+	"head_send": "YL<",
+	"head_recv": "YL>",
+	"checknum_type": "sum16",
 	"checknum_type1": "crc16",
 	"aes_cbc_enbable": false,
 	"aes_cbc_key_send": "",
@@ -112,6 +115,12 @@
 		"",
 		"set:net=WIFI24,12345678,qq.com,1883,user,123456",
 		"set_net"
+	],
+
+	[
+		"",
+		"set:upgrade=Template.bin.sum32[008000c9],008000c9,84800",
+		"set_upgrade"
 	]
 
 ]

+ 5 - 5
websocket_server.py

@@ -13,7 +13,7 @@ class WebServer:
         self.port = port
         self.clients = []
         self.message_base = message_base
- 
+
     async def echo(self, websocket, path):
         self.clients.append(websocket)
         client_ip, client_port = websocket.remote_address
@@ -47,10 +47,10 @@ class WebServer:
                 break
             except Exception as e:
                 print("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
- 
+        print("客户结束:{client_ip}:{client_port}");
+
+    
     def connect(self):
-        print("连接成功!")
-        asyncio.set_event_loop(asyncio.new_event_loop())
         start_server = websockets.serve(self.echo, self.host, self.port)
         asyncio.get_event_loop().run_until_complete(start_server)
         asyncio.get_event_loop().run_forever()
@@ -58,7 +58,7 @@ class WebServer:
     def run(self):
         t = threading.Thread(target=self.connect)
         t.start()
-        print("已启动!")
+        print("ws服务已启动!")
  
  
 if __name__ == '__main__':