#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio from bleak import BleakClient, BleakScanner from bleak.backends.characteristic import BleakGATTCharacteristic import chardet import time import json from inputimeout import inputimeout, TimeoutOccurred import base64 from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad import copy import os import threading import ctypes import inspect import re import base64 from message_base import MessageBase from websocket_server import WebServer import sys import atexit import signal import traceback import logging logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) s_h = logging.StreamHandler(sys.stderr) # formatter = logging.Formatter('%(asctime)s.%(msecs)03d-%(name)s-%(filename)s-[line:%(lineno)d]' # '-%(levelname)s-[日志信息]: %(message)s', # datefmt='%Y-%m-%d,%H:%M:%S') formatter = logging.Formatter('%(asctime)s-[%(lineno)d]' '-%(levelname)s: %(message)s', datefmt='%d %H:%M:%S') s_h.setFormatter(formatter) logger.addHandler(s_h) config_file="config.json" #设备的Characteristic UUID par_notification_characteristic="0000ffe2-0000-1000-8000-00805f9b34fb" #设备的Characteristic UUID(具备写属性Write) par_write_characteristic="0000ffe1-0000-1000-8000-00805f9b34fb" #设备的MAC地址 # par_device_addr="D1:1D:6A:52:CD:F8" par_device_addr="D5:2D:D4:9E:5C:3C" # 密钥(key), 密斯偏移量(iv) CBC模式加密 EV_SOFT_BLE_SEND_CODE = '#MOIF@KHab%DECR$' #//蓝牙发送密钥 EV_SOFT_BLE_RECV_CODE = '$*@AB%DHJqopENC#' #//蓝牙接收密钥 # EV_SOFT_BLE_SEND_CODE = 'XH23456789ABCDEF' #//蓝牙发送密钥 # EV_SOFT_BLE_RECV_CODE = 'XH23456789ABCDEF' #//蓝牙接收密钥 g_run = True asyncio_list = [] thread_list = [] g_config = {} g_ble_mtu = 20 g_ble_client = None g_download_cfg = {"Key":"", "Total":0, "Number":0, "DataCrc16":0} g_recv_count = 0 mb = None ws = None def calculate_time(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() if end_time - start_time >= 0.5 : print("函数 %s 运行时间为 %.3f 秒" % (func.__name__, end_time - start_time)) return result return wrapper @atexit.register def exit_handler(): logger.info("异常退出") # 采用traceback模块查看异常,这个方法会打印出异常代码的行号 exc_type, exc_value, exc_tb = sys.exc_info() logger.info(str(traceback.format_exception(exc_type, exc_value, exc_tb))) def sig_handler(signum, frame): global g_run logger.info('singal: %d' % signum) logger.info("sig退出") g_run = False # 设置事件,通知所有线程退出 exit_event.set() # for asy in asyncio_list: # asy.stop() # for thread in thread_list: # thread.stop() sys.exit(0) def shuncomdacode(data): """ 识别data的编码格式 """ result = chardet.detect(data) print(result['encoding']) return (result['encoding']) def AES_Encrypt(aes_iv, aes_key, plain_text): """ AES encrypt :param plain_text: bytes :param aes_key: bytes :param aes_iv: bytes :return: bytes """ try: pad_data = pad(plain_text, AES.block_size) return AES.new(aes_key, AES.MODE_CBC, aes_iv).encrypt(pad_data) except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) return bytes() def AES_Decrypt(aes_iv, aes_key, plain_text): """ AES decrypt :param plain_text: bytes :param aes_key: bytes, aes_key :param aes_iv: bytes, aes_iv :return: bytes """ try: dec_data = AES.new(aes_key, AES.MODE_CBC, aes_iv).decrypt(plain_text) return unpad(dec_data, AES.block_size) except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) return bytes() def AES_Encrypt2(vi, key, data): enctext = bytes() try: # vi = '0102030405060708' pad = lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16) data = pad(data) # 字符串补位 cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8')) encryptedbytes = cipher.encrypt(data.encode('utf8')) # 加密后得到的是bytes类型的数据 encodestrs = base64.b64encode(encryptedbytes) # 使用Base64进行编码,返回byte字符串 enctext = encodestrs.decode('utf8') # 对byte字符串按utf-8进行解码 except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) return enctext def AES_Decrypt2(vi, key, data): text_decrypted = bytes() try: # vi = '0102030405060708' data = data.encode('utf8') encodebytes = base64.decodebytes(data) # 将加密数据转换位bytes类型数据 cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8')) text_decrypted = cipher.decrypt(encodebytes) unpad = lambda s: s[0:-s[-1]] text_decrypted = unpad(text_decrypted) # 去补位 text_decrypted = text_decrypted.decode('utf8') except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) return text_decrypted def sum_ccitt_16(data): total = sum(data) total &= 0xFFFF return total def crc_ccitt_16(data): crc = 0 for byte in data: crc ^= (byte << 8) for _ in range(8): if crc & 0x8000: crc = (crc << 1) ^ 0x1021 else: crc <<= 1 crc &= 0xFFFF return crc def checknum_16(data): type = g_config["def_cfg"]["checknum_type"] if type == "sum16": return sum_ccitt_16(data) elif type == "crc16": return crc_ccitt_16(data) return 0 def get_aes_key(type): if type == "send": if g_config["def_cfg"]["aes_cbc_key_send"]: return bytes(g_config["def_cfg"]["aes_cbc_key_send"], encoding='utf-8') else: return bytes(EV_SOFT_BLE_SEND_CODE, encoding='utf-8') else: if g_config["def_cfg"]["aes_cbc_key_recv"]: return bytes(g_config["def_cfg"]["aes_cbc_key_recv"], encoding='utf-8') else: return bytes(EV_SOFT_BLE_RECV_CODE, encoding='utf-8') # 初始化一个CRC16校验码计算函数,多项式为0x8005 def ev_packing(data): data = bytes(data) # 加密 if g_config["def_cfg"]["aes_cbc_enbable"]: logger.info("加密前:{}".format(bytes(data))) key = get_aes_key("send") data = AES_Encrypt(bytes(key), bytes(key), bytes(data)) lenght = len(data) # crc16 = crc16_func(data) ## 计算数据的CRC-16校验码 # checknum = crc16(data, 0, len(data)) out = [] # 头 if g_config["def_cfg"]["head_send"] != "": HEAD_SEND = str(g_config["def_cfg"]["head_send"]).encode() out = list(HEAD_SEND) # 长度 out = out + [ (lenght>>8)&0xFF, (lenght>>0)&0xFF] # 校验码 if g_config["def_cfg"]["checknum_type"] != "": checknum = checknum_16(data) out = out + [ (checknum>>8)&0xFF, (checknum>>0)&0xFF] for d in bytes(data): out.append(d) return bytes(out) #监听回调函数,此处为打印消息 # 记录数据 recv_data = [] recv_start_time = 0 GetConfiguration = [] def PACK_LEN(): H = len(g_config["def_cfg"]["head_send"]) L = 2 C = 2 if g_config["def_cfg"]["checknum_type"]!="" else 0 return H+L+C def batch_split_key_value(input_str): # 使用split()方法按';'切割字符串,获取键值对列表 pairs = input_str.split(';') # 初始化一个字典来存储结果 result = {} # 遍历键值对列表 for pair in pairs: # 去除空白字符 pair = pair.strip() # 检查是否为空字符串 if pair: # 使用split()方法按'='切割键值对,获取键和值字符串 key, values_str = pair.split('=', 1) # 限制分割次数为1 # 使用split()方法按','切割值字符串,获取值列表 values = values_str.split(',') # 将键和值列表存储到字典中 result[key] = values return result @calculate_time def recv_handler(data: bytearray, reve=False): mq.add("ble_recv", data) # 记录数据 def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray): global mq mq.add("ble_recv", data) if ws: mq.add("ws_send", data) async def ble_send(client, data): global recv_start_time global g_ble_mtu logger.info('发送:{}'.format(bytes(data).hex())) frame_len = g_ble_mtu #244 #20 #244 if g_config["def_cfg"]["ble_mtu"] > 0: frame_len = g_config["def_cfg"]["ble_mtu"] all_count = len(data) send_count = 0 send_start_time = time.time() try: while send_count=frame_len else all_count-send_count s = data[send_count:send_count+cur_len] await client.write_gatt_char(g_config["def_cfg"]["write_char"], s) await asyncio.sleep(0.050) send_count += cur_len logger.info('发送包:{}/{}'.format(send_count, all_count)) except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) send_end_time = time.time() recv_start_time = time.time() logger.info('发送耗时:%.3fs', send_end_time - send_start_time) # await asyncio.sleep(g_config["def_cfg"]["ble_send_wait"]) #每休眠1秒发送一次 import sys, select def timeoutable_input(clue="",timeout=None): print(clue,end="") i, o, e = select.select([sys.stdin], [], [], timeout) return sys.stdin.readline() if len(i)>0 else None def print_data_list(): global g_config try: for i in range(len(g_config["cmd_list"])): # print(i, g_config["cmd_list"][i][2]) print(i, g_config["cmd_list"][i]["note"]) except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) @calculate_time def send_form_data_list(client, i, data=None): global g_config if data: json_data = data else: if i >= len(g_config["cmd_list"]): return None json_data = g_config["cmd_list"][i] # if len(json_data) < 3: # return # byte_sequence = b'' # if len(json_data[0]) > 4: # if json_data[0][:4] == "hex:": # byte_sequence += bytes.fromhex(json_data[0][4:]) # elif json_data[0][:4] == "str:": # byte_sequence += bytes(json_data[0][4:], 'utf-8') # elif json_data[0][:4] == "bin:": # byte_value = int(json_data[0][4:], 2) # 将二进制字符串转换为整数 # byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串 # byte_sequence += byte_array # if len(json_data[1]) > 4: # if json_data[1][:4] == "hex:": # byte_sequence += bytes.fromhex(json_data[1][4:]) # elif json_data[1][:4] == "str:": # byte_sequence += bytes(json_data[1][4:], 'utf-8') # elif json_data[1][:4] == "bin:": # byte_value = int(json_data[1][4:], 2) # 将二进制字符串转换为整数 # byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串 # byte_sequence += byte_array byte_sequence = json_data logger.info("发送前:{}".format(byte_sequence)) data_str = json.dumps(json_data) # 字符串编码为字节序列 byte_sequence = str.encode(data_str) # 字节序列转换为bytearray类型 byte_array = bytearray(byte_sequence) byte_array = ev_packing(byte_array) send_data = bytes(byte_array) # await ble_send(client, send_data) mq.add("ble_send", send_data) async def task(): global g_config await asyncio.sleep(3) while True: await asyncio.sleep(1) @calculate_time def read_config_call(): global g_config # 读取更新json with open(config_file, "r", encoding="utf-8") as f: send_list_new = json.load(f) if g_config != send_list_new: g_config = send_list_new logger.info("json内容改变,内容如下:") print_data_list() @calculate_time def heart_beat_call(client): global g_config # 心跳发送:维持蓝牙通讯 logger.info("执行心跳发送任务..") send_form_data_list(client, g_config["def_cfg"]["heart_beat_sel"]) def extract_key_value(text): pattern = r"(\w+)=([^;]+)" # 匹配形如 key=value 的字符串 matches = re.findall(pattern, text) for key, value in matches: print(f"Key: {key}, Value: {value}") return matches @calculate_time def input_call(client): global g_config # 输入 userinput = None if g_download_cfg["Number"] == 0: #非批量获取配置的状态下 try: userinput = inputimeout(prompt='请命令序号:', timeout=g_config["def_cfg"]["input_interval"]) except TimeoutOccurred: userinput = None if userinput: # 判断类型: 数字int 字符串str 列表list 元组tuple 字典dict # isinstance(userinput, str) if not userinput.isalpha(): send_form_data_list(client, int(userinput)) elif str(userinput)=='l': # 读取更新json with open(config_file, "r", encoding="utf-8") as f: send_list_new = json.load(f) logger.info("json内容如下:") print_data_list() async def ble_main(): global g_run global g_ble_client global g_config global g_ble_mtu global mq logger.info("快速显示列表, 可输入l") while g_run: logger.info("开始扫描...") #基于MAC地址查找设备 device = await BleakScanner.find_device_by_address( g_config["def_cfg"]["ble_mac"], cb=dict(use_bdaddr=False) #use_bdaddr判断是否是MOC系统 ) if device is None: logger.error("无法找到设备({})".format(g_config["def_cfg"]["ble_mac"])) continue #事件定义 disconnected_event = asyncio.Event() #断开连接事件回调 def disconnected_callback(client): global g_ble_client logger.info("断开回调!") disconnected_event.set() g_ble_client = None logger.info("尝试连接设备({})...".format(g_config["def_cfg"]["ble_mac"])) async with BleakClient(device,disconnected_callback=disconnected_callback) as client: try: g_ble_client = client logger.info("已连接(mtu=%d)", client.mtu_size) g_ble_mtu = client.mtu_size-3 # 是否连接 # if not client.is_connected: # client.connect() # logger.info(f"连接状态: {client.is_connected}") # 是否配对 # paired = await client.pair(protection_level=2) # logger.info(f"配对: {paired}") # 开启通知的接收 await client.start_notify(g_config["def_cfg"]["notif_char"], notification_handler) # 循环发送指令 g_download_cfg["Number"] = 0 while client.is_connected and g_run: ble_data = mq.get("ble_send") if ble_data: await ble_send(client, ble_data) # 开始根据设备即功能处理消息 ws_data = mq.get("ws_recv") if ws_data: await ble_send(client, ws_data) logger.info('断开连接') except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) finally: # try: # if client.is_connected: # # 结束监听 # await client.stop_notify(g_config["def_cfg"]["notif_char"]) # # 断开与蓝牙设备的连接 # await client.disconnect() # except Exception as e: # logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) logger.info("结束") logger.info("结束:{}".format(inspect.currentframe().f_code.co_name)) def test_aes(): key = EV_SOFT_BLE_SEND_CODE data = b'[2, "123456789012", "KeepAlive", {}]' #需要加密的内容 AES_Encrypt(key, key, data) enctext = AES_Encrypt(key, key, data) print(enctext) text_decrypted = AES_Decrypt(key, key, enctext) print(text_decrypted) import array,struct def test_base64(): in_data = b'1234567890\0_+,.//abcdefg' print(len(in_data), in_data) base64_bytes = base64.b64encode(in_data) print(len(base64_bytes),base64_bytes) out = base64.b64decode(base64_bytes) print(len(out),out) def test_recv(): with open(config_file, "r", encoding="utf-8") as f: g_config = json.load(f) print_data_list() str_a = '45 56 3C 00 40 EB AE E3 1F 7B FA 9A 61 86 22 BE 36 0D 0D 09 FE 12 C6 60 27 A8 74 2A 32 08 0D D1 54 B4 4C E1 69 10 4B FC DD 00 23 58 14 74 C1 3F 64 48 9C 88 12 41 17 59 6B 82 F7 90 E7 89 A1 71 80 5A 07 FF 4D 4D 99' recv_handler(bytes.fromhex(str_a), True) str_b = '45 56 3C 00 40 B2 E6 73 BD E7 EE B8 1E D4 DE B7 BF 64 19 80 95 24 94 D2 F3 4E 45 97 99 A3 B5 F7 29 E0 86 1A 60 07 96 77 C4 73 DC C0 4B 2B FB 7E 73 D2 A1 CE 17 26 CF 12 FF 06 DC 83 B7 5E 65 C9 18 0B 87 16 EB 75 0F' recv_handler(bytes.fromhex(str_b), True) def asyncio_handler(): # tasks.append(task()) # loop = asyncio.get_event_loop() # loop.run_until_complete(asyncio.wait([ble_main])) # loop.close() loop = asyncio.new_event_loop() loop.run_until_complete(asyncio.wait([ble_main()])) def ble_handler(dec_str): try: out = json.loads(str(dec_str)) if "key" in out and "value" in out and out["key"] == "fwreq": file = str(out["value"]["file"]) addr = int(out["value"]["addr"]) size = int(out["value"]["size"]) filesize = os.stat(file).st_size with open(file, "rb") as f: f.seek(addr) #位移到最后 SEEK_END(值为2) SEEK_CUR(值为1) SEEK_SET(值为0) datas = f.read(size) f.close() # 发送 fwdata = { "type": "set", "data": [ { "key":"fwdata", "value":{"cur":addr, "total":filesize, "data": base64.b64encode(datas).decode("utf-8")} } ], "note":"set_fwdata" } byte_array = json.dumps(fwdata).encode("utf-8") byte_array = ev_packing(byte_array) send_data = bytes(byte_array) mq.add("ble_send", send_data) logger.info("add event:fwdata" ) except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) def ble_unpack(data): global recv_start_time global g_download_cfg global g_recv_count global recv_data HEAD_SEND = str(g_config["def_cfg"]["head_send"]) HEAD_RECV = str(g_config["def_cfg"]["head_recv"]) # 如果当前包头是'EV>',直接清除之前的数据 if str(bytes(data)).startswith(HEAD_RECV): recv_data.clear() for d in data: recv_data.append(d) if g_config["def_cfg"]["recv_detail_print"]: logger.info("包接收({}),总长度({}):{}".format(len(data), len(recv_data),bytes(data))) if len(recv_data)>g_config["def_cfg"]["ble_pack_size"]: recv_data.clear() return if len(recv_data)' if index < 2: return index -= 2 data_len = (recv_data[index+4]&0xff) | (recv_data[index+3]&0xff)<<8 if len(recv_data) < data_len+index+PACK_LEN(): logger.info("数据不够%d < %d(index(%d),head(%d),len(%d):0x%02x%02x)", len(recv_data), index+PACK_LEN()+data_len, index,PACK_LEN(),data_len, recv_data[index+3], recv_data[index+4]) return if g_config["def_cfg"]["recv_soc_data_print"]: logger.info("接收:{}".format(bytes(recv_data).hex())) if recv_start_time: recv_end_time = time.time() logger.info('接收耗时:%.3fs', recv_end_time - recv_start_time) recv_start_time = 0 soc_data = recv_data[index+PACK_LEN():index+PACK_LEN()+data_len] if g_config["def_cfg"]["checknum_type"]!="": H = len(g_config["def_cfg"]["head_send"]) L = 2 get_crc = (recv_data[index+(H+L+1)]&0xff)| (recv_data[index+(H+L)]&0xff)<<8 check_num = checknum_16(soc_data) if get_crc != check_num: logger.info("校验失败(%d):0x%04X 0x%04X", len(recv_data), get_crc, check_num) recv_data.clear() return logger.info("校验成功") if g_config["def_cfg"]["aes_cbc_enbable"]: key = get_aes_key("recv") #"recv" 'send' dec_data = AES_Decrypt(bytes(key), bytes(key), bytes(soc_data)) logger.info("解密后:{}".format(bytes(dec_data))) else: dec_data = soc_data dec_str = bytes(dec_data).decode('gbk') logger.info("解析:{}".format(dec_str) ) # 特殊处理:升级 # out = batch_split_key_value(dec_str) ble_handler(dec_str) # 清理 recv_data.clear() def ble_recv_handler(): global g_run while g_run: data = mq.get("ble_recv") if data: ble_unpack(data) time.sleep(0.1) logger.info("结束:{}".format(inspect.currentframe().f_code.co_name)) def key_handler(): global g_run input_wait_time = 0 while g_run: if g_config["def_cfg"]["input_interval"]>0 and time.time()-input_wait_time>=g_config["def_cfg"]["input_interval"]: input_wait_time = time.time() input_call(None) time.sleep(0.1) logger.info("结束:{}".format(inspect.currentframe().f_code.co_name)) def main_handler(): global g_run global g_recv_count read_config_time = 0 sec_count = 0 while g_run: if time.time()-sec_count >= 1: sec_count = time.time() if g_recv_count > 0: g_recv_count -= 1 if time.time()-read_config_time >= 5: read_config_time = time.time() read_config_call() if g_ble_client and g_ble_client.is_connected: if g_recv_count == 0: g_recv_count = g_config["def_cfg"]["heart_beat_interval"] heart_beat_call(None) time.sleep(0.1) logger.info("结束:{}".format(inspect.currentframe().f_code.co_name)) #--------------------------------------------------------- if __name__ == "__main__": signal.signal(signal.SIGTERM, sig_handler) # kill pid signal.signal(signal.SIGINT, sig_handler) # ctrl -c logger.info("读取配置...") with open(config_file, "r", encoding="utf-8") as f: g_config = json.load(f) print_data_list() exit_event = threading.Event() asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # test_aes() # test_base64() # test_recv() import websockets mq = MessageBase() thread_list.append(threading.Thread(target=main_handler, args=())) thread_list.append(threading.Thread(target=key_handler, args=())) thread_list.append(threading.Thread(target=ble_recv_handler, args=())) for thread in thread_list: thread.start() # ws = WebServer("0.0.0.0", 11100, mq) # ws_serve = websockets.serve(ws.echo, ws.host, ws.port) # asyncio.get_event_loop().run_until_complete(ws_serve) task = asyncio.get_event_loop().create_task(ble_main()) asyncio.get_event_loop().run_until_complete(task) # 等待所有线程完成 for thread in thread_list: thread.join() sys.exit(0)