77import tempfile
88import threading
99import time
10+ from collections import Counter
1011from importlib .resources import files
1112from pathlib import Path
1213
@@ -498,11 +499,13 @@ def sync_incremental(
498499
499500 client = Client (space_id , verbose = False , httpx_kwargs = {"timeout" : 90 })
500501 hf_token = huggingface_hub .utils .get_token ()
502+ expected_run_counts : Counter [str ] = Counter ()
501503
502504 if pending_only :
503505 pending_logs = SQLiteStorage .get_pending_logs (project )
504506 if pending_logs :
505507 logs = pending_logs ["logs" ]
508+ expected_run_counts .update (log ["run" ] for log in logs )
506509 for i in range (0 , len (logs ), SYNC_BATCH_SIZE ):
507510 batch = logs [i : i + SYNC_BATCH_SIZE ]
508511 print (
@@ -550,6 +553,7 @@ def sync_incremental(
550553 else :
551554 all_logs = SQLiteStorage .get_all_logs_for_sync (project )
552555 if all_logs :
556+ expected_run_counts .update (log ["run" ] for log in all_logs )
553557 for i in range (0 , len (all_logs ), SYNC_BATCH_SIZE ):
554558 batch = all_logs [i : i + SYNC_BATCH_SIZE ]
555559 print (
@@ -568,12 +572,52 @@ def sync_incremental(
568572 api_name = "/bulk_log_system" , logs = batch , hf_token = hf_token
569573 )
570574
575+ _wait_for_remote_sync (client , project , expected_run_counts )
571576 SQLiteStorage .set_project_metadata (project , "space_id" , space_id )
572577 print (
573578 f"* Synced successfully to space: { _BOLD_ORANGE } { SPACE_URL .format (space_id = space_id )} { _RESET } "
574579 )
575580
576581
582+ def _wait_for_remote_sync (
583+ client : Client ,
584+ project : str ,
585+ expected_run_counts : Counter [str ],
586+ timeout : int = 180 ,
587+ ) -> None :
588+ if not expected_run_counts :
589+ return
590+
591+ deadline = time .time () + timeout
592+ delay = 2
593+ last_error : Exception | None = None
594+ pending = dict (expected_run_counts )
595+
596+ while time .time () < deadline and pending :
597+ completed = []
598+ for run_name , expected_num_logs in pending .items ():
599+ try :
600+ summary = client .predict (
601+ project = project , run = run_name , api_name = "/get_run_summary"
602+ )
603+ if summary .get ("num_logs" ) == expected_num_logs :
604+ completed .append (run_name )
605+ except Exception as e :
606+ last_error = e
607+ for run_name in completed :
608+ pending .pop (run_name , None )
609+ if pending :
610+ time .sleep (delay )
611+ delay = min (delay * 1.5 , 15 )
612+
613+ if pending :
614+ raise TimeoutError (
615+ f"Remote sync for project '{ project } ' did not become visible for runs "
616+ f"{ sorted (pending .items ())} within { timeout } s. "
617+ f"Last error: { last_error !r} "
618+ )
619+
620+
577621def upload_dataset_for_static (
578622 project : str ,
579623 dataset_id : str ,
@@ -831,6 +875,14 @@ def _do_sync():
831875 create_space_if_not_exists (
832876 space_id , bucket_id = bucket_id , private = private
833877 )
878+ _wait_for_remote_sync (
879+ Client (space_id , verbose = False , httpx_kwargs = {"timeout" : 90 }),
880+ project ,
881+ Counter (
882+ log ["run" ]
883+ for log in SQLiteStorage .get_all_logs_for_sync (project )
884+ ),
885+ )
834886 else :
835887 sync_incremental (project , space_id , private = private , pending_only = False )
836888 SQLiteStorage .set_project_metadata (project , "space_id" , space_id )
0 commit comments