ble.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import asyncio
  4. from bleak import BleakClient, BleakScanner
  5. from bleak.backends.characteristic import BleakGATTCharacteristic
  6. import chardet
  7. import time
  8. import json
  9. from inputimeout import inputimeout, TimeoutOccurred
  10. import base64
  11. from Crypto.Cipher import AES
  12. from Crypto.Util.Padding import pad, unpad
  13. import copy
  14. import os
  15. import threading
  16. import ctypes
  17. import inspect
  18. import re
  19. import base64
  20. from message_base import MessageBase
  21. from websocket_server import WebServer
  22. import sys
  23. import atexit
  24. import signal
  25. import traceback
  26. import logging
  27. logger = logging.getLogger(__name__)
  28. logger.setLevel(logging.DEBUG)
  29. s_h = logging.StreamHandler(sys.stderr)
  30. # formatter = logging.Formatter('%(asctime)s.%(msecs)03d-%(name)s-%(filename)s-[line:%(lineno)d]'
  31. # '-%(levelname)s-[日志信息]: %(message)s',
  32. # datefmt='%Y-%m-%d,%H:%M:%S')
  33. formatter = logging.Formatter('%(asctime)s-[%(lineno)d]'
  34. '-%(levelname)s: %(message)s',
  35. datefmt='%d %H:%M:%S')
  36. s_h.setFormatter(formatter)
  37. logger.addHandler(s_h)
  38. config_file="config.json"
  39. #设备的Characteristic UUID
  40. par_notification_characteristic="0000ffe2-0000-1000-8000-00805f9b34fb"
  41. #设备的Characteristic UUID(具备写属性Write)
  42. par_write_characteristic="0000ffe1-0000-1000-8000-00805f9b34fb"
  43. #设备的MAC地址
  44. # par_device_addr="D1:1D:6A:52:CD:F8"
  45. par_device_addr="D5:2D:D4:9E:5C:3C"
  46. # 密钥(key), 密斯偏移量(iv) CBC模式加密
  47. EV_SOFT_BLE_SEND_CODE = '#MOIF@KHab%DECR$' #//蓝牙发送密钥
  48. EV_SOFT_BLE_RECV_CODE = '$*@AB%DHJqopENC#' #//蓝牙接收密钥
  49. # EV_SOFT_BLE_SEND_CODE = 'XH23456789ABCDEF' #//蓝牙发送密钥
  50. # EV_SOFT_BLE_RECV_CODE = 'XH23456789ABCDEF' #//蓝牙接收密钥
  51. g_run = True
  52. asyncio_list = []
  53. thread_list = []
  54. g_config = {}
  55. g_ble_mtu = 20
  56. g_ble_client = None
  57. g_download_cfg = {"Key":"", "Total":0, "Number":0, "DataCrc16":0}
  58. g_recv_count = 0
  59. mb = None
  60. ws = None
  61. def calculate_time(func):
  62. def wrapper(*args, **kwargs):
  63. start_time = time.time()
  64. result = func(*args, **kwargs)
  65. end_time = time.time()
  66. if end_time - start_time >= 0.5 :
  67. print("函数 %s 运行时间为 %.3f 秒" % (func.__name__, end_time - start_time))
  68. return result
  69. return wrapper
  70. @atexit.register
  71. def exit_handler():
  72. logger.info("异常退出")
  73. # 采用traceback模块查看异常,这个方法会打印出异常代码的行号
  74. exc_type, exc_value, exc_tb = sys.exc_info()
  75. logger.info(str(traceback.format_exception(exc_type, exc_value, exc_tb)))
  76. def sig_handler(signum, frame):
  77. global g_run
  78. logger.info('singal: %d' % signum)
  79. logger.info("sig退出")
  80. g_run = False
  81. # 设置事件,通知所有线程退出
  82. exit_event.set()
  83. # for asy in asyncio_list:
  84. # asy.stop()
  85. # for thread in thread_list:
  86. # thread.stop()
  87. sys.exit(0)
  88. def shuncomdacode(data):
  89. """ 识别data的编码格式 """
  90. result = chardet.detect(data)
  91. print(result['encoding'])
  92. return (result['encoding'])
  93. def AES_Encrypt(aes_iv, aes_key, plain_text):
  94. """
  95. AES encrypt
  96. :param plain_text: bytes
  97. :param aes_key: bytes
  98. :param aes_iv: bytes
  99. :return: bytes
  100. """
  101. try:
  102. pad_data = pad(plain_text, AES.block_size)
  103. return AES.new(aes_key, AES.MODE_CBC, aes_iv).encrypt(pad_data)
  104. except Exception as e:
  105. logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
  106. return bytes()
  107. def AES_Decrypt(aes_iv, aes_key, plain_text):
  108. """
  109. AES decrypt
  110. :param plain_text: bytes
  111. :param aes_key: bytes, aes_key
  112. :param aes_iv: bytes, aes_iv
  113. :return: bytes
  114. """
  115. try:
  116. dec_data = AES.new(aes_key, AES.MODE_CBC, aes_iv).decrypt(plain_text)
  117. return unpad(dec_data, AES.block_size)
  118. except Exception as e:
  119. logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
  120. return bytes()
  121. def AES_Encrypt2(vi, key, data):
  122. enctext = bytes()
  123. try:
  124. # vi = '0102030405060708'
  125. pad = lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16)
  126. data = pad(data)
  127. # 字符串补位
  128. cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8'))
  129. encryptedbytes = cipher.encrypt(data.encode('utf8'))
  130. # 加密后得到的是bytes类型的数据
  131. encodestrs = base64.b64encode(encryptedbytes)
  132. # 使用Base64进行编码,返回byte字符串
  133. enctext = encodestrs.decode('utf8')
  134. # 对byte字符串按utf-8进行解码
  135. except Exception as e:
  136. logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
  137. return enctext
  138. def AES_Decrypt2(vi, key, data):
  139. text_decrypted = bytes()
  140. try:
  141. # vi = '0102030405060708'
  142. data = data.encode('utf8')
  143. encodebytes = base64.decodebytes(data)
  144. # 将加密数据转换位bytes类型数据
  145. cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8'))
  146. text_decrypted = cipher.decrypt(encodebytes)
  147. unpad = lambda s: s[0:-s[-1]]
  148. text_decrypted = unpad(text_decrypted)
  149. # 去补位
  150. text_decrypted = text_decrypted.decode('utf8')
  151. except Exception as e:
  152. logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
  153. return text_decrypted
  154. def sum_ccitt_16(data):
  155. total = sum(data)
  156. total &= 0xFFFF
  157. return total
  158. def crc_ccitt_16(data):
  159. crc = 0
  160. for byte in data:
  161. crc ^= (byte << 8)
  162. for _ in range(8):
  163. if crc & 0x8000:
  164. crc = (crc << 1) ^ 0x1021
  165. else:
  166. crc <<= 1
  167. crc &= 0xFFFF
  168. return crc
  169. def checknum_16(data):
  170. type = g_config["def_cfg"]["checknum_type"]
  171. if type == "sum16":
  172. return sum_ccitt_16(data)
  173. elif type == "crc16":
  174. return crc_ccitt_16(data)
  175. return 0
  176. def get_aes_key(type):
  177. if type == "send":
  178. if g_config["def_cfg"]["aes_cbc_key_send"]:
  179. return bytes(g_config["def_cfg"]["aes_cbc_key_send"], encoding='utf-8')
  180. else:
  181. return bytes(EV_SOFT_BLE_SEND_CODE, encoding='utf-8')
  182. else:
  183. if g_config["def_cfg"]["aes_cbc_key_recv"]:
  184. return bytes(g_config["def_cfg"]["aes_cbc_key_recv"], encoding='utf-8')
  185. else:
  186. return bytes(EV_SOFT_BLE_RECV_CODE, encoding='utf-8')
  187. # 初始化一个CRC16校验码计算函数,多项式为0x8005
  188. def ev_packing(data):
  189. data = bytes(data)
  190. # 加密
  191. if g_config["def_cfg"]["aes_cbc_enbable"]:
  192. logger.info("加密前:{}".format(bytes(data)))
  193. key = get_aes_key("send")
  194. data = AES_Encrypt(bytes(key), bytes(key), bytes(data))
  195. lenght = len(data)
  196. # crc16 = crc16_func(data) ## 计算数据的CRC-16校验码
  197. # checknum = crc16(data, 0, len(data))
  198. out = []
  199. # 头
  200. if g_config["def_cfg"]["head_send"] != "":
  201. HEAD_SEND = str(g_config["def_cfg"]["head_send"]).encode()
  202. out = list(HEAD_SEND)
  203. # 长度
  204. out = out + [ (lenght>>8)&0xFF, (lenght>>0)&0xFF]
  205. # 校验码
  206. if g_config["def_cfg"]["checknum_type"] != "":
  207. checknum = checknum_16(data)
  208. out = out + [ (checknum>>8)&0xFF, (checknum>>0)&0xFF]
  209. for d in bytes(data):
  210. out.append(d)
  211. return bytes(out)
  212. #监听回调函数,此处为打印消息
  213. # 记录数据
  214. recv_data = []
  215. recv_start_time = 0
  216. GetConfiguration = []
  217. def PACK_LEN():
  218. H = len(g_config["def_cfg"]["head_send"])
  219. L = 2
  220. C = 2 if g_config["def_cfg"]["checknum_type"]!="" else 0
  221. return H+L+C
  222. def batch_split_key_value(input_str):
  223. # 使用split()方法按';'切割字符串,获取键值对列表
  224. pairs = input_str.split(';')
  225. # 初始化一个字典来存储结果
  226. result = {}
  227. # 遍历键值对列表
  228. for pair in pairs:
  229. # 去除空白字符
  230. pair = pair.strip()
  231. # 检查是否为空字符串
  232. if pair:
  233. # 使用split()方法按'='切割键值对,获取键和值字符串
  234. key, values_str = pair.split('=', 1) # 限制分割次数为1
  235. # 使用split()方法按','切割值字符串,获取值列表
  236. values = values_str.split(',')
  237. # 将键和值列表存储到字典中
  238. result[key] = values
  239. return result
  240. @calculate_time
  241. def recv_handler(data: bytearray, reve=False):
  242. mq.add("ble_recv", data)
  243. # 记录数据
  244. def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
  245. global mq
  246. mq.add("ble_recv", data)
  247. if ws:
  248. mq.add("ws_send", data)
  249. async def ble_send(client, data):
  250. global recv_start_time
  251. global g_ble_mtu
  252. logger.info('发送:{}'.format(bytes(data).hex()))
  253. frame_len = g_ble_mtu #244 #20 #244
  254. if g_config["def_cfg"]["ble_mtu"] > 0:
  255. frame_len = g_config["def_cfg"]["ble_mtu"]
  256. all_count = len(data)
  257. send_count = 0
  258. send_start_time = time.time()
  259. try:
  260. while send_count<all_count:
  261. cur_len = frame_len if all_count-send_count>=frame_len else all_count-send_count
  262. s = data[send_count:send_count+cur_len]
  263. await client.write_gatt_char(g_config["def_cfg"]["write_char"], s)
  264. await asyncio.sleep(0.050)
  265. send_count += cur_len
  266. logger.info('发送包:{}/{}'.format(send_count, all_count))
  267. except Exception as e:
  268. logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
  269. send_end_time = time.time()
  270. recv_start_time = time.time()
  271. logger.info('发送耗时:%.3fs', send_end_time - send_start_time)
  272. # await asyncio.sleep(g_config["def_cfg"]["ble_send_wait"]) #每休眠1秒发送一次
  273. import sys, select
  274. def timeoutable_input(clue="",timeout=None):
  275. print(clue,end="")
  276. i, o, e = select.select([sys.stdin], [], [], timeout)
  277. return sys.stdin.readline() if len(i)>0 else None
  278. def print_data_list():
  279. global g_config
  280. try:
  281. for i in range(len(g_config["cmd_list"])):
  282. # print(i, g_config["cmd_list"][i][2])
  283. print(i, g_config["cmd_list"][i]["note"])
  284. except Exception as e:
  285. logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
  286. @calculate_time
  287. def send_form_data_list(client, i, data=None):
  288. global g_config
  289. if data:
  290. json_data = data
  291. else:
  292. if i >= len(g_config["cmd_list"]):
  293. return None
  294. json_data = g_config["cmd_list"][i]
  295. # if len(json_data) < 3:
  296. # return
  297. # byte_sequence = b''
  298. # if len(json_data[0]) > 4:
  299. # if json_data[0][:4] == "hex:":
  300. # byte_sequence += bytes.fromhex(json_data[0][4:])
  301. # elif json_data[0][:4] == "str:":
  302. # byte_sequence += bytes(json_data[0][4:], 'utf-8')
  303. # elif json_data[0][:4] == "bin:":
  304. # byte_value = int(json_data[0][4:], 2) # 将二进制字符串转换为整数
  305. # byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串
  306. # byte_sequence += byte_array
  307. # if len(json_data[1]) > 4:
  308. # if json_data[1][:4] == "hex:":
  309. # byte_sequence += bytes.fromhex(json_data[1][4:])
  310. # elif json_data[1][:4] == "str:":
  311. # byte_sequence += bytes(json_data[1][4:], 'utf-8')
  312. # elif json_data[1][:4] == "bin:":
  313. # byte_value = int(json_data[1][4:], 2) # 将二进制字符串转换为整数
  314. # byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串
  315. # byte_sequence += byte_array
  316. byte_sequence = json_data
  317. logger.info("发送前:{}".format(byte_sequence))
  318. data_str = json.dumps(json_data)
  319. # 字符串编码为字节序列
  320. byte_sequence = str.encode(data_str)
  321. # 字节序列转换为bytearray类型
  322. byte_array = bytearray(byte_sequence)
  323. byte_array = ev_packing(byte_array)
  324. send_data = bytes(byte_array)
  325. # await ble_send(client, send_data)
  326. mq.add("ble_send", send_data)
  327. async def task():
  328. global g_config
  329. await asyncio.sleep(3)
  330. while True:
  331. await asyncio.sleep(1)
  332. @calculate_time
  333. def read_config_call():
  334. global g_config
  335. # 读取更新json
  336. with open(config_file, "r", encoding="utf-8") as f:
  337. send_list_new = json.load(f)
  338. if g_config != send_list_new:
  339. g_config = send_list_new
  340. logger.info("json内容改变,内容如下:")
  341. print_data_list()
  342. @calculate_time
  343. def heart_beat_call(client):
  344. global g_config
  345. # 心跳发送:维持蓝牙通讯
  346. logger.info("执行心跳发送任务..")
  347. send_form_data_list(client, g_config["def_cfg"]["heart_beat_sel"])
  348. def extract_key_value(text):
  349. pattern = r"(\w+)=([^;]+)" # 匹配形如 key=value 的字符串
  350. matches = re.findall(pattern, text)
  351. for key, value in matches:
  352. print(f"Key: {key}, Value: {value}")
  353. return matches
  354. @calculate_time
  355. def input_call(client):
  356. global g_config
  357. # 输入
  358. userinput = None
  359. if g_download_cfg["Number"] == 0: #非批量获取配置的状态下
  360. try:
  361. userinput = inputimeout(prompt='请命令序号:', timeout=g_config["def_cfg"]["input_interval"])
  362. except TimeoutOccurred:
  363. userinput = None
  364. if userinput:
  365. # 判断类型: 数字int 字符串str 列表list 元组tuple 字典dict
  366. # isinstance(userinput, str)
  367. if not userinput.isalpha():
  368. send_form_data_list(client, int(userinput))
  369. elif str(userinput)=='l':
  370. # 读取更新json
  371. with open(config_file, "r", encoding="utf-8") as f:
  372. send_list_new = json.load(f)
  373. logger.info("json内容如下:")
  374. print_data_list()
  375. async def ble_main():
  376. global g_run
  377. global g_ble_client
  378. global g_config
  379. global g_ble_mtu
  380. global mq
  381. logger.info("快速显示列表, 可输入l")
  382. while g_run:
  383. logger.info("开始扫描...")
  384. #基于MAC地址查找设备
  385. device = await BleakScanner.find_device_by_address(
  386. g_config["def_cfg"]["ble_mac"], cb=dict(use_bdaddr=False) #use_bdaddr判断是否是MOC系统
  387. )
  388. if device is None:
  389. logger.error("无法找到设备({})".format(g_config["def_cfg"]["ble_mac"]))
  390. continue
  391. #事件定义
  392. disconnected_event = asyncio.Event()
  393. #断开连接事件回调
  394. def disconnected_callback(client):
  395. global g_ble_client
  396. logger.info("断开回调!")
  397. disconnected_event.set()
  398. g_ble_client = None
  399. logger.info("尝试连接设备({})...".format(g_config["def_cfg"]["ble_mac"]))
  400. async with BleakClient(device,disconnected_callback=disconnected_callback) as client:
  401. try:
  402. g_ble_client = client
  403. logger.info("已连接(mtu=%d)", client.mtu_size)
  404. g_ble_mtu = client.mtu_size-3
  405. # 是否连接
  406. # if not client.is_connected:
  407. # client.connect()
  408. # logger.info(f"连接状态: {client.is_connected}")
  409. # 是否配对
  410. # paired = await client.pair(protection_level=2)
  411. # logger.info(f"配对: {paired}")
  412. # 开启通知的接收
  413. await client.start_notify(g_config["def_cfg"]["notif_char"], notification_handler)
  414. # 循环发送指令
  415. g_download_cfg["Number"] = 0
  416. while client.is_connected and g_run:
  417. ble_data = mq.get("ble_send")
  418. if ble_data:
  419. await ble_send(client, ble_data)
  420. # 开始根据设备即功能处理消息
  421. ws_data = mq.get("ws_recv")
  422. if ws_data:
  423. await ble_send(client, ws_data)
  424. logger.info('断开连接')
  425. except Exception as e:
  426. logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
  427. finally:
  428. # try:
  429. # if client.is_connected:
  430. # # 结束监听
  431. # await client.stop_notify(g_config["def_cfg"]["notif_char"])
  432. # # 断开与蓝牙设备的连接
  433. # await client.disconnect()
  434. # except Exception as e:
  435. # logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
  436. logger.info("结束")
  437. logger.info("结束:{}".format(inspect.currentframe().f_code.co_name))
  438. def test_aes():
  439. key = EV_SOFT_BLE_SEND_CODE
  440. data = b'[2, "123456789012", "KeepAlive", {}]' #需要加密的内容
  441. AES_Encrypt(key, key, data)
  442. enctext = AES_Encrypt(key, key, data)
  443. print(enctext)
  444. text_decrypted = AES_Decrypt(key, key, enctext)
  445. print(text_decrypted)
  446. import array,struct
  447. def test_base64():
  448. in_data = b'1234567890\0_+,.//abcdefg'
  449. print(len(in_data), in_data)
  450. base64_bytes = base64.b64encode(in_data)
  451. print(len(base64_bytes),base64_bytes)
  452. out = base64.b64decode(base64_bytes)
  453. print(len(out),out)
  454. def test_recv():
  455. with open(config_file, "r", encoding="utf-8") as f:
  456. g_config = json.load(f)
  457. print_data_list()
  458. str_a = '45 56 3C 00 40 EB AE E3 1F 7B FA 9A 61 86 22 BE 36 0D 0D 09 FE 12 C6 60 27 A8 74 2A 32 08 0D D1 54 B4 4C E1 69 10 4B FC DD 00 23 58 14 74 C1 3F 64 48 9C 88 12 41 17 59 6B 82 F7 90 E7 89 A1 71 80 5A 07 FF 4D 4D 99'
  459. recv_handler(bytes.fromhex(str_a), True)
  460. str_b = '45 56 3C 00 40 B2 E6 73 BD E7 EE B8 1E D4 DE B7 BF 64 19 80 95 24 94 D2 F3 4E 45 97 99 A3 B5 F7 29 E0 86 1A 60 07 96 77 C4 73 DC C0 4B 2B FB 7E 73 D2 A1 CE 17 26 CF 12 FF 06 DC 83 B7 5E 65 C9 18 0B 87 16 EB 75 0F'
  461. recv_handler(bytes.fromhex(str_b), True)
  462. def asyncio_handler():
  463. # tasks.append(task())
  464. # loop = asyncio.get_event_loop()
  465. # loop.run_until_complete(asyncio.wait([ble_main]))
  466. # loop.close()
  467. loop = asyncio.new_event_loop()
  468. loop.run_until_complete(asyncio.wait([ble_main()]))
  469. def ble_handler(dec_str):
  470. try:
  471. out = json.loads(str(dec_str))
  472. if "key" in out and "value" in out and out["key"] == "fwreq":
  473. file = str(out["value"]["file"])
  474. addr = int(out["value"]["addr"])
  475. size = int(out["value"]["size"])
  476. filesize = os.stat(file).st_size
  477. with open(file, "rb") as f:
  478. f.seek(addr) #位移到最后 SEEK_END(值为2) SEEK_CUR(值为1) SEEK_SET(值为0)
  479. datas = f.read(size)
  480. f.close()
  481. # 发送
  482. fwdata = {
  483. "type": "set",
  484. "data": [
  485. {
  486. "key":"fwdata",
  487. "value":{"cur":addr, "total":filesize, "data": base64.b64encode(datas).decode("utf-8")}
  488. }
  489. ],
  490. "note":"set_fwdata"
  491. }
  492. byte_array = json.dumps(fwdata).encode("utf-8")
  493. byte_array = ev_packing(byte_array)
  494. send_data = bytes(byte_array)
  495. mq.add("ble_send", send_data)
  496. logger.info("add event:fwdata" )
  497. except Exception as e:
  498. logger.error("Err in {}[{}]:\n {}".format(os.path.basename(e.__traceback__.tb_frame.f_globals["__file__"]), e.__traceback__.tb_lineno, e))
  499. def ble_unpack(data):
  500. global recv_start_time
  501. global g_download_cfg
  502. global g_recv_count
  503. global recv_data
  504. HEAD_SEND = str(g_config["def_cfg"]["head_send"])
  505. HEAD_RECV = str(g_config["def_cfg"]["head_recv"])
  506. # 如果当前包头是'EV>',直接清除之前的数据
  507. if str(bytes(data)).startswith(HEAD_RECV):
  508. recv_data.clear()
  509. for d in data:
  510. recv_data.append(d)
  511. if g_config["def_cfg"]["recv_detail_print"]:
  512. logger.info("包接收({}),总长度({}):{}".format(len(data), len(recv_data),bytes(data)))
  513. if len(recv_data)>g_config["def_cfg"]["ble_pack_size"]:
  514. recv_data.clear()
  515. return
  516. if len(recv_data)<PACK_LEN():
  517. return
  518. g_recv_count = g_config["def_cfg"]["heart_beat_interval"]
  519. recv_data_str = str(bytes(recv_data))
  520. find_head = HEAD_RECV
  521. index = -1
  522. if find_head != "":
  523. index = recv_data_str.find(find_head)#'EV>'
  524. if index < 2:
  525. return
  526. index -= 2
  527. data_len = (recv_data[index+4]&0xff) | (recv_data[index+3]&0xff)<<8
  528. if len(recv_data) < data_len+index+PACK_LEN():
  529. logger.info("数据不够%d < %d(index(%d),head(%d),len(%d):0x%02x%02x)", len(recv_data), index+PACK_LEN()+data_len, index,PACK_LEN(),data_len, recv_data[index+3], recv_data[index+4])
  530. return
  531. if g_config["def_cfg"]["recv_soc_data_print"]:
  532. logger.info("接收:{}".format(bytes(recv_data).hex()))
  533. if recv_start_time:
  534. recv_end_time = time.time()
  535. logger.info('接收耗时:%.3fs', recv_end_time - recv_start_time)
  536. recv_start_time = 0
  537. soc_data = recv_data[index+PACK_LEN():index+PACK_LEN()+data_len]
  538. if g_config["def_cfg"]["checknum_type"]!="":
  539. H = len(g_config["def_cfg"]["head_send"])
  540. L = 2
  541. get_crc = (recv_data[index+(H+L+1)]&0xff)| (recv_data[index+(H+L)]&0xff)<<8
  542. check_num = checknum_16(soc_data)
  543. if get_crc != check_num:
  544. logger.info("校验失败(%d):0x%04X 0x%04X", len(recv_data), get_crc, check_num)
  545. recv_data.clear()
  546. return
  547. logger.info("校验成功")
  548. if g_config["def_cfg"]["aes_cbc_enbable"]:
  549. key = get_aes_key("recv") #"recv" 'send'
  550. dec_data = AES_Decrypt(bytes(key), bytes(key), bytes(soc_data))
  551. logger.info("解密后:{}".format(bytes(dec_data)))
  552. else:
  553. dec_data = soc_data
  554. dec_str = bytes(dec_data).decode('gbk')
  555. logger.info("解析:{}".format(dec_str) )
  556. # 特殊处理:升级
  557. # out = batch_split_key_value(dec_str)
  558. ble_handler(dec_str)
  559. # 清理
  560. recv_data.clear()
  561. def ble_recv_handler():
  562. global g_run
  563. while g_run:
  564. data = mq.get("ble_recv")
  565. if data:
  566. ble_unpack(data)
  567. time.sleep(0.1)
  568. logger.info("结束:{}".format(inspect.currentframe().f_code.co_name))
  569. def key_handler():
  570. global g_run
  571. input_wait_time = 0
  572. while g_run:
  573. if g_config["def_cfg"]["input_interval"]>0 and time.time()-input_wait_time>=g_config["def_cfg"]["input_interval"]:
  574. input_wait_time = time.time()
  575. input_call(None)
  576. time.sleep(0.1)
  577. logger.info("结束:{}".format(inspect.currentframe().f_code.co_name))
  578. def main_handler():
  579. global g_run
  580. global g_recv_count
  581. read_config_time = 0
  582. sec_count = 0
  583. while g_run:
  584. if time.time()-sec_count >= 1:
  585. sec_count = time.time()
  586. if g_recv_count > 0:
  587. g_recv_count -= 1
  588. if time.time()-read_config_time >= 5:
  589. read_config_time = time.time()
  590. read_config_call()
  591. if g_ble_client and g_ble_client.is_connected:
  592. if g_recv_count == 0:
  593. g_recv_count = g_config["def_cfg"]["heart_beat_interval"]
  594. heart_beat_call(None)
  595. time.sleep(0.1)
  596. logger.info("结束:{}".format(inspect.currentframe().f_code.co_name))
  597. #---------------------------------------------------------
  598. if __name__ == "__main__":
  599. signal.signal(signal.SIGTERM, sig_handler) # kill pid
  600. signal.signal(signal.SIGINT, sig_handler) # ctrl -c
  601. logger.info("读取配置...")
  602. with open(config_file, "r", encoding="utf-8") as f:
  603. g_config = json.load(f)
  604. print_data_list()
  605. exit_event = threading.Event()
  606. asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
  607. # test_aes()
  608. # test_base64()
  609. # test_recv()
  610. import websockets
  611. mq = MessageBase()
  612. thread_list.append(threading.Thread(target=main_handler, args=()))
  613. thread_list.append(threading.Thread(target=key_handler, args=()))
  614. thread_list.append(threading.Thread(target=ble_recv_handler, args=()))
  615. for thread in thread_list:
  616. thread.start()
  617. # ws = WebServer("0.0.0.0", 11100, mq)
  618. # ws_serve = websockets.serve(ws.echo, ws.host, ws.port)
  619. # asyncio.get_event_loop().run_until_complete(ws_serve)
  620. task = asyncio.get_event_loop().create_task(ble_main())
  621. asyncio.get_event_loop().run_until_complete(task)
  622. # 等待所有线程完成
  623. for thread in thread_list:
  624. thread.join()
  625. sys.exit(0)