Source code for bleak.backends.bluezdbus.manager

"""
BlueZ D-Bus manager module
--------------------------

This module contains code for the global BlueZ D-Bus object manager that is
used internally by Bleak.
"""

import asyncio
import logging
import os
from typing import (
    Any,
    Callable,
    Coroutine,
    Dict,
    Iterable,
    List,
    NamedTuple,
    Optional,
    Set,
    Tuple,
    cast,
)

from dbus_next import BusType, Message, MessageType, Variant
from dbus_next.aio.message_bus import MessageBus
from typing_extensions import Literal, TypedDict

from ...exc import BleakError
from ..service import BleakGATTServiceCollection
from . import defs
from .advertisement_monitor import AdvertisementMonitor, OrPatternLike
from .characteristic import BleakGATTCharacteristicBlueZDBus
from .descriptor import BleakGATTDescriptorBlueZDBus
from .service import BleakGATTServiceBlueZDBus
from .signals import MatchRules, add_match
from .utils import assert_reply, unpack_variants

logger = logging.getLogger(__name__)


# D-Bus properties for interfaces
# https://github.com/bluez/bluez/blob/master/doc/adapter-api.txt


[docs]class Adapter1(TypedDict): Address: str Name: str Alias: str Class: int Powered: bool Discoverable: bool Pairable: bool PairableTimeout: int DiscoverableTimeout: int Discovering: int UUIDs: List[str] Modalias: str Roles: List[str] ExperimentalFeatures: List[str]
# https://github.com/bluez/bluez/blob/master/doc/advertisement-monitor-api.txt
[docs]class AdvertisementMonitor1(TypedDict): Type: str RSSILowThreshold: int RSSIHighThreshold: int RSSILowTimeout: int RSSIHighTimeout: int RSSISamplingPeriod: int Patterns: List[Tuple[int, int, bytes]]
[docs]class AdvertisementMonitorManager1(TypedDict): SupportedMonitorTypes: List[str] SupportedFeatures: List[str]
# https://github.com/bluez/bluez/blob/master/doc/battery-api.txt
[docs]class Battery1(TypedDict): SupportedMonitorTypes: List[str] SupportedFeatures: List[str]
# https://github.com/bluez/bluez/blob/master/doc/device-api.txt
[docs]class Device1(TypedDict): Address: str AddressType: str Name: str Icon: str Class: int Appearance: int UUIDs: List[str] Paired: bool Bonded: bool Connected: bool Trusted: bool Blocked: bool WakeAllowed: bool Alias: str Adapter: str LegacyPairing: bool Modalias: str RSSI: int TxPower: int ManufacturerData: Dict[int, bytes] ServiceData: Dict[str, bytes] ServicesResolved: bool AdvertisingFlags: bytes AdvertisingData: Dict[int, bytes]
# https://github.com/bluez/bluez/blob/master/doc/gatt-api.txt
[docs]class GattService1(TypedDict): UUID: str Primary: bool Device: str Includes: List[str]
# Handle is server-only and not available in Bleak
[docs]class GattCharacteristic1(TypedDict): UUID: str Service: str Value: bytes WriteAcquired: bool NotifyAcquired: bool Notifying: bool Flags: List[ Literal[ "broadcast", "read", "write-without-response", "write", "notify", "indicate", "authenticated-signed-writes", "extended-properties", "reliable-write", "writable-auxiliaries", "encrypt-read", "encrypt-write", # "encrypt-notify" and "encrypt-indicate" are server-only "encrypt-authenticated-read", "encrypt-authenticated-write", # "encrypt-authenticated-notify", "encrypt-authenticated-indicate", # "secure-read", "secure-write", "secure-notify", "secure-indicate" # are server-only "authorize", ] ] MTU: int
# Handle is server-only and not available in Bleak
[docs]class GattDescriptor1(TypedDict): UUID: str Characteristic: str Value: bytes Flags: List[ Literal[ "read", "write", "encrypt-read", "encrypt-write", "encrypt-authenticated-read", "encrypt-authenticated-write", # "secure-read" and "secure-write" are server-only and not available in Bleak "authorize", ] ]
# Handle is server-only and not available in Bleak AdvertisementCallback = Callable[[str, Device1], None] """ A callback that is called when advertisement data is received. Args: arg0: The D-Bus object path of the device. arg1: The D-Bus properties of the device object. """
[docs]class CallbackAndState(NamedTuple): """ Encapsulates an :data:`AdvertisementCallback` and some state. """ callback: AdvertisementCallback """ The callback. """ adapter_path: str """ The D-Bus object path of the adapter associated with the callback. """ seen_devices: Set[str] """ Set of device D-Bus object paths that have been "seen" already by this callback. """
DeviceConnectedChangedCallback = Callable[[bool], None] """ A callback that is called when a device's "Connected" property changes. Args: arg0: The current value of the "Connected" property. """ CharacteristicValueChangedCallback = Callable[[str, bytes], None] """ A callback that is called when a characteristics's "Value" property changes. Args: arg0: The D-Bus object path of the characteristic. arg1: The current value of the "Value" property. """
[docs]class DeviceWatcher(NamedTuple): device_path: str """ The D-Bus object path of the device. """ on_connected_changed: DeviceConnectedChangedCallback """ A callback that is called when a device's "Connected" property changes. """ on_characteristic_value_changed: CharacteristicValueChangedCallback """ A callback that is called when a characteristics's "Value" property changes. """
# set of org.bluez.Device1 property names that come from advertising data _ADVERTISING_DATA_PROPERTIES = { "AdvertisingData", "AdvertisingFlags", "ManufacturerData", "Name", "ServiceData", "UUIDs", }
[docs]class BlueZManager: """ BlueZ D-Bus object manager. Use :func:`bleak.backends.bluezdbus.get_global_bluez_manager` to get the global instance. """ def __init__(self): self._bus: Optional[MessageBus] = None self._bus_lock = asyncio.Lock() # dict of object path: dict of interface name: dict of property name: property value self._properties: Dict[str, Dict[str, Dict[str, Any]]] = {} self._advertisement_callbacks: List[CallbackAndState] = [] self._device_watchers: Set[DeviceWatcher] = set() self._condition_callbacks: Set[Callable] = set()
[docs] async def async_init(self): """ Connects to the D-Bus message bus and begins monitoring signals. It is safe to call this method multiple times. If the bus is already connected, no action is performed. """ async with self._bus_lock: if self._bus and self._bus.connected: return # We need to create a new MessageBus each time as # dbus-next will destory the underlying file descriptors # when the previous one is closed in its finalizer. bus = MessageBus(bus_type=BusType.SYSTEM) await bus.connect() try: # Add signal listeners bus.add_message_handler(self._parse_msg) rules = MatchRules( interface=defs.OBJECT_MANAGER_INTERFACE, member="InterfacesAdded", arg0path="/org/bluez/", ) reply = await add_match(bus, rules) assert_reply(reply) rules = MatchRules( interface=defs.OBJECT_MANAGER_INTERFACE, member="InterfacesRemoved", arg0path="/org/bluez/", ) reply = await add_match(bus, rules) assert_reply(reply) rules = MatchRules( interface=defs.PROPERTIES_INTERFACE, member="PropertiesChanged", path_namespace="/org/bluez", ) reply = await add_match(bus, rules) assert_reply(reply) # get existing objects after adding signal handlers to avoid # race condition reply = await bus.call( Message( destination=defs.BLUEZ_SERVICE, path="/", member="GetManagedObjects", interface=defs.OBJECT_MANAGER_INTERFACE, ) ) assert_reply(reply) # dictionary is replaced in case AddInterfaces was received first self._properties = { path: unpack_variants(interfaces) for path, interfaces in reply.body[0].items() } logger.debug(f"initial properties: {self._properties}") except BaseException: # if setup failed, disconnect bus.disconnect() raise # Everything is setup, so save the bus self._bus = bus
[docs] async def active_scan( self, adapter_path: str, filters: Dict[str, Variant], callback: AdvertisementCallback, ) -> Callable[[], Coroutine]: """ Configures the advertisement data filters and starts scanning. Args: adapter_path: The D-Bus object path of the adapter to use for scanning. filters: A dictionary of filters to pass to ``SetDiscoveryFilter``. callback: A callable that will be called when new advertisement data is received. Returns: An async function that is used to stop scanning and remove the filters. """ async with self._bus_lock: # If the adapter doesn't exist, then the message calls below would # fail with "method not found". This provides a more informative # error message. if adapter_path not in self._properties: raise BleakError(f"adapter '{adapter_path.split('/')[-1]}' not found") callback_and_state = CallbackAndState(callback, adapter_path, set()) self._advertisement_callbacks.append(callback_and_state) try: # Apply the filters reply = await self._bus.call( Message( destination=defs.BLUEZ_SERVICE, path=adapter_path, interface=defs.ADAPTER_INTERFACE, member="SetDiscoveryFilter", signature="a{sv}", body=[filters], ) ) assert_reply(reply) # Start scanning reply = await self._bus.call( Message( destination=defs.BLUEZ_SERVICE, path=adapter_path, interface=defs.ADAPTER_INTERFACE, member="StartDiscovery", ) ) assert_reply(reply) async def stop() -> None: async with self._bus_lock: reply = await self._bus.call( Message( destination=defs.BLUEZ_SERVICE, path=adapter_path, interface=defs.ADAPTER_INTERFACE, member="StopDiscovery", ) ) assert_reply(reply) # remove the filters reply = await self._bus.call( Message( destination=defs.BLUEZ_SERVICE, path=adapter_path, interface=defs.ADAPTER_INTERFACE, member="SetDiscoveryFilter", signature="a{sv}", body=[{}], ) ) assert_reply(reply) self._advertisement_callbacks.remove(callback_and_state) return stop except BaseException: # if starting scanning failed, don't leak the callback self._advertisement_callbacks.remove(callback_and_state) raise
[docs] async def passive_scan( self, adapter_path: str, filters: List[OrPatternLike], callback: AdvertisementCallback, ) -> Callable[[], Coroutine]: """ Configures the advertisement data filters and starts scanning. Args: adapter_path: The D-Bus object path of the adapter to use for scanning. filters: A list of "or patterns" to pass to ``org.bluez.AdvertisementMonitor1``. callback: A callable that will be called when new advertisement data is received. Returns: An async function that is used to stop scanning and remove the filters. """ async with self._bus_lock: # If the adapter doesn't exist, then the message calls below would # fail with "method not found". This provides a more informative # error message. if adapter_path not in self._properties: raise BleakError(f"adapter '{adapter_path.split('/')[-1]}' not found") callback_and_state = CallbackAndState(callback, adapter_path, set()) self._advertisement_callbacks.append(callback_and_state) try: monitor = AdvertisementMonitor(filters) # this should be a unique path to allow multiple python interpreters # running bleak and multiple scanners within a single interpreter monitor_path = f"/org/bleak/{os.getpid()}/{id(monitor)}" reply = await self._bus.call( Message( destination=defs.BLUEZ_SERVICE, path=adapter_path, interface=defs.ADVERTISEMENT_MONITOR_MANAGER_INTERFACE, member="RegisterMonitor", signature="o", body=[monitor_path], ) ) if ( reply.message_type == MessageType.ERROR and reply.error_name == "org.freedesktop.DBus.Error.UnknownMethod" ): raise BleakError( "passive scanning on Linux requires BlueZ >= 5.55 with --experimental enabled and Linux kernel >= 5.10" ) assert_reply(reply) # It is important to export after registering, otherwise BlueZ # won't use the monitor self._bus.export(monitor_path, monitor) async def stop(): async with self._bus_lock: self._bus.unexport(monitor_path, monitor) reply = await self._bus.call( Message( destination=defs.BLUEZ_SERVICE, path=adapter_path, interface=defs.ADVERTISEMENT_MONITOR_MANAGER_INTERFACE, member="UnregisterMonitor", signature="o", body=[monitor_path], ) ) assert_reply(reply) self._advertisement_callbacks.remove(callback_and_state) return stop except BaseException: # if starting scanning failed, don't leak the callback self._advertisement_callbacks.remove(callback_and_state) raise
[docs] def add_device_watcher( self, device_path: str, on_connected_changed: DeviceConnectedChangedCallback, on_characteristic_value_changed: CharacteristicValueChangedCallback, ) -> DeviceWatcher: """ Registers a device watcher to receive callbacks when device state changes or events are received. Args: device_path: The D-Bus object path of the device. on_connected_changed: A callback that is called when the device's "Connected" state changes. on_characteristic_value_changed: A callback that is called whenever a characteristic receives a notification/indication. Returns: A device watcher object that acts a token to unregister the watcher. """ watcher = DeviceWatcher( device_path, on_connected_changed, on_characteristic_value_changed ) self._device_watchers.add(watcher) return watcher
[docs] def remove_device_watcher(self, watcher: DeviceWatcher) -> None: """ Unregisters a device watcher. Args: The device watcher token that was returned by :meth:`add_device_watcher`. """ self._device_watchers.remove(watcher)
[docs] async def get_services(self, device_path: str) -> BleakGATTServiceCollection: """ Builds a new :class:`BleakGATTServiceCollection` from the current state. Args: device_path: The D-Bus object path of the Bluetooth device. Returns: A new :class:`BleakGATTServiceCollection`. """ await self._wait_condition(device_path, "ServicesResolved", True) services = BleakGATTServiceCollection() for service_path, service_ifaces in self._properties.items(): if ( not service_path.startswith(device_path) or defs.GATT_SERVICE_INTERFACE not in service_ifaces ): continue service = BleakGATTServiceBlueZDBus( service_ifaces[defs.GATT_SERVICE_INTERFACE], service_path ) services.add_service(service) for char_path, char_ifaces in self._properties.items(): if ( not char_path.startswith(service_path) or defs.GATT_CHARACTERISTIC_INTERFACE not in char_ifaces ): continue char = BleakGATTCharacteristicBlueZDBus( char_ifaces[defs.GATT_CHARACTERISTIC_INTERFACE], char_path, service.uuid, service.handle, ) services.add_characteristic(char) for desc_path, desc_ifaces in self._properties.items(): if ( not desc_path.startswith(char_path) or defs.GATT_DESCRIPTOR_INTERFACE not in desc_ifaces ): continue desc = BleakGATTDescriptorBlueZDBus( desc_ifaces[defs.GATT_DESCRIPTOR_INTERFACE], desc_path, char.uuid, char.handle, ) services.add_descriptor(desc) return services
[docs] def get_device_name(self, device_path: str) -> str: """ Gets the value of the "Name" property for a device. Args: device_path: The D-Bus object path of the device. Returns: The current property value. """ return self._properties[device_path][defs.DEVICE_INTERFACE]["Name"]
async def _wait_condition( self, device_path: str, property_name: str, property_value: Any ) -> None: """ Waits for a condition to become true. Args: device_path: The D-Bus object path of a Bluetooth device. property_name: The name of the property to test. property_value: A value to compare the current property value to. """ if ( self._properties[device_path][defs.DEVICE_INTERFACE][property_name] == property_value ): return event = asyncio.Event() def callback(): if ( self._properties[device_path][defs.DEVICE_INTERFACE][property_name] == property_value ): event.set() self._condition_callbacks.add(callback) try: # can be canceled await event.wait() finally: self._condition_callbacks.remove(callback) def _parse_msg(self, message: Message): """ Handles callbacks from dbus_next. """ if message.message_type != MessageType.SIGNAL: return logger.debug( f"received D-Bus signal: {message.interface}.{message.member} ({message.path}): {message.body}" ) # type hints obj_path: str interfaces_and_props: Dict[str, Dict[str, Variant]] interfaces: List[str] interface: str changed: Dict[str, Variant] invalidated: List[str] if message.member == "InterfacesAdded": obj_path, interfaces_and_props = message.body for interface, props in interfaces_and_props.items(): unpacked_props = unpack_variants(props) self._properties.setdefault(obj_path, {})[interface] = unpacked_props # If this is a device and it has advertising data properties, # then it should mean that this device just started advertising. # Previously, we just relied on RSSI updates to determine if # a device was actually advertising, but we were missing "slow" # devices that only advertise once and then go to sleep for a while. if interface == defs.DEVICE_INTERFACE: self._run_advertisement_callbacks( obj_path, cast(Device1, unpacked_props), unpacked_props.keys() ) elif message.member == "InterfacesRemoved": obj_path, interfaces = message.body for interface in interfaces: del self._properties[obj_path][interface] elif message.member == "PropertiesChanged": assert message.path is not None interface, changed, invalidated = message.body try: self_interface = self._properties[message.path][interface] except KeyError: # This can happen during initialization. The "PropertyChanged" # handler is attached before "GetManagedObjects" is called # and so self._properties may not yet be populated. # This is not a problem. We just discard the property value # since "GetManagedObjects" will return a newer value. pass else: # update self._properties first self_interface.update(unpack_variants(changed)) for name in invalidated: del self_interface[name] # then call any callbacks so they will be called with the # updated state if interface == defs.DEVICE_INTERFACE: # handle advertisement watchers self._run_advertisement_callbacks( message.path, cast(Device1, self_interface), changed.keys() ) # handle device condition watchers for condition_callback in self._condition_callbacks: condition_callback() # handle device connection change watchers if "Connected" in changed: for ( device_path, on_connected_changed, _, ) in self._device_watchers.copy(): # callbacks may remove the watcher, hence the copy() above if message.path == device_path: on_connected_changed(self_interface["Connected"]) elif interface == defs.GATT_CHARACTERISTIC_INTERFACE: # handle characteristic value change watchers if "Value" in changed: for device_path, _, on_value_changed in self._device_watchers: if message.path.startswith(device_path): on_value_changed(message.path, self_interface["Value"]) def _run_advertisement_callbacks( self, device_path: str, device: Device1, changed: Iterable[str] ) -> None: """ Runs any registered advertisement callbacks. Args: device_path: The D-Bus object path of the remote device. device: The current D-Bus properties of the device. changed: A list of properties that have changed since the last call. """ for ( callback, adapter_path, seen_devices, ) in self._advertisement_callbacks: # filter messages from other adapters if not device_path.startswith(adapter_path): continue first_time_seen = False if device_path not in seen_devices: first_time_seen = True seen_devices.add(device_path) # Only do advertising data callback if this is the first time the # device has been seen or if an advertising data property changed. # Otherwise we get a flood of callbacks from RSSI changing. if first_time_seen or not _ADVERTISING_DATA_PROPERTIES.isdisjoint(changed): # TODO: this should be deep copy, not shallow callback(device_path, cast(Device1, device.copy()))
[docs]async def get_global_bluez_manager() -> BlueZManager: """ Gets the initialized global BlueZ manager instance. """ if not hasattr(get_global_bluez_manager, "instance"): setattr(get_global_bluez_manager, "instance", BlueZManager()) instance: BlueZManager = getattr(get_global_bluez_manager, "instance") await instance.async_init() return instance