Source code for bleak.backends.winrt.client

# -*- coding: utf-8 -*-
"""
BLE Client for Windows 10 systems, implemented with WinRT.

Created on 2020-08-19 by hbldh <henrik.blidh@nedomkull.com>
"""

import inspect
import logging
import asyncio
import uuid
from functools import wraps
from typing import Callable, Any, List, Optional, Sequence, Union

from bleak_winrt.windows.devices.bluetooth import (
    BluetoothError,
    BluetoothLEDevice,
    BluetoothCacheMode,
    BluetoothAddressType,
)
from bleak_winrt.windows.devices.bluetooth.genericattributeprofile import (
    GattCharacteristic,
    GattCommunicationStatus,
    GattDescriptor,
    GattDeviceService,
    GattSessionStatus,
    GattSessionStatusChangedEventArgs,
    GattWriteOption,
    GattCharacteristicProperties,
    GattClientCharacteristicConfigurationDescriptorValue,
    GattSession,
)
from bleak_winrt.windows.devices.enumeration import (
    DeviceInformation,
    DevicePairingKinds,
    DevicePairingResultStatus,
    DeviceUnpairingResultStatus,
)
from bleak_winrt.windows.foundation import EventRegistrationToken
from bleak_winrt.windows.storage.streams import Buffer

from bleak.backends.device import BLEDevice
from bleak.backends.winrt.scanner import BleakScannerWinRT
from bleak.exc import BleakError, PROTOCOL_ERROR_CODES
from bleak.backends.client import BaseBleakClient

from bleak.backends.characteristic import BleakGATTCharacteristic
from bleak.backends.service import BleakGATTServiceCollection
from bleak.backends.winrt.service import BleakGATTServiceWinRT
from bleak.backends.winrt.characteristic import BleakGATTCharacteristicWinRT
from bleak.backends.winrt.descriptor import BleakGATTDescriptorWinRT


logger = logging.getLogger(__name__)

_ACCESS_DENIED_SERVICES = list(
    uuid.UUID(u)
    for u in ("00001812-0000-1000-8000-00805f9b34fb",)  # Human Interface Device Service
)

_pairing_statuses = {
    getattr(DevicePairingResultStatus, v): v
    for v in dir(DevicePairingResultStatus)
    if "_" not in v and isinstance(getattr(DevicePairingResultStatus, v), int)
}


_unpairing_statuses = {
    getattr(DeviceUnpairingResultStatus, v): v
    for v in dir(DeviceUnpairingResultStatus)
    if "_" not in v and isinstance(getattr(DeviceUnpairingResultStatus, v), int)
}

# TODO: we can use this when minimum Python is 3.8
# class _Result(typing.Protocol):
#     status: GattCommunicationStatus
#     protocol_error: typing.Optional[int]


def _ensure_success(result: Any, attr: Optional[str], fail_msg: str) -> Any:
    """
    Ensures that *status* is ``GattCommunicationStatus.SUCCESS``, otherwise
    raises ``BleakError``.

    Args:
        result: The result returned by a WinRT API method.
        attr: The name of the attribute containing the result.
        fail_msg: A message to include in the exception.
    """
    status = result.status if hasattr(result, "status") else result

    if status == GattCommunicationStatus.SUCCESS:
        return None if attr is None else getattr(result, attr)

    if status == GattCommunicationStatus.PROTOCOL_ERROR:
        err = PROTOCOL_ERROR_CODES.get(result.protocol_error, "Unknown")
        raise BleakError(
            f"{fail_msg}: Protocol Error 0x{result.protocol_error:02X}: {err}"
        )

    if status == GattCommunicationStatus.ACCESS_DENIED:
        raise BleakError(f"{fail_msg}: Access Denied")

    if status == GattCommunicationStatus.UNREACHABLE:
        raise BleakError(f"{fail_msg}: Unreachable")

    raise BleakError(f"{fail_msg}: Unexpected status code 0x{result.status:02X}")


