diff --git a/.gitignore b/.gitignore index b032e02..fca1811 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ dist python_persistent_queue.egg-info *.queue *.coverage +*.pyc diff --git a/.travis.yml b/.travis.yml index 6f3c6fb..1a3ceba 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,10 +1,11 @@ sudo: false +dist: xenial language: python cache: pip matrix: fast_finish: true include: - - python: 3.5 + - python: 3.6 env: TOXENV=lint - python: 2.7 env: TOXENV=py27 @@ -12,10 +13,12 @@ matrix: env: TOXENV=py34 - python: 3.5 env: TOXENV=py35 - - python: "3.6-dev" + - python: 3.6 env: TOXENV=py36 - - python: nightly + - python: 3.7 env: TOXENV=py37 + - python: nightly + env: TOXENV=py38 allow_failures: - python: nightly diff --git a/README.md b/README.md index 85eb55f..a9d9640 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,52 @@ # Description -Implementation of a persistent queue in Python. I looked around and couldn't find anything that fit my needs, so I made my own. Example usage: +This is an implementation of a persistent queue in Python. By persistent, I mean that every item that is added to the queue is saved to disk. I had a specific usage pattern in mind, and I couldn't find any existing libraries that fit my needs, so I made my own. I tried to make it a drop in replacement for Python's [queue.Queue](https://docs.python.org/3/library/queue.html) object. However, I've added a few extra methods that Python's queue.Queue class does not: `peek`, `flush`, `clear`, and `close`. + +I created Python persistent queue with the following workflow in mind: + +```python + +data = queue.peek() + +success = upload_data_somewhere(data) + +if success: + queue.delete() + +``` + +In this use case, the data is only deleted from the queue after it has been successfully processed (in this example, uploaded). + +Typically, I have two threads, one that is writing data to the queue and another thread that is reading data from the queue and processing it. Something like this: + +```python + +queue = PersistentQueue('queue') + +def consumer(): + while True: + data = queue.peek() # Blocking call + success = upload_data_somewhere(data) + if success: + queue.delete() + + +def producer(): + while True: + data = read_from_sensor() + queue.put(data) + time.sleep(1) +``` + +Objects that are added to the queue must be serializeable (and with default parameters, pickle-able). A file is saved to the file system based on the name given to the queue. + +By default, `pickle` is used to serialize objects. This can be changed depending on your needs by setting the `dumps` and `loads` options (see Parameters). [dill](http://trac.mystic.cacr.caltech.edu/project/pathos/wiki/dill.html) and [msgpack](https://github.com/msgpack/msgpack-python) have been tested (see tests as an example). + +When `get()` or `delete()` are called, the data isn't actually deleted. Instead a pointer is moved to the place in the file with valid data. This is to help reduce I/O costs. As a result, the file will continue to grow even if items are removed. `flush()` reclaims this space. **You must call `flush` as you see fit!** + + +# Extended Example ```python from persistent_queue import PersistentQueue @@ -16,38 +61,24 @@ queue.push(3) queue.push(['a', 'b', 'c']) data = queue.peek() # 1 -data = queue.peek(4) # [1, 2, 3, 'a'] size = len(queue) # 6 queue.push('foobar') -data = queue.pop() # 1 +data = queue.get() # Returns 1 -queue.delete(2) -data = queue.pop() # 3 +queue.delete() # Deletes 2 +data = queue.get() # Returns 3 +data = queue.get() # Returns ['a', 'b', 'c'] -queue.clear() +queue.clear() # Deletes everything from queue ``` -Objects that are added to the queue must be pickle-able. A file is saved to the file system based on the name given to the queue. The same name must be given if you want the data to persist. - -I created this with the following workflow in mind: - -```python - -data = queue.peek(5) - -success = upload_data_somewhere(data) - -if success: - queue.delete(5) - queue.flush() # Remove extra space +# Install ``` - -By default, `pickle` is used to serialize objects. This can be changed depending on your needs by setting the `dumps` and `loads` options (see Parameters). [dill](http://trac.mystic.cacr.caltech.edu/project/pathos/wiki/dill.html) and [msgpack](https://github.com/msgpack/msgpack-python) have been tested (see tests as an example). - -When items are popped or deleted, the data isn't actually deleted. Instead a pointer is moved to the place in the file with valid data. As a result, the file will continue to grow even if items are removed. `persistent_queue.flush()` reclaims this space. **You must call `flush` as you see fit!** +pip install python-persistent-queue +``` # Parameters @@ -59,8 +90,4 @@ A persistent queue takes the following parameters: - `loads` (*optional*, default=`pickle.loads`): The method used to convert bytes into a Python object. - `flush_limit` (*optional*, default=1048576): When the amount of empty space in the file is greater than `flush_limit`, the file will be flushed. This balances file I/O and storage space. -# Install - -``` -pip install python-persistent-queue -``` +# Methods diff --git a/persistent_queue/persistent_queue.py b/persistent_queue/persistent_queue.py index 6634530..0c25982 100644 --- a/persistent_queue/persistent_queue.py +++ b/persistent_queue/persistent_queue.py @@ -9,7 +9,6 @@ import shutil import struct import threading -import time import uuid try: @@ -98,57 +97,32 @@ def _set_queue_top(self, top): self._file.seek(current_pos, 0) - def _peek(self, block, timeout, items, partial=False): - """ - Returns a certain amount of items from the queue. If items is greater - than one, a list is returned. - """ - def read_data(): - length = struct.unpack(LENGTH_STRUCT, self._file.read(4))[0] - data = self._file.read(length) - return self.loads(data) - - _LOGGER.debug("Peeking %s items", items) + def _peek(self, block=False, timeout=None): + with self._get_lock: + _LOGGER.debug("Peeking item") + self._put_event.clear() - # Ignore requests for zero items - if items == 0: - _LOGGER.debug("Returning empty list") - return [], self._file.tell() + if self._length < 1: + if not block: + raise queue.Empty - if block: - if timeout is not None: - target = time.time() + timeout - while self._length < items: + # Wait for something to be added if self._put_event.wait(timeout) is False: # Nothing was added to the queue and timeout expired # This will never happen if timeout is None raise queue.Empty + self._put_event.clear() - # Something was added to the queue - # Update timeout, if necessary - if timeout is not None: # pragma: no cover - timeout = target - time.time() + with self._file_lock: + self._file.seek(self._get_queue_top(), 0) # Beginning of data + length = struct.unpack(LENGTH_STRUCT, self._file.read(4))[0] + data = self._file.read(length) + item = self.loads(data) - elif not partial and self._length < items: - raise queue.Empty + queue_top = self._file.tell() - with self._file_lock: - self._file.seek(self._get_queue_top(), 0) # Beginning of data - total_items = self._length if items > self._length else items - data = [read_data() for i in range(total_items)] - queue_top = self._file.tell() - - if items == 1: - if len(data) == 0: - _LOGGER.debug("No items to peek at so returning None") - return None, queue_top - else: - _LOGGER.debug("Returning data from peek") - return data[0], queue_top - else: - _LOGGER.debug("Returning data from peek") - return data, queue_top + return item, queue_top def qsize(self): """ @@ -182,14 +156,12 @@ def full(self): """ return self.maxsize > 0 and self._length >= self.maxsize - def put(self, items, block=True, timeout=None): + def put(self, item, block=True, timeout=None): """ Provides compatibility with stdlib Queue objects. - When this function returns, all items are guaranteed to be persisted + When this function returns, item is guaranteed to be persisted into the file and the underlying storage. - items: single object, or a list of objects - Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and @@ -198,68 +170,48 @@ def put(self, items, block=True, timeout=None): slot is immediately available, else raise the Full exception (timeout is ignored in that case). """ - def write_data(item): - data = self.dumps(item) - self._file.write(struct.pack(LENGTH_STRUCT, len(data))) - self._file.write(data) - self._file.flush() # Probably not necessary since buffering=0 - os.fsync(self._file.fileno()) - if not isinstance(items, list): - items = [items] + _LOGGER.debug("Putting item") - _LOGGER.debug("Putting %s items", len(items)) + with self._put_lock: + self._get_event.clear() + if self.maxsize > 0 and self._length + 1 > self.maxsize: + if not block: + raise queue.Full - # Ignore requests for adding zero items - if len(items) == 0: - _LOGGER.debug("Putting zero items, ignoring request") - return + if self._get_event.wait(timeout) is False: + # Nothing was removed from the queue and timeout expired + # This will never happen if timeout is None + raise queue.Full + self._get_event.clear() - with self._put_lock: - if self.maxsize > 0: - if block: - if timeout is not None: - target = time.time() + timeout - while self._length + len(items) > self.maxsize: - if self._get_event.wait(timeout) is False: - # Nothing was removed from the queue and timeout expired - # This will never happen if timeout is None - raise queue.Full - self._get_event.clear() - - # Something was removed from the queue - # Update timeout, if necessary - if timeout is not None: # pragma: no cover - timeout = target - time.time() - else: - if self._length + len(items) > self.maxsize: - raise queue.Full + # Convert the object outside of the file lock + data = self.dumps(item) with self._file_lock: self._file.seek(0, 2) # Go to end of file + self._file.write(struct.pack(LENGTH_STRUCT, len(data))) + self._file.write(data) + self._file.flush() # Probably not necessary since buffering=0 + os.fsync(self._file.fileno()) - for i in items: - write_data(i) - - self._update_length(self._length + len(items)) - self._unfinished_tasks += len(items) + self._update_length(self._length + 1) + self._unfinished_tasks += 1 self._put_event.set() _LOGGER.debug("Done putting data") - def put_nowait(self, items): + def put_nowait(self, item): """ Provides compatibility with stdlib Queue objects. - Equivalent to put(items, False). + Equivalent to put(item, False). """ - self.put(items, block=False) + self.put(item, block=False) - def get(self, block=True, timeout=None, items=1): + def get(self, block=True, timeout=None): """ Provides compatibility with stdlib Queue objects. - items: number of how many items are returned. If items is greater than - one, a list is returned. Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an @@ -269,25 +221,14 @@ def get(self, block=True, timeout=None, items=1): immediately available, else raise the Empty exception (timeout is ignored in that case). """ - _LOGGER.debug("Getting %s items", items) - - # Ignore requests for zero items - if items == 0: - _LOGGER.debug("Returning empty list") - return [] + _LOGGER.debug("Getting item") with self._get_lock: - data, queue_top = self._peek(block, timeout, items) + data, queue_top = self._peek(block, timeout) with self._file_lock: self._set_queue_top(queue_top) - - if isinstance(data, list): - if len(data) > 0: - self._update_length(self._length - len(data)) - elif data is not None: - self._update_length(self._length - 1) - + self._update_length(self._length - 1) self._get_event.set() _LOGGER.debug("Returning data from get") return data @@ -303,7 +244,7 @@ def get_nowait(self): """ return self.get(block=False) - def task_done(self, items=1): + def task_done(self): """ Provides compatibility with stdlib Queue objects. Taken from the Python 3.5 stdlib: lib/python3.5/queue.py @@ -320,7 +261,7 @@ def task_done(self, items=1): Raises a ValueError if called more times than there were items placed in the queue. """ with self._all_tasks_done: - unfinished = self._unfinished_tasks - items + unfinished = self._unfinished_tasks - 1 if unfinished < 0: raise ValueError('task_done() called too many times') if unfinished == 0: @@ -343,12 +284,12 @@ def join(self): while self._unfinished_tasks: self._all_tasks_done.wait() - def peek(self, block=False, timeout=None, items=1): + def peek(self, block=False, timeout=None): """ - Peeks into the queue and returns items without removing them. + Peeks into the queue and returns an item without removing it. """ - with self._get_lock: - return self._peek(block, timeout, items, partial=True)[0] + item, _ = self._peek(block, timeout) + return item def clear(self): """ @@ -434,37 +375,48 @@ def flush(self): _LOGGER.debug("Finished flushing the queue") - def delete(self, items=1): + def delete(self): """ - Removes items from queue. - - items: number of how many items will be deleted + Remove item from queue without reading object from file. """ - def read_length(): - length = struct.unpack(LENGTH_STRUCT, self._file.read(4))[0] - self._file.seek(length, 1) - - _LOGGER.debug("Deleting %s items", items) + _LOGGER.debug("Deleting item") - # Ignore requests for zero items - if items == 0: - _LOGGER.debug("Ignoring request to delete") + # If there is nothing in the queue, do nothing + if self._length < 1: return with self._file_lock, self._get_lock: self._file.seek(self._get_queue_top(), 0) # Beginning of data - total_items = self._length if items > self._length else items - - for _ in range(total_items): - read_length() - + length = struct.unpack(LENGTH_STRUCT, self._file.read(4))[0] + self._file.seek(length, 1) self._set_queue_top(self._file.tell()) - self._update_length(self._length - total_items) + self._update_length(self._length - 1) _LOGGER.debug("Done deleting data") + def close(self): + self._file.close() + self._put_event.set() # Alert threads waiting for a put + self._get_event.set() # Alert threads waiting for a get + def __len__(self): """ Get size of queue. """ return self._length + + def __repr__(self): + """ + Return a representation of a persistent queue. + """ + string = "PersistentQueue(filename={filename!r}, " + \ + "maxsize={maxsize!r}, " + \ + "dumps={dumps}, " + \ + "loads={loads}, " + \ + "flush_limit={flush_limit!r})" + + return string.format(filename=self.filename, + maxsize=self.maxsize, + dumps=self.dumps, + loads=self.loads, + flush_limit=self.flush_limit) diff --git a/tests/test_persistent_queue.py b/tests/test_persistent_queue.py index 4e7c41d..6f66307 100644 --- a/tests/test_persistent_queue.py +++ b/tests/test_persistent_queue.py @@ -5,6 +5,11 @@ import uuid import pytest +try: + from unittest.mock import MagicMock +except ImportError: + from mock import MagicMock + try: import queue except ImportError: @@ -44,7 +49,6 @@ def test_simple(self): os.remove(filename) def test_qsize(self): - assert len(self.queue) == 0 assert self.queue.qsize() == 0 @@ -70,24 +74,28 @@ def test_qsize(self): def test_put(self): self.queue.put(5) - assert self.queue.peek(items=1) == 5 + assert self.queue.peek() == 5 - self.queue.put_nowait(5) + self.queue.put_nowait(10) assert self.queue.get() == 5 + assert self.queue.get() == 10 - self.queue.put([10, 15, 20]) - assert self.queue.peek(items=4) == [5, 10, 15, 20] + self.queue.put(None) + assert self.queue.get() is None data = {b'a': 1, b'b': 2, b'c': [1, 2, 3]} self.queue.put(data) - assert self.queue.peek(items=5) == [5, 10, 15, 20, data] + assert self.queue.get() == data self.queue.put([]) - assert self.queue.peek(items=5) == [5, 10, 15, 20, data] + assert self.queue.get() == [] + + self.queue.maxsize = 1 + self.queue.put(0) - self.queue.maxsize = 4 + # Test that Full exception gets raised after something has been put in queue with pytest.raises(queue.Full): - self.queue.put(b'full', timeout=1) + self.queue.put(b'full', block=True, timeout=1) with pytest.raises(queue.Full): self.queue.put(b'full', block=False) @@ -100,7 +108,7 @@ def test_get(self): assert self.queue.get() == b'a' assert len(self.queue) == 1 - assert self.queue.get(items=1) == b'b' + assert self.queue.get() == b'b' assert len(self.queue) == 0 self.queue.put(b'a') @@ -109,16 +117,19 @@ def test_get(self): self.queue.put(b'd') assert len(self.queue) == 4 - assert self.queue.get(items=3) == [b'a', b'b', b'c'] + assert [self.queue.get(), self.queue.get(), self.queue.get()] == [b'a', b'b', b'c'] assert len(self.queue) == 1 + assert self.queue.get() == b'd' + + # Test that Empty exception gets raised after something has been removed in queue with pytest.raises(queue.Empty): - assert self.queue.get(block=False, items=100) == [b'd'] - assert len(self.queue) == 1 + self.queue.get(block=True, timeout=1) - self.queue.put(b'd') - assert self.queue.get(items=0) == [] - assert len(self.queue) == 2 + with pytest.raises(queue.Empty): + self.queue.get(block=False) + + assert len(self.queue) == 0 def test_get_blocking(self): done = [False] @@ -141,9 +152,6 @@ def func(): self.queue.get(timeout=1) def test_get_non_blocking_no_values(self): - with pytest.raises(queue.Empty): - assert self.queue.get(block=False, items=5) == [] - with pytest.raises(queue.Empty): self.queue.get(block=False) @@ -156,21 +164,19 @@ def test_peek(self): self.queue.put(b'test') assert self.queue.peek() == 1 - assert self.queue.peek(items=1) == 1 - assert self.queue.peek(items=2) == [1, 2] - assert self.queue.peek(items=3) == [1, 2, b'test'] + assert self.queue.get() == 1 + + assert self.queue.peek() == 2 + assert self.queue.get() == 2 - assert self.queue.peek(items=100) == [1, 2, b'test'] + assert self.queue.peek() == b'test' + assert self.queue.get() == b'test' self.queue.clear() self.queue.put(1) assert len(self.queue) == 1 assert self.queue.peek() == 1 - assert self.queue.peek(items=1) == 1 - assert self.queue.peek(items=2) == [1] - - assert self.queue.peek(items=0) == [] def test_peek_blocking(self): done = [False] @@ -189,7 +195,7 @@ def func(): assert data == 5 assert len(self.queue) == 1 - def test_peek_blocking_list(self): + def test_get_blocking_list(self): done_pushing = [False] done_peeking = [False] @@ -203,28 +209,36 @@ def func(): t = threading.Thread(target=func) t.start() - data = self.queue.peek(items=5, block=True) + data = [self.queue.get(block=True) for i in range(5)] done_peeking[0] = True assert done_pushing[0] is True assert data == [0, 1, 2, 3, 4] - assert len(self.queue) == 5 + assert len(self.queue) == 0 def test_peek_no_values(self): - assert self.queue.peek(items=5) == [] - assert self.queue.peek() is None + with pytest.raises(queue.Empty): + assert self.queue.peek() def test_clear(self): self.queue.put(5) self.queue.put(50) - assert self.queue.peek(items=2) == [5, 50] assert len(self.queue) == 2 + assert self.queue.peek() == 5 + self.queue.clear() + assert len(self.queue) == 0 + with pytest.raises(queue.Empty): + assert self.queue.peek() def test_copy(self): new_queue_name = 'another_queue' - self.queue.put([5, 4, 3, 2, 1]) + self.queue.put(5) + self.queue.put(4) + self.queue.put(3) + self.queue.put(2) + self.queue.put(1) assert len(self.queue) == 5 assert self.queue.get() == 5 @@ -245,22 +259,32 @@ def test_delete(self): self.queue.put(11) assert len(self.queue) == 4 - self.queue.delete(2) + self.queue.delete() + assert len(self.queue) == 3 + + self.queue.delete() assert len(self.queue) == 2 - assert self.queue.peek(items=2) == [7, 11] - assert self.queue.get(items=2) == [7, 11] + + assert self.queue.peek() == 7 + assert self.queue.get() == 7 + assert self.queue.get() == 11 self.queue.put(2) - self.queue.delete(1000) + self.queue.delete() + self.queue.delete() + self.queue.delete() + self.queue.delete() assert len(self.queue) == 0 self.queue.put(2) - self.queue.delete(0) + self.queue.put(1) + self.queue.delete() assert len(self.queue) == 1 def test_delete_no_values(self): + assert len(self.queue) == 0 self.queue.delete() - self.queue.delete(100) + assert len(self.queue) == 0 def test_big_file_1(self): data = {b'a': list(range(500))} @@ -282,7 +306,8 @@ def test_big_file_2(self): for i in range(1000): self.queue.put(data) - assert self.queue.get(items=995) == [data for i in range(995)] + for i in range(995): + assert self.queue.get() == data self.queue.flush() assert len(self.queue) == 5 @@ -296,14 +321,15 @@ def test_usage(self): self.queue.put([b'a', b'b', b'c']) assert self.queue.peek() == 1 - assert self.queue.peek(items=4) == [1, 2, 3, b'a'] - assert len(self.queue) == 6 + assert len(self.queue) == 4 self.queue.put(b'foobar') assert self.queue.get() == 1 - assert len(self.queue) == 6 - assert self.queue.get(items=6) == [2, 3, b'a', b'b', b'c', b'foobar'] + assert len(self.queue) == 4 + + for item in [2, 3, [b'a', b'b', b'c'], b'foobar']: + assert self.queue.get() == item def test_threads(self): def random_stuff(): @@ -312,12 +338,13 @@ def random_stuff(): if random_number % 3 == 0: try: - self.queue.peek(block=False, items=(random_number % 5)) + self.queue.peek(block=False) except queue.Empty: pass elif random_number % 2 == 0: try: - self.queue.get(block=False, items=(random_number % 5)) + for _ in range(random_number % 5): + self.queue.get(block=False) except queue.Empty: pass else: @@ -356,6 +383,118 @@ def worker(): self.queue.join() assert self.queue.empty() is True + def test_close(self): + self.queue.put(1) + self.queue.close() + + with pytest.raises(ValueError): + self.queue.put(2) + + new_queue = PersistentQueue(self.queue.filename, + dumps=self.queue.dumps, + loads=self.queue.loads) + assert new_queue.get() == 1 + assert len(new_queue) == 0 + + def test_close_threaded_getting(self): + def worker(): + while True: + try: + assert self.queue.get() in [1, 2, 3] + except ValueError: + return + + t = threading.Thread(target=worker) + t.start() + + self.queue.put(1) + time.sleep(.2) + self.queue.put(2) + time.sleep(.2) + self.queue.put(3) + time.sleep(1) + + self.queue.close() + + def test_close_threaded_putting(self): + self.queue.maxsize = 1 + + def worker(): + self.queue.put("test_1") + + while True: + try: + self.queue.put("test_2") + assert False + except ValueError: + return + + t = threading.Thread(target=worker) + t.start() + + time.sleep(1) + + assert len(self.queue) == 1 + self.queue.close() + + def test_repr(self): + queue_repr = repr(self.queue) + assert 'filename' in queue_repr + assert 'maxsize' in queue_repr + assert 'flush_limit' in queue_repr + + def test_half_write_os_fsync(self): + """ + Test if there is a failure in the middle of putting something into the + queue. + """ + + self.queue.put(1) + old_os_fsync = os.fsync + + # Set up failure when calling os.fsync + os.fsync = MagicMock(side_effect=Exception('Power shutoff')) + with pytest.raises(Exception): + self.queue.put(2) + self.queue.close() + + os.fsync = old_os_fsync + new_queue = PersistentQueue(self.queue.filename, + dumps=self.queue.dumps, + loads=self.queue.loads) + + # Failure occurred before the second item was fully added + assert len(new_queue) == 1 + assert new_queue.get() == 1 + with pytest.raises(queue.Empty): + new_queue.get(block=False) + + def test_half_write_update_length(self): + """ + Test if there is a failure in the middle of putting something into the + queue. + """ + + self.queue.put(1) + old_update_length = self.queue._update_length + + # Set up failure when calling os.fsync + self.queue._update_length = MagicMock(side_effect=Exception('Power shutoff')) + with pytest.raises(Exception): + self.queue.put(2) + self.queue.close() + + self.queue._update_length = old_update_length + new_queue = PersistentQueue(self.queue.filename, + dumps=self.queue.dumps, + loads=self.queue.loads) + + # Failure occurred before the second item was fully added + assert len(new_queue) == 1 + assert new_queue.get() == 1 + with pytest.raises(queue.Empty): + new_queue.get(block=False) + class TestPersistentQueueWithDill(TestPersistentQueue): def setup_method(self): diff --git a/tox.ini b/tox.ini index 5f01721..a22d037 100644 --- a/tox.ini +++ b/tox.ini @@ -11,6 +11,7 @@ deps = pytest>=3.0.4,<4 pytest-cov>=2.4.0,<3 pytest-xdist>=1.5.0,<2 + mock passenv = CODECOV_TOKEN CI CI_* TRAVIS TRAVIS_* setenv = HOME = {envtmpdir} commands = py.test -v --cov persistent_queue {posargs}