Skip to content

Commit 92eeb8e

Browse files
committed
Merge branch 'main' into add-native-progress
2 parents b79c583 + 1b1397d commit 92eeb8e

14 files changed

Lines changed: 520 additions & 60 deletions

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,8 @@ DVUploader provides several environment variables that allow you to control retr
127127
- `DVUPLOADER_MIN_RETRY_TIME`: Minimum wait time between retries in seconds (default: 1)
128128
- `DVUPLOADER_RETRY_MULTIPLIER`: Multiplier for exponential backoff (default: 0.1)
129129
- `DVUPLOADER_MAX_PKG_SIZE`: Maximum package size in bytes (default: 2GB)
130+
- `DVUPLOADER_LOCK_WAIT_TIME`: Time to wait between checks for dataset lock (default: 10 seconds)
131+
- `DVUPLOADER_LOCK_TIMEOUT`: Timeout for dataset lock check in seconds (default: 300 seconds)
130132

131133
**Setting via environment:**
132134
```bash
@@ -135,6 +137,8 @@ export DVUPLOADER_MAX_RETRY_TIME=300
135137
export DVUPLOADER_MIN_RETRY_TIME=2
136138
export DVUPLOADER_RETRY_MULTIPLIER=0.2
137139
export DVUPLOADER_MAX_PKG_SIZE=3221225472 # 3GB
140+
export DVUPLOADER_LOCK_WAIT_TIME=5
141+
export DVUPLOADER_LOCK_TIMEOUT=300
138142
```
139143

140144
**Setting programmatically:**
@@ -148,6 +152,8 @@ dv.config(
148152
min_retry_time=2,
149153
retry_multiplier=0.2,
150154
max_package_size=3 * 1024**3 # 3GB
155+
lock_wait_time=5,
156+
lock_timeout=300,
151157
)
152158
153159
# Continue with your upload as normal

dvuploader/cli.py

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
1-
import yaml
2-
import typer
3-
41
from pathlib import Path
5-
from pydantic import BaseModel
62
from typing import List, Optional
3+
4+
import typer
5+
import yaml
6+
from pydantic import BaseModel
7+
78
from dvuploader import DVUploader, File
89
from dvuploader.utils import add_directory
910

@@ -29,6 +30,7 @@ class CliInput(BaseModel):
2930

3031
app = typer.Typer()
3132

