Source code for fsspec.asyn

import asyncio
import functools
import inspect
import re
import os
import sys
import threading

from .utils import other_paths, is_exception
from .spec import AbstractFileSystem

# this global variable holds whether this thread is running async or not
thread_state = threading.local()
private = re.compile("_[^_]")


def _run_until_done(coro):
    """execute coroutine, when already in the event loop"""
    if sys.version_info < (3, 7):  # pragma: no cover
        raise RuntimeError(
            "async file systems do not work completely on py<37. "
            "The nested call currently underway cannot be processed. "
            "Please downgrade your fsspec or upgrade python."
        )
    loop = asyncio.get_event_loop()
    task = asyncio.current_task()
    asyncio.tasks._unregister_task(task)
    del asyncio.tasks._current_tasks[loop]
    runner = loop.create_task(coro)
    while not runner.done():
        loop._run_once()
    asyncio.tasks._current_tasks[loop] = task
    return runner.result()


def sync(loop, func, *args, callback_timeout=None, **kwargs):
    """
    Run coroutine in loop running in separate thread.
    """

    e = threading.Event()
    main_tid = threading.get_ident()
    result = [None]
    error = [False]

    async def f():
        try:
            if main_tid == threading.get_ident():
                raise RuntimeError("sync() called from thread of running loop")
            await asyncio.sleep(0)
            thread_state.asynchronous = True
            future = func(*args, **kwargs)
            if callback_timeout is not None:
                future = asyncio.wait_for(future, callback_timeout)
            result[0] = await future
        except Exception:
            error[0] = sys.exc_info()
        finally:
            thread_state.asynchronous = False
            e.set()

    asyncio.run_coroutine_threadsafe(f(), loop=loop)
    if callback_timeout is not None:
        if not e.wait(callback_timeout):
            raise TimeoutError("timed out after %s s." % (callback_timeout,))
    else:
        while not e.is_set():
            e.wait(10)
    if error[0]:
        typ, exc, tb = error[0]
        raise exc.with_traceback(tb)
    else:
        return result[0]


def maybe_sync(func, self, *args, **kwargs):
    """Make function call into coroutine or maybe run

    If we are running async, run coroutine on current loop until done;
    otherwise runs it on the loop (if is a coroutine already) or directly. Will guess
    we are running async if either "self" has an attribute asynchronous which is True,
    or thread_state does (this gets set in ``sync()`` itself, to avoid nesting loops).
    """
    loop = self.loop
    # second condition below triggers if this is running in the thread of the
    # event loop *during* the call to sync(), i.e., while running
    # asynchronously
    if getattr(self, "asynchronous", False) or getattr(
        thread_state, "asynchronous", False
    ):
        if inspect.iscoroutinefunction(func):
            # run coroutine while pausing this one (because we are within async)
            return _run_until_done(func(*args, **kwargs))
        else:
            # make awaitable which then calls the blocking function
            return _run_as_coroutine(func, *args, **kwargs)
    else:
        if inspect.iscoroutinefunction(func):
            # run the awaitable on the loop
            return sync(loop, func, *args, **kwargs)
        else:
            # just call the blocking function
            return func(*args, **kwargs)


async def _run_as_coroutine(func, *args, **kwargs):
    # This is not currently used
    return func(*args, **kwargs)


def sync_wrapper(func, obj=None):
    """Given a function, make so can be called in async or blocking contexts

    Leave obj=None if defining within a class. Pass the instance if attaching
    as an attribute of the instance.
    """

    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        self = obj or args[0]
        return maybe_sync(func, self, *args, **kwargs)

    return wrapper


def async_wrapper(func):
    """Run a sync function on the event loop"""

    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        return func(*args, **kwargs)

    return wrapper


def get_loop():
    """Create a running loop in another thread"""
    loop = asyncio.new_event_loop()
    t = threading.Thread(target=loop.run_forever)
    t.daemon = True
    t.start()
    return loop


# these methods should be implemented as async by any async-able backend
async_methods = [
    "_ls",
    "_cat_file",
    "_get_file",
    "_put_file",
    "_rm_file",
    "_cp_file",
    "_pipe_file",
]
# these methods could be overridden, but have default sync versions which rely on _ls
# the sync methods below all call expand_path, which in turn may call walk or glob
# (if passed paths with glob characters, or for recursive=True, respectively)
default_async_methods = [
    "_expand_path",
    "_info",
    "_isfile",
    "_isdir",
    "_exists",
    "_walk",
    "_glob",
    "_find",
    "_du",
]