[docs]class BleakClientWinRT(BaseBleakClient): """Native Windows Bleak Client. Implemented using `winrt <https://github.com/Microsoft/xlang/tree/master/src/package/pywinrt/projection>`_, a package that enables Python developers to access Windows Runtime APIs directly from Python. Args: address_or_ble_device (`BLEDevice` or str): The Bluetooth address of the BLE peripheral to connect to or the `BLEDevice` object representing it. Keyword Args: timeout (float): Timeout for required ``BleakScanner.find_device_by_address`` call. Defaults to 10.0. """ def __init__(self, address_or_ble_device: Union[BLEDevice, str], **kwargs): super(BleakClientWinRT, self).__init__(address_or_ble_device, **kwargs) # Backend specific. WinRT objects. if isinstance(address_or_ble_device, BLEDevice): self._device_info = address_or_ble_device.details.adv.bluetooth_address else: self._device_info = None self._requester = None self._session_active_events: List[asyncio.Event] = [] self._session_closed_events: List[asyncio.Event] = [] self._session: GattSession = None self._address_type = ( kwargs["address_type"] if "address_type" in kwargs and kwargs["address_type"] in ("public", "random") else None ) self._session_status_changed_token: Optional[EventRegistrationToken] = None def __str__(self): return "BleakClientWinRT ({0})".format(self.address) # Connectivity methods
[docs] async def connect(self, **kwargs) -> bool: """Connect to the specified GATT server. Keyword Args: timeout (float): Timeout for required ``BleakScanner.find_device_by_address`` call. Defaults to 10.0. Returns: Boolean representing connection status. """ # Try to find the desired device. timeout = kwargs.get("timeout", self._timeout) if self._device_info is None: device = await BleakScannerWinRT.find_device_by_address( self.address, timeout=timeout ) if device: self._device_info = device.details.adv.bluetooth_address else: raise BleakError( "Device with address {0} was not found.".format(self.address) ) logger.debug("Connecting to BLE device @ {0}".format(self.address)) args = [ self._device_info, ] if self._address_type is not None: args.append( BluetoothAddressType.PUBLIC if self._address_type == "public" else BluetoothAddressType.RANDOM ) self._requester = await BluetoothLEDevice.from_bluetooth_address_async(*args) if self._requester is None: # https://github.com/microsoft/Windows-universal-samples/issues/1089#issuecomment-487586755 raise BleakError( f"Failed to connect to {self._device_info}. If the device requires pairing, then pair first. If the device uses a random address, it may have changed." ) # Called on disconnect event or on failure to connect. def handle_disconnect(): if self._session_status_changed_token: self._session.remove_session_status_changed( self._session_status_changed_token ) self._session_status_changed_token = None if self._requester: self._requester.close() self._requester = None if self._session: self._session.close() self._session = None def handle_session_status_changed( args: GattSessionStatusChangedEventArgs, ): if args.error != BluetoothError.SUCCESS: logger.error(f"Unhandled GATT error {args.error}") if args.status == GattSessionStatus.ACTIVE: for e in self._session_active_events: e.set() elif args.status == GattSessionStatus.CLOSED: if self._disconnected_callback: self._disconnected_callback(self) for e in self._session_closed_events: e.set() handle_disconnect() loop = asyncio.get_running_loop() # this is the WinRT event handler will be called on another thread def session_status_changed_event_handler( sender: GattSession, args: GattSessionStatusChangedEventArgs ): logger.debug( "session_status_changed_event_handler: id: %s, error: %s, status: %s", sender.device_id, args.error, args.status, ) loop.call_soon_threadsafe(handle_session_status_changed, args) # Start a GATT Session to connect event = asyncio.Event() self._session_active_events.append(event) try: self._session = await GattSession.from_device_id_async( self._requester.bluetooth_device_id ) if not self._session.can_maintain_connection: raise BleakError("device does not support GATT sessions") self._session_status_changed_token = ( self._session.add_session_status_changed( session_status_changed_event_handler ) ) # Windows does not support explicitly connecting to a device. # Instead it has the concept of a GATT session that is owned # by the calling program. self._session.maintain_connection = True # This keeps the device connected until we set maintain_connection = False. # wait for the session to become active await asyncio.wait_for(event.wait(), timeout=timeout) except BaseException: handle_disconnect() raise finally: self._session_active_events.remove(event) # Obtain services, which also leads to connection being established. await self.get_services() return True
[docs] async def disconnect(self) -> bool: """Disconnect from the specified GATT server. Returns: Boolean representing if device is disconnected. """ logger.debug("Disconnecting from BLE device...") # Remove notifications. for handle, event_handler_token in list(self._notification_callbacks.items()): char = self.services.get_characteristic(handle) char.obj.remove_value_changed(event_handler_token) self._notification_callbacks.clear() # Dispose all service components that we have requested and created. for service in self.services: service.obj.close() self.services = BleakGATTServiceCollection() self._services_resolved = False # Without this, disposing the BluetoothLEDevice won't disconnect it if self._session: self._session.maintain_connection = False # calling self._session.close() here prevents any further GATT # session status events, so we defer that until after the session # is no longer active # Dispose of the BluetoothLEDevice and see that the session # status is now closed. if self._requester: event = asyncio.Event() self._session_closed_events.append(event) try: self._requester.close() await asyncio.wait_for(event.wait(), timeout=10) finally: self._session_closed_events.remove(event) return True
@property def is_connected(self) -> bool: """Check connection status between this client and the server. Returns: Boolean representing connection status. """ return self._DeprecatedIsConnectedReturn( False if self._session is None else self._session.session_status == GattSessionStatus.ACTIVE ) @property def mtu_size(self) -> int: """Get ATT MTU size for active connection""" return self._session.max_pdu_size
[docs] async def pair(self, protection_level: int = None, **kwargs) -> bool: """Attempts to pair with the device. Keyword Args: protection_level: ``Windows.Devices.Enumeration.DevicePairingProtectionLevel`` 1: None - Pair the device using no levels of protection. 2: Encryption - Pair the device using encryption. 3: EncryptionAndAuthentication - Pair the device using encryption and authentication. (This will not work in Bleak...) Returns: Boolean regarding success of pairing. """ # New local device information object created since the object from the requester isn't updated device_information = await DeviceInformation.create_from_id_async( self._requester.device_information.id ) if ( device_information.pairing.can_pair and not device_information.pairing.is_paired ): # Currently only supporting Just Works solutions... ceremony = DevicePairingKinds.CONFIRM_ONLY custom_pairing = device_information.pairing.custom def handler(sender, args): args.accept() pairing_requested_token = custom_pairing.add_pairing_requested(handler) try: if protection_level: pairing_result = await custom_pairing.pair_async( ceremony, protection_level ) else: pairing_result = await custom_pairing.pair_async(ceremony) except Exception as e: raise BleakError("Failure trying to pair with device!") from e finally: custom_pairing.remove_pairing_requested(pairing_requested_token) if pairing_result.status not in ( DevicePairingResultStatus.PAIRED, DevicePairingResultStatus.ALREADY_PAIRED, ): raise BleakError( "Could not pair with device: {0}: {1}".format( pairing_result.status, _pairing_statuses.get(pairing_result.status), ) ) else: logger.info( "Paired to device with protection level {0}.".format( pairing_result.protection_level_used ) ) return True else: return device_information.pairing.is_paired
[docs] async def unpair(self) -> bool: """Attempts to unpair from the device. N.B. unpairing also leads to disconnection in the Windows backend. Returns: Boolean on whether the unparing was successful. """ # New local device information object created since the object from the requester isn't updated device_information = await DeviceInformation.create_from_id_async( self._requester.device_information.id ) if device_information.pairing.is_paired: unpairing_result = await device_information.pairing.unpair_async() if unpairing_result.status not in ( DevicePairingResultStatus.PAIRED, DevicePairingResultStatus.ALREADY_PAIRED, ): raise BleakError( "Could not unpair with device: {0}: {1}".format( unpairing_result.status, _unpairing_statuses.get(unpairing_result.status), ) ) else: logger.info("Unpaired with device.") return True return not device_information.pairing.is_paired
# GATT services methods
[docs] async def get_services(self, **kwargs) -> BleakGATTServiceCollection: """Get all services registered for this GATT server. Returns: A :py:class:`bleak.backends.service.BleakGATTServiceCollection` with this device's services tree. """ # Return the Service Collection. if self._services_resolved: return self.services else: logger.debug("Get Services...") services: Sequence[GattDeviceService] = _ensure_success( await self._requester.get_gatt_services_async( BluetoothCacheMode.UNCACHED ), "services", "Could not get GATT services", ) for service in services: # Windows returns an ACCESS_DENIED error when trying to enumerate # characterstics of services used by the OS, like the HID service # so we have to exclude those services. if service.uuid in _ACCESS_DENIED_SERVICES: continue self.services.add_service(BleakGATTServiceWinRT(service)) characteristics: Sequence[GattCharacteristic] = _ensure_success( await service.get_characteristics_async( BluetoothCacheMode.UNCACHED ), "characteristics", f"Could not get GATT characteristics for {service}", ) for characteristic in characteristics: self.services.add_characteristic( BleakGATTCharacteristicWinRT(characteristic) ) descriptors: Sequence[GattDescriptor] = _ensure_success( await characteristic.get_descriptors_async( BluetoothCacheMode.UNCACHED ), "descriptors", f"Could not get GATT descriptors for {service}", ) for descriptor in descriptors: self.services.add_descriptor( BleakGATTDescriptorWinRT( descriptor, str(characteristic.uuid), characteristic.attribute_handle, ) ) logger.info("Services resolved for %s", str(self)) self._services_resolved = True return self.services
# I/O methods
[docs] async def read_gatt_char( self, char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID], **kwargs, ) -> bytearray: """Perform read operation on the specified GATT characteristic. Args: char_specifier (BleakGATTCharacteristic, int, str or UUID): The characteristic to read from, specified by either integer handle, UUID or directly by the BleakGATTCharacteristic object representing it. Keyword Args: use_cached (bool): ``False`` forces Windows to read the value from the device again and not use its own cached value. Defaults to ``False``. Returns: (bytearray) The read data. """ use_cached = kwargs.get("use_cached", False) if not isinstance(char_specifier, BleakGATTCharacteristic): characteristic = self.services.get_characteristic(char_specifier) else: characteristic = char_specifier if not characteristic: raise BleakError("Characteristic {0} was not found!".format(char_specifier)) value = bytearray( _ensure_success( await characteristic.obj.read_value_async( BluetoothCacheMode.CACHED if use_cached else BluetoothCacheMode.UNCACHED ), "value", f"Could not read characteristic handle {characteristic.handle}", ) ) logger.debug(f"Read Characteristic {characteristic.handle:04X} : {value}") return value
[docs] async def read_gatt_descriptor(self, handle: int, **kwargs) -> bytearray: """Perform read operation on the specified GATT descriptor. Args: handle (int): The handle of the descriptor to read from. Keyword Args: use_cached (bool): `False` forces Windows to read the value from the device again and not use its own cached value. Defaults to `False`. Returns: (bytearray) The read data. """ use_cached = kwargs.get("use_cached", False) descriptor = self.services.get_descriptor(handle) if not descriptor: raise BleakError("Descriptor with handle {0} was not found!".format(handle)) value = bytearray( _ensure_success( await descriptor.obj.read_value_async( BluetoothCacheMode.CACHED if use_cached else BluetoothCacheMode.UNCACHED ), "value", f"Could not read Descriptor value for {handle:04X}", ) ) logger.debug(f"Read Descriptor {handle:04X} : {value}") return value
[docs] async def write_gatt_char( self, char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID], data: Union[bytes, bytearray, memoryview], response: bool = False, ) -> None: """Perform a write operation of the specified GATT characteristic. Args: char_specifier (BleakGATTCharacteristic, int, str or UUID): The characteristic to write to, specified by either integer handle, UUID or directly by the BleakGATTCharacteristic object representing it. data (bytes or bytearray): The data to send. response (bool): If write-with-response operation should be done. Defaults to `False`. """ if not isinstance(char_specifier, BleakGATTCharacteristic): characteristic = self.services.get_characteristic(char_specifier) else: characteristic = char_specifier if not characteristic: raise BleakError("Characteristic {} was not found!".format(char_specifier)) response = ( GattWriteOption.WRITE_WITH_RESPONSE if response else GattWriteOption.WRITE_WITHOUT_RESPONSE ) buf = Buffer(len(data)) buf.length = buf.capacity with memoryview(buf) as mv: mv[:] = data _ensure_success( await characteristic.obj.write_value_with_result_async(buf, response), None, f"Could not write value {data} to characteristic {characteristic.handle:04X}", )
[docs] async def write_gatt_descriptor( self, handle: int, data: Union[bytes, bytearray, memoryview] ) -> None: """Perform a write operation on the specified GATT descriptor. Args: handle (int): The handle of the descriptor to read from. data (bytes or bytearray): The data to send. """ descriptor = self.services.get_descriptor(handle) if not descriptor: raise BleakError("Descriptor with handle {0} was not found!".format(handle)) buf = Buffer(len(data)) buf.length = buf.capacity with memoryview(buf) as mv: mv[:] = data _ensure_success( await descriptor.obj.write_value_with_result_async(buf), None, f"Could not write value {data} to descriptor {handle:04X}", ) logger.debug(f"Write Descriptor {handle:04X} : {data}")
[docs] async def start_notify( self, char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID], callback: Callable[[int, bytearray], None], **kwargs, ) -> None: """Activate notifications/indications on a characteristic. Callbacks must accept two inputs. The first will be a uuid string object and the second will be a bytearray. .. code-block:: python def callback(sender, data): print(f"{sender}: {data}") client.start_notify(char_uuid, callback) Args: char_specifier (BleakGATTCharacteristic, int, str or UUID): The characteristic to activate notifications/indications on a characteristic, specified by either integer handle, UUID or directly by the BleakGATTCharacteristic object representing it. callback (function): The function to be called on notification. Keyword Args: force_indicate (bool): If this is set to True, then Bleak will set up a indication request instead of a notification request, given that the characteristic supports notifications as well as indications. """ if inspect.iscoroutinefunction(callback): def bleak_callback(s, d): asyncio.ensure_future(callback(s, d)) else: bleak_callback = callback if not isinstance(char_specifier, BleakGATTCharacteristic): characteristic = self.services.get_characteristic(char_specifier) else: characteristic = char_specifier if not characteristic: raise BleakError("Characteristic {0} not found!".format(char_specifier)) if self._notification_callbacks.get(characteristic.handle): await self.stop_notify(characteristic) characteristic_obj = characteristic.obj # If we want to force indicate even when notify is available, also check if the device # actually supports indicate as well. if not kwargs.get("force_indicate", False) and ( characteristic_obj.characteristic_properties & GattCharacteristicProperties.NOTIFY ): cccd = GattClientCharacteristicConfigurationDescriptorValue.NOTIFY elif ( characteristic_obj.characteristic_properties & GattCharacteristicProperties.INDICATE ): cccd = GattClientCharacteristicConfigurationDescriptorValue.INDICATE else: raise BleakError( "characteristic does not support notifications or indications" ) fcn = _notification_wrapper(bleak_callback, asyncio.get_running_loop()) event_handler_token = characteristic_obj.add_value_changed(fcn) self._notification_callbacks[characteristic.handle] = event_handler_token try: _ensure_success( await characteristic_obj.write_client_characteristic_configuration_descriptor_async( cccd ), None, f"Could not start notify on {characteristic.handle:04X}", ) except BaseException: # This usually happens when a device reports that it supports indicate, # but it actually doesn't. if characteristic.handle in self._notification_callbacks: event_handler_token = self._notification_callbacks.pop( characteristic.handle ) characteristic_obj.remove_value_changed(event_handler_token) raise
[docs] async def stop_notify( self, char_specifier: Union[BleakGATTCharacteristic, int, str, uuid.UUID] ) -> None: """Deactivate notification/indication on a specified characteristic. Args: char_specifier (BleakGATTCharacteristic, int, str or UUID): The characteristic to deactivate notification/indication on, specified by either integer handle, UUID or directly by the BleakGATTCharacteristic object representing it. """ if not isinstance(char_specifier, BleakGATTCharacteristic): characteristic = self.services.get_characteristic(char_specifier) else: characteristic = char_specifier if not characteristic: raise BleakError("Characteristic {} not found!".format(char_specifier)) _ensure_success( await characteristic.obj.write_client_characteristic_configuration_descriptor_async( GattClientCharacteristicConfigurationDescriptorValue.NONE ), None, f"Could not stop notify on {characteristic.handle:04X}", ) event_handler_token = self._notification_callbacks.pop(characteristic.handle) characteristic.obj.remove_value_changed(event_handler_token)
def _notification_wrapper(func: Callable, loop: asyncio.AbstractEventLoop): @wraps(func) def notification_parser(sender: Any, args: Any): # Return only the UUID string representation as sender. # Also do a conversion from System.Bytes[] to bytearray. value = bytearray(args.characteristic_value) return loop.call_soon_threadsafe(func, sender.attribute_handle, value) return notification_parser