๋ณธ๋ฌธ ๋ฐ”๋กœ๊ฐ€๊ธฐ
๐Ÿ | Python/ํŒŒ์ด์ฌ ์ดˆ๊ฐ„๋‹จ ํ”„๋กœ์ ํŠธ

[Python] BLE ํ†ต์‹  ํ•˜๊ธฐ with bleak (MacOS) - 2) ๋ฐ์ดํ„ฐ ๋ฐ›๊ธฐ

by KASSID 2023. 5. 24.

๋ชฉ์ฐจ

    728x90

    2023.05.24 - [๐Ÿ | Python/ํŒŒ์ด์ฌ ์ดˆ๊ฐ„๋‹จ ํ”„๋กœ์ ํŠธ] - [Python] BLE ํ†ต์‹  ํ•˜๊ธฐ with bleak (MacOS) - 1) ํƒ์ƒ‰, ์—ฐ๊ฒฐ

     

    ์ง€๋‚œ ํฌ์ŠคํŠธ์—์„œ๋Š” bleak ๋ชจ๋“ˆ์„ ์ด์šฉํ•ด ์ฃผ๋ณ€ ๊ธฐ๊ธฐ๋ฅผ ํƒ์ƒ‰ํ•˜๊ณ  ์—ฐ๊ฒฐํ•˜๋Š” ๊ฒƒ์„ ํ•ด๋ณด์•˜๋‹ค.

     

    ์ด๋ฒˆ์—๋Š” ํ˜„์žฌ Kit์˜ ์„œ๋น„์Šค ์ •๋ณด๋ฅผ ์•Œ์•„๋ณด๊ณ  ๊ทธ๋ฅผ ํ™œ์šฉํ•˜์—ฌ kit๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„๋ณด์ž!

     

    1. ์—ฐ๊ฒฐํ•œ ์žฅ์น˜์˜ ์„œ๋น„์Šค

    1) ์—ฐ๊ฒฐ๋œ ์„œ๋น„์Šค ํ™•์ธ

    import asyncio
    from bleak import BleakClient, BleakScanner
    
    async  def  main ():
        devices = await BleakScanner.discover()
        for idx, device in enumerate(devices):
            print(f'{idx}\t\t',device)
        select = int(input())
        address = devices[select].address
    
        async  with BleakClient(address) as client:
            try:
                print("Connecting...", end='')
                await client.connect()
                print('Connected!!')
                
                # ์„œ๋น„์Šค ๋ฐ›๊ธฐ
                services = await client.get_services()
                print("Services:", type(services))
                
                for service in services:
    	            print(service)
    
    
                
            except Exception as e:
                print('error: ', e, end='')
            finally:
                print('Start disconnect')
                await client.disconnect()
                print('disconnected')
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print('Done!')

     

    ์ˆ˜ํ–‰ ๊ฒฐ๊ณผ ์ค‘ ์ผ๋ถ€

     

    ์„ ํƒํ•œ ์žฅ์น˜์™€ ์ •์ƒ ์—ฐ๊ฒฐ ํ›„ get_services()๋ฅผ ํ†ตํ•ด ์„œ๋น„์Šค ์ •๋ณด๋ฅผ ๋ฐ˜ํ™˜๋ฐ›๋Š”๋‹ค.

     

     

    2) chracteristic ์ •๋ณด ์–ป๊ธฐ

    import asyncio
    from bleak import BleakClient, BleakScanner
    
    async  def  main ():
        devices = await BleakScanner.discover()
        for idx, device in enumerate(devices):
            print(f'{idx}\t\t',device)
        select = int(input())
        address = devices[select].address
    
        async  with BleakClient(address) as client:
            try:
                print("Connecting...", end='')
                await client.connect()
                print('Connected!!')
                
                # ์„œ๋น„์Šค ๋ฐ›๊ธฐ
                services = await client.get_services()
                print("Services:", type(services))
                
                for service in services:
                    print(service)
    
                    print(service.uuid)
                    print("Characteristic List:")
    
                    for characteristic in service.characteristics:
                        print("\n\t", characteristic)
                        print("\t", characteristic.uuid)
                        print("\t", characteristic.description)
                        print("\t", characteristic.properties)
    
                
            except Exception as e:
                print('error: ', e, end='')
            finally:
                print('Start disconnect')
                await client.disconnect()
                print('disconnected')
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print('Done!')

     

    ์ˆ˜ํ–‰ ๊ฒฐ๊ณผ ์ค‘ ์ผ๋ถ€

     

    serive ๋‚ด๋ถ€์— ๋“ค์–ด์žˆ๋Š” ์ •๋ณด๋“ค์— ๋Œ€ํ•ด์„œ ๋ฐ˜ํ™˜ํ•ด๋ณด๋ฉด ์œ„์™€ ๊ฐ™๋‹ค.

    uuid, ์„ค๋ช…, ์†์„ฑ ๋“ค์„ ์•Œ ์ˆ˜ ์žˆ๋‹ค.

     

    ์ด ์ค‘ chracteristic UUID์™€ ์†์„ฑ์ด ๋‹ค์Œ ๋‹จ๊ณ„๋ฅผ ์œ„ํ•ด ํ•„์š”ํ•œ ์ •๋ณด๋“ค์ด๋‹ค.

     

    ์œ„์˜ ์ •๋ณด๋ฅผ ํ†ตํ•ด ์•Œ ์ˆ˜ ์žˆ๋Š” ๊ฒƒ์€

    6e400003-b5a3-f393-e0a9-e50e24dcca9e ์ด๋ผ๋Š” UUID๋ฅผ ๊ฐ€์ง€๊ณ  ์žˆ๋Š” ์บ๋ฆญํ„ฐ๋ฆฌ์Šคํ‹ฑ์ด

    notify, write ๊ธฐ๋Šฅ์„ ์ˆ˜ํ–‰ํ•  ์ˆ˜ ์žˆ๋‹ค ๋ผ๋Š” ๊ฒƒ์ด๋‹ค.

     

    (( ์ฃผ์š” ์†์„ฑ ))
    - read : ์ฝ๊ธฐ (์žฅ์น˜์— ์žˆ๋Š” ๋ฐ์ดํ„ฐ๋ฅผ)
    - write : ์“ฐ๊ธฐ (์žฅ์น˜์—๊ฒŒ ๋ฐ์ดํ„ฐ๋ฅผ) & ์‘๋‹ต ๊ธฐ๋‹ค๋ฆฌ๊ธฐ
    - notify : ํด๋ผ์ด์–ธํŠธ ์š”์ฒญ์ด ์—†์–ด๋„ ์žฅ์น˜์—์„œ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋‚ด๊ธฐ & ์‘๋‹ต ๊ธฐ๋‹ค๋ฆฌ์ง€ ์•Š์Œ
    ... (indicate, write-without-response ๋“ฑ)

     

     

    3) ๋ฐ์ดํ„ฐ ๋ณด๋‚ด๊ธฐ (notify)

    ํ•„์ž์˜ ํ”„๋กœ์ ํŠธ์—์„œ ํ•„์š”ํ•œ ์†์„ฑ์ธ notify๋ฅผ ํ†ตํ•ด

    ์žฅ์น˜์—์„œ ํด๋ผ์ด์–ธํŠธ์—๊ฒŒ ๋ฐ์ดํ„ฐ๋ฅผ ๋ณด๋‚ด๋„๋ก ํ•ด๋ณด์ž!

    import asyncio
    from bleak import BleakClient, BleakScanner
    
    
    def notify_callback(sender: int, data: bytearray):
        print('sender: ', sender, 'data: ', data)
        print('\tint list: ', [b for b in data])
    
    async  def  main ():
        devices = await BleakScanner.discover()
        for idx, device in enumerate(devices):
            print(f'{idx}\t\t',device)
        select = int(input())
        address = devices[select].address
    
        async  with BleakClient(address) as client:
            try:
                print("Connecting...", end='')
                await client.connect()
                print('Connected!!')
                
                # ์„œ๋น„์Šค ๋ฐ›๊ธฐ
                services = await client.get_services()
                print("Services:", type(services))
                
                for service in services:
                    print(service)
                    for characteristic in service.characteristics:
                        if 'notify' in characteristic.properties:
                          notity_charcteristic_uuid = characteristic.uuid
                          print("\n\t", characteristic)
                          print('Activate notify...')
                          await client.start_notify(characteristic, notify_callback)
    
                
            except Exception as e:
                print('error: ', e, end='')
            
            if client.is_connected:
                await asyncio.sleep(2.0) #20์ดˆ ๋™์•ˆ ์ŠคํŠธ๋ฆผ
                print('try to deactivate notify.')
                await client.stop_notify(notity_charcteristic_uuid)
    
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
    print('Done!')

     

    ์ˆ˜ํ–‰ ๊ฒฐ๊ณผ ์ค‘ ์ผ๋ถ€

     

    ์œ„์™€ ๊ฐ™์ด ์žฅ์น˜๋กœ๋ถ€ํ„ฐ ๋ฐ์ดํ„ฐ๋ฅผ ๋ฐ›์•„์˜ฌ ์ˆ˜ ์žˆ๋‹ค.

     

     

     

     

    ์ถ”ํ›„์— ๋‹ค๋ฅธ ์†์„ฑ๋“ค์„ ๋‹ค๋ฅด๋Š” ๊ธ€๋„ ํฌ์ŠคํŒ…ํ•ด๋ณผ ์˜ˆ์ •์ด๋‹ค!

    ๋Œ“๊ธ€