diff --git a/README.md b/README.md index da0ce3c02..31727ad86 100644 --- a/README.md +++ b/README.md @@ -30,8 +30,8 @@ Since Release - - commit since last release + + commit since last release diff --git a/datajoint/autopopulate.py b/datajoint/autopopulate.py index 226e64dda..d696ce981 100644 --- a/datajoint/autopopulate.py +++ b/datajoint/autopopulate.py @@ -93,7 +93,7 @@ def _rename_attributes(table, props): self._key_source *= _rename_attributes(*q) return self._key_source - def make(self, key): + def make(self, key, **kwargs): """ This method must be implemented by derived classes to perform automated computation. The method must implement the following three steps: @@ -136,6 +136,8 @@ def make(self, key): DataJoint may programmatically enforce this separation in the future. :param key: The primary key value used to restrict the data fetching. + :param kwargs: Keyword arguments passed from populate(make_kwargs=...). + These are passed to make_fetch for the tripartite pattern. :raises NotImplementedError: If the derived class does not implement the required methods. """ @@ -153,7 +155,7 @@ def make(self, key): # User has implemented `_fetch`, `_compute`, and `_insert` methods instead # Step 1: Fetch data from parent tables - fetched_data = self.make_fetch(key) # fetched_data is a tuple + fetched_data = self.make_fetch(key, **kwargs) # fetched_data is a tuple computed_result = yield fetched_data # passed as input into make_compute # Step 2: If computed result is not passed in, compute the result diff --git a/datajoint/external.py b/datajoint/external.py index b3de2ff5d..0f210556d 100644 --- a/datajoint/external.py +++ b/datajoint/external.py @@ -276,30 +276,67 @@ def upload_filepath(self, local_filepath): uuid = uuid_from_buffer( init_string=relative_filepath ) # hash relative path, not contents - contents_hash = uuid_from_file(local_filepath) + + # Check if checksum should be skipped based on file size limit + file_size = Path(local_filepath).stat().st_size + size_limit = config.get("filepath_checksum_size_limit_insert") + skip_checksum = size_limit is not None and file_size > size_limit + + if skip_checksum: + contents_hash = None + logger.warning( + f"Skipping checksum for '{relative_filepath}' ({file_size} bytes > {size_limit} byte limit)" + ) + else: + contents_hash = uuid_from_file(local_filepath) # check if the remote file already exists and verify that it matches check_hash = (self & {"hash": uuid}).fetch("contents_hash") if check_hash.size: # the tracking entry exists, check that it's the same file as before - if contents_hash != check_hash[0]: + if not skip_checksum and contents_hash != check_hash[0]: raise DataJointError( f"A different version of '{relative_filepath}' has already been placed." ) else: # upload the file and create its tracking entry - self._upload_file( - local_filepath, - self._make_external_filepath(relative_filepath), - metadata={"contents_hash": str(contents_hash)}, - ) + external_path = self._make_external_filepath(relative_filepath) + already_uploaded = False + if self.spec["protocol"] == "s3": + stat = self.s3.stat(str(external_path)) + if stat is not None and stat.size == file_size: + # Verify contents_hash from S3 metadata when available + if skip_checksum: + already_uploaded = True + else: + remote_meta = { + k.lower().lstrip("x-amz-meta-"): v + for k, v in (stat.metadata or {}).items() + } + remote_hash = remote_meta.get("contents_hash", "") + if remote_hash == str(contents_hash): + already_uploaded = True + if already_uploaded: + logger.info( + f"File already exists on S3 with matching size" + f"{'' if skip_checksum else ' and checksum'}" + f", skipping upload: '{relative_filepath}'" + ) + if not already_uploaded: + self._upload_file( + local_filepath, + external_path, + metadata={ + "contents_hash": str(contents_hash) if contents_hash else "" + }, + ) self.connection.query( "INSERT INTO {tab} (hash, size, filepath, contents_hash) VALUES (%s, {size}, '{filepath}', %s)".format( tab=self.full_table_name, - size=Path(local_filepath).stat().st_size, + size=file_size, filepath=relative_filepath, ), - args=(uuid.bytes, contents_hash.bytes), + args=(uuid.bytes, contents_hash.bytes if contents_hash else None), ) return uuid diff --git a/datajoint/plugin.py b/datajoint/plugin.py index 8cb668092..94070342d 100644 --- a/datajoint/plugin.py +++ b/datajoint/plugin.py @@ -1,7 +1,7 @@ import logging from pathlib import Path -import pkg_resources +import pkg_resources # requires setuptools<82 from cryptography.exceptions import InvalidSignature from otumat import hash_pkg, verify diff --git a/datajoint/s3.py b/datajoint/s3.py index 98dc75708..9f58f52f0 100644 --- a/datajoint/s3.py +++ b/datajoint/s3.py @@ -95,16 +95,19 @@ def fget(self, name, local_filepath): if "contents_hash" in meta: return uuid.UUID(meta["contents_hash"]) - def exists(self, name): - logger.debug("exists: {}:{}".format(self.bucket, name)) + def stat(self, name): + """Return stat result for an object, or None if it does not exist.""" + logger.debug("stat: {}:{}".format(self.bucket, name)) try: - self.client.stat_object(self.bucket, str(name)) + return self.client.stat_object(self.bucket, str(name)) except minio.error.S3Error as e: if e.code == "NoSuchKey": - return False - else: - raise e - return True + return None + raise e + + def exists(self, name): + logger.debug("exists: {}:{}".format(self.bucket, name)) + return self.stat(name) is not None def get_size(self, name): logger.debug("get_size: {}:{}".format(self.bucket, name)) diff --git a/datajoint/settings.py b/datajoint/settings.py index 30b206f99..c8add6017 100644 --- a/datajoint/settings.py +++ b/datajoint/settings.py @@ -49,8 +49,11 @@ "database.use_tls": None, "enable_python_native_blobs": True, # python-native/dj0 encoding support "add_hidden_timestamp": False, - # file size limit for when to disable checksums + # file size limits for when to disable checksums (in bytes) + # filepath_checksum_size_limit: skip checksum verification on fetch for large files "filepath_checksum_size_limit": None, + # filepath_checksum_size_limit_insert: skip checksum computation on insert for large files + "filepath_checksum_size_limit_insert": None, } ) diff --git a/datajoint/version.py b/datajoint/version.py index b51d5935a..60f19e7c6 100644 --- a/datajoint/version.py +++ b/datajoint/version.py @@ -1,6 +1,6 @@ # version bump auto managed by Github Actions: # label_prs.yaml(prep), release.yaml(bump), post_release.yaml(edit) # manually set this version will be eventually overwritten by the above actions -__version__ = "0.14.5" +__version__ = "0.14.9" assert len(__version__) <= 10 # The log table limits version to the 10 characters diff --git a/pyproject.toml b/pyproject.toml index c787cc11d..9ec0660db 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ dependencies = [ "faker", "cryptography", "urllib3", - "setuptools", + "setuptools<82", # pkg_resources removed in 82.0.0 ] requires-python = ">=3.9,<4.0" authors = [