| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- import asyncio
- from bleak import BleakClient, BleakScanner
- from bleak.backends.characteristic import BleakGATTCharacteristic
- import chardet
- import time
- import json
- from inputimeout import inputimeout, TimeoutOccurred
- import base64
- from Crypto.Cipher import AES
- from Crypto.Util.Padding import pad, unpad
- import copy
- from message_base import MessageBase
- from websocket_server import WebServer
- import sys
- import atexit
- import signal
- import traceback
- import logging
- logger = logging.getLogger(__name__)
- logger.setLevel(logging.DEBUG)
- s_h = logging.StreamHandler(sys.stderr)
- # formatter = logging.Formatter('%(asctime)s.%(msecs)03d-%(name)s-%(filename)s-[line:%(lineno)d]'
- # '-%(levelname)s-[日志信息]: %(message)s',
- # datefmt='%Y-%m-%d,%H:%M:%S')
- formatter = logging.Formatter('%(asctime)s-[%(lineno)d]'
- '-%(levelname)s: %(message)s',
- datefmt='%d %H:%M:%S')
- s_h.setFormatter(formatter)
- logger.addHandler(s_h)
-
- config_file="config.json"
- #设备的Characteristic UUID
- par_notification_characteristic="0000ffe2-0000-1000-8000-00805f9b34fb"
- #设备的Characteristic UUID(具备写属性Write)
- par_write_characteristic="0000ffe1-0000-1000-8000-00805f9b34fb"
- #设备的MAC地址
- # par_device_addr="D1:1D:6A:52:CD:F8"
- par_device_addr="D5:2D:D4:9E:5C:3C"
- # 密钥(key), 密斯偏移量(iv) CBC模式加密
- EV_SOFT_BLE_SEND_CODE = '#MOIF@KHab%DECR$' #//蓝牙发送密钥
- EV_SOFT_BLE_RECV_CODE = '$*@AB%DHJqopENC#' #//蓝牙接收密钥
- # EV_SOFT_BLE_SEND_CODE = 'XH23456789ABCDEF' #//蓝牙发送密钥
- # EV_SOFT_BLE_RECV_CODE = 'XH23456789ABCDEF' #//蓝牙接收密钥
- g_config = {}
- g_ble_mtu = 20
- g_ble_client = None
- g_download_cfg = {"Key":"", "Total":0, "Number":0, "DataCrc16":0}
- mb = None
- ws = None
- def calculate_time(func):
- def wrapper(*args, **kwargs):
- start_time = time.time()
- result = func(*args, **kwargs)
- end_time = time.time()
- if end_time - start_time >= 0.5 :
- print("函数 %s 运行时间为 %.3f 秒" % (func.__name__, end_time - start_time))
- return result
- return wrapper
- @atexit.register
- def exit_handler():
- logger.info("异常退出")
- # 采用traceback模块查看异常,这个方法会打印出异常代码的行号
- exc_type, exc_value, exc_tb = sys.exc_info()
- logger.info(str(traceback.format_exception(exc_type, exc_value, exc_tb)))
-
-
- def sig_handler(signum, frame):
- logger.info('catched singal: %d' % signum)
- sys.exit(0)
- def shuncomdacode(data):
- """ 识别data的编码格式 """
- result = chardet.detect(data)
- print(result['encoding'])
- return (result['encoding'])
- def AES_Encrypt(aes_iv, aes_key, plain_text):
- """
- AES encrypt
- :param plain_text: bytes
- :param aes_key: bytes
- :param aes_iv: bytes
- :return: bytes
- """
- try:
- pad_data = pad(plain_text, AES.block_size)
- return AES.new(aes_key, AES.MODE_CBC, aes_iv).encrypt(pad_data)
- except Exception as e:
- logger.error('Error:{}'.format(e))
- return bytes()
- def AES_Decrypt(aes_iv, aes_key, plain_text):
- """
- AES decrypt
- :param plain_text: bytes
- :param aes_key: bytes, aes_key
- :param aes_iv: bytes, aes_iv
- :return: bytes
- """
- try:
- dec_data = AES.new(aes_key, AES.MODE_CBC, aes_iv).decrypt(plain_text)
- return unpad(dec_data, AES.block_size)
- except Exception as e:
- logger.error('Error:{}'.format(e))
- return bytes()
- def AES_Encrypt2(vi, key, data):
- enctext = bytes()
- try:
- # vi = '0102030405060708'
- pad = lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16)
- data = pad(data)
- # 字符串补位
- cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8'))
- encryptedbytes = cipher.encrypt(data.encode('utf8'))
- # 加密后得到的是bytes类型的数据
- encodestrs = base64.b64encode(encryptedbytes)
- # 使用Base64进行编码,返回byte字符串
- enctext = encodestrs.decode('utf8')
- # 对byte字符串按utf-8进行解码
- except Exception as e:
- logger.error('Error:{}'.format(e))
- return enctext
-
- def AES_Decrypt2(vi, key, data):
- text_decrypted = bytes()
- try:
- # vi = '0102030405060708'
- data = data.encode('utf8')
- encodebytes = base64.decodebytes(data)
- # 将加密数据转换位bytes类型数据
- cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8'))
- text_decrypted = cipher.decrypt(encodebytes)
- unpad = lambda s: s[0:-s[-1]]
- text_decrypted = unpad(text_decrypted)
- # 去补位
- text_decrypted = text_decrypted.decode('utf8')
- except Exception as e:
- logger.error('Error:{}'.format(e))
- return text_decrypted
- def sum_ccitt_16(data):
- total = sum(data)
- total &= 0xFFFF
- return total
- def crc_ccitt_16(data):
- crc = 0
- for byte in data:
- crc ^= (byte << 8)
- for _ in range(8):
- if crc & 0x8000:
- crc = (crc << 1) ^ 0x1021
- else:
- crc <<= 1
- crc &= 0xFFFF
- return crc
- def checknum_16(data):
- type = g_config["def_cfg"]["checknum_type"]
- if type == "sum16":
- return sum_ccitt_16(data)
- elif type == "crc16":
- return crc_ccitt_16(data)
- return 0
- def get_aes_key(type):
- if type == "send":
- if g_config["def_cfg"]["aes_cbc_key_send"]:
- return bytes(g_config["def_cfg"]["aes_cbc_key_send"], encoding='utf-8')
- else:
- return bytes(EV_SOFT_BLE_SEND_CODE, encoding='utf-8')
- else:
- if g_config["def_cfg"]["aes_cbc_key_recv"]:
- return bytes(g_config["def_cfg"]["aes_cbc_key_recv"], encoding='utf-8')
- else:
- return bytes(EV_SOFT_BLE_RECV_CODE, encoding='utf-8')
- # 初始化一个CRC16校验码计算函数,多项式为0x8005
- def ev_packing(data):
- data = bytes(data)
- # 加密
- if g_config["def_cfg"]["aes_cbc_enbable"]:
- logger.info("加密前:{}".format(bytes(data)))
- key = get_aes_key("send")
- data = AES_Encrypt(bytes(key), bytes(key), bytes(data))
- lenght = len(data)
- # crc16 = crc16_func(data) ## 计算数据的CRC-16校验码
- # checknum = crc16(data, 0, len(data))
- out = []
- # 头
- if g_config["def_cfg"]["head_send"] != "":
- HEAD_SEND = str(g_config["def_cfg"]["head_send"]).encode()
- out = list(HEAD_SEND)
- # 长度
- out = out + [ (lenght>>8)&0xFF, (lenght>>0)&0xFF]
- # 校验码
- if g_config["def_cfg"]["checknum_type"] != "":
- checknum = checknum_16(data)
- out = out + [ (checknum>>8)&0xFF, (checknum>>0)&0xFF]
- for d in bytes(data):
- out.append(d)
- return bytes(out)
- #监听回调函数,此处为打印消息
- # 记录数据
- recv_data = []
- recv_start_time = 0
- GetConfiguration = []
- def PACK_LEN():
- H = len(g_config["def_cfg"]["head_send"])
- L = 2
- C = 2 if g_config["def_cfg"]["checknum_type"]!="" else 0
- return H+L+C
- @calculate_time
- def recv_handler(data: bytearray, reve=False):
- global recv_start_time
- global g_download_cfg
- if g_config["def_cfg"]["recv_detail_print"]:
- logger.info("包接收:{}".format(bytes(data)))
- for d in data:
- recv_data.append(d)
- if len(recv_data)>1500:
- recv_data.clear()
- return
- if len(recv_data)<PACK_LEN():
- return
- recv_data_str = str(bytes(recv_data))
- HEAD_SEND = str(g_config["def_cfg"]["head_send"])
- HEAD_RECV = str(g_config["def_cfg"]["head_recv"])
- find_head = HEAD_SEND if not reve else HEAD_RECV
- index = -1
- if find_head != "":
- index = recv_data_str.find(find_head)#'EV>'
- if index < 2:
- return
- index -= 2
- data_len = (recv_data[index+4]&0xff) | (recv_data[index+3]&0xff)<<8
- if index+PACK_LEN()+data_len > len(recv_data):
- return
-
- if g_config["def_cfg"]["recv_soc_data_print"]:
- logger.info("接收:{}".format(bytes(recv_data).hex()))
- if recv_start_time:
- recv_end_time = time.time()
- logger.info('接收耗时:%.3fs', recv_end_time - recv_start_time)
- recv_start_time = 0
- soc_data = recv_data[index+PACK_LEN():index+PACK_LEN()+data_len]
- if g_config["def_cfg"]["checknum_type"]!="":
- H = len(g_config["def_cfg"]["head_send"])
- L = 2
- get_crc = (recv_data[index+(H+L+1)]&0xff)| (recv_data[index+(H+L)]&0xff)<<8
- check_num = checknum_16(soc_data)
- if get_crc != check_num:
- logger.info("校验失败(%d):0x%04X 0x%04X", len(recv_data), get_crc, check_num)
- return
- logger.info("校验成功")
- if g_config["def_cfg"]["aes_cbc_enbable"]:
- find_type= "recv" if not reve else 'send'
- key = get_aes_key(find_type) #"recv"
- dec_data = AES_Decrypt(bytes(key), bytes(key), bytes(soc_data))
- logger.info("解密后:{}".format(bytes(dec_data)))
- else:
- dec_data = soc_data
- # 清理
- recv_data.clear()
- def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
- global mq
- recv_handler(data)
- mq.add("ws_send", data)
- async def ble_send(client, data):
- global recv_start_time
- global g_ble_mtu
- frame_len = g_ble_mtu #244 #20 #244
- if g_config["def_cfg"]["ble_mtu"] > 0:
- frame_len = g_config["def_cfg"]["ble_mtu"]
- all_count = len(data)
- send_count = 0
- send_start_time = time.time()
- try:
- while send_count<all_count:
- cur_len = frame_len
- if all_count-send_count<frame_len:
- cur_len = all_count-send_count
- s = data[send_count:send_count+cur_len]
- await client.write_gatt_char(g_config["def_cfg"]["write_char"], s)
- send_count += cur_len
- except Exception as e:
- logger.error('Error:{}'.format(e))
-
- send_end_time = time.time()
- recv_start_time = time.time()
- logger.info('发送耗时:%.3fs', send_end_time - send_start_time)
- await asyncio.sleep(g_config["def_cfg"]["ble_send_wait"]) #每休眠1秒发送一次
-
- import sys, select
- def timeoutable_input(clue="",timeout=None):
- print(clue,end="")
- i, o, e = select.select([sys.stdin], [], [], timeout)
- return sys.stdin.readline() if len(i)>0 else None
- def print_data_list():
- global g_config
- try:
- for i in range(len(g_config["cmd_list"])):
- print(i, g_config["cmd_list"][i][2])
- except Exception as e:
- logger.error('Error:{}'.format(e))
- @calculate_time
- async def send_form_data_list(client, i, data=None):
- global g_config
- if data:
- json_data = data
- else:
- if i >= len(g_config["cmd_list"]):
- return None
- json_data = g_config["cmd_list"][i]
- if len(json_data) < 3:
- return
-
- byte_sequence = b''
- if len(json_data[0]) > 4:
- if json_data[0][:4] == "hex:":
- byte_sequence += bytes.fromhex(json_data[0][4:])
- elif json_data[0][:4] == "str:":
- byte_sequence += bytes(json_data[0][4:], 'utf-8')
- elif json_data[0][:4] == "bin:":
- byte_value = int(json_data[0][4:], 2) # 将二进制字符串转换为整数
- byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串
- byte_sequence += byte_array
- if len(json_data[1]) > 4:
- if json_data[1][:4] == "hex:":
- byte_sequence += bytes.fromhex(json_data[1][4:])
- elif json_data[1][:4] == "str:":
- byte_sequence += bytes(json_data[1][4:], 'utf-8')
- elif json_data[1][:4] == "bin:":
- byte_value = int(json_data[1][4:], 2) # 将二进制字符串转换为整数
- byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串
- byte_sequence += byte_array
- # data_str = json.dumps(json_data)
- # 字符串编码为字节序列
- # byte_sequence = str.encode(data_str)
- # 字节序列转换为bytearray类型
- byte_array = bytearray(byte_sequence)
- byte_array = ev_packing(byte_array)
- send_data = bytes(byte_array)
- logger.info('发送{}:{}'.format(json_data[2], send_data.hex()))
- await ble_send(client, send_data)
- async def task():
- global g_config
- await asyncio.sleep(3)
- while True:
- await asyncio.sleep(1)
- @calculate_time
- def read_config_call():
- global g_config
- # 读取更新json
- with open(config_file, "r", encoding="utf-8") as f:
- send_list_new = json.load(f)
- if g_config != send_list_new:
- g_config = send_list_new
- logger.info("json内容改变,内容如下:")
- print_data_list()
- @calculate_time
- async def heart_beat_call(client):
- global g_config
- # 心跳发送:维持蓝牙通讯
- logger.info("执行心跳发送任务..")
- await send_form_data_list(client, g_config["def_cfg"]["heart_beat_sel"])
- @calculate_time
- async def input_call(client):
- global g_config
- # 输入
- userinput = None
- if g_download_cfg["Number"] == 0: #非批量获取配置的状态下
- try:
- userinput = inputimeout(prompt='请命令序号:', timeout=g_config["def_cfg"]["input_interval"])
- except TimeoutOccurred:
- userinput = None
- if userinput:
- # 判断类型: 数字int 字符串str 列表list 元组tuple 字典dict
- # isinstance(userinput, str)
- if not userinput.isalpha():
- await send_form_data_list(client, int(userinput))
- elif str(userinput)=='l':
- # 读取更新json
- with open(config_file, "r", encoding="utf-8") as f:
- send_list_new = json.load(f)
- logger.info("json内容如下:")
- print_data_list()
- @calculate_time
- async def auto_getcfg_call(client):
- global g_config
- global g_download_cfg
- # 自动发送任务
- try:
- if g_download_cfg["Number"]>0:
- new_one = []
- for i, v in enumerate(g_config["cmd_list"]):
- if v[2] == "GetConfigurationNumber":
- new_one = copy.deepcopy(v) #深拷贝
- break
- if new_one:
- Number = g_download_cfg["Number"]
- Total = g_download_cfg["Total"]
- new_one[3]["Number"] = Number
- await send_form_data_list(client, 0, new_one)
- g_download_cfg["Number"] = Number+1 if Number < Total else 0
- except Exception as e:
- logger.error('Error:{}'.format(e))
- async def main():
- global g_ble_client
- global g_config
- global g_ble_mtu
- global mq
- logger.info("读取配置...")
- with open(config_file, "r", encoding="utf-8") as f:
- g_config = json.load(f)
- print_data_list()
- logger.info("快速显示列表, 可输入l")
- while True:
- logger.info("开始扫描...")
- #基于MAC地址查找设备
- device = await BleakScanner.find_device_by_address(
- g_config["def_cfg"]["ble_mac"], cb=dict(use_bdaddr=False) #use_bdaddr判断是否是MOC系统
- )
- if device is None:
- logger.error("无法找到设备({})".format(g_config["def_cfg"]["ble_mac"]))
- continue
- #事件定义
- disconnected_event = asyncio.Event()
- #断开连接事件回调
- def disconnected_callback(client):
- global g_ble_client
- logger.info("断开回调!")
- disconnected_event.set()
- g_ble_client = None
- logger.info("尝试连接设备({})...".format(g_config["def_cfg"]["ble_mac"]))
- async with BleakClient(device,disconnected_callback=disconnected_callback) as client:
- g_ble_client = client
- logger.info("已连接(mtu=%d)", client.mtu_size)
- g_ble_mtu = client.mtu_size-3
- try:
- await client.start_notify(g_config["def_cfg"]["notif_char"], notification_handler)
- except Exception as e:
- logger.error('Error:{}'.format(e))
- g_download_cfg["Number"] = 0
- read_config_time = 0
- heart_beart_time = 0
- input_wait_time = 0
- while client and client.is_connected:
- if time.time()-read_config_time >= 3:
- read_config_time = time.time()
- read_config_call()
- if g_config["def_cfg"]["auto_send_GetConfigurationNumber"] and g_download_cfg["Number"]>0:
- await auto_getcfg_call(client)
- elif g_config["def_cfg"]["heart_beat_interval"] >0 and time.time()-heart_beart_time>=g_config["def_cfg"]["heart_beat_interval"]:
- heart_beart_time = time.time()
- await heart_beat_call(client)
- elif g_config["def_cfg"]["input_interval"]>0 and time.time()-input_wait_time>=g_config["def_cfg"]["input_interval"]:
- input_wait_time = time.time()
- await input_call(client)
- try:
- # 开始根据设备即功能处理消息
- ws_data = mq.get(device)
- if ws_data:
- ble_send(client, ws_data)
- except Exception as err:
- pass
- time.sleep(0.1)
- def test_aes():
- key = EV_SOFT_BLE_SEND_CODE
- data = b'[2, "123456789012", "KeepAlive", {}]' #需要加密的内容
- AES_Encrypt(key, key, data)
- enctext = AES_Encrypt(key, key, data)
- print(enctext)
- text_decrypted = AES_Decrypt(key, key, enctext)
- print(text_decrypted)
- import array,struct
- def test_base64():
- in_data = b'1234567890\0_+,.//abcdefg'
- print(len(in_data), in_data)
- base64_bytes = base64.b64encode(in_data)
- print(len(base64_bytes),base64_bytes)
- out = base64.b64decode(base64_bytes)
- print(len(out),out)
- def test_recv():
- with open(config_file, "r", encoding="utf-8") as f:
- g_config = json.load(f)
- print_data_list()
- str_a = '45 56 3C 00 40 EB AE E3 1F 7B FA 9A 61 86 22 BE 36 0D 0D 09 FE 12 C6 60 27 A8 74 2A 32 08 0D D1 54 B4 4C E1 69 10 4B FC DD 00 23 58 14 74 C1 3F 64 48 9C 88 12 41 17 59 6B 82 F7 90 E7 89 A1 71 80 5A 07 FF 4D 4D 99'
- recv_handler(bytes.fromhex(str_a), True)
- str_b = '45 56 3C 00 40 B2 E6 73 BD E7 EE B8 1E D4 DE B7 BF 64 19 80 95 24 94 D2 F3 4E 45 97 99 A3 B5 F7 29 E0 86 1A 60 07 96 77 C4 73 DC C0 4B 2B FB 7E 73 D2 A1 CE 17 26 CF 12 FF 06 DC 83 B7 5E 65 C9 18 0B 87 16 EB 75 0F'
- recv_handler(bytes.fromhex(str_b), True)
- def run(tasks = []):
- # tasks.append(task())
- loop = asyncio.get_event_loop()
- loop.run_until_complete(asyncio.wait(tasks))
- loop.close()
- #---------------------------------------------------------
- if __name__ == "__main__":
- signal.signal(signal.SIGTERM, sig_handler) # kill pid
- signal.signal(signal.SIGINT, sig_handler) # ctrl -c
- # test_aes()
- # test_base64()
- # test_recv()
- mq = MessageBase()
- ws = WebServer("0.0.0.0", 11100, mq)
- ws.run()
- run([main(),])
- sys.exit(0)
|