33+
3234
def _enumerate_filepaths(filepaths: List[str], recurse: bool) -> List[File]:
3335
"""
3436
Take a list of filepaths and transform it into a list of File objects, optionally recursing into each of them.
@@ -39,7 +41,7 @@ def _enumerate_filepaths(filepaths: List[str], recurse: bool) -> List[File]:
3941
4042
Returns:
4143
List[File]: A list of File objects representing the files extracted from all filepaths.
42-
44+
4345
Raises:
4446
FileNotFoundError: If a filepath does not exist.
4547
IsADirectoryError: If recurse is False and a filepath points to a directory instead of a file.
@@ -183,6 +185,9 @@ def main(
183185
if filepaths is None:
184186
filepaths = []
185187

188+
if recurse is None:
189+
recurse = False
190+
186191
_validate_inputs(
187192
filepaths=filepaths,
188193
pid=pid,
@@ -200,7 +205,10 @@ def main(
200205
api_token=api_token,
201206
dataverse_url=dataverse_url,
202207
persistent_id=pid,
203-
files=_enumerate_filepaths(filepaths=filepaths, recurse=recurse),
208+
files=_enumerate_filepaths(
209+
filepaths=filepaths,
210+
recurse=recurse,
211+
),
204212
)
205213

206214
uploader = DVUploader(files=cli_input.files)

dvuploader/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,8 @@
22

33

44
def config(
5+
lock_wait_time: int = 10,
6+
lock_timeout: int = 300,
57
max_retries: int = 15,
68
max_retry_time: int = 240,
79
min_retry_time: int = 1,
@@ -54,3 +56,5 @@ def config(
5456
os.environ["DVUPLOADER_MIN_RETRY_TIME"] = str(min_retry_time)
5557
os.environ["DVUPLOADER_RETRY_MULTIPLIER"] = str(retry_multiplier)
5658
os.environ["DVUPLOADER_MAX_PKG_SIZE"] = str(max_package_size)
59+
os.environ["DVUPLOADER_LOCK_WAIT_TIME"] = str(lock_wait_time)
60+
os.environ["DVUPLOADER_LOCK_TIMEOUT"] = str(lock_timeout)

dvuploader/directupload.py

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,27 @@
11
import asyncio
2-
import httpx
3-
from io import BytesIO
42
import json
53
import os
6-
from typing import Dict, List, Optional, Tuple
4+
from io import BytesIO
5+
from typing import AsyncGenerator, Dict, List, Optional, Tuple
76
from urllib.parse import urljoin
7+
88
import aiofiles
9-
from typing import AsyncGenerator
9+
import httpx
1010
from rich.progress import Progress, TaskID
1111

1212
from dvuploader.file import File
13-
from dvuploader.utils import build_url
13+
from dvuploader.utils import build_url, init_logging, wait_for_dataset_unlock
1414

1515
TESTING = bool(os.environ.get("DVUPLOADER_TESTING", False))
1616
MAX_FILE_DISPLAY = int(os.environ.get("DVUPLOADER_MAX_FILE_DISPLAY", 50))
1717
MAX_RETRIES = int(os.environ.get("DVUPLOADER_MAX_RETRIES", 10))
1818

19+
LOCK_WAIT_TIME = int(os.environ.get("DVUPLOADER_LOCK_WAIT_TIME", 1.5))
20+
LOCK_TIMEOUT = int(os.environ.get("DVUPLOADER_LOCK_TIMEOUT", 300))
21+
22+
assert isinstance(LOCK_WAIT_TIME, int), "DVUPLOADER_LOCK_WAIT_TIME must be an integer"
23+
assert isinstance(LOCK_TIMEOUT, int), "DVUPLOADER_LOCK_TIMEOUT must be an integer"
24+
1925
assert isinstance(MAX_FILE_DISPLAY, int), (
2026
"DVUPLOADER_MAX_FILE_DISPLAY must be an integer"
2127
)
@@ -27,6 +33,9 @@
2733
UPLOAD_ENDPOINT = "/api/datasets/:persistentId/addFiles?persistentId="
2834
REPLACE_ENDPOINT = "/api/datasets/:persistentId/replaceFiles?persistentId="
2935

36+
# Initialize logging
37+
init_logging()
38+
3039

3140
async def direct_upload(
3241
files: List[File],
@@ -250,7 +259,7 @@ async def _upload_singlepart(
250259
"headers": headers,
251260
"url": ticket["url"],
252261
"content": upload_bytes(
253-
file=file.handler, # type: ignore
262+
file=file.get_handler(), # type: ignore
254263
progress=progress,
255264
pbar=pbar,
256265
hash_func=file.checksum._hash_fun,
@@ -549,6 +558,13 @@ async def _add_files_to_ds(
549558
pbar: Progress bar for registration.
550559
"""
551560

561+
await wait_for_dataset_unlock(
562+
session=session,
563+
persistent_id=pid,
564+
sleep_time=LOCK_WAIT_TIME,
565+
timeout=LOCK_TIMEOUT,
566+
)
567+
552568
novel_url = urljoin(dataverse_url, UPLOAD_ENDPOINT + pid)
553569
replace_url = urljoin(dataverse_url, REPLACE_ENDPOINT + pid)
554570

dvuploader/dvuploader.py

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,14 @@ def _check_duplicates(
244244
to_skip.append(file.file_id)
245245

246246
if replace_existing:
247-
ds_file = self._get_dsfile_by_id(file.file_id, ds_files) # type: ignore
248-
if not self._check_size(file, ds_file): # type: ignore
247+
assert file.file_id is not None, "File ID is required"
248+
assert isinstance(file.file_id, int), "File ID must be an integer"
249+
250+
ds_file = self._get_dsfile_by_id(file.file_id, ds_files)
251+
252+
assert ds_file is not None, "Dataset file not found"
253+
254+
if not self._check_size(file, ds_file):
249255
file._unchanged_data = False
250256
else:
251257
# calculate checksum
@@ -364,10 +370,12 @@ def _check_hashes(file: File, dsFile: Dict):
364370
dsFile.get("directoryLabel", ""), dsFile["dataFile"]["filename"]
365371
)
366372

