#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio from bleak import BleakClient, BleakScanner from bleak.backends.characteristic import BleakGATTCharacteristic import chardet import time import json from inputimeout import inputimeout, TimeoutOccurred import base64 from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad import copy import os import threading import ctypes import inspect import re from message_base import MessageBase from websocket_server import WebServer import sys import atexit import signal import traceback import logging logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) s_h = logging.StreamHandler(sys.stderr) # formatter = logging.Formatter('%(asctime)s.%(msecs)03d-%(name)s-%(filename)s-[line:%(lineno)d]' # '-%(levelname)s-[日志信息]: %(message)s', # datefmt='%Y-%m-%d,%H:%M:%S') formatter = logging.Formatter('%(asctime)s-[%(lineno)d]' '-%(levelname)s: %(message)s', datefmt='%d %H:%M:%S') s_h.setFormatter(formatter) logger.addHandler(s_h) config_file="config.json" #设备的Characteristic UUID par_notification_characteristic="0000ffe2-0000-1000-8000-00805f9b34fb" #设备的Characteristic UUID(具备写属性Write) par_write_characteristic="0000ffe1-0000-1000-8000-00805f9b34fb" #设备的MAC地址 # par_device_addr="D1:1D:6A:52:CD:F8" par_device_addr="D5:2D:D4:9E:5C:3C" # 密钥(key), 密斯偏移量(iv) CBC模式加密 EV_SOFT_BLE_SEND_CODE = '#MOIF@KHab%DECR$' #//蓝牙发送密钥 EV_SOFT_BLE_RECV_CODE = '$*@AB%DHJqopENC#' #//蓝牙接收密钥 # EV_SOFT_BLE_SEND_CODE = 'XH23456789ABCDEF' #//蓝牙发送密钥 # EV_SOFT_BLE_RECV_CODE = 'XH23456789ABCDEF' #//蓝牙接收密钥 g_run = True asyncio_list = [] thread_list = [] g_config = {} g_ble_mtu = 20 g_ble_client = None g_download_cfg = {"Key":"", "Total":0, "Number":0, "DataCrc16":0} mb = None ws = None def calculate_time(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() if end_time - start_time >= 0.5 : print("函数 %s 运行时间为 %.3f 秒" % (func.__name__, end_time - start_time)) return result return wrapper @atexit.register def exit_handler(): logger.info("异常退出") # 采用traceback模块查看异常,这个方法会打印出异常代码的行号 exc_type, exc_value, exc_tb = sys.exc_info() logger.info(str(traceback.format_exception(exc_type, exc_value, exc_tb))) def sig_handler(signum, frame): global g_run logger.info('singal: %d' % signum) logger.info("sig退出") g_run = False # 设置事件,通知所有线程退出 exit_event.set() # for asy in asyncio_list: # asy.stop() # for thread in thread_list: # thread.stop() sys.exit(0) def shuncomdacode(data): """ 识别data的编码格式 """ result = chardet.detect(data) print(result['encoding']) return (result['encoding']) def AES_Encrypt(aes_iv, aes_key, plain_text): """ AES encrypt :param plain_text: bytes :param aes_key: bytes :param aes_iv: bytes :return: bytes """ try: pad_data = pad(plain_text, AES.block_size) return AES.new(aes_key, AES.MODE_CBC, aes_iv).encrypt(pad_data) except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) return bytes() def AES_Decrypt(aes_iv, aes_key, plain_text): """ AES decrypt :param plain_text: bytes :param aes_key: bytes, aes_key :param aes_iv: bytes, aes_iv :return: bytes """ try: dec_data = AES.new(aes_key, AES.MODE_CBC, aes_iv).decrypt(plain_text) return unpad(dec_data, AES.block_size) except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) return bytes() def AES_Encrypt2(vi, key, data): enctext = bytes() try: # vi = '0102030405060708' pad = lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16) data = pad(data) # 字符串补位 cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8')) encryptedbytes = cipher.encrypt(data.encode('utf8')) # 加密后得到的是bytes类型的数据 encodestrs = base64.b64encode(encryptedbytes) # 使用Base64进行编码,返回byte字符串 enctext = encodestrs.decode('utf8') # 对byte字符串按utf-8进行解码 except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) return enctext def AES_Decrypt2(vi, key, data): text_decrypted = bytes() try: # vi = '0102030405060708' data = data.encode('utf8') encodebytes = base64.decodebytes(data) # 将加密数据转换位bytes类型数据 cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8')) text_decrypted = cipher.decrypt(encodebytes) unpad = lambda s: s[0:-s[-1]] text_decrypted = unpad(text_decrypted) # 去补位 text_decrypted = text_decrypted.decode('utf8') except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) return text_decrypted def sum_ccitt_16(data): total = sum(data) total &= 0xFFFF return total def crc_ccitt_16(data): crc = 0 for byte in data: crc ^= (byte << 8) for _ in range(8): if crc & 0x8000: crc = (crc << 1) ^ 0x1021 else: crc <<= 1 crc &= 0xFFFF return crc def checknum_16(data): type = g_config["def_cfg"]["checknum_type"] if type == "sum16": return sum_ccitt_16(data) elif type == "crc16": return crc_ccitt_16(data) return 0 def get_aes_key(type): if type == "send": if g_config["def_cfg"]["aes_cbc_key_send"]: return bytes(g_config["def_cfg"]["aes_cbc_key_send"], encoding='utf-8') else: return bytes(EV_SOFT_BLE_SEND_CODE, encoding='utf-8') else: if g_config["def_cfg"]["aes_cbc_key_recv"]: return bytes(g_config["def_cfg"]["aes_cbc_key_recv"], encoding='utf-8') else: return bytes(EV_SOFT_BLE_RECV_CODE, encoding='utf-8') # 初始化一个CRC16校验码计算函数,多项式为0x8005 def ev_packing(data): data = bytes(data) # 加密 if g_config["def_cfg"]["aes_cbc_enbable"]: logger.info("加密前:{}".format(bytes(data))) key = get_aes_key("send") data = AES_Encrypt(bytes(key), bytes(key), bytes(data)) lenght = len(data) # crc16 = crc16_func(data) ## 计算数据的CRC-16校验码 # checknum = crc16(data, 0, len(data)) out = [] # 头 if g_config["def_cfg"]["head_send"] != "": HEAD_SEND = str(g_config["def_cfg"]["head_send"]).encode() out = list(HEAD_SEND) # 长度 out = out + [ (lenght>>8)&0xFF, (lenght>>0)&0xFF] # 校验码 if g_config["def_cfg"]["checknum_type"] != "": checknum = checknum_16(data) out = out + [ (checknum>>8)&0xFF, (checknum>>0)&0xFF] for d in bytes(data): out.append(d) return bytes(out) #监听回调函数,此处为打印消息 # 记录数据 recv_data = [] recv_start_time = 0 GetConfiguration = [] def PACK_LEN(): H = len(g_config["def_cfg"]["head_send"]) L = 2 C = 2 if g_config["def_cfg"]["checknum_type"]!="" else 0 return H+L+C def batch_split_key_value(input_str): # 正则表达式匹配key=value格式,支持多个键值对 pattern = re.compile(r'(\w+)=([\w.]+)') result = {} for match in pattern.finditer(input_str): key, value = match.groups() result[key] = value return result @calculate_time def recv_handler(data: bytearray, reve=False): mq.add("ble_recv", data) def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray): global mq # recv_handler(data) mq.add("ble_recv", data) mq.add("ws_send", data) async def ble_send(client, data): global recv_start_time global g_ble_mtu logger.info('发送:{}'.format(bytes(data).hex())) frame_len = g_ble_mtu #244 #20 #244 if g_config["def_cfg"]["ble_mtu"] > 0: frame_len = g_config["def_cfg"]["ble_mtu"] all_count = len(data) send_count = 0 send_start_time = time.time() try: while send_count0 else None def print_data_list(): global g_config try: for i in range(len(g_config["cmd_list"])): print(i, g_config["cmd_list"][i][2]) except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) @calculate_time def send_form_data_list(client, i, data=None): global g_config if data: json_data = data else: if i >= len(g_config["cmd_list"]): return None json_data = g_config["cmd_list"][i] if len(json_data) < 3: return byte_sequence = b'' if len(json_data[0]) > 4: if json_data[0][:4] == "hex:": byte_sequence += bytes.fromhex(json_data[0][4:]) elif json_data[0][:4] == "str:": byte_sequence += bytes(json_data[0][4:], 'utf-8') elif json_data[0][:4] == "bin:": byte_value = int(json_data[0][4:], 2) # 将二进制字符串转换为整数 byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串 byte_sequence += byte_array if len(json_data[1]) > 4: if json_data[1][:4] == "hex:": byte_sequence += bytes.fromhex(json_data[1][4:]) elif json_data[1][:4] == "str:": byte_sequence += bytes(json_data[1][4:], 'utf-8') elif json_data[1][:4] == "bin:": byte_value = int(json_data[1][4:], 2) # 将二进制字符串转换为整数 byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串 byte_sequence += byte_array # data_str = json.dumps(json_data) # 字符串编码为字节序列 # byte_sequence = str.encode(data_str) # 字节序列转换为bytearray类型 byte_array = bytearray(byte_sequence) byte_array = ev_packing(byte_array) send_data = bytes(byte_array) # await ble_send(client, send_data) mq.add("ble_send", send_data) async def task(): global g_config await asyncio.sleep(3) while True: await asyncio.sleep(1) @calculate_time def read_config_call(): global g_config # 读取更新json with open(config_file, "r", encoding="utf-8") as f: send_list_new = json.load(f) if g_config != send_list_new: g_config = send_list_new logger.info("json内容改变,内容如下:") print_data_list() @calculate_time def heart_beat_call(client): global g_config # 心跳发送:维持蓝牙通讯 logger.info("执行心跳发送任务..") send_form_data_list(client, g_config["def_cfg"]["heart_beat_sel"]) def extract_key_value(text): pattern = r"(\w+)=([^;]+)" # 匹配形如 key=value 的字符串 matches = re.findall(pattern, text) for key, value in matches: print(f"Key: {key}, Value: {value}") return matches @calculate_time def input_call(client): global g_config # 输入 userinput = None if g_download_cfg["Number"] == 0: #非批量获取配置的状态下 try: userinput = inputimeout(prompt='请命令序号:', timeout=g_config["def_cfg"]["input_interval"]) except TimeoutOccurred: userinput = None if userinput: # 判断类型: 数字int 字符串str 列表list 元组tuple 字典dict # isinstance(userinput, str) if not userinput.isalpha(): send_form_data_list(client, int(userinput)) elif str(userinput)=='l': # 读取更新json with open(config_file, "r", encoding="utf-8") as f: send_list_new = json.load(f) logger.info("json内容如下:") print_data_list() @calculate_time def auto_getcfg_call(client): global g_config global g_download_cfg # 自动发送任务 try: if g_download_cfg["Number"]>0: new_one = [] for i, v in enumerate(g_config["cmd_list"]): if v[2] == "GetConfigurationNumber": new_one = copy.deepcopy(v) #深拷贝 break if new_one: Number = g_download_cfg["Number"] Total = g_download_cfg["Total"] new_one[3]["Number"] = Number send_form_data_list(client, 0, new_one) g_download_cfg["Number"] = Number+1 if Number < Total else 0 except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) async def ble_main(): global g_run global g_ble_client global g_config global g_ble_mtu global mq logger.info("快速显示列表, 可输入l") while g_run: logger.info("开始扫描...") #基于MAC地址查找设备 device = await BleakScanner.find_device_by_address( g_config["def_cfg"]["ble_mac"], cb=dict(use_bdaddr=False) #use_bdaddr判断是否是MOC系统 ) if device is None: logger.error("无法找到设备({})".format(g_config["def_cfg"]["ble_mac"])) continue #事件定义 disconnected_event = asyncio.Event() #断开连接事件回调 def disconnected_callback(client): global g_ble_client logger.info("断开回调!") disconnected_event.set() g_ble_client = None logger.info("尝试连接设备({})...".format(g_config["def_cfg"]["ble_mac"])) async with BleakClient(device,disconnected_callback=disconnected_callback) as client: g_ble_client = client logger.info("已连接(mtu=%d)", client.mtu_size) g_ble_mtu = client.mtu_size-3 try: await client.start_notify(g_config["def_cfg"]["notif_char"], notification_handler) except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) g_download_cfg["Number"] = 0 while client and client.is_connected and g_run: try: ble_data = mq.get("ble_send") if ble_data: await ble_send(client, ble_data) # 开始根据设备即功能处理消息 ws_data = mq.get("ws_recv") if ws_data: await ble_send(client, ws_data) except Exception as e: logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e)) time.sleep(0.01) logger.info("结束:{}".format(inspect.currentframe().f_code.co_name)) def test_aes(): key = EV_SOFT_BLE_SEND_CODE data = b'[2, "123456789012", "KeepAlive", {}]' #需要加密的内容 AES_Encrypt(key, key, data) enctext = AES_Encrypt(key, key, data) print(enctext) text_decrypted = AES_Decrypt(key, key, enctext) print(text_decrypted) import array,struct def test_base64(): in_data = b'1234567890\0_+,.//abcdefg' print(len(in_data), in_data) base64_bytes = base64.b64encode(in_data) print(len(base64_bytes),base64_bytes) out = base64.b64decode(base64_bytes) print(len(out),out) def test_recv(): with open(config_file, "r", encoding="utf-8") as f: g_config = json.load(f) print_data_list() str_a = '45 56 3C 00 40 EB AE E3 1F 7B FA 9A 61 86 22 BE 36 0D 0D 09 FE 12 C6 60 27 A8 74 2A 32 08 0D D1 54 B4 4C E1 69 10 4B FC DD 00 23 58 14 74 C1 3F 64 48 9C 88 12 41 17 59 6B 82 F7 90 E7 89 A1 71 80 5A 07 FF 4D 4D 99' recv_handler(bytes.fromhex(str_a), True) str_b = '45 56 3C 00 40 B2 E6 73 BD E7 EE B8 1E D4 DE B7 BF 64 19 80 95 24 94 D2 F3 4E 45 97 99 A3 B5 F7 29 E0 86 1A 60 07 96 77 C4 73 DC C0 4B 2B FB 7E 73 D2 A1 CE 17 26 CF 12 FF 06 DC 83 B7 5E 65 C9 18 0B 87 16 EB 75 0F' recv_handler(bytes.fromhex(str_b), True) def asyncio_handler(): # tasks.append(task()) # loop = asyncio.get_event_loop() # loop.run_until_complete(asyncio.wait([ble_main])) # loop.close() loop = asyncio.new_event_loop() loop.run_until_complete(asyncio.wait([ble_main()])) def ble_recv_handler(): global g_run global recv_start_time global g_download_cfg while g_run: data = mq.get("ble_recv") if not data: continue if g_config["def_cfg"]["recv_detail_print"]: logger.info("包接收:{}".format(bytes(data))) for d in data: recv_data.append(d) if len(recv_data)>1500: recv_data.clear() return if len(recv_data)' if index < 2: return index -= 2 data_len = (recv_data[index+4]&0xff) | (recv_data[index+3]&0xff)<<8 if index+PACK_LEN()+data_len > len(recv_data): return if g_config["def_cfg"]["recv_soc_data_print"]: logger.info("接收:{}".format(bytes(recv_data).hex())) if recv_start_time: recv_end_time = time.time() logger.info('接收耗时:%.3fs', recv_end_time - recv_start_time) recv_start_time = 0 soc_data = recv_data[index+PACK_LEN():index+PACK_LEN()+data_len] if g_config["def_cfg"]["checknum_type"]!="": H = len(g_config["def_cfg"]["head_send"]) L = 2 get_crc = (recv_data[index+(H+L+1)]&0xff)| (recv_data[index+(H+L)]&0xff)<<8 check_num = checknum_16(soc_data) if get_crc != check_num: logger.info("校验失败(%d):0x%04X 0x%04X", len(recv_data), get_crc, check_num) return logger.info("校验成功") if g_config["def_cfg"]["aes_cbc_enbable"]: find_type= "recv" if not reve else 'send' key = get_aes_key(find_type) #"recv" dec_data = AES_Decrypt(bytes(key), bytes(key), bytes(soc_data)) logger.info("解密后:{}".format(bytes(dec_data))) else: dec_data = soc_data # 特殊处理:升级 out = batch_split_key_value(dec_data) for key in out: if key == "upgrade_req": values = out[key].split(",") if len(values) == 2: file = values[0] page = values[1] with open(file, "rb", encoding="utf-8") as f: f.seek(page*(1024+4)) #位移到最后 SEEK_END(值为2) SEEK_CUR(值为1) SEEK_SET(值为0) datas = f.read(1024+4) f.close() # 发送 byte_array = bytearray("set:upgrade_data=")+bytearray(datas) byte_array = ev_packing(byte_array) send_data = bytes(byte_array) mq.add("ble_send", send_data) # 清理 recv_data.clear() time.sleep(0.1) logger.info("结束:{}".format(inspect.currentframe().f_code.co_name)) def main_key_handler(): global g_run read_config_time = 0 heart_beart_time = 0 input_wait_time = 0 while g_run: if time.time()-read_config_time >= 3: read_config_time = time.time() read_config_call() if g_config["def_cfg"]["auto_send_GetConfigurationNumber"] and g_download_cfg["Number"]>0: auto_getcfg_call(None) elif g_config["def_cfg"]["heart_beat_interval"] >0 and time.time()-heart_beart_time>=g_config["def_cfg"]["heart_beat_interval"]: heart_beart_time = time.time() heart_beat_call(None) elif g_config["def_cfg"]["input_interval"]>0 and time.time()-input_wait_time>=g_config["def_cfg"]["input_interval"]: input_wait_time = time.time() input_call(None) time.sleep(0.1) logger.info("结束:{}".format(inspect.currentframe().f_code.co_name)) #--------------------------------------------------------- if __name__ == "__main__": signal.signal(signal.SIGTERM, sig_handler) # kill pid signal.signal(signal.SIGINT, sig_handler) # ctrl -c logger.info("读取配置...") with open(config_file, "r", encoding="utf-8") as f: g_config = json.load(f) print_data_list() exit_event = threading.Event() asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) # test_aes() # test_base64() # test_recv() import websockets mq = MessageBase() ws = WebServer("0.0.0.0", 11100, mq) thread_list.append(threading.Thread(target=main_key_handler, args=())) thread_list.append(threading.Thread(target=ble_recv_handler, args=())) for thread in thread_list: thread.start() ws_serve = websockets.serve(ws.echo, ws.host, ws.port) asyncio.get_event_loop().run_until_complete(ws_serve) task = asyncio.get_event_loop().create_task(ble_main()) asyncio.get_event_loop().run_until_complete(task) # 等待所有线程完成 for thread in thread_list: thread.join() sys.exit(0)