ble.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import asyncio
  4. from bleak import BleakClient, BleakScanner
  5. from bleak.backends.characteristic import BleakGATTCharacteristic
  6. import chardet
  7. import time
  8. import json
  9. from inputimeout import inputimeout, TimeoutOccurred
  10. import base64
  11. from Crypto.Cipher import AES
  12. from Crypto.Util.Padding import pad, unpad
  13. import copy
  14. from message_base import MessageBase
  15. from websocket_server import WebServer
  16. import sys
  17. import atexit
  18. import signal
  19. import traceback
  20. import logging
  21. logger = logging.getLogger(__name__)
  22. logger.setLevel(logging.DEBUG)
  23. s_h = logging.StreamHandler(sys.stderr)
  24. # formatter = logging.Formatter('%(asctime)s.%(msecs)03d-%(name)s-%(filename)s-[line:%(lineno)d]'
  25. # '-%(levelname)s-[日志信息]: %(message)s',
  26. # datefmt='%Y-%m-%d,%H:%M:%S')
  27. formatter = logging.Formatter('%(asctime)s-[%(lineno)d]'
  28. '-%(levelname)s: %(message)s',
  29. datefmt='%d %H:%M:%S')
  30. s_h.setFormatter(formatter)
  31. logger.addHandler(s_h)
  32. config_file="config.json"
  33. #设备的Characteristic UUID
  34. par_notification_characteristic="0000ffe2-0000-1000-8000-00805f9b34fb"
  35. #设备的Characteristic UUID(具备写属性Write)
  36. par_write_characteristic="0000ffe1-0000-1000-8000-00805f9b34fb"
  37. #设备的MAC地址
  38. # par_device_addr="D1:1D:6A:52:CD:F8"
  39. par_device_addr="D5:2D:D4:9E:5C:3C"
  40. # 密钥(key), 密斯偏移量(iv) CBC模式加密
  41. EV_SOFT_BLE_SEND_CODE = '#MOIF@KHab%DECR$' #//蓝牙发送密钥
  42. EV_SOFT_BLE_RECV_CODE = '$*@AB%DHJqopENC#' #//蓝牙接收密钥
  43. # EV_SOFT_BLE_SEND_CODE = 'XH23456789ABCDEF' #//蓝牙发送密钥
  44. # EV_SOFT_BLE_RECV_CODE = 'XH23456789ABCDEF' #//蓝牙接收密钥
  45. g_config = {}
  46. g_ble_mtu = 20
  47. g_ble_client = None
  48. g_download_cfg = {"Key":"", "Total":0, "Number":0, "DataCrc16":0}
  49. mb = None
  50. ws = None
  51. def calculate_time(func):
  52. def wrapper(*args, **kwargs):
  53. start_time = time.time()
  54. result = func(*args, **kwargs)
  55. end_time = time.time()
  56. if end_time - start_time >= 0.5 :
  57. print("函数 %s 运行时间为 %.3f 秒" % (func.__name__, end_time - start_time))
  58. return result
  59. return wrapper
  60. @atexit.register
  61. def exit_handler():
  62. logger.info("异常退出")
  63. # 采用traceback模块查看异常,这个方法会打印出异常代码的行号
  64. exc_type, exc_value, exc_tb = sys.exc_info()
  65. logger.info(str(traceback.format_exception(exc_type, exc_value, exc_tb)))
  66. def sig_handler(signum, frame):
  67. logger.info('catched singal: %d' % signum)
  68. sys.exit(0)
  69. def shuncomdacode(data):
  70. """ 识别data的编码格式 """
  71. result = chardet.detect(data)
  72. print(result['encoding'])
  73. return (result['encoding'])
  74. def AES_Encrypt(aes_iv, aes_key, plain_text):
  75. """
  76. AES encrypt
  77. :param plain_text: bytes
  78. :param aes_key: bytes
  79. :param aes_iv: bytes
  80. :return: bytes
  81. """
  82. try:
  83. pad_data = pad(plain_text, AES.block_size)
  84. return AES.new(aes_key, AES.MODE_CBC, aes_iv).encrypt(pad_data)
  85. except Exception as e:
  86. logger.error('Error:{}'.format(e))
  87. return bytes()
  88. def AES_Decrypt(aes_iv, aes_key, plain_text):
  89. """
  90. AES decrypt
  91. :param plain_text: bytes
  92. :param aes_key: bytes, aes_key
  93. :param aes_iv: bytes, aes_iv
  94. :return: bytes
  95. """
  96. try:
  97. dec_data = AES.new(aes_key, AES.MODE_CBC, aes_iv).decrypt(plain_text)
  98. return unpad(dec_data, AES.block_size)
  99. except Exception as e:
  100. logger.error('Error:{}'.format(e))
  101. return bytes()
  102. def AES_Encrypt2(vi, key, data):
  103. enctext = bytes()
  104. try:
  105. # vi = '0102030405060708'
  106. pad = lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16)
  107. data = pad(data)
  108. # 字符串补位
  109. cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8'))
  110. encryptedbytes = cipher.encrypt(data.encode('utf8'))
  111. # 加密后得到的是bytes类型的数据
  112. encodestrs = base64.b64encode(encryptedbytes)
  113. # 使用Base64进行编码,返回byte字符串
  114. enctext = encodestrs.decode('utf8')
  115. # 对byte字符串按utf-8进行解码
  116. except Exception as e:
  117. logger.error('Error:{}'.format(e))
  118. return enctext
  119. def AES_Decrypt2(vi, key, data):
  120. text_decrypted = bytes()
  121. try:
  122. # vi = '0102030405060708'
  123. data = data.encode('utf8')
  124. encodebytes = base64.decodebytes(data)
  125. # 将加密数据转换位bytes类型数据
  126. cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8'))
  127. text_decrypted = cipher.decrypt(encodebytes)
  128. unpad = lambda s: s[0:-s[-1]]
  129. text_decrypted = unpad(text_decrypted)
  130. # 去补位
  131. text_decrypted = text_decrypted.decode('utf8')
  132. except Exception as e:
  133. logger.error('Error:{}'.format(e))
  134. return text_decrypted
  135. def sum_ccitt_16(data):
  136. total = sum(data)
  137. total &= 0xFFFF
  138. return total
  139. def crc_ccitt_16(data):
  140. crc = 0
  141. for byte in data:
  142. crc ^= (byte << 8)
  143. for _ in range(8):
  144. if crc & 0x8000:
  145. crc = (crc << 1) ^ 0x1021
  146. else:
  147. crc <<= 1
  148. crc &= 0xFFFF
  149. return crc
  150. def checknum_16(data):
  151. type = g_config["def_cfg"]["checknum_type"]
  152. if type == "sum16":
  153. return sum_ccitt_16(data)
  154. elif type == "crc16":
  155. return crc_ccitt_16(data)
  156. return 0
  157. def get_aes_key(type):
  158. if type == "send":
  159. if g_config["def_cfg"]["aes_cbc_key_send"]:
  160. return bytes(g_config["def_cfg"]["aes_cbc_key_send"], encoding='utf-8')
  161. else:
  162. return bytes(EV_SOFT_BLE_SEND_CODE, encoding='utf-8')
  163. else:
  164. if g_config["def_cfg"]["aes_cbc_key_recv"]:
  165. return bytes(g_config["def_cfg"]["aes_cbc_key_recv"], encoding='utf-8')
  166. else:
  167. return bytes(EV_SOFT_BLE_RECV_CODE, encoding='utf-8')
  168. # 初始化一个CRC16校验码计算函数,多项式为0x8005
  169. def ev_packing(data):
  170. data = bytes(data)
  171. # 加密
  172. if g_config["def_cfg"]["aes_cbc_enbable"]:
  173. logger.info("加密前:{}".format(bytes(data)))
  174. key = get_aes_key("send")
  175. data = AES_Encrypt(bytes(key), bytes(key), bytes(data))
  176. lenght = len(data)
  177. # crc16 = crc16_func(data) ## 计算数据的CRC-16校验码
  178. # checknum = crc16(data, 0, len(data))
  179. out = []
  180. # 头
  181. if g_config["def_cfg"]["head_send"] != "":
  182. HEAD_SEND = str(g_config["def_cfg"]["head_send"]).encode()
  183. out = list(HEAD_SEND)
  184. # 长度
  185. out = out + [ (lenght>>8)&0xFF, (lenght>>0)&0xFF]
  186. # 校验码
  187. if g_config["def_cfg"]["checknum_type"] != "":
  188. checknum = checknum_16(data)
  189. out = out + [ (checknum>>8)&0xFF, (checknum>>0)&0xFF]
  190. for d in bytes(data):
  191. out.append(d)
  192. return bytes(out)
  193. #监听回调函数,此处为打印消息
  194. # 记录数据
  195. recv_data = []
  196. recv_start_time = 0
  197. GetConfiguration = []
  198. def PACK_LEN():
  199. H = len(g_config["def_cfg"]["head_send"])
  200. L = 2
  201. C = 2 if g_config["def_cfg"]["checknum_type"]!="" else 0
  202. return H+L+C
  203. @calculate_time
  204. def recv_handler(data: bytearray, reve=False):
  205. global recv_start_time
  206. global g_download_cfg
  207. if g_config["def_cfg"]["recv_detail_print"]:
  208. logger.info("包接收:{}".format(bytes(data)))
  209. for d in data:
  210. recv_data.append(d)
  211. if len(recv_data)>1500:
  212. recv_data.clear()
  213. return
  214. if len(recv_data)<PACK_LEN():
  215. return
  216. recv_data_str = str(bytes(recv_data))
  217. HEAD_SEND = str(g_config["def_cfg"]["head_send"])
  218. HEAD_RECV = str(g_config["def_cfg"]["head_recv"])
  219. find_head = HEAD_SEND if not reve else HEAD_RECV
  220. index = -1
  221. if find_head != "":
  222. index = recv_data_str.find(find_head)#'EV>'
  223. if index < 2:
  224. return
  225. index -= 2
  226. data_len = (recv_data[index+4]&0xff) | (recv_data[index+3]&0xff)<<8
  227. if index+PACK_LEN()+data_len > len(recv_data):
  228. return
  229. if g_config["def_cfg"]["recv_soc_data_print"]:
  230. logger.info("接收:{}".format(bytes(recv_data).hex()))
  231. if recv_start_time:
  232. recv_end_time = time.time()
  233. logger.info('接收耗时:%.3fs', recv_end_time - recv_start_time)
  234. recv_start_time = 0
  235. soc_data = recv_data[index+PACK_LEN():index+PACK_LEN()+data_len]
  236. if g_config["def_cfg"]["checknum_type"]!="":
  237. H = len(g_config["def_cfg"]["head_send"])
  238. L = 2
  239. get_crc = (recv_data[index+(H+L+1)]&0xff)| (recv_data[index+(H+L)]&0xff)<<8
  240. check_num = checknum_16(soc_data)
  241. if get_crc != check_num:
  242. logger.info("校验失败(%d):0x%04X 0x%04X", len(recv_data), get_crc, check_num)
  243. return
  244. logger.info("校验成功")
  245. if g_config["def_cfg"]["aes_cbc_enbable"]:
  246. find_type= "recv" if not reve else 'send'
  247. key = get_aes_key(find_type) #"recv"
  248. dec_data = AES_Decrypt(bytes(key), bytes(key), bytes(soc_data))
  249. logger.info("解密后:{}".format(bytes(dec_data)))
  250. else:
  251. dec_data = soc_data
  252. # 清理
  253. recv_data.clear()
  254. def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
  255. global mq
  256. recv_handler(data)
  257. mq.add("ws_send", data)
  258. async def ble_send(client, data):
  259. global recv_start_time
  260. global g_ble_mtu
  261. frame_len = g_ble_mtu #244 #20 #244
  262. if g_config["def_cfg"]["ble_mtu"] > 0:
  263. frame_len = g_config["def_cfg"]["ble_mtu"]
  264. all_count = len(data)
  265. send_count = 0
  266. send_start_time = time.time()
  267. try:
  268. while send_count<all_count:
  269. cur_len = frame_len
  270. if all_count-send_count<frame_len:
  271. cur_len = all_count-send_count
  272. s = data[send_count:send_count+cur_len]
  273. await client.write_gatt_char(g_config["def_cfg"]["write_char"], s)
  274. send_count += cur_len
  275. except Exception as e:
  276. logger.error('Error:{}'.format(e))
  277. send_end_time = time.time()
  278. recv_start_time = time.time()
  279. logger.info('发送耗时:%.3fs', send_end_time - send_start_time)
  280. await asyncio.sleep(g_config["def_cfg"]["ble_send_wait"]) #每休眠1秒发送一次
  281. import sys, select
  282. def timeoutable_input(clue="",timeout=None):
  283. print(clue,end="")
  284. i, o, e = select.select([sys.stdin], [], [], timeout)
  285. return sys.stdin.readline() if len(i)>0 else None
  286. def print_data_list():
  287. global g_config
  288. try:
  289. for i in range(len(g_config["cmd_list"])):
  290. print(i, g_config["cmd_list"][i][2])
  291. except Exception as e:
  292. logger.error('Error:{}'.format(e))
  293. @calculate_time
  294. async def send_form_data_list(client, i, data=None):
  295. global g_config
  296. if data:
  297. json_data = data
  298. else:
  299. if i >= len(g_config["cmd_list"]):
  300. return None
  301. json_data = g_config["cmd_list"][i]
  302. if len(json_data) < 3:
  303. return
  304. byte_sequence = b''
  305. if len(json_data[0]) > 4:
  306. if json_data[0][:4] == "hex:":
  307. byte_sequence += bytes.fromhex(json_data[0][4:])
  308. elif json_data[0][:4] == "str:":
  309. byte_sequence += bytes(json_data[0][4:], 'utf-8')
  310. elif json_data[0][:4] == "bin:":
  311. byte_value = int(json_data[0][4:], 2) # 将二进制字符串转换为整数
  312. byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串
  313. byte_sequence += byte_array
  314. if len(json_data[1]) > 4:
  315. if json_data[1][:4] == "hex:":
  316. byte_sequence += bytes.fromhex(json_data[1][4:])
  317. elif json_data[1][:4] == "str:":
  318. byte_sequence += bytes(json_data[1][4:], 'utf-8')
  319. elif json_data[1][:4] == "bin:":
  320. byte_value = int(json_data[1][4:], 2) # 将二进制字符串转换为整数
  321. byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串
  322. byte_sequence += byte_array
  323. # data_str = json.dumps(json_data)
  324. # 字符串编码为字节序列
  325. # byte_sequence = str.encode(data_str)
  326. # 字节序列转换为bytearray类型
  327. byte_array = bytearray(byte_sequence)
  328. byte_array = ev_packing(byte_array)
  329. send_data = bytes(byte_array)
  330. logger.info('发送{}:{}'.format(json_data[2], send_data.hex()))
  331. await ble_send(client, send_data)
  332. async def task():
  333. global g_config
  334. await asyncio.sleep(3)
  335. while True:
  336. await asyncio.sleep(1)
  337. @calculate_time
  338. def read_config_call():
  339. global g_config
  340. # 读取更新json
  341. with open(config_file, "r", encoding="utf-8") as f:
  342. send_list_new = json.load(f)
  343. if g_config != send_list_new:
  344. g_config = send_list_new
  345. logger.info("json内容改变,内容如下:")
  346. print_data_list()
  347. @calculate_time
  348. async def heart_beat_call(client):
  349. global g_config
  350. # 心跳发送:维持蓝牙通讯
  351. logger.info("执行心跳发送任务..")
  352. await send_form_data_list(client, g_config["def_cfg"]["heart_beat_sel"])
  353. @calculate_time
  354. async def input_call(client):
  355. global g_config
  356. # 输入
  357. userinput = None
  358. if g_download_cfg["Number"] == 0: #非批量获取配置的状态下
  359. try:
  360. userinput = inputimeout(prompt='请命令序号:', timeout=g_config["def_cfg"]["input_interval"])
  361. except TimeoutOccurred:
  362. userinput = None
  363. if userinput:
  364. # 判断类型: 数字int 字符串str 列表list 元组tuple 字典dict
  365. # isinstance(userinput, str)
  366. if not userinput.isalpha():
  367. await send_form_data_list(client, int(userinput))
  368. elif str(userinput)=='l':
  369. # 读取更新json
  370. with open(config_file, "r", encoding="utf-8") as f:
  371. send_list_new = json.load(f)
  372. logger.info("json内容如下:")
  373. print_data_list()
  374. @calculate_time
  375. async def auto_getcfg_call(client):
  376. global g_config
  377. global g_download_cfg
  378. # 自动发送任务
  379. try:
  380. if g_download_cfg["Number"]>0:
  381. new_one = []
  382. for i, v in enumerate(g_config["cmd_list"]):
  383. if v[2] == "GetConfigurationNumber":
  384. new_one = copy.deepcopy(v) #深拷贝
  385. break
  386. if new_one:
  387. Number = g_download_cfg["Number"]
  388. Total = g_download_cfg["Total"]
  389. new_one[3]["Number"] = Number
  390. await send_form_data_list(client, 0, new_one)
  391. g_download_cfg["Number"] = Number+1 if Number < Total else 0
  392. except Exception as e:
  393. logger.error('Error:{}'.format(e))
  394. async def main():
  395. global g_ble_client
  396. global g_config
  397. global g_ble_mtu
  398. global mq
  399. logger.info("读取配置...")
  400. with open(config_file, "r", encoding="utf-8") as f:
  401. g_config = json.load(f)
  402. print_data_list()
  403. logger.info("快速显示列表, 可输入l")
  404. while True:
  405. logger.info("开始扫描...")
  406. #基于MAC地址查找设备
  407. device = await BleakScanner.find_device_by_address(
  408. g_config["def_cfg"]["ble_mac"], cb=dict(use_bdaddr=False) #use_bdaddr判断是否是MOC系统
  409. )
  410. if device is None:
  411. logger.error("无法找到设备({})".format(g_config["def_cfg"]["ble_mac"]))
  412. continue
  413. #事件定义
  414. disconnected_event = asyncio.Event()
  415. #断开连接事件回调
  416. def disconnected_callback(client):
  417. global g_ble_client
  418. logger.info("断开回调!")
  419. disconnected_event.set()
  420. g_ble_client = None
  421. logger.info("尝试连接设备({})...".format(g_config["def_cfg"]["ble_mac"]))
  422. async with BleakClient(device,disconnected_callback=disconnected_callback) as client:
  423. g_ble_client = client
  424. logger.info("已连接(mtu=%d)", client.mtu_size)
  425. g_ble_mtu = client.mtu_size-3
  426. try:
  427. await client.start_notify(g_config["def_cfg"]["notif_char"], notification_handler)
  428. except Exception as e:
  429. logger.error('Error:{}'.format(e))
  430. g_download_cfg["Number"] = 0
  431. read_config_time = 0
  432. heart_beart_time = 0
  433. input_wait_time = 0
  434. while client and client.is_connected:
  435. if time.time()-read_config_time >= 3:
  436. read_config_time = time.time()
  437. read_config_call()
  438. if g_config["def_cfg"]["auto_send_GetConfigurationNumber"] and g_download_cfg["Number"]>0:
  439. await auto_getcfg_call(client)
  440. elif g_config["def_cfg"]["heart_beat_interval"] >0 and time.time()-heart_beart_time>=g_config["def_cfg"]["heart_beat_interval"]:
  441. heart_beart_time = time.time()
  442. await heart_beat_call(client)
  443. elif g_config["def_cfg"]["input_interval"]>0 and time.time()-input_wait_time>=g_config["def_cfg"]["input_interval"]:
  444. input_wait_time = time.time()
  445. await input_call(client)
  446. try:
  447. # 开始根据设备即功能处理消息
  448. ws_data = mq.get(device)
  449. if ws_data:
  450. ble_send(client, ws_data)
  451. except Exception as err:
  452. pass
  453. time.sleep(0.1)
  454. def test_aes():
  455. key = EV_SOFT_BLE_SEND_CODE
  456. data = b'[2, "123456789012", "KeepAlive", {}]' #需要加密的内容
  457. AES_Encrypt(key, key, data)
  458. enctext = AES_Encrypt(key, key, data)
  459. print(enctext)
  460. text_decrypted = AES_Decrypt(key, key, enctext)
  461. print(text_decrypted)
  462. import array,struct
  463. def test_base64():
  464. in_data = b'1234567890\0_+,.//abcdefg'
  465. print(len(in_data), in_data)
  466. base64_bytes = base64.b64encode(in_data)
  467. print(len(base64_bytes),base64_bytes)
  468. out = base64.b64decode(base64_bytes)
  469. print(len(out),out)
  470. def test_recv():
  471. with open(config_file, "r", encoding="utf-8") as f:
  472. g_config = json.load(f)
  473. print_data_list()
  474. str_a = '45 56 3C 00 40 EB AE E3 1F 7B FA 9A 61 86 22 BE 36 0D 0D 09 FE 12 C6 60 27 A8 74 2A 32 08 0D D1 54 B4 4C E1 69 10 4B FC DD 00 23 58 14 74 C1 3F 64 48 9C 88 12 41 17 59 6B 82 F7 90 E7 89 A1 71 80 5A 07 FF 4D 4D 99'
  475. recv_handler(bytes.fromhex(str_a), True)
  476. str_b = '45 56 3C 00 40 B2 E6 73 BD E7 EE B8 1E D4 DE B7 BF 64 19 80 95 24 94 D2 F3 4E 45 97 99 A3 B5 F7 29 E0 86 1A 60 07 96 77 C4 73 DC C0 4B 2B FB 7E 73 D2 A1 CE 17 26 CF 12 FF 06 DC 83 B7 5E 65 C9 18 0B 87 16 EB 75 0F'
  477. recv_handler(bytes.fromhex(str_b), True)
  478. def run(tasks = []):
  479. # tasks.append(task())
  480. loop = asyncio.get_event_loop()
  481. loop.run_until_complete(asyncio.wait(tasks))
  482. loop.close()
  483. #---------------------------------------------------------
  484. if __name__ == "__main__":
  485. signal.signal(signal.SIGTERM, sig_handler) # kill pid
  486. signal.signal(signal.SIGINT, sig_handler) # ctrl -c
  487. # test_aes()
  488. # test_base64()
  489. # test_recv()
  490. mq = MessageBase()
  491. ws = WebServer("0.0.0.0", 11100, mq)
  492. ws.run()
  493. run([main(),])
  494. sys.exit(0)