Skip to content

Commit d553be4

Browse files
authored
Merge pull request #34 from bnavigator/native-replace
Native upload: Handle file replacements and metadata updates.
2 parents 9ad752c + a3b7bea commit d553be4

3 files changed

Lines changed: 108 additions & 11 deletions

File tree

dvuploader/dvuploader.py

Lines changed: 49 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -239,9 +239,22 @@ def _check_duplicates(
239239
to_skip.append(file.file_id)
240240

241241
if replace_existing:
242-
table.add_row(
243-
file.file_name, "[bright_cyan]Exists", "[bright_black]Replace"
244-
)
242+
ds_file = self._get_dsfile_by_id(file.file_id, ds_files)
243+
if not self._check_size(file, ds_file):
244+
file._unchanged_data = False
245+
else:
246+
# calculate checksum
247+
file.update_checksum_chunked()
248+
file.apply_checksum()
249+
file._unchanged_data = self._check_hashes(file, ds_file)
250+
if file._unchanged_data:
251+
table.add_row(
252+
file.file_name, "[bright_cyan]Exists", "[bright_black]Replace Meta"
253+
)
254+
else:
255+
table.add_row(
256+
file.file_name, "[bright_cyan]Exists", "[bright_black]Replace"
257+
)
245258
else:
246259
table.add_row(
247260
file.file_name, "[bright_cyan]Exists", "[bright_black]Skipping"
@@ -294,6 +307,25 @@ def _get_file_id(
294307
if dspath == fpath:
295308
return ds_file["dataFile"]["id"]
296309

310+
@staticmethod
311+
def _get_dsfile_by_id(
312+
file_id: int,
313+
ds_files: List[Dict],
314+
) -> Optional[Dict]:
315+
"""
316+
Retrieves a dataset file dictionary by its ID.
317+
318+
Args:
319+
file_id (int): The ID of the file to retrieve.
320+
ds_files (List[Dict]): List of dictionary objects representing dataset files.
321+
322+
Returns:
323+
Optional[Dict]: The dataset file dictionary if found, None otherwise.
324+
"""
325+
for ds_file in ds_files:
326+
if ds_file["dataFile"]["id"] == file_id:
327+
return ds_file
328+
297329
@staticmethod
298330
def _check_hashes(file: File, dsFile: Dict):
299331
"""
@@ -321,6 +353,20 @@ def _check_hashes(file: File, dsFile: Dict):
321353
and path == os.path.join(file.directory_label, file.file_name) # type: ignore
322354
)
323355

356+
@staticmethod
357+
def _check_size(file: File, dsFile: Dict) -> bool:
358+
"""
359+
Checks if the file size matches the size of the file in the dataset.
360+
361+
Args:
362+
file (File): The file to check.
363+
dsFile (Dict): The file in the dataset to compare against.
364+
365+
Returns:
366+
bool: True if the sizes match, False otherwise.
367+
"""
368+
return dsFile["dataFile"]["filesize"] == file._size
369+
324370
@staticmethod
325371
def _has_direct_upload(
326372
dataverse_url: str,

dvuploader/file.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ class File(BaseModel):
2929
3030
Private Attributes:
3131
_size (int): Size of the file in bytes.
32+
_unchanged_data (bool): Indicates if the file data has not changed since last upload.
3233
3334
Methods:
3435
extract_file_name(): Extracts filename from filepath and initializes file handler.
@@ -57,6 +58,7 @@ class File(BaseModel):
5758
tab_ingest: bool = Field(default=True, alias="tabIngest")
5859

5960
_size: int = PrivateAttr(default=0)
61+
_unchanged_data: bool = PrivateAttr(default=False)
6062

6163
def extract_file_name(self):
6264
"""
@@ -110,6 +112,7 @@ def apply_checksum(self):
110112
"""
111113
Calculates and applies the checksum for the file.
112114
Must be called after extract_file_name() has initialized the checksum.
115+
And all data has been fed into the checksum hash function.
113116
114117
Raises:
115118
AssertionError: If checksum is not initialized or hash function is not set.
@@ -119,6 +122,36 @@ def apply_checksum(self):
119122

120123
self.checksum.apply_checksum()
121124

125+
126+
def update_checksum_chunked(self, blocksize=2**20):
127+
"""Updates the checksum with data read from a file-like object in chunks.
128+
129+
Args:
130+
blocksize (int, optional): Size of chunks to read. Defaults to 1MB (2**20)
131+
132+
Raises:
133+
AssertionError: If the hash function has not been initialized
134+
135+
Note:
136+
This method resets the file position to the start after reading.
137+
"""
138+
assert self.handler is not None, "File handler is not initialized."
139+
assert self.checksum is not None, "Checksum is not initialized."
140+
assert self.checksum._hash_fun is not None, "Checksum hash function is not set."
141+
142+
while True:
143+
buf = self.handler.read(blocksize)
144+
145+
if not isinstance(buf, bytes):
146+
buf = buf.encode()
147+
148+
if not buf:
149+
break
150+
self.checksum._hash_fun.update(buf)
151+
152+
self.handler.seek(0)
153+
154+
122155
def __del__(self):
123156
if self.handler is not None:
124157
self.handler.close()

dvuploader/nativeupload.py

Lines changed: 26 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -91,15 +91,30 @@ async def native_upload(
9191
"proxy": proxy,
9292
}
9393

94+
files_new = [file for file in files if not file.to_replace]
95+
files_new_metadata = [file for file in files if file.to_replace and file._unchanged_data]
96+
files_replace = [file for file in files if file.to_replace and not file._unchanged_data]
97+
9498
async with httpx.AsyncClient(**session_params) as session:
9599
with tempfile.TemporaryDirectory() as tmp_dir:
96-
packages = distribute_files(files)
100+
packages = distribute_files(files_new)
97101
packaged_files = _zip_packages(
98102
packages=packages,
99103
tmp_dir=tmp_dir,
100104
progress=progress,
101105
)
102106

107+
replacable_files = [
108+
(
109+
progress.add_task(
110+
file.file_name, # type: ignore
111+
total=file._size,
112+
),
113+
file
114+
)
115+
for file in files_replace
116+
]
117+
103118
tasks = [
104119
_single_native_upload(
105120
session=session,
@@ -108,15 +123,15 @@ async def native_upload(
108123
pbar=pbar, # type: ignore
109124
progress=progress,
110125
)
111-
for pbar, file in packaged_files
126+
for pbar, file in (packaged_files + replacable_files)
112127
]
113128

114129
responses = await asyncio.gather(*tasks)
115130
_validate_upload_responses(responses, files)
116131

117132
await _update_metadata(
118133
session=session,
119-
files=files,
134+
files=files_new + files_new_metadata,
120135
persistent_id=persistent_id,
121136
dataverse_url=dataverse_url,
122137
api_token=api_token,
@@ -167,6 +182,10 @@ def _zip_packages(
167182
for index, package in packages:
168183
if len(package) == 1:
169184
file = package[0]
185+
pbar = progress.add_task(
186+
file.file_name, # type: ignore
187+
total=file._size,
188+
)
170189
else:
171190
path = zip_files(
172191
files=package,
@@ -178,10 +197,10 @@ def _zip_packages(
178197
file.extract_file_name()
179198
file.mimeType = "application/zip"
180199

181-
pbar = progress.add_task(
182-
file.file_name, # type: ignore
183-
total=file._size,
184-
)
200+
pbar = progress.add_task(
201+
f"Zip package of {len(package)} files", # type: ignore
202+
total=file._size,
203+
)
185204

186205
files.append((pbar, file))
187206

@@ -401,7 +420,6 @@ async def _update_single_metadata(
401420
json_data = _get_json_data(file)
402421

403422
del json_data["forceReplace"]
404-
del json_data["restrict"]
405423

406424
# Send metadata as a readable byte stream
407425
# This is a workaround since "data" and "json"

0 commit comments

Comments
 (0)