ble.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. import asyncio
  4. from bleak import BleakClient, BleakScanner
  5. from bleak.backends.characteristic import BleakGATTCharacteristic
  6. import chardet
  7. import time
  8. import json
  9. from inputimeout import inputimeout, TimeoutOccurred
  10. import base64
  11. from Crypto.Cipher import AES
  12. from Crypto.Util.Padding import pad, unpad
  13. import copy
  14. import sys
  15. import atexit
  16. import signal
  17. import traceback
  18. import logging
  19. logger = logging.getLogger(__name__)
  20. logger.setLevel(logging.DEBUG)
  21. s_h = logging.StreamHandler(sys.stderr)
  22. # formatter = logging.Formatter('%(asctime)s.%(msecs)03d-%(name)s-%(filename)s-[line:%(lineno)d]'
  23. # '-%(levelname)s-[日志信息]: %(message)s',
  24. # datefmt='%Y-%m-%d,%H:%M:%S')
  25. formatter = logging.Formatter('%(asctime)s-[%(lineno)d]'
  26. '-%(levelname)s: %(message)s',
  27. datefmt='%d %H:%M:%S')
  28. s_h.setFormatter(formatter)
  29. logger.addHandler(s_h)
  30. config_file="config.json"
  31. #设备的Characteristic UUID
  32. par_notification_characteristic="0000ffe2-0000-1000-8000-00805f9b34fb"
  33. #设备的Characteristic UUID(具备写属性Write)
  34. par_write_characteristic="0000ffe1-0000-1000-8000-00805f9b34fb"
  35. #设备的MAC地址
  36. # par_device_addr="D1:1D:6A:52:CD:F8"
  37. par_device_addr="D5:2D:D4:9E:5C:3C"
  38. # 密钥(key), 密斯偏移量(iv) CBC模式加密
  39. EV_SOFT_BLE_SEND_CODE = '#MOIF@KHab%DECR$' #//蓝牙发送密钥
  40. EV_SOFT_BLE_RECV_CODE = '$*@AB%DHJqopENC#' #//蓝牙接收密钥
  41. # EV_SOFT_BLE_SEND_CODE = 'XH23456789ABCDEF' #//蓝牙发送密钥
  42. # EV_SOFT_BLE_RECV_CODE = 'XH23456789ABCDEF' #//蓝牙接收密钥
  43. g_config = {}
  44. g_ble_mtu = 20
  45. g_ble_client = None
  46. g_download_cfg = {"Key":"", "Total":0, "Number":0, "DataCrc16":0}
  47. def calculate_time(func):
  48. def wrapper(*args, **kwargs):
  49. start_time = time.time()
  50. result = func(*args, **kwargs)
  51. end_time = time.time()
  52. if end_time - start_time >= 0.5 :
  53. print("函数 %s 运行时间为 %.3f 秒" % (func.__name__, end_time - start_time))
  54. return result
  55. return wrapper
  56. @atexit.register
  57. def exit_handler():
  58. logger.info("异常退出")
  59. # 采用traceback模块查看异常,这个方法会打印出异常代码的行号
  60. exc_type, exc_value, exc_tb = sys.exc_info()
  61. logger.info(str(traceback.format_exception(exc_type, exc_value, exc_tb)))
  62. def sig_handler(signum, frame):
  63. logger.info('catched singal: %d' % signum)
  64. sys.exit(0)
  65. def shuncomdacode(data):
  66. """ 识别data的编码格式 """
  67. result = chardet.detect(data)
  68. print(result['encoding'])
  69. return (result['encoding'])
  70. def AES_Encrypt(aes_iv, aes_key, plain_text):
  71. """
  72. AES encrypt
  73. :param plain_text: bytes
  74. :param aes_key: bytes
  75. :param aes_iv: bytes
  76. :return: bytes
  77. """
  78. try:
  79. pad_data = pad(plain_text, AES.block_size)
  80. return AES.new(aes_key, AES.MODE_CBC, aes_iv).encrypt(pad_data)
  81. except Exception as e:
  82. logger.error('Error:{}'.format(e))
  83. return bytes()
  84. def AES_Decrypt(aes_iv, aes_key, plain_text):
  85. """
  86. AES decrypt
  87. :param plain_text: bytes
  88. :param aes_key: bytes, aes_key
  89. :param aes_iv: bytes, aes_iv
  90. :return: bytes
  91. """
  92. try:
  93. dec_data = AES.new(aes_key, AES.MODE_CBC, aes_iv).decrypt(plain_text)
  94. return unpad(dec_data, AES.block_size)
  95. except Exception as e:
  96. logger.error('Error:{}'.format(e))
  97. return bytes()
  98. def AES_Encrypt2(vi, key, data):
  99. enctext = bytes()
  100. try:
  101. # vi = '0102030405060708'
  102. pad = lambda s: s + (16 - len(s) % 16) * chr(16 - len(s) % 16)
  103. data = pad(data)
  104. # 字符串补位
  105. cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8'))
  106. encryptedbytes = cipher.encrypt(data.encode('utf8'))
  107. # 加密后得到的是bytes类型的数据
  108. encodestrs = base64.b64encode(encryptedbytes)
  109. # 使用Base64进行编码,返回byte字符串
  110. enctext = encodestrs.decode('utf8')
  111. # 对byte字符串按utf-8进行解码
  112. except Exception as e:
  113. logger.error('Error:{}'.format(e))
  114. return enctext
  115. def AES_Decrypt2(vi, key, data):
  116. text_decrypted = bytes()
  117. try:
  118. # vi = '0102030405060708'
  119. data = data.encode('utf8')
  120. encodebytes = base64.decodebytes(data)
  121. # 将加密数据转换位bytes类型数据
  122. cipher = AES.new(key.encode('utf8'), AES.MODE_CBC, vi.encode('utf8'))
  123. text_decrypted = cipher.decrypt(encodebytes)
  124. unpad = lambda s: s[0:-s[-1]]
  125. text_decrypted = unpad(text_decrypted)
  126. # 去补位
  127. text_decrypted = text_decrypted.decode('utf8')
  128. except Exception as e:
  129. logger.error('Error:{}'.format(e))
  130. return text_decrypted
  131. def sum_ccitt_16(data):
  132. total = sum(data)
  133. total &= 0xFFFF
  134. return total
  135. def crc_ccitt_16(data):
  136. crc = 0
  137. for byte in data:
  138. crc ^= (byte << 8)
  139. for _ in range(8):
  140. if crc & 0x8000:
  141. crc = (crc << 1) ^ 0x1021
  142. else:
  143. crc <<= 1
  144. crc &= 0xFFFF
  145. return crc
  146. def checknum_16(data):
  147. type = g_config["def_cfg"]["checknum_type"]
  148. if type == "sum16":
  149. return sum_ccitt_16(data)
  150. elif type == "crc16":
  151. return crc_ccitt_16(data)
  152. return 0
  153. def get_aes_key(type):
  154. if type == "send":
  155. if g_config["def_cfg"]["aes_cbc_key_send"]:
  156. return bytes(g_config["def_cfg"]["aes_cbc_key_send"], encoding='utf-8')
  157. else:
  158. return bytes(EV_SOFT_BLE_SEND_CODE, encoding='utf-8')
  159. else:
  160. if g_config["def_cfg"]["aes_cbc_key_recv"]:
  161. return bytes(g_config["def_cfg"]["aes_cbc_key_recv"], encoding='utf-8')
  162. else:
  163. return bytes(EV_SOFT_BLE_RECV_CODE, encoding='utf-8')
  164. # 初始化一个CRC16校验码计算函数,多项式为0x8005
  165. def ev_packing(data):
  166. data = bytes(data)
  167. # 加密
  168. if g_config["def_cfg"]["aes_cbc_enbable"]:
  169. logger.info("加密前:{}".format(bytes(data)))
  170. key = get_aes_key("send")
  171. data = AES_Encrypt(bytes(key), bytes(key), bytes(data))
  172. lenght = len(data)
  173. # crc16 = crc16_func(data) ## 计算数据的CRC-16校验码
  174. # checknum = crc16(data, 0, len(data))
  175. checknum = checknum_16(data)
  176. HEAD_SEND = str(g_config["def_cfg"]["head_send"]).encode()
  177. out = list(HEAD_SEND) + [ (lenght>>8)&0xFF, (lenght>>0)&0xFF, (checknum>>8)&0xFF, (checknum>>0)&0xFF]
  178. for d in bytes(data):
  179. out.append(d)
  180. return bytes(out)
  181. #监听回调函数,此处为打印消息
  182. # 记录数据
  183. recv_data = []
  184. recv_start_time = 0
  185. GetConfiguration = []
  186. @calculate_time
  187. def recv_handler(data: bytearray, reve=False):
  188. global recv_start_time
  189. global g_download_cfg
  190. if g_config["def_cfg"]["recv_detail_print"]:
  191. logger.info("包接收:{}".format(bytes(data)))
  192. for d in data:
  193. recv_data.append(d)
  194. if len(recv_data)>1500:
  195. recv_data.clear()
  196. return
  197. if len(recv_data)<7:
  198. return
  199. recv_data_str = str(bytes(recv_data))
  200. HEAD_SEND = str(g_config["def_cfg"]["head_send"])
  201. HEAD_RECV = str(g_config["def_cfg"]["head_recv"])
  202. find_head = HEAD_SEND if not reve else HEAD_RECV
  203. index = recv_data_str.find(find_head)#'EV>'
  204. if index < 2:
  205. return
  206. index -= 2
  207. data_len = (recv_data[index+4]&0xff) | (recv_data[index+3]&0xff)<<8
  208. if index+7+data_len > len(recv_data):
  209. return
  210. if g_config["def_cfg"]["recv_soc_data_print"]:
  211. logger.info("接收:{}".format(bytes(recv_data).hex()))
  212. if recv_start_time:
  213. recv_end_time = time.time()
  214. logger.info('接收耗时:%.3fs', recv_end_time - recv_start_time)
  215. recv_start_time = 0
  216. soc_data = recv_data[index+7:index+7+data_len]
  217. get_crc = (recv_data[index+6]&0xff)| (recv_data[index+5]&0xff)<<8
  218. check_num = checknum_16(soc_data)
  219. if get_crc != check_num:
  220. logger.info("校验失败(%d):0x%04X 0x%04X", len(recv_data), get_crc, check_num)
  221. return
  222. logger.info("校验成功")
  223. if g_config["def_cfg"]["aes_cbc_enbable"]:
  224. find_type= "recv" if not reve else 'send'
  225. key = get_aes_key(find_type) #"recv"
  226. dec_data = AES_Decrypt(bytes(key), bytes(key), bytes(soc_data))
  227. logger.info("解密后:{}".format(bytes(dec_data)))
  228. else:
  229. dec_data = soc_data
  230. # 清理
  231. recv_data.clear()
  232. def notification_handler(characteristic: BleakGATTCharacteristic, data: bytearray):
  233. recv_handler(data)
  234. async def ble_send(client, data):
  235. global recv_start_time
  236. global g_ble_mtu
  237. frame_len = g_ble_mtu #244 #20 #244
  238. if g_config["def_cfg"]["ble_mtu"] > 0:
  239. frame_len = g_config["def_cfg"]["ble_mtu"]
  240. all_count = len(data)
  241. send_count = 0
  242. send_start_time = time.time()
  243. try:
  244. while send_count<all_count:
  245. cur_len = frame_len
  246. if all_count-send_count<frame_len:
  247. cur_len = all_count-send_count
  248. s = data[send_count:send_count+cur_len]
  249. await client.write_gatt_char(g_config["def_cfg"]["write_char"], s)
  250. send_count += cur_len
  251. except Exception as e:
  252. logger.error('Error:{}'.format(e))
  253. send_end_time = time.time()
  254. recv_start_time = time.time()
  255. logger.info('发送耗时:%.3fs', send_end_time - send_start_time)
  256. await asyncio.sleep(g_config["def_cfg"]["ble_send_wait"]) #每休眠1秒发送一次
  257. import sys, select
  258. def timeoutable_input(clue="",timeout=None):
  259. print(clue,end="")
  260. i, o, e = select.select([sys.stdin], [], [], timeout)
  261. return sys.stdin.readline() if len(i)>0 else None
  262. def print_data_list():
  263. global g_config
  264. try:
  265. for i in range(len(g_config["cmd_list"])):
  266. print(i, g_config["cmd_list"][i][2])
  267. except Exception as e:
  268. logger.error('Error:{}'.format(e))
  269. @calculate_time
  270. async def send_form_data_list(client, i, data=None):
  271. global g_config
  272. if data:
  273. json_data = data
  274. else:
  275. if i >= len(g_config["cmd_list"]):
  276. return None
  277. json_data = g_config["cmd_list"][i]
  278. if len(json_data) < 3:
  279. return
  280. byte_sequence = b''
  281. if len(json_data[0]) > 4:
  282. if json_data[0][:4] == "hex:":
  283. byte_sequence += bytes.fromhex(json_data[0][4:])
  284. elif json_data[0][:4] == "str:":
  285. byte_sequence += bytes(json_data[0][4:], 'utf-8')
  286. elif json_data[0][:4] == "bin:":
  287. byte_value = int(json_data[0][4:], 2) # 将二进制字符串转换为整数
  288. byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串
  289. byte_sequence += byte_array
  290. if len(json_data[1]) > 4:
  291. if json_data[1][:4] == "hex:":
  292. byte_sequence += bytes.fromhex(json_data[1][4:])
  293. elif json_data[1][:4] == "str:":
  294. byte_sequence += bytes(json_data[1][4:], 'utf-8')
  295. elif json_data[1][:4] == "bin:":
  296. byte_value = int(json_data[1][4:], 2) # 将二进制字符串转换为整数
  297. byte_array = bytes([byte_value]) # 将整数转换为单字节的字节串
  298. byte_sequence += byte_array
  299. # data_str = json.dumps(json_data)
  300. # 字符串编码为字节序列
  301. # byte_sequence = str.encode(data_str)
  302. # 字节序列转换为bytearray类型
  303. byte_array = bytearray(byte_sequence)
  304. byte_array = ev_packing(byte_array)
  305. send_data = bytes(byte_array)
  306. logger.info('发送{}:{}'.format(json_data[2], send_data.hex()))
  307. await ble_send(client, send_data)
  308. async def task():
  309. global g_config
  310. await asyncio.sleep(3)
  311. while True:
  312. await asyncio.sleep(1)
  313. @calculate_time
  314. def read_config_call():
  315. global g_config
  316. # 读取更新json
  317. with open(config_file, "r", encoding="utf-8") as f:
  318. send_list_new = json.load(f)
  319. if g_config != send_list_new:
  320. g_config = send_list_new
  321. logger.info("json内容改变,内容如下:")
  322. print_data_list()
  323. @calculate_time
  324. async def heart_beat_call(client):
  325. global g_config
  326. # 心跳发送:维持蓝牙通讯
  327. logger.info("执行心跳发送任务..")
  328. await send_form_data_list(client, g_config["def_cfg"]["heart_beat_sel"])
  329. @calculate_time
  330. async def input_call(client):
  331. global g_config
  332. # 输入
  333. userinput = None
  334. if g_download_cfg["Number"] == 0: #非批量获取配置的状态下
  335. try:
  336. userinput = inputimeout(prompt='请命令序号:', timeout=g_config["def_cfg"]["input_interval"])
  337. except TimeoutOccurred:
  338. userinput = None
  339. if userinput:
  340. # 判断类型: 数字int 字符串str 列表list 元组tuple 字典dict
  341. # isinstance(userinput, str)
  342. if not userinput.isalpha():
  343. await send_form_data_list(client, int(userinput))
  344. elif str(userinput)=='l':
  345. # 读取更新json
  346. with open(config_file, "r", encoding="utf-8") as f:
  347. send_list_new = json.load(f)
  348. logger.info("json内容如下:")
  349. print_data_list()
  350. @calculate_time
  351. async def auto_getcfg_call(client):
  352. global g_config
  353. global g_download_cfg
  354. # 自动发送任务
  355. try:
  356. if g_download_cfg["Number"]>0:
  357. new_one = []
  358. for i, v in enumerate(g_config["cmd_list"]):
  359. if v[2] == "GetConfigurationNumber":
  360. new_one = copy.deepcopy(v) #深拷贝
  361. break
  362. if new_one:
  363. Number = g_download_cfg["Number"]
  364. Total = g_download_cfg["Total"]
  365. new_one[3]["Number"] = Number
  366. await send_form_data_list(client, 0, new_one)
  367. g_download_cfg["Number"] = Number+1 if Number < Total else 0
  368. except Exception as e:
  369. logger.error('Error:{}'.format(e))
  370. async def main():
  371. global g_ble_client
  372. global g_config
  373. global g_ble_mtu
  374. logger.info("读取配置...")
  375. with open(config_file, "r", encoding="utf-8") as f:
  376. g_config = json.load(f)
  377. print_data_list()
  378. logger.info("快速显示列表, 可输入l")
  379. while True:
  380. logger.info("开始扫描...")
  381. #基于MAC地址查找设备
  382. device = await BleakScanner.find_device_by_address(
  383. g_config["def_cfg"]["ble_mac"], cb=dict(use_bdaddr=False) #use_bdaddr判断是否是MOC系统
  384. )
  385. if device is None:
  386. logger.error("无法找到设备({})".format(g_config["def_cfg"]["ble_mac"]))
  387. continue
  388. #事件定义
  389. disconnected_event = asyncio.Event()
  390. #断开连接事件回调
  391. def disconnected_callback(client):
  392. global g_ble_client
  393. logger.info("断开回调!")
  394. disconnected_event.set()
  395. g_ble_client = None
  396. logger.info("尝试连接设备({})...".format(g_config["def_cfg"]["ble_mac"]))
  397. async with BleakClient(device,disconnected_callback=disconnected_callback) as client:
  398. g_ble_client = client
  399. logger.info("已连接(mtu=%d)", client.mtu_size)
  400. g_ble_mtu = client.mtu_size-3
  401. try:
  402. await client.start_notify(g_config["def_cfg"]["notif_char"], notification_handler)
  403. except Exception as e:
  404. logger.error('Error:{}'.format(e))
  405. g_download_cfg["Number"] = 0
  406. read_config_time = 0
  407. heart_beart_time = 0
  408. input_wait_time = 0
  409. while client and client.is_connected:
  410. if time.time()-read_config_time >= 3:
  411. read_config_time = time.time()
  412. read_config_call()
  413. if g_config["def_cfg"]["auto_send_GetConfigurationNumber"] and g_download_cfg["Number"]>0:
  414. await auto_getcfg_call(client)
  415. elif g_config["def_cfg"]["heart_beat_interval"] >0 and time.time()-heart_beart_time>=g_config["def_cfg"]["heart_beat_interval"]:
  416. heart_beart_time = time.time()
  417. await heart_beat_call(client)
  418. elif g_config["def_cfg"]["input_interval"]>0 and time.time()-input_wait_time>=g_config["def_cfg"]["input_interval"]:
  419. input_wait_time = time.time()
  420. await input_call(client)
  421. time.sleep(0.1)
  422. def test_aes():
  423. key = EV_SOFT_BLE_SEND_CODE
  424. data = b'[2, "123456789012", "KeepAlive", {}]' #需要加密的内容
  425. AES_Encrypt(key, key, data)
  426. enctext = AES_Encrypt(key, key, data)
  427. print(enctext)
  428. text_decrypted = AES_Decrypt(key, key, enctext)
  429. print(text_decrypted)
  430. import array,struct
  431. def test_base64():
  432. in_data = b'1234567890\0_+,.//abcdefg'
  433. print(len(in_data), in_data)
  434. base64_bytes = base64.b64encode(in_data)
  435. print(len(base64_bytes),base64_bytes)
  436. out = base64.b64decode(base64_bytes)
  437. print(len(out),out)
  438. def test_recv():
  439. with open(config_file, "r", encoding="utf-8") as f:
  440. g_config = json.load(f)
  441. print_data_list()
  442. str_a = '45 56 3C 00 40 EB AE E3 1F 7B FA 9A 61 86 22 BE 36 0D 0D 09 FE 12 C6 60 27 A8 74 2A 32 08 0D D1 54 B4 4C E1 69 10 4B FC DD 00 23 58 14 74 C1 3F 64 48 9C 88 12 41 17 59 6B 82 F7 90 E7 89 A1 71 80 5A 07 FF 4D 4D 99'
  443. recv_handler(bytes.fromhex(str_a), True)
  444. str_b = '45 56 3C 00 40 B2 E6 73 BD E7 EE B8 1E D4 DE B7 BF 64 19 80 95 24 94 D2 F3 4E 45 97 99 A3 B5 F7 29 E0 86 1A 60 07 96 77 C4 73 DC C0 4B 2B FB 7E 73 D2 A1 CE 17 26 CF 12 FF 06 DC 83 B7 5E 65 C9 18 0B 87 16 EB 75 0F'
  445. recv_handler(bytes.fromhex(str_b), True)
  446. def run():
  447. tasks = []
  448. tasks.append(main())
  449. # tasks.append(task())
  450. loop = asyncio.get_event_loop()
  451. loop.run_until_complete(asyncio.wait(tasks))
  452. loop.close()
  453. if __name__ == "__main__":
  454. signal.signal(signal.SIGTERM, sig_handler) # kill pid
  455. signal.signal(signal.SIGINT, sig_handler) # ctrl -c
  456. # test_aes()
  457. # test_base64()
  458. # test_recv()
  459. run()
  460. # asyncio.run(main())
  461. sys.exit(0)