373+
directory_label = file.directory_label if file.directory_label else ""
374+
367375
return (
368376
file.checksum.value == hash_value
369377
and file.checksum.type == hash_algo
370-
and path == os.path.join(file.directory_label, file.file_name) # type: ignore
378+
and path == os.path.join(directory_label, file.file_name) # type: ignore
371379
)
372380

373381
@staticmethod

dvuploader/file.py

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,6 @@ def extract_file_name(self):
130130

131131
if self.handler is None:
132132
self._validate_filepath(self.filepath)
133-
self.handler = open(self.filepath, "rb")
134133
self._size = os.path.getsize(self.filepath)
135134
else:
136135
self._size = len(self.handler.read())
@@ -147,6 +146,15 @@ def extract_file_name(self):
147146

148147
return self
149148

149+
def get_handler(self) -> IO:
150+
"""
151+
Opens the file and initializes the file handler.
152+
"""
153+
if self.handler is not None:
154+
return self.handler
155+
156+
return open(self.filepath, "rb")
157+
150158
@staticmethod
151159
def _validate_filepath(path):
152160
"""
@@ -190,21 +198,27 @@ def update_checksum_chunked(self, blocksize=2**20):
190198
Note:
191199
This method resets the file position to the start after reading.
192200
"""
193-
assert self.handler is not None, "File handler is not initialized."
194201
assert self.checksum is not None, "Checksum is not initialized."
195202
assert self.checksum._hash_fun is not None, "Checksum hash function is not set."
196203

204+
handler = self.get_handler()
205+
197206
while True:
198-
buf = self.handler.read(blocksize)
207+
buf = handler.read(blocksize)
199208

200209
if not isinstance(buf, bytes):
201210
buf = buf.encode()
202211

203212
if not buf:
204213
break
205214
self.checksum._hash_fun.update(buf)
206-
207-
self.handler.seek(0)
215+
216+
if self.handler is not None: # type: ignore
217+
# In case of passed handler, we need to seek the handler to the start after reading.
218+
self.handler.seek(0)
219+
else:
220+
# Path-based handlers will be opened just-in-time, so we can close it.
221+
handler.close()
208222

209223
def __del__(self):
210224
if self.handler is not None:

dvuploader/nativeupload.py

Lines changed: 50 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,12 @@
1313

1414
from dvuploader.file import File
1515
from dvuploader.packaging import distribute_files, zip_files
16-
from dvuploader.utils import build_url, retrieve_dataset_files
16+
from dvuploader.utils import (
17+
build_url,
18+
init_logging,
19+
retrieve_dataset_files,
20+
wait_for_dataset_unlock,
21+
)
1722

1823
##### CONFIGURATION #####
1924

@@ -22,6 +27,8 @@
2227
#
2328
# This will exponentially increase the wait time between retries.
2429
# The max wait time is 240 seconds per retry though.
30+
LOCK_WAIT_TIME = int(os.environ.get("DVUPLOADER_LOCK_WAIT_TIME", 1.5))
31+
LOCK_TIMEOUT = int(os.environ.get("DVUPLOADER_LOCK_TIMEOUT", 300))
2532
MAX_RETRIES = int(os.environ.get("DVUPLOADER_MAX_RETRIES", 15))
2633
MAX_RETRY_TIME = int(os.environ.get("DVUPLOADER_MAX_RETRY_TIME", 60))
2734
MIN_RETRY_TIME = int(os.environ.get("DVUPLOADER_MIN_RETRY_TIME", 1))
@@ -32,6 +39,9 @@
3239
max=MAX_RETRY_TIME,
3340
)
3441

