#!/usr/bin/env python # -*- coding: utf-8 -*- import asyncio from bleak import BleakClient, BleakScanner from bleak.backends.characteristic import BleakGATTCharacteristic import chardet import time import json from inputimeout import inputimeout, TimeoutOccurred import base64 from Crypto.Cipher import AES from Crypto.Util.Padding import pad, unpad import copy from message_base import MessageBase from websocket_server import WebServer import sys import atexit import signal import traceback import logging logger = logging.getLogger(__name__) logger.setLevel(logging.DEBUG) s_h = logging.StreamHandler(sys.stderr) # formatter = logging.Formatter('%(asctime)s.%(msecs)03d-%(name)s-%(filename)s-[line:%(lineno)d]' # '-%(levelname)s-[日志信息]: %(message)s', # datefmt='%Y-%m-%d,%H:%M:%S') formatter = logging.Formatter('%(asctime)s-[%(lineno)d]' '-%(levelname)s: %(message)s', datefmt='%d %H:%M:%S') s_h.setFormatter(formatter) logger.addHandler(s_h) config_file="config.json" #设备的Characteristic UUID par_notification_characteristic="0000ffe2-0000-1000-8000-00805f9b34fb" #设备的Characteristic UUID(具备写属性Write) par_write_characteristic="0000ffe1-0000-1000-8000-00805f9b34fb" #设备的MAC地址 # par_device_addr="D1:1D:6A:52:CD:F8" par_device_addr="D5:2D:D4:9E:5C:3C" # 密钥(key), 密斯偏移量(iv) CBC模式加密 EV_SOFT_BLE_SEND_CODE = '#MOIF@KHab%DECR$' #//蓝牙发送密钥 EV_SOFT_BLE_RECV_CODE = '$*@AB%DHJqopENC#' #//蓝牙接收密钥 # EV_SOFT_BLE_SEND_CODE = 'XH23456789ABCDEF' #//蓝牙发送密钥 # EV_SOFT_BLE_RECV_CODE = 'XH23456789ABCDEF' #//蓝牙接收密钥 g_config = {} g_ble_mtu = 20 g_ble_client = None g_download_cfg = {"Key":"", "Total":0, "Number":0, "DataCrc16":0} mb = None ws = None def calculate_time(func): def wrapper(*args, **kwargs): start_time = time.time() result = func(*args, **kwargs) end_time = time.time() if end_time - start_time >= 0.5 : print("函数 %s 运行时间为 %.3f 秒" % (func.__name__, end_time - start_time)) return result return wrapper @atexit.register def exit_handler(): logger.info("异常退出") # 采用traceback模块查看异常,这个方法会打印出异常代码的行号 exc_type, exc_value, exc_tb = sys.exc_info() logger.info(str(traceback.format_exception(exc_type, exc_value, exc_tb))) def sig_handler(signum, frame): logger.info('catched singal: %d' % signum) sys.exit(0) def shuncomdacode(data): """ 识别data的编码格式 """ result = chardet.detect(data) print(result['encoding']) return (result['encoding']) def AES_Encrypt(aes_iv, aes_key, plain_text): """ AES encrypt :param plain_text: bytes :param aes_key: bytes :param aes_iv: bytes :return: bytes """ try: pad_data = pad(plain_text, AES.block_size) return AES.new(aes_key, AES.MODE_CBC, aes_iv).encrypt(pad_data) except Exception as e: logger.error('Error:{}'.format(e)) return bytes() def AES_Decrypt(aes_iv, aes_key, plain_text): """ AES decrypt :param plain_text: bytes :param aes_key: bytes, aes_key :param aes_iv: bytes, aes_iv :return: bytes """ try: dec_data = AES.new(aes_key, AES.MODE_CBC, aes_iv).decrypt(plain_text) return unpad(dec_data, AES.block_size) except Exception as e: logger.error('Error:{}'.format(e)) return bytes() def AES_Encrypt2(vi, key, data): enctext = bytes() try: # vi = '0102030405060708' pad = lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16) data = pad(data) # 字符串补位 cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8')) encryptedbytes = cipher.encrypt(data.encode('utf8')) # 加密后得到的是bytes类型的数据 encodestrs = base64.b64encode(encryptedbytes) # 使用Base64进行编码,返回byte字符串 enctext = encodestrs.decode('utf8') # 对byte字符串按utf-8进行解码 except Exception as e: logger.error('Error:{}'.format(e)) return enctext def AES_Decrypt2(vi, key, data): text_decrypted = bytes() try: # vi = '0102030405060708' data = data.encode('utf8') encodebytes = base64.decodebytes(data) # 将加密数据转换位bytes类型数据 cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8')) text_decrypted = cipher.decrypt(encodebytes) unpad = lambda s: s[0:-s[-1]] text_decrypted = unpad(text_decrypted) # 去补位 text_decrypted = text_decrypted.decode('utf8') except Exception as e: logger.error('Error:{}'.format(e)) return text_decrypted def sum_ccitt_16(data): total = sum(data) total &= 0xFFFF return total def crc_ccitt_16(data): crc = 0 for byte in data: crc ^= (byte << 8) for _ in range(8): if crc & 0x8000: crc = (crc << 1) ^ 0x1021 else: crc <<= 1 crc &= 0xFFFF return crc def checknum_16(data): type = g_config["def_cfg"]["checknum_type"] if type == "sum16": return sum_ccitt_16(data) elif type == "crc16": return crc_ccitt_16(data) return 0 def get_aes_key(type): if type == "send": if g_config["def_cfg"]["aes_cbc_key_send"]: return bytes(g_config["def_cfg"]["aes_cbc_key_send"], encoding='utf-8') else: return bytes(EV_SOFT_BLE_SEND_CODE, encoding='utf-8') else: if g_config["def_cfg"]["aes_cbc_key_recv"]: return bytes(g_config["def_cfg"]["aes_cbc_key_recv"], encoding='utf-8') else: return bytes(EV_SOFT_BLE_RECV_CODE, encoding='utf-8') # 初始化一个CRC16校验码计算函数,多项式为0x8005 def ev_packing(data): data = bytes(data) # 加密 if g_config["def_cfg"]["aes_cbc_enbable"]: logger.info("加密前:{}".format(bytes(data))) key = get_aes_key("send") data = AES_Encrypt(bytes(key), bytes(key), bytes(data)) lenght = len(data) # crc16 = crc16_func(data) ## 计算数据的CRC-16校验码 # checknum = crc16(data, 0, len(data)) out = [] # 头 if g_config["def_cfg"]["head_send"] != "": HEAD_SEND = str(g_config["def_cfg"]["head_send"]).encode() out = list(HEAD_SEND) # 长度 out = out + [ (lenght>>8)&0xFF, (lenght>>0)&0xFF] # 校验码 if g_config["def_cfg"]["checknum_type"] != "": checknum = checknum_16(data) out = out + [ (checknum>>8)&0xFF, (checknum>>0)&0xFF] for d in bytes(data): out.append(d) return bytes(out) #监听回调函数,此处为打印消息 # 记录数据 recv_data = [] recv_start_time = 0 GetConfiguration = [] def PACK_LEN(): H = len(g_config["def_cfg"]["head_send"]) L = 2 C = 2 if g_config["def_cfg"]["checknum_type"]!="" else 0 return H+L+C @calculate_time def recv_handler(data: bytearray, reve=False): global recv_start_time global g_download_cfg if g_config["def_cfg"]["recv_detail_print"]: logger.info("包接收:{}".format(bytes(data))) for d in data: recv_data.append(d) if len(recv_data)>1500: recv_data.clear() return if len(recv_data)' if index < 2: return index -= 2 data_len = (recv_data[index+4]&0xff) | (recv_data[index+3]&0xff)<<8 if index+PACK_LEN()+data_len > len(recv_data): return if g_config["def_cfg"]["recv_soc_data_print"]: logger.info("接收:{}".format(bytes(recv_data).hex())) if recv_start_time: recv_end_time = time.time() logger.info('接收耗时:%.3fs', recv_end_time - recv_start_time) recv_start_time = 0 soc_data = recv_data[index+PACK_LEN():index+PACK_LEN()+data_len] if g_config["def_cfg"]["checknum_type"]!="": H = len(g_config["def_cfg"]["head_send"]) L = 2 get_crc = (recv_data[index+(H+L+1)]&0xff)| (recv_data[index+(H+L)]&0xff)<<8 check_num = checknum_16(soc_data) if get_crc != check_num: logger.info("校验失败(%d):0x%04X 0x%04X", len(recv_data), get_crc, check_num) return logger.info("校验成功") if g_config["def_cfg"]["aes_cbc_enbable"]: find_type= "recv" if not reve else 'send' key = get_aes_key(find_type) #"recv" dec_data = AES_Decrypt(bytes(key), bytes(key), bytes(soc_data)) logger.info("解密后:{}".format(bytes(dec_data))) else: dec_data = soc_data # 清理 recv_data.clear() def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray): global mq recv_handler(data) mq.add("ws_send", data) async def ble_send(client, data): global recv_start_time global g_ble_mtu logger.info('发送:{}'.format(bytes(data).hex())) frame_len = g_ble_mtu #244 #20 #244 if g_config["def_cfg"]["ble_mtu"] > 0: frame_len = g_config["def_cfg"]["ble_mtu"] all_count = len(data) send_count = 0 send_start_time = time.time() try: while send_count0 else None def print_data_list(): global g_config try: for i in range(len(g_config["cmd_list"])): print(i, g_config["cmd_list"][i][2]) except Exception as e: logger.error('Error:{}'.format(e)) @calculate_time async def send_form_data_list(client, i, data=None): global g_config if data: json_data = data else: if i >= len(g_config["cmd_list"]): return None json_data = g_config["cmd_list"][i] if len(json_data) < 3: return byte_sequence = b'' if len(json_data[0]) > 4: if json_data[0][:4] == "hex:": byte_sequence += bytes.fromhex(json_data[0][4:]) elif json_data[0][:4] == "str:": byte_sequence += bytes(json_data[0][4:], 'utf-8') elif json_data[0][:4] == "bin:": byte_value = int(json_data[0][4:], 2) # 将二进制字符串转换为整数 byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串 byte_sequence += byte_array if len(json_data[1]) > 4: if json_data[1][:4] == "hex:": byte_sequence += bytes.fromhex(json_data[1][4:]) elif json_data[1][:4] == "str:": byte_sequence += bytes(json_data[1][4:], 'utf-8') elif json_data[1][:4] == "bin:": byte_value = int(json_data[1][4:], 2) # 将二进制字符串转换为整数 byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串 byte_sequence += byte_array # data_str = json.dumps(json_data) # 字符串编码为字节序列 # byte_sequence = str.encode(data_str) # 字节序列转换为bytearray类型 byte_array = bytearray(byte_sequence) byte_array = ev_packing(byte_array) send_data = bytes(byte_array) await ble_send(client, send_data) async def task(): global g_config await asyncio.sleep(3) while True: await asyncio.sleep(1) @calculate_time def read_config_call(): global g_config # 读取更新json with open(config_file, "r", encoding="utf-8") as f: send_list_new = json.load(f) if g_config != send_list_new: g_config = send_list_new logger.info("json内容改变,内容如下:") print_data_list() @calculate_time async def heart_beat_call(client): global g_config # 心跳发送:维持蓝牙通讯 logger.info("执行心跳发送任务..") await send_form_data_list(client, g_config["def_cfg"]["heart_beat_sel"]) @calculate_time async def input_call(client): global g_config # 输入 userinput = None if g_download_cfg["Number"] == 0: #非批量获取配置的状态下 try: userinput = inputimeout(prompt='请命令序号:', timeout=g_config["def_cfg"]["input_interval"]) except TimeoutOccurred: userinput = None if userinput: # 判断类型: 数字int 字符串str 列表list 元组tuple 字典dict # isinstance(userinput, str) if not userinput.isalpha(): await send_form_data_list(client, int(userinput)) elif str(userinput)=='l': # 读取更新json with open(config_file, "r", encoding="utf-8") as f: send_list_new = json.load(f) logger.info("json内容如下:") print_data_list() @calculate_time async def auto_getcfg_call(client): global g_config global g_download_cfg # 自动发送任务 try: if g_download_cfg["Number"]>0: new_one = [] for i, v in enumerate(g_config["cmd_list"]): if v[2] == "GetConfigurationNumber": new_one = copy.deepcopy(v) #深拷贝 break if new_one: Number = g_download_cfg["Number"] Total = g_download_cfg["Total"] new_one[3]["Number"] = Number await send_form_data_list(client, 0, new_one) g_download_cfg["Number"] = Number+1 if Number < Total else 0 except Exception as e: logger.error('Error:{}'.format(e)) async def main(): global g_ble_client global g_config global g_ble_mtu global mq logger.info("读取配置...") with open(config_file, "r", encoding="utf-8") as f: g_config = json.load(f) print_data_list() logger.info("快速显示列表, 可输入l") while True: logger.info("开始扫描...") #基于MAC地址查找设备 device = await BleakScanner.find_device_by_address( g_config["def_cfg"]["ble_mac"], cb=dict(use_bdaddr=False) #use_bdaddr判断是否是MOC系统 ) if device is None: logger.error("无法找到设备({})".format(g_config["def_cfg"]["ble_mac"])) continue #事件定义 disconnected_event = asyncio.Event() #断开连接事件回调 def disconnected_callback(client): global g_ble_client logger.info("断开回调!") disconnected_event.set() g_ble_client = None logger.info("尝试连接设备({})...".format(g_config["def_cfg"]["ble_mac"])) async with BleakClient(device,disconnected_callback=disconnected_callback) as client: g_ble_client = client logger.info("已连接(mtu=%d)", client.mtu_size) g_ble_mtu = client.mtu_size-3 try: await client.start_notify(g_config["def_cfg"]["notif_char"], notification_handler) except Exception as e: logger.error('Error:{}'.format(e)) g_download_cfg["Number"] = 0 read_config_time = 0 heart_beart_time = 0 input_wait_time = 0 while client and client.is_connected: if time.time()-read_config_time >= 3: read_config_time = time.time() read_config_call() if g_config["def_cfg"]["auto_send_GetConfigurationNumber"] and g_download_cfg["Number"]>0: await auto_getcfg_call(client) elif g_config["def_cfg"]["heart_beat_interval"] >0 and time.time()-heart_beart_time>=g_config["def_cfg"]["heart_beat_interval"]: heart_beart_time = time.time() await heart_beat_call(client) elif g_config["def_cfg"]["input_interval"]>0 and time.time()-input_wait_time>=g_config["def_cfg"]["input_interval"]: input_wait_time = time.time() await input_call(client) try: # 开始根据设备即功能处理消息 ws_data = mq.get("ws_recv") if ws_data: await ble_send(client, ws_data) except Exception as err: pass time.sleep(0.1) def test_aes(): key = EV_SOFT_BLE_SEND_CODE data = b'[2, "123456789012", "KeepAlive", {}]' #需要加密的内容 AES_Encrypt(key, key, data) enctext = AES_Encrypt(key, key, data) print(enctext) text_decrypted = AES_Decrypt(key, key, enctext) print(text_decrypted) import array,struct def test_base64(): in_data = b'1234567890\0_+,.//abcdefg' print(len(in_data), in_data) base64_bytes = base64.b64encode(in_data) print(len(base64_bytes),base64_bytes) out = base64.b64decode(base64_bytes) print(len(out),out) def test_recv(): with open(config_file, "r", encoding="utf-8") as f: g_config = json.load(f) print_data_list() str_a = '45 56 3C 00 40 EB AE E3 1F 7B FA 9A 61 86 22 BE 36 0D 0D 09 FE 12 C6 60 27 A8 74 2A 32 08 0D D1 54 B4 4C E1 69 10 4B FC DD 00 23 58 14 74 C1 3F 64 48 9C 88 12 41 17 59 6B 82 F7 90 E7 89 A1 71 80 5A 07 FF 4D 4D 99' recv_handler(bytes.fromhex(str_a), True) str_b = '45 56 3C 00 40 B2 E6 73 BD E7 EE B8 1E D4 DE B7 BF 64 19 80 95 24 94 D2 F3 4E 45 97 99 A3 B5 F7 29 E0 86 1A 60 07 96 77 C4 73 DC C0 4B 2B FB 7E 73 D2 A1 CE 17 26 CF 12 FF 06 DC 83 B7 5E 65 C9 18 0B 87 16 EB 75 0F' recv_handler(bytes.fromhex(str_b), True) def run(tasks = []): # tasks.append(task()) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) loop.close() #--------------------------------------------------------- if __name__ == "__main__": signal.signal(signal.SIGTERM, sig_handler) # kill pid signal.signal(signal.SIGINT, sig_handler) # ctrl -c # test_aes() # test_base64() # test_recv() mq = MessageBase() ws = WebServer("0.0.0.0", 11100, mq) ws.run() run([main(),]) sys.exit(0)