diff --git a/huey_monitor/tqdm.py b/huey_monitor/tqdm.py index ea3fd65..704cf66 100644 --- a/huey_monitor/tqdm.py +++ b/huey_monitor/tqdm.py @@ -2,6 +2,8 @@ from django.core.exceptions import ValidationError from django.utils import timezone +from django.db.models import F + from huey.api import Task from huey_monitor.constants import TASK_MODEL_DESC_MAX_LENGTH @@ -87,3 +89,41 @@ def __str__(self): f'{self.task.name} - {self.desc} {self.total_progress}/{self.total}{self.unit}' f' (divisor: {self.unit_divisor})' ) + + def update_parent_progress(self, n=1): + """ + Increment the parent TaskModel progress information + as well as update_dt + """ + assert self.parent_task_id + update_task_progress(self.parent_task_id, n) + + def make_complete(self): + """ + Update TaskModel.total based on TaskModel.progress_count + to mark task with unkown number of steps complete + """ + + TaskModel.objects.filter(task_id=self.task.id).update( + total=self.total_progress + ) + self.total = self.total_progress + + +def make_task_complete(task_id): + """ + Update the TaskModel.total corresponding to a given Huey task to match its current progress_count + Used to mark task with unkown number of steps complete + """ + TaskModel.objects.filter(task_id=task_id).update( + update_dt=timezone.now(), total=F('progress_count') + ) + +def update_task_progress(task_id, n=1): + """ + Increment the TaskModel.progress_count corresponding to a given Huey task + and update update_dt + """ + TaskModel.objects.filter(task_id=task_id).update( + update_dt=timezone.now(), progress_count=F('progress_count') + n + ) diff --git a/huey_monitor_tests/test_app/tasks.py b/huey_monitor_tests/test_app/tasks.py index 9c25b37..791efff 100644 --- a/huey_monitor_tests/test_app/tasks.py +++ b/huey_monitor_tests/test_app/tasks.py @@ -2,13 +2,14 @@ import math import sys import time +import random from bx_py_utils.iteration import chunk_iterable from huey import crontab from huey.contrib.djhuey import lock_task, periodic_task, task from huey_monitor.models import TaskModel -from huey_monitor.tqdm import ProcessInfo +from huey_monitor.tqdm import ProcessInfo, make_task_complete logger = logging.getLogger(__name__) @@ -129,3 +130,66 @@ def parallel_task(task, total=2000, task_num=3, **info_kwargs): logger.info('Start sub task no. %i', no) time.sleep(5) parallel_sub_task(parent_task_id=task.id, item_chunk=chunk, **info_kwargs) + +@task(context=True, retries=1) +def sub_task_recursive(task, parent_task_id): + """ + Example of implementation for recursive tasks where final number of sub-tasks is unknown. + Each recursive sub-task will refer to the same task as their parent task + """ + logger.info('Recursive sub task started from main task: %s', parent_task_id) + TaskModel.objects.set_parent_task( + main_task_id=parent_task_id, + sub_task_id=task.id, + ) + # let's consider we don't know yet the number of steps + process_info = ProcessInfo(task, desc='Recursive task execution', total=999) + + continue_with_next_step = True + + while continue_with_next_step: + # we execute the step: + continue_with_next_step = random.randrange(100)<80 + + # progress_count is incremented (default incrementation step is 1): + process_info.update() + # process_info.update(n=10) for incrementing by 10 + + # Update TaskModel.total based on TaskModel.progress_count : + # (was 999 before because final number of steps was unknown) + process_info.make_complete() + + # progress_count of the parent task is incremented (default incrementation step is 1): + process_info.update_parent_progress() + # process_info.update_parent_progress(n=5) for incrementing by 5 + + # for convenience purpose, the function 'huey_monitor.tqdm.update_task_progress(task_id, n=1)' + # can also be called if 'process_info' is not declared in current code + + # we now test if conditions are met to exit the recursive loop: + + condition_for_recursive_loop_exit = random.randrange(10)>7 + if condition_for_recursive_loop_exit: + logger.info('This was the last of the recursive sub tasks') + + # Update TaskModel.total based on TaskModel.progress_count for the parent: + # (was 999 before because final number of sub-tasks was unknown) + make_task_complete(parent_task_id) + else: + # next recursive task is launched: + sub_task_recursive(parent_task_id=parent_task_id) + + +@task(context=True) +def main_task_recursive(task): + """ + Example of implementation for recursive tasks where final number of sub-tasks is unknown. + This is the parent task which launches the recursive search + and will act as parent task for all sub-tasks + """ + logger.info('Main task %s starts recursive sub tasks', task.id) + process_info = ProcessInfo(task, desc='Launching recursive tasks', total=999) + # we don't know yet the number of sub-tasks + + sub_task_recursive(parent_task_id=task.id) +