diff --git a/.changeset/shiny-rockets-bathe.md b/.changeset/shiny-rockets-bathe.md new file mode 100644 index 000000000..b580c9bed --- /dev/null +++ b/.changeset/shiny-rockets-bathe.md @@ -0,0 +1,5 @@ +--- +"trackio": minor +--- + +feat:Use HF buckets as backend diff --git a/tests/e2e-spaces/conftest.py b/tests/e2e-spaces/conftest.py index 13aa71b51..d21b5b1e6 100644 --- a/tests/e2e-spaces/conftest.py +++ b/tests/e2e-spaces/conftest.py @@ -17,8 +17,10 @@ def test_space_id(): @pytest.fixture(scope="session", autouse=True) def _ensure_space_ready(test_space_id): - space_id, dataset_id = utils.preprocess_space_and_dataset_ids(test_space_id, None) - deploy.create_space_if_not_exists(space_id, None, dataset_id, None) + space_id, dataset_id, bucket_id = utils.preprocess_space_and_dataset_ids( + test_space_id, None + ) + deploy.create_space_if_not_exists(space_id, None, dataset_id, bucket_id, None) deadline = time.time() + 300 while time.time() < deadline: diff --git a/tests/e2e-spaces/test_metrics_on_spaces.py b/tests/e2e-spaces/test_metrics_on_spaces.py index 2bd2e1226..d7aad95b6 100644 --- a/tests/e2e-spaces/test_metrics_on_spaces.py +++ b/tests/e2e-spaces/test_metrics_on_spaces.py @@ -2,9 +2,11 @@ import time import huggingface_hub +import pytest from gradio_client import Client import trackio +from trackio import utils def test_basic_logging(test_space_id): @@ -92,3 +94,60 @@ def test_runs_data_persisted_after_restart(test_space_id): lr = cfg.get("learning_rate") assert lr is not None and abs(float(lr) - 0.001) < 1e-6 assert cfg.get("epochs") == 10 + + +def test_bucket_space_preserves_logged_metrics_after_restart(test_space_id): + _, dataset_id, bucket_id = utils.preprocess_space_and_dataset_ids( + test_space_id, None, None + ) + if dataset_id is not None or bucket_id is None: + pytest.skip("Requires a Space deployed with bucket backend (no dataset_id).") + + project_name = f"test_bucket_persist_{secrets.token_urlsafe(8)}" + run_name = "metrics_run" + + trackio.init(project=project_name, name=run_name, space_id=test_space_id) + trackio.log(metrics={"loss": 0.42, "acc": 0.88}) + trackio.finish() + + client = Client(test_space_id) + client.predict(api_name="/force_sync") + + huggingface_hub.add_space_variable( + test_space_id, "TRACKIO_TEST_RESTART", secrets.token_urlsafe(8) + ) + + time.sleep(10) + deadline = time.time() + 300 + client = None + while time.time() < deadline: + try: + client = Client(test_space_id, verbose=False) + break + except Exception: + time.sleep(10) + assert client is not None, "Space did not come back up after restart" + + summary = client.predict( + project=project_name, run=run_name, api_name="/get_run_summary" + ) + assert summary["num_logs"] == 1 + assert "loss" in summary["metrics"] and "acc" in summary["metrics"] + + loss_values = client.predict( + project=project_name, + run=run_name, + metric_name="loss", + api_name="/get_metric_values", + ) + assert len(loss_values) == 1 + assert abs(float(loss_values[0]["value"]) - 0.42) < 1e-6 + + acc_values = client.predict( + project=project_name, + run=run_name, + metric_name="acc", + api_name="/get_metric_values", + ) + assert len(acc_values) == 1 + assert abs(float(acc_values[0]["value"]) - 0.88) < 1e-6 diff --git a/trackio/README.md b/trackio/README.md index 65d5a3105..a344591b2 100644 --- a/trackio/README.md +++ b/trackio/README.md @@ -4,7 +4,7 @@ sdk_version: {GRADIO_VERSION} app_file: {APP_FILE} tags: - trackio -hf_oauth: true +{LINKED_HUB_METADATA}hf_oauth: true hf_oauth_scopes: - write-repos --- \ No newline at end of file diff --git a/trackio/__init__.py b/trackio/__init__.py index c3e08bb81..38ab74c2a 100644 --- a/trackio/__init__.py +++ b/trackio/__init__.py @@ -85,6 +85,7 @@ config = {} _atexit_registered = False +_projects_notified_auto_log_hw: set[str] = set() def _cleanup_current_run(): @@ -103,6 +104,7 @@ def init( space_id: str | None = None, space_storage: SpaceStorage | None = None, dataset_id: str | None = None, + bucket_id: str | None = None, config: dict | None = None, resume: str = "never", settings: Any = None, @@ -137,13 +139,18 @@ def init( space_storage ([`~huggingface_hub.SpaceStorage`], *optional*): Choice of persistent storage tier. dataset_id (`str`, *optional*): - If a `space_id` is provided, a persistent Hugging Face Dataset will be - created and the metrics will be synced to it every 5 minutes. Specify a - Dataset with name like `"username/datasetname"` or `"orgname/datasetname"`, - or `"datasetname"` (uses currently-logged-in Hugging Face user's namespace), - or `None` (uses the same name as the Space but with the `"_dataset"` - suffix). If the Dataset does not exist, it will be created. If the Dataset - already exists, the project will be appended to it. + If provided, uses the legacy Hugging Face Dataset backend for metric + persistence (metrics are exported to Parquet and committed every 5 minutes). + Specify a Dataset with name like `"username/datasetname"` or + `"orgname/datasetname"`, or `"datasetname"` (uses currently-logged-in + Hugging Face user's namespace). Cannot be used together with `bucket_id`. + bucket_id (`str`, *optional*): + The ID of the Hugging Face Bucket to use for metric persistence. By default, + when a `space_id` is provided and neither `dataset_id` nor `bucket_id` is + explicitly set, a bucket is auto-generated from the space_id. Buckets provide + S3-like storage without git overhead - the SQLite database is stored directly + via `hf-mount` in the Space. Specify a Bucket with name like + `"username/bucketname"` or just `"bucketname"`. config (`dict`, *optional*): A dictionary of configuration options. Provided for compatibility with `wandb.init()`. @@ -194,11 +201,14 @@ def init( ) space_id = space_id or os.environ.get("TRACKIO_SPACE_ID") + bucket_id = bucket_id or os.environ.get("TRACKIO_BUCKET_ID") if space_id is None and dataset_id is not None: raise ValueError("Must provide a `space_id` when `dataset_id` is provided.") + if dataset_id is not None and bucket_id is not None: + raise ValueError("Cannot provide both `dataset_id` and `bucket_id`.") try: - space_id, dataset_id = utils.preprocess_space_and_dataset_ids( - space_id, dataset_id + space_id, dataset_id, bucket_id = utils.preprocess_space_and_dataset_ids( + space_id, dataset_id, bucket_id ) except LocalTokenNotFoundError as e: raise LocalTokenNotFoundError( @@ -221,7 +231,13 @@ def init( ): print(f"* Trackio project initialized: {project}") - if dataset_id is not None: + if bucket_id is not None: + os.environ["TRACKIO_BUCKET_ID"] = bucket_id + bucket_url = f"https://huggingface.co/buckets/{bucket_id}" + print( + f"* Trackio metrics will be synced to Hugging Face Bucket: {bucket_url}" + ) + elif dataset_id is not None: os.environ["TRACKIO_DATASET_ID"] = dataset_id print( f"* Trackio metrics will be synced to Hugging Face Dataset: {dataset_id}" @@ -233,13 +249,19 @@ def init( utils.print_dashboard_instructions(project) else: deploy.create_space_if_not_exists( - space_id, space_storage, dataset_id, private + space_id, + space_storage, + dataset_id, + bucket_id, + private, ) user_name, space_name = space_id.split("/") space_url = deploy.SPACE_HOST_URL.format( user_name=user_name, space_name=space_name ) - print(f"* View dashboard by going to: {space_url}") + print( + f"* View dashboard by going to: {deploy._BOLD_ORANGE}{space_url}{deploy._RESET}" + ) if utils.is_in_notebook() and embed: utils.embed_url_in_notebook(space_url) context_vars.current_project.set(project) @@ -268,10 +290,15 @@ def init( nvidia_available = gpu_available() apple_available = apple_gpu_available() auto_log_gpu = nvidia_available or apple_available - if nvidia_available: - print("* NVIDIA GPU detected, enabling automatic GPU metrics logging") - elif apple_available: - print("* Apple Silicon detected, enabling automatic system metrics logging") + if project not in _projects_notified_auto_log_hw: + if nvidia_available: + print("* NVIDIA GPU detected, enabling automatic GPU metrics logging") + elif apple_available: + print( + "* Apple Silicon detected, enabling automatic system metrics logging" + ) + if nvidia_available or apple_available: + _projects_notified_auto_log_hw.add(project) run = Run( url=url, diff --git a/trackio/bucket_storage.py b/trackio/bucket_storage.py new file mode 100644 index 000000000..874b8322f --- /dev/null +++ b/trackio/bucket_storage.py @@ -0,0 +1,40 @@ +import sqlite3 + +import huggingface_hub +from huggingface_hub import sync_bucket + +from trackio.sqlite_storage import SQLiteStorage +from trackio.utils import MEDIA_DIR, TRACKIO_DIR + + +def create_bucket_if_not_exists(bucket_id: str, private: bool | None = None) -> None: + huggingface_hub.create_bucket(bucket_id, private=private or False, exist_ok=True) + + +def download_bucket_to_trackio_dir(bucket_id: str) -> None: + TRACKIO_DIR.mkdir(parents=True, exist_ok=True) + sync_bucket( + source=f"hf://buckets/{bucket_id}", + dest=str(TRACKIO_DIR), + quiet=True, + ) + + +def upload_project_to_bucket(project: str, bucket_id: str) -> None: + db_path = SQLiteStorage.get_project_db_path(project) + if not db_path.exists(): + raise FileNotFoundError(f"No database found for project '{project}'") + + with sqlite3.connect(str(db_path), timeout=30.0) as conn: + conn.execute("PRAGMA wal_checkpoint(TRUNCATE)") + + files_to_add = [(str(db_path), db_path.name)] + + media_dir = MEDIA_DIR / project + if media_dir.exists(): + for media_file in media_dir.rglob("*"): + if media_file.is_file(): + rel = media_file.relative_to(TRACKIO_DIR) + files_to_add.append((str(media_file), str(rel))) + + huggingface_hub.batch_bucket_files(bucket_id, add=files_to_add) diff --git a/trackio/deploy.py b/trackio/deploy.py index b12ead64b..7f8ff5f44 100644 --- a/trackio/deploy.py +++ b/trackio/deploy.py @@ -16,12 +16,17 @@ import tomli as tomllib import gradio +import httpx import huggingface_hub from gradio_client import Client, handle_file from httpx import ReadTimeout from huggingface_hub.errors import HfHubHTTPError, RepositoryNotFoundError import trackio +from trackio.bucket_storage import create_bucket_if_not_exists, upload_project_to_bucket +from trackio.space_volumes import ( + attach_bucket_volume, +) from trackio.sqlite_storage import SQLiteStorage from trackio.utils import ( MEDIA_DIR, @@ -31,6 +36,17 @@ SPACE_HOST_URL = "https://{user_name}-{space_name}.hf.space/" SPACE_URL = "https://huggingface.co/spaces/{space_id}" +_BOLD_ORANGE = "\033[1m\033[38;5;208m" +_RESET = "\033[0m" + + +def _readme_linked_hub_yaml(dataset_id: str | None) -> str: + if dataset_id is not None: + return f"datasets:\n - {dataset_id}\n" + return "" + + +_SPACE_APP_PY = "import trackio\ntrackio.show()\n" def _retry_hf_write(op_name: str, fn, retries: int = 4, initial_delay: float = 1.5): @@ -99,6 +115,7 @@ def deploy_as_space( space_id: str, space_storage: huggingface_hub.SpaceStorage | None = None, dataset_id: str | None = None, + bucket_id: str | None = None, private: bool | None = None, ): if ( @@ -106,6 +123,11 @@ def deploy_as_space( ): # in case a repo with this function is uploaded to spaces return + if dataset_id is not None and bucket_id is not None: + raise ValueError( + "Cannot use bucket volume options together with dataset_id; use one persistence mode." + ) + trackio_path = files("trackio") hf_api = huggingface_hub.HfApi() @@ -138,10 +160,16 @@ def deploy_as_space( # Make sure necessary dependencies are installed by creating a requirements.txt. is_source_install = _is_trackio_installed_from_source() + if bucket_id is not None: + create_bucket_if_not_exists(bucket_id, private=private) + with open(Path(trackio_path, "README.md"), "r") as f: readme_content = f.read() readme_content = readme_content.replace("{GRADIO_VERSION}", gradio.__version__) readme_content = readme_content.replace("{APP_FILE}", "app.py") + readme_content = readme_content.replace( + "{LINKED_HUB_METADATA}", _readme_linked_hub_yaml(dataset_id) + ) readme_buffer = io.BytesIO(readme_content.encode("utf-8")) hf_api.upload_file( path_or_fileobj=readme_buffer, @@ -193,8 +221,7 @@ def deploy_as_space( ], ) - app_file_content = """import trackio -trackio.show()""" + app_file_content = _SPACE_APP_PY app_file_buffer = io.BytesIO(app_file_content.encode("utf-8")) hf_api.upload_file( path_or_fileobj=app_file_buffer, @@ -205,7 +232,16 @@ def deploy_as_space( if hf_token := huggingface_hub.utils.get_token(): huggingface_hub.add_space_secret(space_id, "HF_TOKEN", hf_token) - if dataset_id is not None: + if bucket_id is not None: + changed = attach_bucket_volume( + space_id, + bucket_id, + mount_path="/data", + ) + huggingface_hub.add_space_variable(space_id, "TRACKIO_DIR", "/data/trackio") + if changed: + print(f"* Attached bucket {bucket_id} at '/data'") + elif dataset_id is not None: huggingface_hub.add_space_variable(space_id, "TRACKIO_DATASET_ID", dataset_id) if logo_light_url := os.environ.get("TRACKIO_LOGO_LIGHT_URL"): huggingface_hub.add_space_variable( @@ -226,6 +262,7 @@ def create_space_if_not_exists( space_id: str, space_storage: huggingface_hub.SpaceStorage | None = None, dataset_id: str | None = None, + bucket_id: str | None = None, private: bool | None = None, ) -> None: """ @@ -238,6 +275,9 @@ def create_space_if_not_exists( Choice of persistent storage tier for the Space. dataset_id (`str`, *optional*): The ID of the Dataset to add to the Space as a space variable. + bucket_id (`str`, *optional*): + Full Hub bucket id (`namespace/name`) to attach via the Hub volumes API (platform mount). + Sets `TRACKIO_DIR` to the mount path; do not combine with `dataset_id`. private (`bool`, *optional*): Whether to make the Space private. If `None` (default), the repo will be public unless the organization's default is private. This value is ignored @@ -251,9 +291,15 @@ def create_space_if_not_exists( raise ValueError( f"Invalid dataset ID: {dataset_id}. Must be in the format: username/datasetname or orgname/datasetname." ) + if bucket_id is not None and "/" not in bucket_id: + raise ValueError( + f"Invalid bucket ID: {bucket_id}. Must be in the format: username/bucketname or orgname/bucketname." + ) try: huggingface_hub.repo_info(space_id, repo_type="space") - print(f"* Found existing space: {SPACE_URL.format(space_id=space_id)}") + print( + f"* Found existing space: {_BOLD_ORANGE}{SPACE_URL.format(space_id=space_id)}{_RESET}" + ) return except RepositoryNotFoundError: pass @@ -264,8 +310,16 @@ def create_space_if_not_exists( else: raise ValueError(f"Failed to create Space: {e}") - print(f"* Creating new space: {SPACE_URL.format(space_id=space_id)}") - deploy_as_space(space_id, space_storage, dataset_id, private) + print( + f"* Creating new space: {_BOLD_ORANGE}{SPACE_URL.format(space_id=space_id)}{_RESET}" + ) + deploy_as_space( + space_id, + space_storage, + dataset_id, + bucket_id, + private, + ) print("* Waiting for Space to be ready...") _wait_until_space_running(space_id) @@ -274,15 +328,32 @@ def _wait_until_space_running(space_id: str, timeout: int = 300) -> None: hf_api = huggingface_hub.HfApi() start = time.time() delay = 2 + request_timeout = 45.0 + failure_stages = frozenset( + ("NO_APP_FILE", "CONFIG_ERROR", "BUILD_ERROR", "RUNTIME_ERROR") + ) while time.time() - start < timeout: try: - info = hf_api.space_info(space_id) - if info.runtime and info.runtime.stage == "RUNNING": - return - except (huggingface_hub.utils.HfHubHTTPError, ReadTimeout): + info = hf_api.space_info(space_id, timeout=request_timeout) + if info.runtime: + stage = str(info.runtime.stage) + if stage in failure_stages: + raise RuntimeError( + f"Space {space_id} entered terminal stage {stage}. " + "Fix README.md or app files; see build logs on the Hub." + ) + if stage == "RUNNING": + return + except RuntimeError: + raise + except (huggingface_hub.utils.HfHubHTTPError, httpx.RequestError): pass time.sleep(delay) delay = min(delay * 1.5, 15) + raise TimeoutError( + f"Space {space_id} did not reach RUNNING within {timeout}s. " + "Check status and build logs on the Hub." + ) def wait_until_space_exists( @@ -304,7 +375,7 @@ def wait_until_space_exists( try: hf_api.space_info(space_id) return - except (huggingface_hub.utils.HfHubHTTPError, ReadTimeout): + except (huggingface_hub.utils.HfHubHTTPError, httpx.RequestError): time.sleep(delay) delay = min(delay * 2, 60) raise TimeoutError("Waiting for space to exist took longer than expected") @@ -450,7 +521,9 @@ def sync_incremental( ) SQLiteStorage.set_project_metadata(project, "space_id", space_id) - print(f"* Synced successfully to space: {SPACE_URL.format(space_id=space_id)}") + print( + f"* Synced successfully to space: {_BOLD_ORANGE}{SPACE_URL.format(space_id=space_id)}{_RESET}" + ) def upload_dataset_for_static( @@ -503,8 +576,9 @@ def upload_dataset_for_static( def deploy_as_static_space( space_id: str, - dataset_id: str, + dataset_id: str | None, project: str, + bucket_id: str | None = None, private: bool | None = None, hf_token: str | None = None, ) -> None: @@ -535,7 +609,10 @@ def deploy_as_static_space( else: raise ValueError(f"Failed to create Space: {e}") - readme_content = "---\nsdk: static\npinned: false\ntags:\n - trackio\n---\n" + linked = _readme_linked_hub_yaml(dataset_id) + readme_content = ( + f"---\nsdk: static\npinned: false\ntags:\n - trackio\n{linked}---\n" + ) _retry_hf_write( "Static Space README upload", lambda: hf_api.upload_file( @@ -567,10 +644,13 @@ def deploy_as_static_space( config = { "mode": "static", - "dataset_id": dataset_id, "project": project, "private": bool(private), } + if bucket_id is not None: + config["bucket_id"] = bucket_id + if dataset_id is not None: + config["dataset_id"] = dataset_id if hf_token and private: config["hf_token"] = hf_token @@ -596,7 +676,9 @@ def deploy_as_static_space( ), ) - print(f"* Static Space deployed: {SPACE_URL.format(space_id=space_id)}") + print( + f"* Static Space deployed: {_BOLD_ORANGE}{SPACE_URL.format(space_id=space_id)}{_RESET}" + ) def sync( @@ -607,6 +689,7 @@ def sync( run_in_background: bool = False, sdk: str = "gradio", dataset_id: str | None = None, + bucket_id: str | None = None, ) -> str: """ Syncs a local Trackio project's database to a Hugging Face Space. @@ -631,7 +714,11 @@ def sync( server. `"static"` deploys a static Space that reads from an HF Dataset (no server needed). dataset_id (`str`, *optional*): - The ID of the HF Dataset for static mode. Auto-generated from space_id if not provided. + The ID of the HF Dataset to sync to. When provided, uses the legacy + Dataset backend instead of Buckets. + bucket_id (`str`, *optional*): + The ID of the HF Bucket to sync to. By default, a bucket is auto-generated + from the space_id. Set `dataset_id` to use the legacy Dataset backend instead. Returns: `str`: The Space ID of the synced project. """ @@ -641,21 +728,48 @@ def sync( space_id = SQLiteStorage.get_space_id(project) if space_id is None: space_id = f"{project}-{get_or_create_project_hash(project)}" - space_id, dataset_id = preprocess_space_and_dataset_ids(space_id, dataset_id) + space_id, dataset_id, bucket_id = preprocess_space_and_dataset_ids( + space_id, dataset_id, bucket_id + ) def _do_sync(): if sdk == "static": - upload_dataset_for_static(project, dataset_id, private=private) - hf_token = huggingface_hub.utils.get_token() if private else None - deploy_as_static_space( - space_id, - dataset_id, - project, - private=private, - hf_token=hf_token, - ) + if dataset_id is not None: + upload_dataset_for_static(project, dataset_id, private=private) + hf_token = huggingface_hub.utils.get_token() if private else None + deploy_as_static_space( + space_id, + dataset_id, + project, + private=private, + hf_token=hf_token, + ) + elif bucket_id is not None: + create_bucket_if_not_exists(bucket_id, private=private) + upload_project_to_bucket(project, bucket_id) + print( + f"* Project data uploaded to bucket: https://huggingface.co/buckets/{bucket_id}" + ) + deploy_as_static_space( + space_id, + None, + project, + bucket_id=bucket_id, + private=private, + hf_token=huggingface_hub.utils.get_token() if private else None, + ) else: - sync_incremental(project, space_id, private=private, pending_only=False) + if bucket_id is not None: + create_bucket_if_not_exists(bucket_id, private=private) + upload_project_to_bucket(project, bucket_id) + print( + f"* Project data uploaded to bucket: https://huggingface.co/buckets/{bucket_id}" + ) + create_space_if_not_exists( + space_id, bucket_id=bucket_id, private=private + ) + else: + sync_incremental(project, space_id, private=private, pending_only=False) SQLiteStorage.set_project_metadata(project, "space_id", space_id) if run_in_background: diff --git a/trackio/dummy_commit_scheduler.py b/trackio/dummy_commit_scheduler.py index 0f5015e14..6068fe8a3 100644 --- a/trackio/dummy_commit_scheduler.py +++ b/trackio/dummy_commit_scheduler.py @@ -1,4 +1,6 @@ -# A dummy object to fit the interface of huggingface_hub's CommitScheduler +from concurrent.futures import Future + + class DummyCommitSchedulerLock: def __enter__(self): return None @@ -10,3 +12,8 @@ def __exit__(self, exception_type, exception_value, exception_traceback): class DummyCommitScheduler: def __init__(self): self.lock = DummyCommitSchedulerLock() + + def trigger(self) -> Future: + fut: Future = Future() + fut.set_result(None) + return fut diff --git a/trackio/imports.py b/trackio/imports.py index 993d32b21..16ecfb9b4 100644 --- a/trackio/imports.py +++ b/trackio/imports.py @@ -132,7 +132,9 @@ def import_csv( ) print(f"* Metrics found: {', '.join(metrics_list[0].keys())}") - space_id, dataset_id = utils.preprocess_space_and_dataset_ids(space_id, dataset_id) + space_id, dataset_id, _ = utils.preprocess_space_and_dataset_ids( + space_id, dataset_id + ) if dataset_id is not None: os.environ["TRACKIO_DATASET_ID"] = dataset_id print(f"* Trackio metrics will be synced to Hugging Face Dataset: {dataset_id}") @@ -286,7 +288,9 @@ def import_tf_events( print(f"* Total imported events: {total_imported}") print(f"* Created runs: {', '.join(imported_runs)}") - space_id, dataset_id = utils.preprocess_space_and_dataset_ids(space_id, dataset_id) + space_id, dataset_id, _ = utils.preprocess_space_and_dataset_ids( + space_id, dataset_id + ) if dataset_id is not None: os.environ["TRACKIO_DATASET_ID"] = dataset_id print(f"* Trackio metrics will be synced to Hugging Face Dataset: {dataset_id}") diff --git a/trackio/server.py b/trackio/server.py index e7a87c87e..a0494cfed 100644 --- a/trackio/server.py +++ b/trackio/server.py @@ -1,11 +1,15 @@ """The main API layer for the Trackio UI.""" import base64 +import logging import os import re import secrets import shutil +import sqlite3 +import threading import time +from collections import deque from functools import lru_cache from typing import Any from urllib.parse import urlencode @@ -23,6 +27,66 @@ HfApi = hf.HfApi() +logger = logging.getLogger("trackio") + +_write_queue: deque[tuple[str, Any]] = deque() +_flush_thread: threading.Thread | None = None +_flush_lock = threading.Lock() +_FLUSH_INTERVAL = 2.0 +_MAX_RETRIES = 30 + + +def _enqueue_write(kind: str, payload: Any) -> None: + _write_queue.append((kind, payload)) + _ensure_flush_thread() + + +def _ensure_flush_thread() -> None: + global _flush_thread + with _flush_lock: + if _flush_thread is not None and _flush_thread.is_alive(): + return + _flush_thread = threading.Thread(target=_flush_loop, daemon=True) + _flush_thread.start() + + +def _flush_loop() -> None: + retries = 0 + while _write_queue and retries < _MAX_RETRIES: + kind, payload = _write_queue[0] + try: + if kind == "bulk_log": + SQLiteStorage.bulk_log(**payload) + elif kind == "bulk_log_system": + SQLiteStorage.bulk_log_system(**payload) + elif kind == "bulk_alert": + SQLiteStorage.bulk_alert(**payload) + _write_queue.popleft() + retries = 0 + except sqlite3.OperationalError as e: + msg = str(e).lower() + if "disk i/o error" in msg or "readonly" in msg: + retries += 1 + logger.warning( + "write queue: flush failed (%s), retry %d/%d", + e, + retries, + _MAX_RETRIES, + ) + time.sleep(min(_FLUSH_INTERVAL * retries, 15.0)) + else: + logger.error("write queue: non-retryable error (%s), dropping entry", e) + _write_queue.popleft() + retries = 0 + if _write_queue: + logger.error( + "write queue: giving up after %d retries, %d entries dropped", + _MAX_RETRIES, + len(_write_queue), + ) + _write_queue.clear() + + write_token = secrets.token_urlsafe(32) OAUTH_CALLBACK_PATH = "/login/callback" @@ -345,7 +409,7 @@ def bulk_log( for (project, run), data in logs_by_run.items(): has_log_ids = any(lid is not None for lid in data["log_ids"]) - SQLiteStorage.bulk_log( + payload = dict( project=project, run=run, metrics_list=data["metrics"], @@ -353,6 +417,10 @@ def bulk_log( config=data["config"], log_ids=data["log_ids"] if has_log_ids else None, ) + try: + SQLiteStorage.bulk_log(**payload) + except sqlite3.OperationalError: + _enqueue_write("bulk_log", payload) def bulk_log_system( @@ -372,13 +440,17 @@ def bulk_log_system( for (project, run), data in logs_by_run.items(): has_log_ids = any(lid is not None for lid in data["log_ids"]) - SQLiteStorage.bulk_log_system( + payload = dict( project=project, run=run, metrics_list=data["metrics"], timestamps=data["timestamps"], log_ids=data["log_ids"] if has_log_ids else None, ) + try: + SQLiteStorage.bulk_log_system(**payload) + except sqlite3.OperationalError: + _enqueue_write("bulk_log_system", payload) def bulk_alert( @@ -408,7 +480,7 @@ def bulk_alert( for (project, run), data in alerts_by_run.items(): has_alert_ids = any(aid is not None for aid in data["alert_ids"]) - SQLiteStorage.bulk_alert( + payload = dict( project=project, run=run, titles=data["titles"], @@ -418,6 +490,10 @@ def bulk_alert( timestamps=data["timestamps"], alert_ids=data["alert_ids"] if has_alert_ids else None, ) + try: + SQLiteStorage.bulk_alert(**payload) + except sqlite3.OperationalError: + _enqueue_write("bulk_alert", payload) def get_alerts( @@ -590,6 +666,8 @@ def rename_run( def force_sync() -> bool: + if os.environ.get("TRACKIO_BUCKET_ID"): + return True SQLiteStorage._dataset_import_attempted = True SQLiteStorage.export_to_parquet() scheduler = SQLiteStorage.get_scheduler() diff --git a/trackio/space_volumes.py b/trackio/space_volumes.py new file mode 100644 index 000000000..b496b2aef --- /dev/null +++ b/trackio/space_volumes.py @@ -0,0 +1,102 @@ +from __future__ import annotations + +from typing import Any + +import huggingface_hub +from huggingface_hub.constants import ENDPOINT +from huggingface_hub.utils import get_session, hf_raise_for_status + + +class SpaceBucketConflictError(RuntimeError): + pass + + +def _token(token: str | None) -> str: + t = token if token is not None else huggingface_hub.utils.get_token() + if not t: + raise ValueError( + "A Hugging Face token is required. Set HF_TOKEN or run huggingface_hub.login()." + ) + return t + + +def _space_repo_api_url(space_id: str) -> str: + if "/" not in space_id: + raise ValueError(f"Invalid space_id {space_id!r}; expected 'namespace/repo'.") + namespace, repo = space_id.split("/", 1) + base = ENDPOINT.rstrip("/") + return f"{base}/api/spaces/{namespace}/{repo}" + + +def get_space_volumes( + space_id: str, *, token: str | None = None +) -> list[dict[str, Any]]: + t = _token(token) + hf_api = huggingface_hub.HfApi(token=t) + info = hf_api.space_info(space_id) + if not info.runtime: + return [] + raw = info.runtime.raw + vols = raw.get("volumes") + return list(vols) if vols else [] + + +def set_space_volumes( + space_id: str, + volumes: list[dict[str, Any]], + *, + token: str | None = None, +) -> None: + t = _token(token) + url = f"{_space_repo_api_url(space_id)}/volumes" + headers = {"Authorization": f"Bearer {t}"} + r = get_session().put( + url, + headers=headers, + json={"volumes": volumes}, + timeout=120.0, + ) + hf_raise_for_status(r) + + +def attach_bucket_volume( + space_id: str, + bucket_id: str, + *, + mount_path: str = "/data", + read_only: bool = False, + token: str | None = None, +) -> bool: + if not mount_path.startswith("/"): + raise ValueError("mount_path must be an absolute path (e.g. '/data').") + existing = get_space_volumes(space_id, token=token) + bucket_sources = [ + v["source"] for v in existing if v.get("type") == "bucket" and "source" in v + ] + if bucket_sources: + if bucket_id not in bucket_sources: + raise SpaceBucketConflictError( + f"Space {space_id!r} already mounts bucket volume(s) {bucket_sources!r}; " + f"cannot attach {bucket_id!r}. Remove the existing mount in Space settings or use that bucket." + ) + for v in existing: + if v.get("type") != "bucket" or v.get("source") != bucket_id: + continue + same_mount = v.get("mountPath") == mount_path + existing_ro = bool(v.get("readOnly")) + same_ro = existing_ro == read_only + if same_mount and same_ro: + return False + raise SpaceBucketConflictError( + f"Bucket {bucket_id!r} is already mounted with different mountPath/readOnly; " + f"update in Space settings instead." + ) + new_vol: dict[str, Any] = { + "type": "bucket", + "source": bucket_id, + "mountPath": mount_path, + } + if read_only: + new_vol["readOnly"] = True + set_space_volumes(space_id, existing + [new_vol], token=token) + return True diff --git a/trackio/sqlite_storage.py b/trackio/sqlite_storage.py index 7de4bfd97..3b84a8a09 100644 --- a/trackio/sqlite_storage.py +++ b/trackio/sqlite_storage.py @@ -33,6 +33,24 @@ DB_EXT = ".db" +_JOURNAL_MODE_WHITELIST = frozenset( + {"wal", "delete", "truncate", "persist", "memory", "off"} +) + + +def _configure_sqlite_pragmas(conn: sqlite3.Connection) -> None: + override = os.environ.get("TRACKIO_SQLITE_JOURNAL_MODE", "").strip().lower() + if override in _JOURNAL_MODE_WHITELIST: + journal = override.upper() + elif os.environ.get("SYSTEM") == "spaces": + journal = "DELETE" + else: + journal = "WAL" + conn.execute(f"PRAGMA journal_mode = {journal}") + conn.execute("PRAGMA synchronous = NORMAL") + conn.execute("PRAGMA temp_store = MEMORY") + conn.execute("PRAGMA cache_size = -20000") + class ProcessLock: """A file-based lock that works across processes using fcntl (Unix) or msvcrt (Windows).""" @@ -81,16 +99,7 @@ class SQLiteStorage: @staticmethod def _get_connection(db_path: Path) -> sqlite3.Connection: conn = sqlite3.connect(str(db_path), timeout=30.0) - # Keep WAL for concurrency + performance on many small writes - conn.execute("PRAGMA journal_mode = WAL") - # ---- Minimal perf tweaks for many tiny transactions ---- - # NORMAL = fsync at critical points only (safer than OFF, much faster than FULL) - conn.execute("PRAGMA synchronous = NORMAL") - # Keep temp data in memory to avoid disk hits during small writes - conn.execute("PRAGMA temp_store = MEMORY") - # Give SQLite a bit more room for cache (negative = KB, engine-managed) - conn.execute("PRAGMA cache_size = -20000") - # -------------------------------------------------------- + _configure_sqlite_pragmas(conn) conn.row_factory = sqlite3.Row return conn @@ -121,14 +130,12 @@ def init_db(project: str) -> Path: Initialize the SQLite database with required tables. Returns the database path. """ + SQLiteStorage._ensure_hub_loaded() db_path = SQLiteStorage.get_project_db_path(project) db_path.parent.mkdir(parents=True, exist_ok=True) with SQLiteStorage._get_process_lock(project): with sqlite3.connect(str(db_path), timeout=30.0) as conn: - conn.execute("PRAGMA journal_mode = WAL") - conn.execute("PRAGMA synchronous = NORMAL") - conn.execute("PRAGMA temp_store = MEMORY") - conn.execute("PRAGMA cache_size = -20000") + _configure_sqlite_pragmas(conn) cursor = conn.cursor() cursor.execute( """ @@ -510,9 +517,7 @@ def get_scheduler(): hf_token = os.environ.get("HF_TOKEN") dataset_id = os.environ.get("TRACKIO_DATASET_ID") space_repo_name = os.environ.get("SPACE_REPO_NAME") - if dataset_id is None or space_repo_name is None: - scheduler = DummyCommitScheduler() - else: + if dataset_id is not None and space_repo_name is not None: scheduler = CommitScheduler( repo_id=dataset_id, repo_type="dataset", @@ -528,6 +533,8 @@ def get_scheduler(): token=hf_token, on_before_commit=SQLiteStorage.export_to_parquet, ) + else: + scheduler = DummyCommitScheduler() SQLiteStorage._current_scheduler = scheduler return scheduler @@ -884,6 +891,7 @@ def has_system_metrics(project: str) -> bool: @staticmethod def get_log_count(project: str, run: str) -> int: + SQLiteStorage._ensure_hub_loaded() db_path = SQLiteStorage.get_project_db_path(project) if not db_path.exists(): return 0 @@ -961,6 +969,17 @@ def get_logs(project: str, run: str, max_points: int | None = None) -> list[dict @staticmethod def load_from_dataset(): + bucket_id = os.environ.get("TRACKIO_BUCKET_ID") + if bucket_id is not None: + if not SQLiteStorage._dataset_import_attempted: + from trackio.bucket_storage import download_bucket_to_trackio_dir + + try: + download_bucket_to_trackio_dir(bucket_id) + except Exception: + pass + SQLiteStorage._dataset_import_attempted = True + return dataset_id = os.environ.get("TRACKIO_DATASET_ID") space_repo_name = os.environ.get("SPACE_REPO_NAME") if dataset_id is not None and space_repo_name is not None: @@ -989,13 +1008,17 @@ def load_from_dataset(): SQLiteStorage.import_from_parquet() SQLiteStorage._dataset_import_attempted = True + @staticmethod + def _ensure_hub_loaded(): + if not SQLiteStorage._dataset_import_attempted: + SQLiteStorage.load_from_dataset() + @staticmethod def get_projects() -> list[str]: """ Get list of all projects by scanning the database files in the trackio directory. """ - if not SQLiteStorage._dataset_import_attempted: - SQLiteStorage.load_from_dataset() + SQLiteStorage._ensure_hub_loaded() projects: set[str] = set() if not TRACKIO_DIR.exists(): @@ -1009,6 +1032,7 @@ def get_projects() -> list[str]: @staticmethod def get_runs(project: str) -> list[str]: """Get list of all runs for a project, ordered by creation time.""" + SQLiteStorage._ensure_hub_loaded() db_path = SQLiteStorage.get_project_db_path(project) if not db_path.exists(): return [] diff --git a/trackio/utils.py b/trackio/utils.py index f291acaa6..15ac0fa56 100644 --- a/trackio/utils.py +++ b/trackio/utils.py @@ -132,7 +132,7 @@ def persistent_storage_enabled() -> bool: def _get_trackio_dir() -> Path: if persistent_storage_enabled(): return Path("/data/trackio") - elif os.environ.get("TRACKIO_DIR"): + if os.environ.get("TRACKIO_DIR"): return Path(os.environ.get("TRACKIO_DIR")) return Path(HF_HOME) / "trackio" @@ -428,10 +428,14 @@ def print_dashboard_instructions(project: str) -> None: def preprocess_space_and_dataset_ids( - space_id: str | None, dataset_id: str | None -) -> tuple[str | None, str | None]: + space_id: str | None, + dataset_id: str | None, + bucket_id: str | None = None, +) -> tuple[str | None, str | None, str | None]: """ - Preprocesses the Space and Dataset names to ensure they are valid "username/space_id" or "username/dataset_id" format. + Preprocesses the Space, Dataset, and Bucket names to ensure they are valid + "username/name" format. When space_id is provided and neither dataset_id nor + bucket_id is explicitly set, auto-generates a bucket_id (default backend). """ if space_id is not None and "/" not in space_id: username = _get_default_namespace() @@ -439,9 +443,12 @@ def preprocess_space_and_dataset_ids( if dataset_id is not None and "/" not in dataset_id: username = _get_default_namespace() dataset_id = f"{username}/{dataset_id}" - if space_id is not None and dataset_id is None: - dataset_id = f"{space_id}-dataset" - return space_id, dataset_id + if bucket_id is not None and "/" not in bucket_id: + username = _get_default_namespace() + bucket_id = f"{username}/{bucket_id}" + if space_id is not None and dataset_id is None and bucket_id is None: + bucket_id = f"{space_id}-bucket" + return space_id, dataset_id, bucket_id def fibo():