42+
43+
assert isinstance(LOCK_WAIT_TIME, int), "DVUPLOADER_LOCK_WAIT_TIME must be an integer"
44+
assert isinstance(LOCK_TIMEOUT, int), "DVUPLOADER_LOCK_TIMEOUT must be an integer"
3545
assert isinstance(MAX_RETRIES, int), "DVUPLOADER_MAX_RETRIES must be an integer"
3646
assert isinstance(MAX_RETRY_TIME, int), "DVUPLOADER_MAX_RETRY_TIME must be an integer"
3747
assert isinstance(MIN_RETRY_TIME, int), "DVUPLOADER_MIN_RETRY_TIME must be an integer"
@@ -85,6 +95,9 @@ def __getattr__(self, name):
8595
return getattr(self._file, name)
8696

8797

98+
init_logging()
99+
100+
88101
async def native_upload(
89102
files: List[File],
90103
dataverse_url: str,
@@ -116,7 +129,12 @@ async def native_upload(
116129
session_params = {
117130
"base_url": dataverse_url,
118131
"headers": {"X-Dataverse-key": api_token},
119-
"timeout": None,
132+
"timeout": httpx.Timeout(
133+
None,
134+
read=None,
135+
write=None,
136+
connect=None,
137+
),
120138
"limits": httpx.Limits(max_connections=n_parallel_uploads),
121139
"proxy": proxy,
122140
}
@@ -295,6 +313,14 @@ async def _single_native_upload(
295313
- dict: JSON response from the upload request
296314
"""
297315

316+
# Check if the dataset is locked
317+
await wait_for_dataset_unlock(
318+
session=session,
319+
persistent_id=persistent_id,
320+
sleep_time=LOCK_WAIT_TIME,
321+
timeout=LOCK_TIMEOUT,
322+
)
323+
298324
if not file.to_replace:
299325
endpoint = build_url(
300326
endpoint=NATIVE_UPLOAD_ENDPOINT,
@@ -306,13 +332,14 @@ async def _single_native_upload(
306332
)
307333

308334
json_data = _get_json_data(file)
335+
handler = file.get_handler()
309336

310-
assert file.handler is not None, "File handler is required for native upload"
337+
assert handler is not None, "File handler is required for native upload"
311338

312339
files = {
313340
"file": (
314341
file.file_name,
315-
_ProgressFileWrapper(file.handler, progress, pbar), # type: ignore[arg-type]
342+
_ProgressFileWrapper(handler, progress, pbar), # type: ignore[arg-type]
316343
file.mimeType,
317344
),
318345
"jsonData": (
@@ -442,6 +469,7 @@ async def _update_metadata(
442469
session=session,
443470
url=NATIVE_METADATA_ENDPOINT.format(FILE_ID=file_id),
444471
file=file,
472+
persistent_id=persistent_id,
445473
)
446474

447475
tasks.append(task)
@@ -457,6 +485,7 @@ async def _update_single_metadata(
457485
session: httpx.AsyncClient,
458486
url: str,
459487
file: File,
488+
persistent_id: str,
460489
) -> None:
461490
"""
462491
Updates the metadata of a single file in a Dataverse repository.
@@ -470,6 +499,13 @@ async def _update_single_metadata(
470499
ValueError: If metadata update fails.
471500
"""
472501

502+
await wait_for_dataset_unlock(
503+
session=session,
504+
persistent_id=persistent_id,
505+
sleep_time=LOCK_WAIT_TIME,
506+
timeout=LOCK_TIMEOUT,
507+
)
508+
473509
json_data = _get_json_data(file)
474510

475511
# Send metadata as a readable byte stream
@@ -490,7 +526,16 @@ async def _update_single_metadata(
490526
else:
491527
await asyncio.sleep(1.0)
492528

493-
raise ValueError(f"Failed to update metadata for file {file.file_name}.")
529+
if "message" in response.json():
530+
# If the response is a JSON object, we can get the error message from the "message" key.
531+
error_message = response.json()["message"]
532+
else:
533+
# If the response is not a JSON object, we can get the error message from the response text.
534+
error_message = response.text
535+
536+
raise ValueError(
537+
f"Failed to update metadata for file {file.file_name}: {error_message}"
538+
)
494539

495540

496541
def _retrieve_file_ids(

0 commit comments

Comments
 (0)