This document is for Kombu's development version, which can be significantly different from previous releases. Get the stable docs here: 5.0.

Source code for kombu.transport.filesystem

"""File-system Transport module for kombu.

Transport using the file-system as the message store. Messages written to the
queue are stored in `data_folder_in` directory and
messages read from the queue are read from `data_folder_out` directory. Both
directories must be created manually. Simple example:

* Producer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_in', 'data_folder_out': 'data_out'
        }
    )
    conn.connect()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            producer = kombu.Producer(channel)
            producer.publish(
                        {'hello': 'world'},
                        retry=True,
                        exchange=test_queue.exchange,
                        routing_key=test_queue.routing_key,
                        declare=[test_queue],
                        serializer='pickle'
            )

* Consumer:

.. code-block:: python

    import kombu

    conn = kombu.Connection(
        'filesystem://', transport_options={
            'data_folder_in': 'data_out', 'data_folder_out': 'data_in'
        }
    )
    conn.connect()

    def callback(body, message):
        print(body, message)
        message.ack()

    test_queue = kombu.Queue('test', routing_key='test')

    with conn as conn:
        with conn.default_channel as channel:
            consumer = kombu.Consumer(
                conn, [test_queue], accept=['pickle']
            )
            consumer.register_callback(callback)
            with consumer:
                conn.drain_events(timeout=1)

Features
========
* Type: Virtual
* Supports Direct: Yes
* Supports Topic: Yes
* Supports Fanout: No
* Supports Priority: No
* Supports TTL: No

Connection String
=================
Connection string is in the following format:

.. code-block::

    filesystem://

Transport Options
=================
* ``data_folder_in`` - directory where are messages stored when written
  to queue.
* ``data_folder_out`` - directory from which are messages read when read from
  queue.
* ``store_processed`` - if set to True, all processed messages are backed up to
  ``processed_folder``.
* ``processed_folder`` - directory where are backed up processed files.
"""

import os
import shutil
import uuid
from queue import Empty
import tempfile
from time import monotonic

from . import virtual
from kombu.exceptions import ChannelError
from kombu.utils.encoding import bytes_to_str, str_to_bytes
from kombu.utils.json import loads, dumps
from kombu.utils.objects import cached_property


VERSION = (1, 0, 0)
__version__ = '.'.join(map(str, VERSION))

# needs win32all to work on Windows
if os.name == 'nt':

    import win32con
    import win32file
    import pywintypes

    LOCK_EX = win32con.LOCKFILE_EXCLUSIVE_LOCK
    # 0 is the default
    LOCK_SH = 0                                     # noqa
    LOCK_NB = win32con.LOCKFILE_FAIL_IMMEDIATELY    # noqa
    __overlapped = pywintypes.OVERLAPPED()

    def lock(file, flags):
        """Create file lock."""
        hfile = win32file._get_osfhandle(file.fileno())
        win32file.LockFileEx(hfile, flags, 0, 0xffff0000, __overlapped)

    def unlock(file):
        """Remove file lock."""
        hfile = win32file._get_osfhandle(file.fileno())
        win32file.UnlockFileEx(hfile, 0, 0xffff0000, __overlapped)

elif os.name == 'posix':

    import fcntl
    from fcntl import LOCK_EX, LOCK_SH, LOCK_NB     # noqa

    def lock(file, flags):  # noqa
        """Create file lock."""
        fcntl.flock(file.fileno(), flags)

    def unlock(file):       # noqa
        """Remove file lock."""
        fcntl.flock(file.fileno(), fcntl.LOCK_UN)
else:
    raise RuntimeError(
        'Filesystem plugin only defined for NT and POSIX platforms')


[docs]class Channel(virtual.Channel): """Filesystem Channel.""" def _put(self, queue, payload, **kwargs): """Put `message` onto `queue`.""" filename = '{}_{}.{}.msg'.format(int(round(monotonic() * 1000)), uuid.uuid4(), queue) filename = os.path.join(self.data_folder_out, filename) try: f = open(filename, 'wb') lock(f, LOCK_EX) f.write(str_to_bytes(dumps(payload))) except OSError: raise ChannelError( f'Cannot add file {filename!r} to directory') finally: unlock(f) f.close() def _get(self, queue): """Get next message from `queue`.""" queue_find = '.' + queue + '.msg' folder = os.listdir(self.data_folder_in) folder = sorted(folder) while len(folder) > 0: filename = folder.pop(0) # only handle message for the requested queue if filename.find(queue_find) < 0: continue if self.store_processed: processed_folder = self.processed_folder else: processed_folder = tempfile.gettempdir() try: # move the file to the tmp/processed folder shutil.move(os.path.join(self.data_folder_in, filename), processed_folder) except OSError: pass # file could be locked, or removed in meantime so ignore filename = os.path.join(processed_folder, filename) try: f = open(filename, 'rb') payload = f.read() f.close() if not self.store_processed: os.remove(filename) except OSError: raise ChannelError( f'Cannot read file {filename!r} from queue.') return loads(bytes_to_str(payload)) raise Empty() def _purge(self, queue): """Remove all messages from `queue`.""" count = 0 queue_find = '.' + queue + '.msg' folder = os.listdir(self.data_folder_in) while len(folder) > 0: filename = folder.pop() try: # only purge messages for the requested queue if filename.find(queue_find) < 0: continue filename = os.path.join(self.data_folder_in, filename) os.remove(filename) count += 1 except OSError: # we simply ignore its existence, as it was probably # processed by another worker pass return count def _size(self, queue): """Return the number of messages in `queue` as an :class:`int`.""" count = 0 queue_find = f'.{queue}.msg' folder = os.listdir(self.data_folder_in) while len(folder) > 0: filename = folder.pop() # only handle message for the requested queue if filename.find(queue_find) < 0: continue count += 1 return count @property def transport_options(self): return self.connection.client.transport_options @cached_property def data_folder_in(self): return self.transport_options.get('data_folder_in', 'data_in') @cached_property def data_folder_out(self): return self.transport_options.get('data_folder_out', 'data_out') @cached_property def store_processed(self): return self.transport_options.get('store_processed', False) @cached_property def processed_folder(self): return self.transport_options.get('processed_folder', 'processed')
[docs]class Transport(virtual.Transport): """Filesystem Transport.""" Channel = Channel default_port = 0 driver_type = 'filesystem' driver_name = 'filesystem'
[docs] def driver_version(self): return 'N/A'