Skip to content

Commit 4fab7fc

Browse files
authored
Add thread safety tests (#275)
1 parent 3ad5368 commit 4fab7fc

1 file changed

Lines changed: 189 additions & 0 deletions

File tree

Lines changed: 189 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,189 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
5+
# Tests to verify thread safety of the ActiveAgent framework.
6+
#
7+
# These tests address the concerns raised in GitHub issue #273 where
8+
# the old code had a race condition with shared provider instances.
9+
#
10+
# The refactored code should be thread-safe because:
11+
# 1. Each generation creates a new provider instance (not shared)
12+
# 2. No class-level mutable state that could be corrupted
13+
# 3. Each provider instance has its own context and message stack
14+
#
15+
# Reference: https://github.com/activeagents/activeagent/issues/273
16+
class ThreadSafetyTest < ActiveSupport::TestCase
17+
# Test agent using Mock provider for fast, deterministic tests
18+
class TestAgent < ActiveAgent::Base
19+
generate_with :mock
20+
21+
def ask
22+
prompt(message: params[:message])
23+
end
24+
end
25+
26+
# Agent with streaming support
27+
class StreamingAgent < ActiveAgent::Base
28+
generate_with :mock
29+
30+
attr_reader :stream_chunks
31+
32+
def initialize
33+
super
34+
@stream_chunks = []
35+
end
36+
37+
on_stream do |chunk|
38+
@stream_chunks << chunk.delta
39+
end
40+
41+
def ask
42+
prompt(message: params[:message], stream: true)
43+
end
44+
end
45+
46+
# Agent with tool support
47+
class ToolAgent < ActiveAgent::Base
48+
generate_with :mock
49+
50+
def ask
51+
prompt(
52+
message: params[:message],
53+
tools: [ {
54+
type: "function",
55+
name: "get_data",
56+
description: "Gets data for a location",
57+
parameters: {
58+
type: "object",
59+
properties: {
60+
location: { type: "string", description: "The location" }
61+
},
62+
required: [ "location" ]
63+
}
64+
} ]
65+
)
66+
end
67+
68+
private
69+
70+
def get_data(location:)
71+
"Data for #{location}"
72+
end
73+
end
74+
75+
setup do
76+
Thread.abort_on_exception = true
77+
end
78+
79+
teardown do
80+
Thread.abort_on_exception = false
81+
end
82+
83+
private
84+
85+
# Helper to run concurrent operations and verify uniqueness
86+
def run_concurrent(num_threads:, &block)
87+
results = Concurrent::Array.new
88+
threads = num_threads.times.map { |i| Thread.new { results << block.call(i) } }
89+
threads.each(&:join)
90+
results
91+
end
92+
93+
def assert_unique_responses(results, message = "All responses should be unique")
94+
responses = results.map { |r| r[:response] }
95+
assert responses.all?(&:present?), "All threads should receive responses"
96+
assert_equal results.size, responses.uniq.size, message
97+
end
98+
99+
public
100+
101+
test "concurrent generations with different messages do not interfere" do
102+
# Core issue from #273: multiple threads with different prompts
103+
# should each get their own correct response back
104+
results = run_concurrent(num_threads: 20) do |i|
105+
message = "THREAD_#{i}: What is #{i} plus #{i}?"
106+
response = TestAgent.with(message: message).ask.generate_now
107+
{ thread_id: i, response: response.message.content }
108+
end
109+
110+
assert_unique_responses(results, "Duplicates indicate a race condition")
111+
end
112+
113+
test "concurrent parameterized invocations with shared agent class" do
114+
# Tests the common pattern of using .with() from multiple threads
115+
results = run_concurrent(num_threads: 15) do |i|
116+
message = "Request_#{i}_#{SecureRandom.hex(4)}"
117+
response = TestAgent.with(message: message).ask.generate_now
118+
{ thread_id: i, response: response.message.content }
119+
end
120+
121+
assert_unique_responses(results)
122+
end
123+
124+
test "concurrent tool calls do not mix up parameters" do
125+
# Tests tool calling scenario from issue #273
126+
results = run_concurrent(num_threads: 10) do |i|
127+
message = "Get data for Location_#{i}"
128+
response = ToolAgent.with(message: message).ask.generate_now
129+
{ thread_id: i, response: response.message.content }
130+
end
131+
132+
assert_unique_responses(results, "Tool responses should not mix between threads")
133+
end
134+
135+
test "concurrent streaming generations maintain isolation" do
136+
results = run_concurrent(num_threads: 5) do |i|
137+
message = "Stream_#{i}_#{SecureRandom.hex(4)}"
138+
response = StreamingAgent.with(message: message).ask.generate_now
139+
{ thread_id: i, response: response.message.content }
140+
end
141+
142+
assert_unique_responses(results, "Streaming responses should not mix between threads")
143+
end
144+
145+
test "high concurrency stress test" do
146+
errors = Concurrent::Array.new
147+
results = run_concurrent(num_threads: 50) do |i|
148+
sleep(rand * 0.01) # Add timing variability
149+
message = "Concurrent_#{i}_#{SecureRandom.uuid}"
150+
response = TestAgent.with(message: message).ask.generate_now
151+
{ thread_id: i, response: response.message.content }
152+
rescue => e
153+
errors << { thread_id: i, error: e }
154+
nil
155+
end.compact
156+
157+
assert errors.empty?, "No threads should error: #{errors.inspect}"
158+
assert_unique_responses(results, "Responses should be unique under high concurrency")
159+
end
160+
161+
test "exception in one thread does not affect others" do
162+
Thread.abort_on_exception = false
163+
164+
failing_agent_class = Class.new(TestAgent) do
165+
def ask
166+
raise "Intentional error" if params[:message].include?("FAIL")
167+
super
168+
end
169+
end
170+
171+
results = run_concurrent(num_threads: 10) do |i|
172+
message = i == 5 ? "FAIL_#{i}" : "Success_#{i}"
173+
response = failing_agent_class.with(message: message).ask.generate_now
174+
{ thread_id: i, success: true, response: response.message.content }
175+
rescue => e
176+
{ thread_id: i, success: false, error: e.message }
177+
end
178+
179+
successful = results.select { |r| r[:success] }
180+
failed = results.reject { |r| r[:success] }
181+
182+
assert_equal 9, successful.size, "9 threads should succeed"
183+
assert_equal 1, failed.size, "1 thread should fail"
184+
assert_equal 5, failed.first[:thread_id]
185+
assert_unique_responses(successful)
186+
ensure
187+
Thread.abort_on_exception = true
188+
end
189+
end

0 commit comments

Comments
 (0)