Source code for fsspec.implementations.memory

from __future__ import print_function, division, absolute_import

from io import BytesIO
from datetime import datetime
from errno import ENOTEMPTY
from fsspec import AbstractFileSystem
import logging

logger = logging.Logger("fsspec.memoryfs")


[docs]class MemoryFileSystem(AbstractFileSystem): """A filesystem based on a dict of BytesIO objects This is a global filesystem so instances of this class all point to the same in memory filesystem. """ store = {} # global pseudo_dirs = [] protocol = "memory" root_marker = "" def ls(self, path, detail=False, **kwargs): if path in self.store: # there is a key with this exact name, but could also be directory out = [ { "name": path, "size": self.store[path].getbuffer().nbytes, "type": "file", "created": self.store[path].created, } ] else: out = [] path = path.strip("/").lstrip("/") paths = set() for p2 in self.store: has_slash = "/" if p2.startswith("/") else "" p = p2.lstrip("/") if "/" in p: root = p.rsplit("/", 1)[0] else: root = "" if root == path: out.append( { "name": has_slash + p, "size": self.store[p2].getbuffer().nbytes, "type": "file", "created": self.store[p2].created, } ) elif ( path and len(path) < len(p.strip("/")) and all( (a == b) for a, b in zip(path.split("/"), p.strip("/").split("/")) ) ): # implicit directory ppath = "/".join(p.split("/")[: len(path.split("/")) + 1]) if ppath not in paths: out.append( { "name": has_slash + ppath + "/", "size": 0, "type": "directory", } ) paths.add(ppath) elif all( (a == b) for a, b in zip(path.split("/"), [""] + p.strip("/").split("/")) ): # root directory entry ppath = p.rstrip("/").split("/", 1)[0] if ppath not in paths: out.append( { "name": has_slash + ppath + "/", "size": 0, "type": "directory", } ) paths.add(ppath) for p2 in self.pseudo_dirs: if self._parent(p2).strip("/") == path and p2.strip("/") not in paths: out.append({"name": p2 + "/", "size": 0, "type": "directory"}) if detail: return out return sorted([f["name"] for f in out]) def mkdir(self, path, create_parents=True, **kwargs): path = path.rstrip("/") if create_parents and self._parent(path): self.mkdir(self._parent(path), create_parents, **kwargs) if self._parent(path) and not self.isdir(self._parent(path)): raise NotADirectoryError(self._parent(path)) if path and path not in self.pseudo_dirs: self.pseudo_dirs.append(path) def rmdir(self, path): path = path.rstrip("/") if path in self.pseudo_dirs: if not self.ls(path): self.pseudo_dirs.remove(path) else: raise OSError(ENOTEMPTY, "Directory not empty", path) else: raise FileNotFoundError(path) def exists(self, path): return path in self.store or path in self.pseudo_dirs def _open( self, path, mode="rb", block_size=None, autocommit=True, cache_options=None, **kwargs ): if mode in ["rb", "ab", "rb+"]: if path in self.store: f = self.store[path] if mode == "ab": # position at the end of file f.seek(0, 2) else: # position at the beginning of file f.seek(0) return f else: raise FileNotFoundError(path) if mode == "wb": m = MemoryFile(self, path) if not self._intrans: m.commit() return m def cp_file(self, path1, path2, **kwargs): if self.isfile(path1): self.store[path2] = MemoryFile(self, path2, self.store[path1].getbuffer()) elif self.isdir(path1): if path2 not in self.pseudo_dirs: self.pseudo_dirs.append(path2) else: raise FileNotFoundError def cat_file(self, path): try: return self.store[path].getvalue() except KeyError: raise FileNotFoundError(path) def _rm(self, path): if self.isfile(path): del self.store[path] elif self.isdir(path): self.rmdir(path) else: raise FileNotFoundError def size(self, path): """Size in bytes of the file at path""" if path not in self.store: raise FileNotFoundError(path) return self.store[path].getbuffer().nbytes
class MemoryFile(BytesIO): """A BytesIO which can't close and works as a context manager Can initialise with data. Each path should only be active once at any moment. No need to provide fs, path if auto-committing (default) """ def __init__(self, fs=None, path=None, data=None): self.fs = fs self.path = path self.created = datetime.utcnow().timestamp() if data: self.write(data) self.size = len(data) self.seek(0) def __enter__(self): return self def close(self): position = self.tell() self.size = self.seek(0, 2) self.seek(position) def discard(self): pass def commit(self): self.fs.store[self.path] = self