-
-
Notifications
You must be signed in to change notification settings - Fork 1.3k
Expand file tree
/
Copy pathprocess.py
More file actions
1503 lines (1316 loc) Ā· 78.2 KB
/
Copy pathprocess.py
File metadata and controls
1503 lines (1316 loc) Ā· 78.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import logging
import asyncio
import json
from typing import Dict, Optional, List, Any, AsyncGenerator
from pydantic import BaseModel, ConfigDict
from ..agent.agent import Agent
from ..task.task import Task
from ..main import display_error
from ..llm import LLM
import csv
import os
class LoopItems(BaseModel):
model_config = ConfigDict(arbitrary_types_allowed=True)
items: List[Any]
class Process:
DEFAULT_RETRY_LIMIT = 3 # Predefined retry limit in a common place
VALIDATION_FAILURE_DECISIONS = ["invalid", "retry", "failed", "error", "unsuccessful", "fail", "errors", "reject", "rejected", "incomplete"] # Decision strings that trigger validation feedback
def __init__(
self,
tasks: Dict[str, Task],
agents: List[Agent],
manager_llm: Optional[str] = None,
verbose: bool = False,
max_iter: int = 10,
output: Optional[str] = None,
):
logging.debug(f"=== Initializing Process ===")
logging.debug(f"Number of tasks: {len(tasks)}")
logging.debug(f"Number of agents: {len(agents)}")
logging.debug(f"Manager LLM: {manager_llm}")
logging.debug(f"Max iterations: {max_iter}")
self.tasks = tasks
self.agents = agents
self.manager_llm = manager_llm
self.max_iter = max_iter
self.task_retry_counter: Dict[str, int] = {} # Initialize retry counter
self.workflow_finished = False # ADDED: Workflow finished flag
# Resolve verbose from output= param (takes precedence) or legacy verbose= param
if output is not None:
# output= takes precedence over verbose=
from ..config.presets import OUTPUT_PRESETS
preset = OUTPUT_PRESETS.get(output, {})
self._verbose = preset.get("verbose", False)
else:
# Backward compat: use legacy verbose= param
self._verbose = verbose
# Keep self.verbose as alias for backward compat
self.verbose = self._verbose
logging.debug(f"Verbose mode: {self._verbose}")
def _create_llm_instance(self):
"""Create and return a configured LLM instance for manager tasks."""
return LLM(model=self.manager_llm, temperature=0.7)
def _parse_manager_instructions(self, response, ManagerInstructions):
"""Parse LLM response and return ManagerInstructions instance.
Args:
response: String response from LLM
ManagerInstructions: Pydantic model class for validation
Returns:
ManagerInstructions instance
Raises:
Exception: If parsing fails
"""
try:
parsed_json = json.loads(response)
return ManagerInstructions(**parsed_json)
except (json.JSONDecodeError, ValueError, TypeError) as e:
raise Exception(f"Failed to parse response: {response}") from e
def _create_loop_subtasks(self, loop_task: Task):
"""Create subtasks for a loop task from input file."""
if not loop_task.input_file:
logging.warning(f"_create_loop_subtasks called for {loop_task.name} but no input_file specified")
return
try:
file_ext = os.path.splitext(loop_task.input_file)[1].lower()
new_tasks = []
if file_ext == ".csv":
with open(loop_task.input_file, "r", encoding="utf-8") as f:
reader = csv.reader(f, quotechar='"', escapechar='\\')
previous_task = None
task_count = 0
for i, row in enumerate(reader):
if not row: # Skip empty rows
continue
# Handle Q&A pairs with potential commas
task_desc = row[0].strip() if row else ""
if len(row) > 1:
question = row[0].strip()
answer = ",".join(field.strip() for field in row[1:])
task_desc = f"Question: {question}\nAnswer: {answer}"
if not task_desc: # Skip rows with empty content
continue
task_count += 1
logging.debug(f"Creating subtask from CSV row {i+1}: {task_desc}")
row_task = Task(
description=f"{loop_task.description}\n{task_desc}" if loop_task.description else task_desc,
agent=loop_task.agent,
name=f"{loop_task.name}_{task_count}" if loop_task.name else task_desc,
expected_output=getattr(loop_task, 'expected_output', None),
on_task_complete=loop_task.callback, # Inherit callback from parent loop task
is_start=(task_count == 1),
task_type="task"
)
self.tasks[row_task.id] = row_task
new_tasks.append(row_task)
if previous_task:
previous_task.next_tasks = [row_task.name]
previous_task = row_task
logging.info(f"Created {task_count} subtasks from CSV file for {loop_task.name}")
else:
# Handle text files
with open(loop_task.input_file, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
previous_task = None
for i, line in enumerate(lines):
if not line.strip(): # Skip empty lines
continue
row_task = Task(
description=f"{loop_task.description}\n{line.strip()}" if loop_task.description else line.strip(),
agent=loop_task.agent,
name=f"{loop_task.name}_{i+1}" if loop_task.name else line.strip(),
expected_output=getattr(loop_task, 'expected_output', None),
on_task_complete=loop_task.callback, # Inherit callback from parent loop task
is_start=(i == 0),
task_type="task"
)
self.tasks[row_task.id] = row_task
new_tasks.append(row_task)
if previous_task:
previous_task.next_tasks = [row_task.name]
previous_task = row_task
logging.info(f"Created {len(new_tasks)} subtasks from text file for {loop_task.name}")
if new_tasks and loop_task.next_tasks:
# Connect last subtask to loop task's next tasks
last_task = new_tasks[-1]
last_task.next_tasks = loop_task.next_tasks
except Exception as e:
logging.error(f"Failed to create subtasks for loop task {loop_task.name}: {e}")
import traceback
traceback.print_exc()
def _build_task_context(self, current_task: Task) -> str:
"""Build context for a task based on its retain_full_context setting"""
# Check if we have validation feedback to include
if current_task.validation_feedback:
feedback = current_task.validation_feedback
context = f"\nPrevious attempt failed validation with reason: {feedback['validation_response']}"
if feedback.get('validated_task'):
context += f"\nValidated task: {feedback['validated_task']}"
if feedback.get('validation_details'):
context += f"\nValidation feedback: {feedback['validation_details']}"
if feedback.get('rejected_output'):
context += f"\nRejected output: {feedback['rejected_output']}"
context += "\nPlease try again with a different approach based on this feedback.\n"
# Clear the feedback after including it to prevent it from persisting
current_task.validation_feedback = None
# If we have validation feedback but no previous tasks context, return just the feedback
if not (current_task.previous_tasks or current_task.context):
return context
# Otherwise, append the regular context
context += "\nInput data from previous tasks:"
elif not (current_task.previous_tasks or current_task.context):
return ""
else:
context = "\nInput data from previous tasks:"
if current_task.retain_full_context:
# Original behavior: include all previous tasks
for prev_name in current_task.previous_tasks:
prev_task = next((t for t in self.tasks.values() if t.name == prev_name), None)
if prev_task and prev_task.result:
context += f"\n{prev_name}: {prev_task.result.raw}"
# Add data from context tasks
if current_task.context:
for ctx_task in current_task.context:
if ctx_task.result and ctx_task.name != current_task.name:
context += f"\n{ctx_task.name}: {ctx_task.result.raw}"
else:
# New behavior: only include the most recent previous task
if current_task.previous_tasks:
# Get the most recent previous task (last in the list)
prev_name = current_task.previous_tasks[-1]
prev_task = next((t for t in self.tasks.values() if t.name == prev_name), None)
if prev_task and prev_task.result:
context += f"\n{prev_name}: {prev_task.result.raw}"
# For context tasks, still include the most recent one
if current_task.context:
# Get the most recent context task with a result
for ctx_task in reversed(current_task.context):
if ctx_task.result and ctx_task.name != current_task.name:
context += f"\n{ctx_task.name}: {ctx_task.result.raw}"
break # Only include the most recent one
return context
def _find_next_not_started_task(self) -> Optional[Task]:
"""Fallback mechanism to find the next 'not started' task."""
fallback_attempts = 0
temp_current_task = None
# Clear previous task context before finding next task
for task in self.tasks.values():
if hasattr(task, 'description') and 'Input data from previous tasks:' in task.description:
task.description = task.description.split('Input data from previous tasks:')[0].strip()
while fallback_attempts < Process.DEFAULT_RETRY_LIMIT and not temp_current_task:
fallback_attempts += 1
logging.debug(f"Fallback attempt {fallback_attempts}: Trying to find next 'not started' task.")
for task_candidate in self.tasks.values():
if task_candidate.status == "not started":
# Check if there's a condition path to this task
current_conditions = task_candidate.condition or {}
leads_to_task = any(
task_value for task_value in current_conditions.values()
if isinstance(task_value, (list, str)) and task_value
)
if not leads_to_task and not task_candidate.next_tasks:
continue # Skip if no valid path exists
if self.task_retry_counter.get(task_candidate.id, 0) < Process.DEFAULT_RETRY_LIMIT:
self.task_retry_counter[task_candidate.id] = self.task_retry_counter.get(task_candidate.id, 0) + 1
temp_current_task = task_candidate
logging.debug(f"Fallback attempt {fallback_attempts}: Found 'not started' task: {temp_current_task.name}, retry count: {self.task_retry_counter[temp_current_task.id]}")
return temp_current_task # Return the found task immediately
else:
logging.debug(f"Max retries reached for task {task_candidate.name} in fallback mode, marking as failed.")
task_candidate.status = "failed"
if not temp_current_task:
logging.debug(f"Fallback attempt {fallback_attempts}: No 'not started' task found within retry limit.")
return None # Return None if no task found after all attempts
async def _get_manager_instructions_with_fallback_async(self, manager_task, manager_prompt, ManagerInstructions):
"""Async version of getting manager instructions with fallback"""
try:
# First try structured output (OpenAI compatible)
logging.info("Attempting structured output...")
return await self._get_structured_response_async(manager_task, manager_prompt, ManagerInstructions)
except Exception as e:
logging.info(f"Structured output failed: {e}, falling back to JSON mode...")
# Fallback to regular JSON mode
try:
# Generate JSON structure description from Pydantic model
try:
schema = ManagerInstructions.model_json_schema()
props_desc = ", ".join([f'"{k}": <{v.get("type", "any")}>' for k, v in schema.get('properties', {}).items()])
required_props = schema.get('required', [])
required_props_str = ', '.join(f'"{p}"' for p in required_props)
required_desc = f" (required: {required_props_str})" if required_props else ""
json_structure_desc = "{" + props_desc + "}"
enhanced_prompt = manager_prompt + f"\n\nIMPORTANT: Respond with valid JSON only, using this exact structure: {json_structure_desc}{required_desc}"
except Exception as schema_error:
logging.warning(f"Could not generate schema for ManagerInstructions: {schema_error}. Using hardcoded prompt.")
# Fallback to hardcoded prompt if schema generation fails
enhanced_prompt = manager_prompt + "\n\nIMPORTANT: Respond with valid JSON only, using this exact structure: {\"task_id\": <int>, \"agent_name\": \"<string>\", \"action\": \"<execute or stop>\"}"
return await self._get_json_response_async(manager_task, enhanced_prompt, ManagerInstructions)
except Exception as fallback_error:
error_msg = f"Both structured output and JSON fallback failed: {fallback_error}"
logging.error(error_msg, exc_info=True)
raise Exception(error_msg) from fallback_error
def _get_manager_instructions_with_fallback(self, manager_task, manager_prompt, ManagerInstructions):
"""Sync version of getting manager instructions with fallback"""
# Create LLM instance with the manager_llm
llm = self._create_llm_instance()
try:
# Use LLM with output_pydantic for structured output
logging.info("Attempting structured output...")
response = llm.get_response(
prompt=manager_prompt,
system_prompt=manager_task.description,
output_pydantic=ManagerInstructions
)
# Parse the response and validate with Pydantic
return self._parse_manager_instructions(response, ManagerInstructions)
except Exception as e:
logging.info(f"Structured output failed: {e}, falling back to JSON mode...")
# Fallback to regular JSON mode with explicit JSON instructions
try:
# Generate JSON structure description from Pydantic model
try:
schema = ManagerInstructions.model_json_schema()
props_desc = ", ".join([f'"{k}": <{v.get("type", "any")}>' for k, v in schema.get('properties', {}).items()])
required_props = schema.get('required', [])
required_props_str = ', '.join(f'"{p}"' for p in required_props)
required_desc = f" (required: {required_props_str})" if required_props else ""
json_structure_desc = "{" + props_desc + "}"
enhanced_prompt = manager_prompt + f"\n\nIMPORTANT: Respond with valid JSON only, using this exact structure: {json_structure_desc}{required_desc}"
except Exception as schema_error:
logging.warning(f"Could not generate schema for ManagerInstructions: {schema_error}. Using hardcoded prompt.")
# Fallback to hardcoded prompt if schema generation fails
enhanced_prompt = manager_prompt + "\n\nIMPORTANT: Respond with valid JSON only, using this exact structure: {\"task_id\": <int>, \"agent_name\": \"<string>\", \"action\": \"<execute or stop>\"}"
response = llm.get_response(
prompt=enhanced_prompt,
system_prompt=manager_task.description,
output_json=True
)
# Parse JSON and validate with Pydantic
return self._parse_manager_instructions(response, ManagerInstructions)
except Exception as fallback_error:
error_msg = f"Both structured output and JSON fallback failed: {fallback_error}"
logging.error(error_msg, exc_info=True)
raise Exception(error_msg) from fallback_error
async def _get_structured_response_async(self, manager_task, manager_prompt, ManagerInstructions):
"""Async version of structured response"""
# Create LLM instance with the manager_llm
llm = self._create_llm_instance()
# Use async get_response with output_pydantic
response = await llm.get_response_async(
prompt=manager_prompt,
system_prompt=manager_task.description,
output_pydantic=ManagerInstructions
)
# Parse the response and validate with Pydantic
return self._parse_manager_instructions(response, ManagerInstructions)
async def _get_json_response_async(self, manager_task, enhanced_prompt, ManagerInstructions):
"""Async version of JSON fallback response"""
# Create LLM instance with the manager_llm
llm = self._create_llm_instance()
response = await llm.get_response_async(
prompt=enhanced_prompt,
system_prompt=manager_task.description,
output_json=True
)
# Parse JSON and validate with Pydantic
return self._parse_manager_instructions(response, ManagerInstructions)
def _check_all_tasks_completed(self) -> bool:
"""Check if all tasks are completed and handle workflow completion.
Returns:
bool: True if all tasks are completed and workflow should exit, False otherwise.
"""
if all(task.status == "completed" for task in self.tasks.values()):
logging.info("All tasks are completed.")
self.workflow_finished = True
return True
return False
async def aworkflow(self) -> AsyncGenerator[str, None]:
"""Async version of workflow method"""
logging.debug("=== Starting Async Workflow ===")
current_iter = 0 # Track how many times we've looped
# Build workflow relationships first
logging.debug("Building workflow relationships...")
for task in self.tasks.values():
if task.next_tasks:
for next_task_name in task.next_tasks:
next_task = next((t for t in self.tasks.values() if t.name == next_task_name), None)
if next_task:
next_task.previous_tasks.append(task.name)
logging.debug(f"Added {task.name} as previous task for {next_task_name}")
# Find start task
logging.debug("Finding start task...")
start_task = None
for task_id, task in self.tasks.items():
if task.is_start:
start_task = task
logging.debug(f"Found marked start task: {task.name} (id: {task_id})")
break
if not start_task:
start_task = list(self.tasks.values())[0]
logging.debug(f"No start task marked, using first task: {start_task.name}")
current_task = start_task
visited_tasks = set()
loop_data = {} # Store loop-specific data
# TODO: start task with loop feature is not available in aworkflow method
while current_task:
current_iter += 1
if current_iter > self.max_iter:
logging.info(f"Max iteration limit {self.max_iter} reached, ending workflow.")
break
# ADDED: Check workflow finished flag at the start of each cycle
if self.workflow_finished:
logging.info("Workflow finished early as all tasks are completed.")
break
# Add task summary at start of each cycle
logging.debug(f"""
=== Workflow Cycle {current_iter} Summary ===
Total tasks: {len(self.tasks)}
Outstanding tasks: {sum(1 for t in self.tasks.values() if t.status != "completed")}
Completed tasks: {sum(1 for t in self.tasks.values() if t.status == "completed")}
Tasks by status:
- Not started: {sum(1 for t in self.tasks.values() if t.status == "not started")}
- In progress: {sum(1 for t in self.tasks.values() if t.status == "in_progress")}
- Completed: {sum(1 for t in self.tasks.values() if t.status == "completed")}
Tasks by type:
- Loop tasks: {sum(1 for t in self.tasks.values() if t.task_type == "loop")}
- Decision tasks: {sum(1 for t in self.tasks.values() if t.task_type == "decision")}
- Regular tasks: {sum(1 for t in self.tasks.values() if t.task_type not in ["loop", "decision"])}
""")
# ADDED: Check if all tasks are completed and set workflow_finished flag
if self._check_all_tasks_completed():
break # Exit immediately to prevent task reset
task_id = current_task.id
logging.debug(f"""
=== Task Execution Details ===
Current task: {current_task.name}
Type: {current_task.task_type}
Status: {current_task.status}
Previous tasks: {current_task.previous_tasks}
Next tasks: {current_task.next_tasks}
Context tasks: {[t.name for t in current_task.context] if current_task.context else []}
Description length: {len(current_task.description)}
""")
# Add context from previous tasks to description
context = self._build_task_context(current_task)
if context:
# Update task description with context
current_task.description = current_task.description + context
# Skip execution for loop tasks, only process their subtasks
if current_task.task_type == "loop":
logging.debug(f"""
=== Loop Task Details ===
Name: {current_task.name}
ID: {current_task.id}
Status: {current_task.status}
Next tasks: {current_task.next_tasks}
Condition: {current_task.condition}
Subtasks created: {getattr(current_task, '_subtasks_created', False)}
Input file: {getattr(current_task, 'input_file', None)}
""")
# Check if subtasks are created and completed
if getattr(current_task, "_subtasks_created", False):
subtasks = [
t for t in self.tasks.values()
if t.name.startswith(current_task.name + "_")
]
logging.debug(f"""
=== Subtask Status Check ===
Total subtasks: {len(subtasks)}
Completed: {sum(1 for st in subtasks if st.status == "completed")}
Pending: {sum(1 for st in subtasks if st.status != "completed")}
""")
# Log detailed subtask info
for st in subtasks:
logging.debug(f"""
Subtask: {st.name}
- Status: {st.status}
- Next tasks: {st.next_tasks}
- Condition: {st.condition}
""")
if subtasks and all(st.status == "completed" for st in subtasks):
logging.debug(f"=== All {len(subtasks)} subtasks completed for {current_task.name} ===")
# Mark loop task completed and move to next task
current_task.status = "completed"
logging.debug(f"Loop {current_task.name} marked as completed")
# Set result for loop task when all subtasks complete
if not current_task.result:
# Get result from last completed subtask
last_subtask = next((t for t in reversed(subtasks) if t.status == "completed"), None)
if last_subtask and last_subtask.result:
current_task.result = last_subtask.result
# Route to next task based on condition
if current_task.condition:
# Get decision from result if available
decision_str = None
if current_task.result:
if current_task.result.pydantic and hasattr(current_task.result.pydantic, "decision"):
decision_str = current_task.result.pydantic.decision.lower()
elif current_task.result.raw:
decision_str = current_task.result.raw.lower()
# For loop tasks, use "done" to follow condition path
if current_task.task_type == "loop" and all(t.status == "completed" for t in subtasks):
decision_str = "done"
target_tasks = current_task.condition.get(decision_str, []) if decision_str else []
task_value = target_tasks[0] if isinstance(target_tasks, list) else target_tasks
next_task = next((t for t in self.tasks.values() if t.name == task_value), None)
if next_task:
next_task.status = "not started" # Reset status to allow execution
logging.debug(f"Routing to {next_task.name} based on decision: {decision_str}")
self.workflow_finished = False
current_task = next_task
# Ensure the task is yielded for execution
if current_task.id not in visited_tasks:
yield current_task.id
visited_tasks.add(current_task.id)
else:
# End workflow if no valid next task found
logging.info(f"No valid next task found for decision: {decision_str}")
self.workflow_finished = True
current_task = None
break
else:
logging.debug(f"No subtasks created yet for {current_task.name}")
# Create subtasks if needed
if current_task.input_file:
self._create_loop_subtasks(current_task)
current_task._subtasks_created = True
logging.debug(f"Created subtasks from {current_task.input_file}")
else:
# No input file, mark as done
current_task.status = "completed"
logging.debug(f"No input file, marking {current_task.name} as completed")
if current_task.next_tasks:
next_task_name = current_task.next_tasks[0]
next_task = next((t for t in self.tasks.values() if t.name == next_task_name), None)
current_task = next_task
else:
current_task = None
else:
# Execute non-loop task
logging.debug(f"=== Executing non-loop task: {current_task.name} (id: {task_id}) ===")
logging.debug(f"Task status: {current_task.status}")
logging.debug(f"Task next_tasks: {current_task.next_tasks}")
yield task_id
visited_tasks.add(task_id)
# Only end workflow if no next_tasks AND no conditions
if not current_task.next_tasks and not current_task.condition and not any(
t.task_type == "loop" and current_task.name.startswith(t.name + "_")
for t in self.tasks.values()
):
logging.info(f"Task {current_task.name} has no next tasks, ending workflow")
self.workflow_finished = True
current_task = None
break
# Reset completed task to "not started" so it can run again
if self.tasks[task_id].status == "completed":
# Never reset loop tasks, decision tasks, or their subtasks if rerun is False
subtask_name = self.tasks[task_id].name
task_to_check = self.tasks[task_id]
logging.debug(f"=== Checking reset for completed task: {subtask_name} ===")
logging.debug(f"Task type: {task_to_check.task_type}")
logging.debug(f"Task status before reset check: {task_to_check.status}")
logging.debug(f"Task rerun: {getattr(task_to_check, 'rerun', True)}") # default to True if not set
logging.debug(f"Task async_execution: {task_to_check.async_execution}")
if (getattr(task_to_check, 'rerun', True) and # Corrected condition - reset only if rerun is True (or default True)
task_to_check.task_type != "loop" and # Removed "decision" from exclusion
not any(t.task_type == "loop" and subtask_name.startswith(t.name + "_")
for t in self.tasks.values()) and
not task_to_check.async_execution): # Don't reset async parallel tasks
logging.debug(f"=== Resetting non-loop, non-decision, non-parallel task {subtask_name} to 'not started' ===")
self.tasks[task_id].status = "not started"
logging.debug(f"Task status after reset: {self.tasks[task_id].status}")
else:
logging.debug(f"=== Skipping reset for loop/decision/subtask/parallel or rerun=False: {subtask_name} ===")
logging.debug(f"Keeping status as: {self.tasks[task_id].status}")
# Handle loop progression
if current_task.task_type == "loop":
loop_key = f"loop_{current_task.name}"
if loop_key in loop_data:
loop_info = loop_data[loop_key]
loop_info["index"] += 1
has_more = loop_info["remaining"] > 0
# Update result to trigger correct condition
if current_task.result:
result = current_task.result.raw
if has_more:
result += "\nmore"
else:
result += "\ndone"
current_task.result.raw = result
# Determine next task based on result
next_task = None
if current_task and current_task.result:
if current_task.task_type in ["decision", "loop"]:
# Get decision from pydantic or raw response
decision_str = current_task.result.raw.lower()
if current_task.result.pydantic and hasattr(current_task.result.pydantic, "decision"):
decision_str = current_task.result.pydantic.decision.lower()
# Check if task has conditions and next_tasks
if current_task.condition:
# Get target task based on decision
target_tasks = current_task.condition.get(decision_str, [])
# Handle all forms of exit conditions
if not target_tasks or target_tasks == "exit" or (isinstance(target_tasks, list) and (not target_tasks or target_tasks[0] == "exit")):
logging.info(f"Workflow exit condition met on decision: {decision_str}")
self.workflow_finished = True
current_task = None
break
else:
# Find the target task by name
task_value = target_tasks[0] if isinstance(target_tasks, list) else target_tasks
next_task = next((t for t in self.tasks.values() if t.name == task_value), None)
if next_task:
next_task.status = "not started" # Reset status to allow execution
# Capture validation feedback for retry scenarios
if decision_str in Process.VALIDATION_FAILURE_DECISIONS:
if current_task and current_task.result:
# Get the rejected output from the task that was validated
validated_task = None
# Find the task that produced the output being validated
if current_task.previous_tasks:
# For validation tasks, typically validate the most recent previous task
prev_task_name = current_task.previous_tasks[-1]
validated_task = next((t for t in self.tasks.values() if t.name == prev_task_name), None)
elif current_task.context:
# If no previous_tasks, check context for the validated task
# Use the most recent task with a result from context
for ctx_task in reversed(current_task.context):
if ctx_task.result and ctx_task.name != current_task.name:
validated_task = ctx_task
break
feedback = {
'validation_response': decision_str,
'validation_details': current_task.result.raw,
'rejected_output': validated_task.result.raw if validated_task and validated_task.result else None,
'validator_task': current_task.name,
'validated_task': validated_task.name if validated_task else None
}
next_task.validation_feedback = feedback
logging.debug(f"Added validation feedback to {next_task.name}: {feedback['validation_response']} (validated task: {feedback.get('validated_task', 'None')})")
logging.debug(f"Routing to {next_task.name} based on decision: {decision_str}")
# Don't mark workflow as finished when following condition path
self.workflow_finished = False
# If no condition-based routing, use next_tasks
if not next_task and current_task and current_task.next_tasks:
next_task_name = current_task.next_tasks[0]
next_task = next((t for t in self.tasks.values() if t.name == next_task_name), None)
if next_task:
# Reset the next task to allow re-execution
next_task.status = "not started"
# Don't mark workflow as finished if we're in a task loop
if (next_task.previous_tasks and current_task.name in next_task.previous_tasks and
next_task.next_tasks and
next_task.next_tasks[0] in self.tasks and
next_task.name in self.tasks[next_task.next_tasks[0]].previous_tasks):
self.workflow_finished = False
logging.debug(f"Following next_tasks to {next_task.name}")
current_task = next_task
if not current_task:
current_task = self._find_next_not_started_task() # General fallback if no next task in workflow
if not current_task:
# Add final workflow summary
logging.debug(f"""
=== Final Workflow Summary ===
Total tasks processed: {len(self.tasks)}
Final status:
- Completed tasks: {sum(1 for t in self.tasks.values() if t.status == "completed")}
- Outstanding tasks: {sum(1 for t in self.tasks.values() if t.status != "completed")}
Tasks by status:
- Not started: {sum(1 for t in self.tasks.values() if t.status == "not started")}
- In progress: {sum(1 for t in self.tasks.values() if t.status == "in_progress")}
- Completed: {sum(1 for t in self.tasks.values() if t.status == "completed")}
- Failed: {sum(1 for t in self.tasks.values() if t.status == "failed")}
Tasks by type:
- Loop tasks: {sum(1 for t in self.tasks.values() if t.task_type == "loop")}
- Decision tasks: {sum(1 for t in self.tasks.values() if t.task_type == "decision")}
- Regular tasks: {sum(1 for t in self.tasks.values() if t.task_type not in ["loop", "decision"])}
Total iterations: {current_iter}
Workflow Finished: {self.workflow_finished} # ADDED: Workflow Finished Status
""")
logging.info("Workflow execution completed")
break
# Add completion logging
logging.debug(f"""
=== Task Completion ===
Task: {current_task.name}
Final status: {current_task.status}
Next task: {next_task.name if next_task else None}
Iteration: {current_iter}/{self.max_iter}
Workflow Finished: {self.workflow_finished} # ADDED: Workflow Finished Status
""")
async def asequential(self) -> AsyncGenerator[str, None]:
"""Async version of sequential method"""
for task_id in self.tasks:
if self.tasks[task_id].status != "completed":
yield task_id
async def ahierarchical(self) -> AsyncGenerator[str, None]:
"""Async version of hierarchical method"""
logging.debug(f"Starting hierarchical task execution with {len(self.tasks)} tasks")
manager_agent = Agent(
name="Manager",
role="Project manager",
goal="Manage the entire flow of tasks and delegate them to the right agent",
backstory="Expert project manager to coordinate tasks among agents",
llm=self.manager_llm,
output={"verbose": self.verbose, "markdown": True},
reflection=False
)
class ManagerInstructions(BaseModel):
task_id: int
agent_name: str
action: str
manager_task = Task(
name="manager_task",
description="Decide the order of tasks and which agent executes them",
expected_output="All tasks completed successfully",
agent=manager_agent
)
manager_task_id = yield manager_task
logging.info(f"Created manager task with ID {manager_task_id}")
completed_count = 0
total_tasks = len(self.tasks) - 1
logging.info(f"Need to complete {total_tasks} tasks (excluding manager task)")
while completed_count < total_tasks:
tasks_summary = []
for tid, tk in self.tasks.items():
if tk.name == "manager_task":
continue
task_info = {
"task_id": tid,
"name": tk.name,
"description": tk.description,
"status": tk.status if tk.status else "not started",
"agent": tk.agent.name if tk.agent else "No agent"
}
tasks_summary.append(task_info)
logging.info(f"Task {tid} status: {task_info}")
manager_prompt = f"""
Here is the current status of all tasks except yours (manager_task):
{tasks_summary}
Provide a JSON with the structure:
{{
"task_id": <int>,
"agent_name": "<string>",
"action": "<execute or stop>"
}}
"""
try:
logging.info("Requesting manager instructions...")
if manager_task.async_execution:
parsed_instructions = await self._get_manager_instructions_with_fallback_async(
manager_task, manager_prompt, ManagerInstructions
)
else:
parsed_instructions = self._get_manager_instructions_with_fallback(
manager_task, manager_prompt, ManagerInstructions
)
logging.info(f"Manager instructions: {parsed_instructions}")
except Exception as e:
display_error(f"Manager parse error: {e}")
logging.error(f"Manager parse error: {str(e)}", exc_info=True)
break
selected_task_id = parsed_instructions.task_id
selected_agent_name = parsed_instructions.agent_name
action = parsed_instructions.action
logging.info(f"Manager selected task_id={selected_task_id}, agent={selected_agent_name}, action={action}")
if action.lower() == "stop":
logging.info("Manager decided to stop task execution")
break
if selected_task_id not in self.tasks:
error_msg = f"Manager selected invalid task id {selected_task_id}"
display_error(error_msg)
logging.error(error_msg)
break
original_agent = self.tasks[selected_task_id].agent.name if self.tasks[selected_task_id].agent else "None"
for a in self.agents:
if a.name == selected_agent_name:
self.tasks[selected_task_id].agent = a
logging.info(f"Changed agent for task {selected_task_id} from {original_agent} to {selected_agent_name}")
break
if self.tasks[selected_task_id].status != "completed":
logging.info(f"Starting execution of task {selected_task_id}")
yield selected_task_id
logging.info(f"Finished execution of task {selected_task_id}, status: {self.tasks[selected_task_id].status}")
if self.tasks[selected_task_id].status == "completed":
completed_count += 1
logging.info(f"Task {selected_task_id} completed. Total completed: {completed_count}/{total_tasks}")
self.tasks[manager_task.id].status = "completed"
if self.verbose >= 1:
logging.info("All tasks completed under manager supervision.")
logging.info("Hierarchical task execution finished")
def workflow(self):
"""Synchronous version of workflow method.
DEPRECATED: This method is deprecated. Use the Workflow class instead:
```python
from praisonaiagents import Workflow, StepResult
from praisonaiagents.workflows import route, parallel, loop, repeat
workflow = Workflow(steps=[step1, step2])
result = workflow.start("input")
```
The Workflow class provides a simpler API with more features:
- route() for decision-based branching
- parallel() for concurrent execution
- loop() for iteration over lists/CSV
- repeat() for evaluator-optimizer patterns
- Callbacks (on_step_start, on_step_complete, etc.)
- Guardrails with validation feedback
- Status tracking
"""
from ..utils.deprecation import warn_deprecated_param
warn_deprecated_param(
"process='workflow'",
since="1.0.0",
removal="2.0.0",
alternative="use the Workflow class instead: from praisonaiagents import Workflow; workflow = Workflow(steps=[...]); workflow.start()",
stacklevel=3
)
current_iter = 0 # Track how many times we've looped
# Build workflow relationships first
for task in self.tasks.values():
if task.next_tasks:
for next_task_name in task.next_tasks:
next_task = next((t for t in self.tasks.values() if t.name == next_task_name), None)
if next_task:
next_task.previous_tasks.append(task.name)
# Find start task
start_task = None
for task_id, task in self.tasks.items():
if task.is_start:
start_task = task
break
if not start_task:
start_task = list(self.tasks.values())[0]
logging.info("No start task marked, using first task")
# If loop type and no input_file, default to tasks.csv
if start_task and start_task.task_type == "loop" and not start_task.input_file:
start_task.input_file = "tasks.csv"
# --- If loop + input_file, read file & create tasks
if start_task and start_task.task_type == "loop" and getattr(start_task, "input_file", None):
try:
file_ext = os.path.splitext(start_task.input_file)[1].lower()
new_tasks = []
if file_ext == ".csv":
with open(start_task.input_file, "r", encoding="utf-8") as f:
reader = csv.reader(f, quotechar='"', escapechar='\\') # Handle quoted/escaped fields
previous_task = None
task_count = 0
for i, row in enumerate(reader):
if not row: # Skip truly empty rows
continue
# Properly handle Q&A pairs with potential commas
task_desc = row[0].strip() if row else ""
if len(row) > 1:
# Preserve all fields in case of multiple commas
question = row[0].strip()
answer = ",".join(field.strip() for field in row[1:])
task_desc = f"Question: {question}\nAnswer: {answer}"
if not task_desc: # Skip rows with empty content
continue
task_count += 1
logging.debug(f"Processing CSV row {i+1}: {task_desc}")
# Inherit next_tasks from parent loop task
inherited_next_tasks = start_task.next_tasks if start_task.next_tasks else []
row_task = Task(
description=f"{start_task.description}\n{task_desc}" if start_task.description else task_desc,
agent=start_task.agent,
name=f"{start_task.name}_{task_count}" if start_task.name else task_desc,
expected_output=getattr(start_task, 'expected_output', None),
on_task_complete=start_task.callback, # Inherit callback from parent loop task
is_start=(task_count == 1),
task_type="decision", # Change to decision type
next_tasks=inherited_next_tasks, # Inherit parent's next tasks
condition={
"done": inherited_next_tasks if inherited_next_tasks else ["next"], # Use full inherited_next_tasks
"retry": ["current"],
"exit": [] # Empty list for exit condition
}
)
self.tasks[row_task.id] = row_task
new_tasks.append(row_task)
if previous_task:
previous_task.next_tasks = [row_task.name]
previous_task.condition["done"] = [row_task.name] # Use "done" consistently
previous_task = row_task
# For the last task in the loop, ensure it points to parent's next tasks
if task_count > 0 and not row_task.next_tasks:
row_task.next_tasks = inherited_next_tasks
logging.info(f"Processed {task_count} rows from CSV file")
else:
# If not CSV, read lines
with open(start_task.input_file, "r", encoding="utf-8") as f:
lines = f.read().splitlines()
previous_task = None
for i, line in enumerate(lines):
row_task = Task(
description=f"{start_task.description}\n{line.strip()}" if start_task.description else line.strip(),
agent=start_task.agent,
name=f"{start_task.name}_{i+1}" if start_task.name else line.strip(),
expected_output=getattr(start_task, 'expected_output', None),
on_task_complete=start_task.callback, # Inherit callback from parent loop task
is_start=(i == 0),
task_type="task",
condition={
"complete": ["next"],
"retry": ["current"]
}
)
self.tasks[row_task.id] = row_task
new_tasks.append(row_task)
if previous_task:
previous_task.next_tasks = [row_task.name]
previous_task.condition["complete"] = [row_task.name]
previous_task = row_task
if new_tasks:
start_task = new_tasks[0]
logging.info(f"Created {len(new_tasks)} tasks from: {start_task.input_file}")
except Exception as e:
logging.error(f"Failed to read file tasks: {e}")
# end of start task handling
current_task = start_task
visited_tasks = set()
loop_data = {} # Store loop-specific data