From da3897e0ec5b27110107111cdb597f04a0ba190e Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Tue, 25 Dec 2018 19:53:32 -0700 Subject: [PATCH 01/16] WIP: Change queue signature to match Python's Previously, persistent queue allowed for multiple items to be pushed and pulled from the queue. Now only one item can be pulled or pushed. --- persistent_queue/persistent_queue.py | 123 +++++++++++++-------------- 1 file changed, 58 insertions(+), 65 deletions(-) diff --git a/persistent_queue/persistent_queue.py b/persistent_queue/persistent_queue.py index 6634530..244b0dc 100644 --- a/persistent_queue/persistent_queue.py +++ b/persistent_queue/persistent_queue.py @@ -98,58 +98,6 @@ def _set_queue_top(self, top): self._file.seek(current_pos, 0) - def _peek(self, block, timeout, items, partial=False): - """ - Returns a certain amount of items from the queue. If items is greater - than one, a list is returned. - """ - def read_data(): - length = struct.unpack(LENGTH_STRUCT, self._file.read(4))[0] - data = self._file.read(length) - return self.loads(data) - - _LOGGER.debug("Peeking %s items", items) - - # Ignore requests for zero items - if items == 0: - _LOGGER.debug("Returning empty list") - return [], self._file.tell() - - if block: - if timeout is not None: - target = time.time() + timeout - while self._length < items: - if self._put_event.wait(timeout) is False: - # Nothing was added to the queue and timeout expired - # This will never happen if timeout is None - raise queue.Empty - self._put_event.clear() - - # Something was added to the queue - # Update timeout, if necessary - if timeout is not None: # pragma: no cover - timeout = target - time.time() - - elif not partial and self._length < items: - raise queue.Empty - - with self._file_lock: - self._file.seek(self._get_queue_top(), 0) # Beginning of data - total_items = self._length if items > self._length else items - data = [read_data() for i in range(total_items)] - queue_top = self._file.tell() - - if items == 1: - if len(data) == 0: - _LOGGER.debug("No items to peek at so returning None") - return None, queue_top - else: - _LOGGER.debug("Returning data from peek") - return data[0], queue_top - else: - _LOGGER.debug("Returning data from peek") - return data, queue_top - def qsize(self): """ Provides compatibility with stdlib Queue objects. @@ -182,13 +130,13 @@ def full(self): """ return self.maxsize > 0 and self._length >= self.maxsize - def put(self, items, block=True, timeout=None): + def put(self, item, block=True, timeout=None): """ Provides compatibility with stdlib Queue objects. - When this function returns, all items are guaranteed to be persisted + When this function returns, item is guaranteed to be persisted into the file and the underlying storage. - items: single object, or a list of objects + item: single object Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. @@ -247,19 +195,17 @@ def write_data(item): self._put_event.set() _LOGGER.debug("Done putting data") - def put_nowait(self, items): + def put_nowait(self, item): """ Provides compatibility with stdlib Queue objects. - Equivalent to put(items, False). + Equivalent to put(item, False). """ - self.put(items, block=False) + self.put(item, block=False) - def get(self, block=True, timeout=None, items=1): + def get(self, block=True, timeout=None): """ Provides compatibility with stdlib Queue objects. - items: number of how many items are returned. If items is greater than - one, a list is returned. Remove and return an item from the queue. If optional args block is true and timeout is None (the default), block if necessary until an @@ -303,7 +249,7 @@ def get_nowait(self): """ return self.get(block=False) - def task_done(self, items=1): + def task_done(self): """ Provides compatibility with stdlib Queue objects. Taken from the Python 3.5 stdlib: lib/python3.5/queue.py @@ -343,12 +289,59 @@ def join(self): while self._unfinished_tasks: self._all_tasks_done.wait() - def peek(self, block=False, timeout=None, items=1): + def peek(self, block=False, timeout=None): """ - Peeks into the queue and returns items without removing them. + Peeks into the queue and returns an item without removing them. """ + + def read_data(): + length = struct.unpack(LENGTH_STRUCT, self._file.read(4))[0] + data = self._file.read(length) + return self.loads(data) + with self._get_lock: - return self._peek(block, timeout, items, partial=True)[0] + _LOGGER.debug("Peeking %s items", items) + + # Ignore requests for zero items + if items == 0: + _LOGGER.debug("Returning empty list") + return [], self._file.tell() + + if block: + if timeout is not None: + target = time.time() + timeout + while self._length < items: + if self._put_event.wait(timeout) is False: + # Nothing was added to the queue and timeout expired + # This will never happen if timeout is None + raise queue.Empty + self._put_event.clear() + + # Something was added to the queue + # Update timeout, if necessary + if timeout is not None: # pragma: no cover + timeout = target - time.time() + + elif not partial and self._length < items: + raise queue.Empty + + with self._file_lock: + self._file.seek(self._get_queue_top(), 0) # Beginning of data + total_items = self._length if items > self._length else items + data = [read_data() for i in range(total_items)] + queue_top = self._file.tell() + + if items == 1: + if len(data) == 0: + _LOGGER.debug("No items to peek at so returning None") + return None, queue_top + else: + _LOGGER.debug("Returning data from peek") + return data[0], queue_top + else: + _LOGGER.debug("Returning data from peek") + return data, queue_top + def clear(self): """ From b9eb7379edeb38686175202599ee3ce6c0964bf2 Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Wed, 26 Dec 2018 13:52:41 -0700 Subject: [PATCH 02/16] Remove ability to push / pop multiple items --- persistent_queue/persistent_queue.py | 158 +++++++++------------------ 1 file changed, 53 insertions(+), 105 deletions(-) diff --git a/persistent_queue/persistent_queue.py b/persistent_queue/persistent_queue.py index 244b0dc..5773985 100644 --- a/persistent_queue/persistent_queue.py +++ b/persistent_queue/persistent_queue.py @@ -97,6 +97,32 @@ def _set_queue_top(self, top): os.fsync(self._file.fileno()) self._file.seek(current_pos, 0) + + def _peek(self, block=False, timeout=None): + with self._get_lock: + _LOGGER.debug("Peeking item") + + if self._length < 1: + if not block: + raise queue.Empty + + # Wait for something to be added + if self._put_event.wait(timeout) is False: + # Nothing was added to the queue and timeout expired + # This will never happen if timeout is None + raise queue.Empty + + self._put_event.clear() + + with self._file_lock: + self._file.seek(self._get_queue_top(), 0) # Beginning of data + length = struct.unpack(LENGTH_STRUCT, self._file.read(4))[0] + data = self._file.read(length) + item = self.loads(data) + + queue_top = self._file.tell() + + return item, queue_top def qsize(self): """ @@ -136,8 +162,6 @@ def put(self, item, block=True, timeout=None): When this function returns, item is guaranteed to be persisted into the file and the underlying storage. - item: single object - Put item into the queue. If optional args block is true and timeout is None (the default), block if necessary until a free slot is available. If timeout is a positive number, it blocks at most timeout seconds and @@ -146,51 +170,33 @@ def put(self, item, block=True, timeout=None): slot is immediately available, else raise the Full exception (timeout is ignored in that case). """ - def write_data(item): - data = self.dumps(item) - self._file.write(struct.pack(LENGTH_STRUCT, len(data))) - self._file.write(data) - self._file.flush() # Probably not necessary since buffering=0 - os.fsync(self._file.fileno()) - - if not isinstance(items, list): - items = [items] - _LOGGER.debug("Putting %s items", len(items)) - - # Ignore requests for adding zero items - if len(items) == 0: - _LOGGER.debug("Putting zero items, ignoring request") - return + _LOGGER.debug("Putting item") with self._put_lock: - if self.maxsize > 0: - if block: - if timeout is not None: - target = time.time() + timeout - while self._length + len(items) > self.maxsize: - if self._get_event.wait(timeout) is False: - # Nothing was removed from the queue and timeout expired - # This will never happen if timeout is None - raise queue.Full - self._get_event.clear() - - # Something was removed from the queue - # Update timeout, if necessary - if timeout is not None: # pragma: no cover - timeout = target - time.time() - else: - if self._length + len(items) > self.maxsize: - raise queue.Full + if self.maxsize > 0 and self._length + 1 > self.maxsize: + if not block: + raise queue.Full + + if self._get_event.wait(timeout) is False: + # Nothing was removed from the queue and timeout expired + # This will never happen if timeout is None + raise queue.Full + + self._get_event.clear() + + # Convert the object outside of the file lock + data = self.dumps(item) with self._file_lock: self._file.seek(0, 2) # Go to end of file + self._file.write(struct.pack(LENGTH_STRUCT, len(data))) + self._file.write(data) + self._file.flush() # Probably not necessary since buffering=0 + os.fsync(self._file.fileno()) - for i in items: - write_data(i) - - self._update_length(self._length + len(items)) - self._unfinished_tasks += len(items) + self._update_length(self._length + 1) + self._unfinished_tasks += 1 self._put_event.set() _LOGGER.debug("Done putting data") @@ -215,25 +221,14 @@ def get(self, block=True, timeout=None): immediately available, else raise the Empty exception (timeout is ignored in that case). """ - _LOGGER.debug("Getting %s items", items) - - # Ignore requests for zero items - if items == 0: - _LOGGER.debug("Returning empty list") - return [] + _LOGGER.debug("Getting item") with self._get_lock: - data, queue_top = self._peek(block, timeout, items) + data, queue_top = self._peek(block, timeout) with self._file_lock: self._set_queue_top(queue_top) - - if isinstance(data, list): - if len(data) > 0: - self._update_length(self._length - len(data)) - elif data is not None: - self._update_length(self._length - 1) - + self._update_length(self._length - 1) self._get_event.set() _LOGGER.debug("Returning data from get") return data @@ -288,60 +283,13 @@ def join(self): with self._all_tasks_done: while self._unfinished_tasks: self._all_tasks_done.wait() - + def peek(self, block=False, timeout=None): """ - Peeks into the queue and returns an item without removing them. + Peeks into the queue and returns an item without removing it. """ - - def read_data(): - length = struct.unpack(LENGTH_STRUCT, self._file.read(4))[0] - data = self._file.read(length) - return self.loads(data) - - with self._get_lock: - _LOGGER.debug("Peeking %s items", items) - - # Ignore requests for zero items - if items == 0: - _LOGGER.debug("Returning empty list") - return [], self._file.tell() - - if block: - if timeout is not None: - target = time.time() + timeout - while self._length < items: - if self._put_event.wait(timeout) is False: - # Nothing was added to the queue and timeout expired - # This will never happen if timeout is None - raise queue.Empty - self._put_event.clear() - - # Something was added to the queue - # Update timeout, if necessary - if timeout is not None: # pragma: no cover - timeout = target - time.time() - - elif not partial and self._length < items: - raise queue.Empty - - with self._file_lock: - self._file.seek(self._get_queue_top(), 0) # Beginning of data - total_items = self._length if items > self._length else items - data = [read_data() for i in range(total_items)] - queue_top = self._file.tell() - - if items == 1: - if len(data) == 0: - _LOGGER.debug("No items to peek at so returning None") - return None, queue_top - else: - _LOGGER.debug("Returning data from peek") - return data[0], queue_top - else: - _LOGGER.debug("Returning data from peek") - return data, queue_top - + item, _ = self._peek(block, timeout) + return item def clear(self): """ From c5b640fccb215c104d49cc4a80eb0409bd291e3e Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Wed, 26 Dec 2018 14:15:28 -0700 Subject: [PATCH 03/16] Add new version of Python --- .travis.yml | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 6f3c6fb..53e1e9e 100644 --- a/.travis.yml +++ b/.travis.yml @@ -4,7 +4,7 @@ cache: pip matrix: fast_finish: true include: - - python: 3.5 + - python: 3.6 env: TOXENV=lint - python: 2.7 env: TOXENV=py27 @@ -12,10 +12,12 @@ matrix: env: TOXENV=py34 - python: 3.5 env: TOXENV=py35 - - python: "3.6-dev" + - python: 3.6 env: TOXENV=py36 - - python: nightly + - python: 3.7 env: TOXENV=py37 + - python: nightly + env: TOXENV=py38 allow_failures: - python: nightly From 425a393a16644bd20cf602bede40a77ff1210d2e Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Wed, 26 Dec 2018 15:05:55 -0700 Subject: [PATCH 04/16] Fix delete method --- persistent_queue/persistent_queue.py | 31 +++++++++------------------- 1 file changed, 10 insertions(+), 21 deletions(-) diff --git a/persistent_queue/persistent_queue.py b/persistent_queue/persistent_queue.py index 5773985..0643f11 100644 --- a/persistent_queue/persistent_queue.py +++ b/persistent_queue/persistent_queue.py @@ -9,7 +9,6 @@ import shutil import struct import threading -import time import uuid try: @@ -98,7 +97,7 @@ def _set_queue_top(self, top): self._file.seek(current_pos, 0) - def _peek(self, block=False, timeout=None): + def _peek(self, block=False, timeout=None): with self._get_lock: _LOGGER.debug("Peeking item") @@ -375,32 +374,22 @@ def flush(self): _LOGGER.debug("Finished flushing the queue") - def delete(self, items=1): + def delete(self): """ - Removes items from queue. - - items: number of how many items will be deleted + Remove item from queue without reading object from file. """ - def read_length(): - length = struct.unpack(LENGTH_STRUCT, self._file.read(4))[0] - self._file.seek(length, 1) - - _LOGGER.debug("Deleting %s items", items) - - # Ignore requests for zero items - if items == 0: - _LOGGER.debug("Ignoring request to delete") + _LOGGER.debug("Deleting item") + + # If there is nothing in the queue, do nothing + if self._length < 1: return with self._file_lock, self._get_lock: self._file.seek(self._get_queue_top(), 0) # Beginning of data - total_items = self._length if items > self._length else items - - for _ in range(total_items): - read_length() - + length = struct.unpack(LENGTH_STRUCT, self._file.read(4))[0] + self._file.seek(length, 1) self._set_queue_top(self._file.tell()) - self._update_length(self._length - total_items) + self._update_length(self._length - 1) _LOGGER.debug("Done deleting data") From 7c238e4b08b0500ccaa9a9708e6c5b1d263df83a Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Wed, 26 Dec 2018 15:06:29 -0700 Subject: [PATCH 05/16] Fix tests with new class signature --- tests/test_persistent_queue.py | 72 +++++++++++++++++++--------------- 1 file changed, 40 insertions(+), 32 deletions(-) diff --git a/tests/test_persistent_queue.py b/tests/test_persistent_queue.py index 4e7c41d..afcf886 100644 --- a/tests/test_persistent_queue.py +++ b/tests/test_persistent_queue.py @@ -70,7 +70,7 @@ def test_qsize(self): def test_put(self): self.queue.put(5) - assert self.queue.peek(items=1) == 5 + assert self.queue.peek() == 5 self.queue.put_nowait(5) assert self.queue.get() == 5 @@ -100,25 +100,23 @@ def test_get(self): assert self.queue.get() == b'a' assert len(self.queue) == 1 - assert self.queue.get(items=1) == b'b' + assert self.queue.get() == b'b' assert len(self.queue) == 0 self.queue.put(b'a') self.queue.put(b'b') self.queue.put(b'c') self.queue.put(b'd') - assert len(self.queue) == 4 + assert len(self.queue) == 3 - assert self.queue.get(items=3) == [b'a', b'b', b'c'] + assert [self.queue.get(), self.queue.get(), self.queue.get()] == [b'a', b'b', b'c'] assert len(self.queue) == 1 + + assert self.queue.get() == b'd' with pytest.raises(queue.Empty): - assert self.queue.get(block=False, items=100) == [b'd'] - assert len(self.queue) == 1 - - self.queue.put(b'd') - assert self.queue.get(items=0) == [] - assert len(self.queue) == 2 + self.queue.get(block=False) + assert len(self.queue) == 0 def test_get_blocking(self): done = [False] @@ -141,9 +139,6 @@ def func(): self.queue.get(timeout=1) def test_get_non_blocking_no_values(self): - with pytest.raises(queue.Empty): - assert self.queue.get(block=False, items=5) == [] - with pytest.raises(queue.Empty): self.queue.get(block=False) @@ -156,21 +151,19 @@ def test_peek(self): self.queue.put(b'test') assert self.queue.peek() == 1 - assert self.queue.peek(items=1) == 1 - assert self.queue.peek(items=2) == [1, 2] - assert self.queue.peek(items=3) == [1, 2, b'test'] + assert self.queue.get() == 1 + + assert self.queue.peek() == 2 + assert self.queue.get() == 2 - assert self.queue.peek(items=100) == [1, 2, b'test'] + assert self.queue.peek() == b'test' + assert self.queue.get() == b'test' self.queue.clear() self.queue.put(1) assert len(self.queue) == 1 assert self.queue.peek() == 1 - assert self.queue.peek(items=1) == 1 - assert self.queue.peek(items=2) == [1] - - assert self.queue.peek(items=0) == [] def test_peek_blocking(self): done = [False] @@ -210,17 +203,21 @@ def func(): assert len(self.queue) == 5 def test_peek_no_values(self): - assert self.queue.peek(items=5) == [] - assert self.queue.peek() is None + with pytest.raises(queue.Empty): + assert self.queue.peek() def test_clear(self): self.queue.put(5) self.queue.put(50) - - assert self.queue.peek(items=2) == [5, 50] + assert len(self.queue) == 2 + assert self.queue.peek() == 5 + self.queue.clear() + assert len(self.queue) == 0 + with pytest.raises(queue.Empty): + assert self.queue.peek() def test_copy(self): new_queue_name = 'another_queue' @@ -245,22 +242,32 @@ def test_delete(self): self.queue.put(11) assert len(self.queue) == 4 - self.queue.delete(2) + self.queue.delete() + assert len(self.queue) == 3 + + self.queue.delete() assert len(self.queue) == 2 - assert self.queue.peek(items=2) == [7, 11] - assert self.queue.get(items=2) == [7, 11] + + assert self.queue.peek() == 7 + assert self.queue.get() == 7 + assert self.queue.get() == 11 self.queue.put(2) - self.queue.delete(1000) + self.queue.delete() + self.queue.delete() + self.queue.delete() + self.queue.delete() assert len(self.queue) == 0 self.queue.put(2) - self.queue.delete(0) + self.queue.put(1) + self.queue.delete() assert len(self.queue) == 1 def test_delete_no_values(self): + assert len(self.queue) == 0 self.queue.delete() - self.queue.delete(100) + assert len(self.queue) == 0 def test_big_file_1(self): data = {b'a': list(range(500))} @@ -282,7 +289,8 @@ def test_big_file_2(self): for i in range(1000): self.queue.put(data) - assert self.queue.get(items=995) == [data for i in range(995)] + for i in range(995): + assert self.queue.get() == i self.queue.flush() assert len(self.queue) == 5 From 986fbc08603cfdead4d7f98ca7c8dd45d4ba1c49 Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Wed, 26 Dec 2018 20:00:18 -0700 Subject: [PATCH 06/16] Remove extra white space --- persistent_queue/persistent_queue.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/persistent_queue/persistent_queue.py b/persistent_queue/persistent_queue.py index 0643f11..bb2a60a 100644 --- a/persistent_queue/persistent_queue.py +++ b/persistent_queue/persistent_queue.py @@ -96,31 +96,31 @@ def _set_queue_top(self, top): os.fsync(self._file.fileno()) self._file.seek(current_pos, 0) - + def _peek(self, block=False, timeout=None): with self._get_lock: _LOGGER.debug("Peeking item") - + if self._length < 1: if not block: raise queue.Empty - + # Wait for something to be added if self._put_event.wait(timeout) is False: # Nothing was added to the queue and timeout expired # This will never happen if timeout is None raise queue.Empty - + self._put_event.clear() - + with self._file_lock: self._file.seek(self._get_queue_top(), 0) # Beginning of data length = struct.unpack(LENGTH_STRUCT, self._file.read(4))[0] data = self._file.read(length) item = self.loads(data) - + queue_top = self._file.tell() - + return item, queue_top def qsize(self): @@ -176,12 +176,12 @@ def put(self, item, block=True, timeout=None): if self.maxsize > 0 and self._length + 1 > self.maxsize: if not block: raise queue.Full - + if self._get_event.wait(timeout) is False: # Nothing was removed from the queue and timeout expired # This will never happen if timeout is None raise queue.Full - + self._get_event.clear() # Convert the object outside of the file lock @@ -282,7 +282,7 @@ def join(self): with self._all_tasks_done: while self._unfinished_tasks: self._all_tasks_done.wait() - + def peek(self, block=False, timeout=None): """ Peeks into the queue and returns an item without removing it. @@ -379,7 +379,7 @@ def delete(self): Remove item from queue without reading object from file. """ _LOGGER.debug("Deleting item") - + # If there is nothing in the queue, do nothing if self._length < 1: return From 09f60d83d1b66151c0cbc442871f6d8a4f16cd7c Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Wed, 26 Dec 2018 20:55:18 -0700 Subject: [PATCH 07/16] Ignore .pyc files --- .gitignore | 1 + 1 file changed, 1 insertion(+) diff --git a/.gitignore b/.gitignore index b032e02..fca1811 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,4 @@ dist python_persistent_queue.egg-info *.queue *.coverage +*.pyc From f0694a6d640e27c1c3c942eba8cc1acb67dcd66b Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Wed, 26 Dec 2018 20:28:58 -0700 Subject: [PATCH 08/16] Remove bug when calling task_done --- persistent_queue/persistent_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/persistent_queue/persistent_queue.py b/persistent_queue/persistent_queue.py index bb2a60a..5f21e51 100644 --- a/persistent_queue/persistent_queue.py +++ b/persistent_queue/persistent_queue.py @@ -260,7 +260,7 @@ def task_done(self): Raises a ValueError if called more times than there were items placed in the queue. """ with self._all_tasks_done: - unfinished = self._unfinished_tasks - items + unfinished = self._unfinished_tasks - 1 if unfinished < 0: raise ValueError('task_done() called too many times') if unfinished == 0: From bef948ba3f5c97b1c261b827334a61c5ab717e43 Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Wed, 26 Dec 2018 20:54:22 -0700 Subject: [PATCH 09/16] Update tests with changes to queue --- tests/test_persistent_queue.py | 63 +++++++++++++++++++--------------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/tests/test_persistent_queue.py b/tests/test_persistent_queue.py index afcf886..979579c 100644 --- a/tests/test_persistent_queue.py +++ b/tests/test_persistent_queue.py @@ -44,7 +44,6 @@ def test_simple(self): os.remove(filename) def test_qsize(self): - assert len(self.queue) == 0 assert self.queue.qsize() == 0 @@ -72,20 +71,23 @@ def test_put(self): self.queue.put(5) assert self.queue.peek() == 5 - self.queue.put_nowait(5) + self.queue.put_nowait(10) assert self.queue.get() == 5 + assert self.queue.get() == 10 - self.queue.put([10, 15, 20]) - assert self.queue.peek(items=4) == [5, 10, 15, 20] + self.queue.put(None) + assert self.queue.get() == None data = {b'a': 1, b'b': 2, b'c': [1, 2, 3]} self.queue.put(data) - assert self.queue.peek(items=5) == [5, 10, 15, 20, data] + assert self.queue.get() == data self.queue.put([]) - assert self.queue.peek(items=5) == [5, 10, 15, 20, data] + assert self.queue.get() == [] + + self.queue.maxsize = 1 + self.queue.put(0) - self.queue.maxsize = 4 with pytest.raises(queue.Full): self.queue.put(b'full', timeout=1) @@ -107,15 +109,16 @@ def test_get(self): self.queue.put(b'b') self.queue.put(b'c') self.queue.put(b'd') - assert len(self.queue) == 3 + assert len(self.queue) == 4 assert [self.queue.get(), self.queue.get(), self.queue.get()] == [b'a', b'b', b'c'] assert len(self.queue) == 1 - + assert self.queue.get() == b'd' with pytest.raises(queue.Empty): self.queue.get(block=False) + assert len(self.queue) == 0 def test_get_blocking(self): @@ -152,12 +155,12 @@ def test_peek(self): assert self.queue.peek() == 1 assert self.queue.get() == 1 - + assert self.queue.peek() == 2 assert self.queue.get() == 2 assert self.queue.peek() == b'test' - assert self.queue.get() == b'test' + assert self.queue.get() == b'test' self.queue.clear() @@ -182,7 +185,7 @@ def func(): assert data == 5 assert len(self.queue) == 1 - def test_peek_blocking_list(self): + def test_get_blocking_list(self): done_pushing = [False] done_peeking = [False] @@ -196,11 +199,11 @@ def func(): t = threading.Thread(target=func) t.start() - data = self.queue.peek(items=5, block=True) + data = [self.queue.get(block=True) for i in range(5)] done_peeking[0] = True assert done_pushing[0] is True assert data == [0, 1, 2, 3, 4] - assert len(self.queue) == 5 + assert len(self.queue) == 0 def test_peek_no_values(self): with pytest.raises(queue.Empty): @@ -209,19 +212,23 @@ def test_peek_no_values(self): def test_clear(self): self.queue.put(5) self.queue.put(50) - + assert len(self.queue) == 2 assert self.queue.peek() == 5 - + self.queue.clear() - + assert len(self.queue) == 0 with pytest.raises(queue.Empty): assert self.queue.peek() def test_copy(self): new_queue_name = 'another_queue' - self.queue.put([5, 4, 3, 2, 1]) + self.queue.put(5) + self.queue.put(4) + self.queue.put(3) + self.queue.put(2) + self.queue.put(1) assert len(self.queue) == 5 assert self.queue.get() == 5 @@ -244,10 +251,10 @@ def test_delete(self): self.queue.delete() assert len(self.queue) == 3 - + self.queue.delete() assert len(self.queue) == 2 - + assert self.queue.peek() == 7 assert self.queue.get() == 7 assert self.queue.get() == 11 @@ -290,7 +297,7 @@ def test_big_file_2(self): self.queue.put(data) for i in range(995): - assert self.queue.get() == i + assert self.queue.get() == data self.queue.flush() assert len(self.queue) == 5 @@ -304,14 +311,15 @@ def test_usage(self): self.queue.put([b'a', b'b', b'c']) assert self.queue.peek() == 1 - assert self.queue.peek(items=4) == [1, 2, 3, b'a'] - assert len(self.queue) == 6 + assert len(self.queue) == 4 self.queue.put(b'foobar') assert self.queue.get() == 1 - assert len(self.queue) == 6 - assert self.queue.get(items=6) == [2, 3, b'a', b'b', b'c', b'foobar'] + assert len(self.queue) == 4 + + for item in [2, 3, [b'a', b'b', b'c'], b'foobar']: + assert self.queue.get() == item def test_threads(self): def random_stuff(): @@ -320,12 +328,13 @@ def random_stuff(): if random_number % 3 == 0: try: - self.queue.peek(block=False, items=(random_number % 5)) + self.queue.peek(block=False) except queue.Empty: pass elif random_number % 2 == 0: try: - self.queue.get(block=False, items=(random_number % 5)) + for _ in range(random_number % 5): + self.queue.get(block=False) except queue.Empty: pass else: From 4c5f9304fc9ab119c94c8e3447eaf22e887373d8 Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Wed, 26 Dec 2018 20:55:04 -0700 Subject: [PATCH 10/16] Fix bug with clearing get/put events and add tests --- persistent_queue/persistent_queue.py | 3 ++- tests/test_persistent_queue.py | 7 ++++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/persistent_queue/persistent_queue.py b/persistent_queue/persistent_queue.py index 5f21e51..ad47913 100644 --- a/persistent_queue/persistent_queue.py +++ b/persistent_queue/persistent_queue.py @@ -100,6 +100,7 @@ def _set_queue_top(self, top): def _peek(self, block=False, timeout=None): with self._get_lock: _LOGGER.debug("Peeking item") + self._put_event.clear() if self._length < 1: if not block: @@ -173,6 +174,7 @@ def put(self, item, block=True, timeout=None): _LOGGER.debug("Putting item") with self._put_lock: + self._get_event.clear() if self.maxsize > 0 and self._length + 1 > self.maxsize: if not block: raise queue.Full @@ -181,7 +183,6 @@ def put(self, item, block=True, timeout=None): # Nothing was removed from the queue and timeout expired # This will never happen if timeout is None raise queue.Full - self._get_event.clear() # Convert the object outside of the file lock diff --git a/tests/test_persistent_queue.py b/tests/test_persistent_queue.py index 979579c..0986fd1 100644 --- a/tests/test_persistent_queue.py +++ b/tests/test_persistent_queue.py @@ -88,8 +88,9 @@ def test_put(self): self.queue.maxsize = 1 self.queue.put(0) + # Test that Full exception gets raised after something has been put in queue with pytest.raises(queue.Full): - self.queue.put(b'full', timeout=1) + self.queue.put(b'full', block=True, timeout=1) with pytest.raises(queue.Full): self.queue.put(b'full', block=False) @@ -116,6 +117,10 @@ def test_get(self): assert self.queue.get() == b'd' + # Test that Empty exception gets raised after something has been removed in queue + with pytest.raises(queue.Empty): + self.queue.get(block=True, timeout=1) + with pytest.raises(queue.Empty): self.queue.get(block=False) From af1ff96033f9fa413830451785eb1fe3de623ca2 Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Wed, 26 Dec 2018 20:59:22 -0700 Subject: [PATCH 11/16] Fix lint issue with test --- tests/test_persistent_queue.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_persistent_queue.py b/tests/test_persistent_queue.py index 0986fd1..54e3980 100644 --- a/tests/test_persistent_queue.py +++ b/tests/test_persistent_queue.py @@ -76,7 +76,7 @@ def test_put(self): assert self.queue.get() == 10 self.queue.put(None) - assert self.queue.get() == None + assert self.queue.get() is None data = {b'a': 1, b'b': 2, b'c': [1, 2, 3]} self.queue.put(data) From efbec545c163a2461bb3666e0a15fe5cda1c502b Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Wed, 26 Dec 2018 21:31:53 -0700 Subject: [PATCH 12/16] Add support for Python 3.7 --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index 53e1e9e..1a3ceba 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,4 +1,5 @@ sudo: false +dist: xenial language: python cache: pip matrix: From 7e76aa3f2f221ffc0d9e004f7a37de0c45e2f60a Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Thu, 27 Dec 2018 10:43:34 -0700 Subject: [PATCH 13/16] Update README.md --- README.md | 45 ++++++++++++++++++++++++--------------------- 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/README.md b/README.md index 85eb55f..384ae2c 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,24 @@ # Description -Implementation of a persistent queue in Python. I looked around and couldn't find anything that fit my needs, so I made my own. Example usage: +This is an implementation of a persistent queue in Python. By persistent, I mean that every item that is added to the queue is saved to disk. I had a specific usage pattern in mind, and I couldn't find any exisiting libraries that fit my needs, so I made my own. I tried to make it a drop in replacement for Python's Queue object. My queue object has one extra methods that Python's Queue class does not, `peek`. + +I created this with the following workflow in mind: + +```python + +data = queue.peek() + +success = upload_data_somewhere(data) + +if success: + queue.delete() + +``` + +In this use case, the data is only deleted from the queue after it has been successfully processed (in this example, uploaded). + +Here is an extended example: ```python from persistent_queue import PersistentQueue @@ -16,38 +33,24 @@ queue.push(3) queue.push(['a', 'b', 'c']) data = queue.peek() # 1 -data = queue.peek(4) # [1, 2, 3, 'a'] size = len(queue) # 6 queue.push('foobar') -data = queue.pop() # 1 +data = queue.get() # Returns 1 -queue.delete(2) -data = queue.pop() # 3 +queue.delete() # Deletes 2 +data = queue.get() # Returns 3 -queue.clear() +queue.clear() # Deletes everything from queue ``` -Objects that are added to the queue must be pickle-able. A file is saved to the file system based on the name given to the queue. The same name must be given if you want the data to persist. - -I created this with the following workflow in mind: - -```python - -data = queue.peek(5) +Objects that are added to the queue must be serializeable (and with default parameters, pickle-able). A file is saved to the file system based on the name given to the queue. The same name must be given if you want the data to persist. -success = upload_data_somewhere(data) - -if success: - queue.delete(5) - queue.flush() # Remove extra space - -``` By default, `pickle` is used to serialize objects. This can be changed depending on your needs by setting the `dumps` and `loads` options (see Parameters). [dill](http://trac.mystic.cacr.caltech.edu/project/pathos/wiki/dill.html) and [msgpack](https://github.com/msgpack/msgpack-python) have been tested (see tests as an example). -When items are popped or deleted, the data isn't actually deleted. Instead a pointer is moved to the place in the file with valid data. As a result, the file will continue to grow even if items are removed. `persistent_queue.flush()` reclaims this space. **You must call `flush` as you see fit!** +When `get` or `delete` are called, the data isn't actually deleted. Instead a pointer is moved to the place in the file with valid data. This is to help reduce IO costs. As a result, the file will continue to grow even if items are removed. `persistent_queue.flush()` reclaims this space. **You must call `flush` as you see fit!** # Parameters From 15ff938ee62846c4d39c893a6cdbf919e97985de Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Thu, 27 Dec 2018 13:14:07 -0700 Subject: [PATCH 14/16] Add repr and tests --- persistent_queue/persistent_queue.py | 16 ++++++++++++++++ tests/test_persistent_queue.py | 6 ++++++ 2 files changed, 22 insertions(+) diff --git a/persistent_queue/persistent_queue.py b/persistent_queue/persistent_queue.py index ad47913..114180e 100644 --- a/persistent_queue/persistent_queue.py +++ b/persistent_queue/persistent_queue.py @@ -399,3 +399,19 @@ def __len__(self): Get size of queue. """ return self._length + + def __repr__(self): + """ + Return a representation of a persistent queue. + """ + string = "PersistentQueue(filename={filename!r}, " + \ + "maxsize={maxsize!r}, " + \ + "dumps={dumps}, " + \ + "loads={loads}, " + \ + "flush_limit={flush_limit!r})" + + return string.format(filename=self.filename, + maxsize=self.maxsize, + dumps=self.dumps, + loads=self.loads, + flush_limit=self.flush_limit) diff --git a/tests/test_persistent_queue.py b/tests/test_persistent_queue.py index 54e3980..b9f6adf 100644 --- a/tests/test_persistent_queue.py +++ b/tests/test_persistent_queue.py @@ -378,6 +378,12 @@ def worker(): self.queue.join() assert self.queue.empty() is True + def test_repr(self): + queue_repr = repr(self.queue) + assert 'filename' in queue_repr + assert 'maxsize' in queue_repr + assert 'flush_limit' in queue_repr + class TestPersistentQueueWithDill(TestPersistentQueue): def setup_method(self): From 47d5eccc711843f8b4a7f2c42653eef5cb03673e Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Fri, 28 Dec 2018 01:32:07 -0700 Subject: [PATCH 15/16] Add close method and tests --- persistent_queue/persistent_queue.py | 5 ++ tests/test_persistent_queue.py | 111 +++++++++++++++++++++++++++ tox.ini | 1 + 3 files changed, 117 insertions(+) diff --git a/persistent_queue/persistent_queue.py b/persistent_queue/persistent_queue.py index 114180e..0c25982 100644 --- a/persistent_queue/persistent_queue.py +++ b/persistent_queue/persistent_queue.py @@ -394,6 +394,11 @@ def delete(self): _LOGGER.debug("Done deleting data") + def close(self): + self._file.close() + self._put_event.set() # Alert threads waiting for a put + self._get_event.set() # Alert threads waiting for a get + def __len__(self): """ Get size of queue. diff --git a/tests/test_persistent_queue.py b/tests/test_persistent_queue.py index b9f6adf..6f66307 100644 --- a/tests/test_persistent_queue.py +++ b/tests/test_persistent_queue.py @@ -5,6 +5,11 @@ import uuid import pytest +try: + from unittest.mock import MagicMock +except ImportError: + from mock import MagicMock + try: import queue except ImportError: @@ -378,12 +383,118 @@ def worker(): self.queue.join() assert self.queue.empty() is True + def test_close(self): + self.queue.put(1) + self.queue.close() + + with pytest.raises(ValueError): + self.queue.put(2) + + new_queue = PersistentQueue(self.queue.filename, + dumps=self.queue.dumps, + loads=self.queue.loads) + assert new_queue.get() == 1 + assert len(new_queue) == 0 + + def test_close_threaded_getting(self): + def worker(): + while True: + try: + assert self.queue.get() in [1, 2, 3] + except ValueError: + return + + t = threading.Thread(target=worker) + t.start() + + self.queue.put(1) + time.sleep(.2) + self.queue.put(2) + time.sleep(.2) + self.queue.put(3) + time.sleep(1) + + self.queue.close() + + def test_close_threaded_putting(self): + self.queue.maxsize = 1 + + def worker(): + self.queue.put("test_1") + + while True: + try: + self.queue.put("test_2") + assert False + except ValueError: + return + + t = threading.Thread(target=worker) + t.start() + + time.sleep(1) + + assert len(self.queue) == 1 + self.queue.close() + def test_repr(self): queue_repr = repr(self.queue) assert 'filename' in queue_repr assert 'maxsize' in queue_repr assert 'flush_limit' in queue_repr + def test_half_write_os_fsync(self): + """ + Test if there is a failure in the middle of putting something into the + queue. + """ + + self.queue.put(1) + old_os_fsync = os.fsync + + # Set up failure when calling os.fsync + os.fsync = MagicMock(side_effect=Exception('Power shutoff')) + with pytest.raises(Exception): + self.queue.put(2) + self.queue.close() + + os.fsync = old_os_fsync + new_queue = PersistentQueue(self.queue.filename, + dumps=self.queue.dumps, + loads=self.queue.loads) + + # Failure occurred before the second item was fully added + assert len(new_queue) == 1 + assert new_queue.get() == 1 + with pytest.raises(queue.Empty): + new_queue.get(block=False) + + def test_half_write_update_length(self): + """ + Test if there is a failure in the middle of putting something into the + queue. + """ + + self.queue.put(1) + old_update_length = self.queue._update_length + + # Set up failure when calling os.fsync + self.queue._update_length = MagicMock(side_effect=Exception('Power shutoff')) + with pytest.raises(Exception): + self.queue.put(2) + self.queue.close() + + self.queue._update_length = old_update_length + new_queue = PersistentQueue(self.queue.filename, + dumps=self.queue.dumps, + loads=self.queue.loads) + + # Failure occurred before the second item was fully added + assert len(new_queue) == 1 + assert new_queue.get() == 1 + with pytest.raises(queue.Empty): + new_queue.get(block=False) + class TestPersistentQueueWithDill(TestPersistentQueue): def setup_method(self): diff --git a/tox.ini b/tox.ini index 5f01721..a22d037 100644 --- a/tox.ini +++ b/tox.ini @@ -11,6 +11,7 @@ deps = pytest>=3.0.4,<4 pytest-cov>=2.4.0,<3 pytest-xdist>=1.5.0,<2 + mock passenv = CODECOV_TOKEN CI CI_* TRAVIS TRAVIS_* setenv = HOME = {envtmpdir} commands = py.test -v --cov persistent_queue {posargs} From 45b3f3a8bd89a17114c89185768f3a1737681eea Mon Sep 17 00:00:00 2001 From: Philip Lundrigan Date: Fri, 28 Dec 2018 01:44:35 -0700 Subject: [PATCH 16/16] Update README --- README.md | 52 ++++++++++++++++++++++++++++++++++++++-------------- 1 file changed, 38 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index 384ae2c..a9d9640 100644 --- a/README.md +++ b/README.md @@ -2,9 +2,9 @@ # Description -This is an implementation of a persistent queue in Python. By persistent, I mean that every item that is added to the queue is saved to disk. I had a specific usage pattern in mind, and I couldn't find any exisiting libraries that fit my needs, so I made my own. I tried to make it a drop in replacement for Python's Queue object. My queue object has one extra methods that Python's Queue class does not, `peek`. +This is an implementation of a persistent queue in Python. By persistent, I mean that every item that is added to the queue is saved to disk. I had a specific usage pattern in mind, and I couldn't find any existing libraries that fit my needs, so I made my own. I tried to make it a drop in replacement for Python's [queue.Queue](https://docs.python.org/3/library/queue.html) object. However, I've added a few extra methods that Python's queue.Queue class does not: `peek`, `flush`, `clear`, and `close`. -I created this with the following workflow in mind: +I created Python persistent queue with the following workflow in mind: ```python @@ -17,9 +17,37 @@ if success: ``` -In this use case, the data is only deleted from the queue after it has been successfully processed (in this example, uploaded). +In this use case, the data is only deleted from the queue after it has been successfully processed (in this example, uploaded). -Here is an extended example: +Typically, I have two threads, one that is writing data to the queue and another thread that is reading data from the queue and processing it. Something like this: + +```python + +queue = PersistentQueue('queue') + +def consumer(): + while True: + data = queue.peek() # Blocking call + success = upload_data_somewhere(data) + if success: + queue.delete() + + +def producer(): + while True: + data = read_from_sensor() + queue.put(data) + time.sleep(1) +``` + +Objects that are added to the queue must be serializeable (and with default parameters, pickle-able). A file is saved to the file system based on the name given to the queue. + +By default, `pickle` is used to serialize objects. This can be changed depending on your needs by setting the `dumps` and `loads` options (see Parameters). [dill](http://trac.mystic.cacr.caltech.edu/project/pathos/wiki/dill.html) and [msgpack](https://github.com/msgpack/msgpack-python) have been tested (see tests as an example). + +When `get()` or `delete()` are called, the data isn't actually deleted. Instead a pointer is moved to the place in the file with valid data. This is to help reduce I/O costs. As a result, the file will continue to grow even if items are removed. `flush()` reclaims this space. **You must call `flush` as you see fit!** + + +# Extended Example ```python from persistent_queue import PersistentQueue @@ -41,16 +69,16 @@ data = queue.get() # Returns 1 queue.delete() # Deletes 2 data = queue.get() # Returns 3 +data = queue.get() # Returns ['a', 'b', 'c'] queue.clear() # Deletes everything from queue ``` -Objects that are added to the queue must be serializeable (and with default parameters, pickle-able). A file is saved to the file system based on the name given to the queue. The same name must be given if you want the data to persist. - - -By default, `pickle` is used to serialize objects. This can be changed depending on your needs by setting the `dumps` and `loads` options (see Parameters). [dill](http://trac.mystic.cacr.caltech.edu/project/pathos/wiki/dill.html) and [msgpack](https://github.com/msgpack/msgpack-python) have been tested (see tests as an example). +# Install -When `get` or `delete` are called, the data isn't actually deleted. Instead a pointer is moved to the place in the file with valid data. This is to help reduce IO costs. As a result, the file will continue to grow even if items are removed. `persistent_queue.flush()` reclaims this space. **You must call `flush` as you see fit!** +``` +pip install python-persistent-queue +``` # Parameters @@ -62,8 +90,4 @@ A persistent queue takes the following parameters: - `loads` (*optional*, default=`pickle.loads`): The method used to convert bytes into a Python object. - `flush_limit` (*optional*, default=1048576): When the amount of empty space in the file is greater than `flush_limit`, the file will be flushed. This balances file I/O and storage space. -# Install - -``` -pip install python-persistent-queue -``` +# Methods