[docs]class AsyncFileSystem(AbstractFileSystem): """Async file operations, default implementations Passes bulk operations to asyncio.gather for concurrent operation. Implementations that have concurrent batch operations and/or async methods should inherit from this class instead of AbstractFileSystem. Docstrings are copied from the un-underscored method in AbstractFileSystem, if not given. """ # note that methods do not have docstring here; they will be copied # for _* methods and inferred for overridden methods. async_impl = True def __init__(self, *args, asynchronous=False, loop=None, **kwargs): self.asynchronous = asynchronous self.loop = loop or get_loop() super().__init__(*args, **kwargs) async def _rm(self, path, recursive=False, **kwargs): await asyncio.gather(*[self._rm_file(p, **kwargs) for p in path]) def rm(self, path, recursive=False, **kwargs): path = self.expand_path(path, recursive=recursive) maybe_sync(self._rm, self, path, **kwargs) async def _copy(self, paths, path2, **kwargs): await asyncio.gather( *[self._cp_file(p1, p2, **kwargs) for p1, p2 in zip(paths, path2)] ) def copy(self, path1, path2, recursive=False, **kwargs): paths = self.expand_path(path1, recursive=recursive) path2 = other_paths(paths, path2) maybe_sync(self._copy, self, paths, path2) async def _pipe(self, path, value=None, **kwargs): if isinstance(path, str): path = {path: value} await asyncio.gather( *[self._pipe_file(k, v, **kwargs) for k, v in path.items()] ) async def _cat(self, paths, **kwargs): return await asyncio.gather( *[ asyncio.ensure_future(self._cat_file(path, **kwargs), loop=self.loop) for path in paths ], return_exceptions=True ) def cat(self, path, recursive=False, on_error="raise", **kwargs): paths = self.expand_path(path, recursive=recursive) out = maybe_sync(self._cat, self, paths, **kwargs) if on_error == "raise": ex = next(filter(is_exception, out), False) if ex: raise ex if ( len(paths) > 1 or isinstance(path, list) or paths[0] != self._strip_protocol(path) ): return { k: v for k, v in zip(paths, out) if on_error != "omit" or not is_exception(v) } else: return out[0] async def _put(self, lpaths, rpaths, **kwargs): return await asyncio.gather( *[ self._put_file(lpath, rpath, **kwargs) for lpath, rpath in zip(lpaths, rpaths) ] ) def put(self, lpath, rpath, recursive=False, **kwargs): from .implementations.local import make_path_posix, LocalFileSystem rpath = self._strip_protocol(rpath) if isinstance(lpath, str): lpath = make_path_posix(lpath) fs = LocalFileSystem() lpaths = fs.expand_path(lpath, recursive=recursive) rpaths = other_paths(lpaths, rpath) maybe_sync(self._put, self, lpaths, rpaths, **kwargs) async def _get(self, rpaths, lpaths, **kwargs): dirs = [os.path.dirname(lp) for lp in lpaths] [os.makedirs(d, exist_ok=True) for d in dirs] return await asyncio.gather( *[ self._get_file(rpath, lpath, **kwargs) for lpath, rpath in zip(lpaths, rpaths) ] ) def get(self, rpath, lpath, recursive=False, **kwargs): from fsspec.implementations.local import make_path_posix rpath = self._strip_protocol(rpath) lpath = make_path_posix(lpath) rpaths = self.expand_path(rpath, recursive=recursive) lpaths = other_paths(rpaths, lpath) [os.makedirs(os.path.dirname(lp), exist_ok=True) for lp in lpaths] return sync(self.loop, self._get, rpaths, lpaths)
def mirror_sync_methods(obj): """Populate sync and async methods for obj For each method will create a sync version if the name refers to an async method (coroutine) and there is no override in the child class; will create an async method for the corresponding sync method if there is no implementation. Uses the methods specified in - async_methods: the set that an implementation is expected to provide - default_async_methods: that can be derived from their sync version in AbstractFileSystem - AsyncFileSystem: async-specific default coroutines """ from fsspec import AbstractFileSystem for method in async_methods + default_async_methods + dir(AsyncFileSystem): if not method.startswith("_"): continue smethod = method[1:] if private.match(method): isco = inspect.iscoroutinefunction(getattr(obj, method, None)) unsync = getattr(getattr(obj, smethod, False), "__func__", None) is_default = unsync is getattr(AbstractFileSystem, smethod, "") if isco and is_default: mth = sync_wrapper(getattr(obj, method), obj=obj) setattr(obj, smethod, mth) if not mth.__doc__: mth.__doc__ = getattr( getattr(AbstractFileSystem, smethod, None), "__doc__", "" ) elif ( hasattr(obj, smethod) and inspect.ismethod(getattr(obj, smethod)) and not hasattr(obj, method) ): setattr(obj, method, async_wrapper(getattr(obj, smethod)))