-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream.go
More file actions
146 lines (124 loc) · 3.62 KB
/
stream.go
File metadata and controls
146 lines (124 loc) · 3.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
package ai
import (
"context"
"fmt"
"time"
)
// StreamCallback is a function called for each chunk of a streamed response.
type StreamCallback func(chunk string)
// Stream sends a request and prints the response chunks to stdout in real-time.
// It is a convenience method for simple streaming to the console.
func (b *Builder) Stream(prompt string) (string, error) {
return b.User(prompt).StreamResponse(func(chunk string) {
fmt.Print(chunk)
})
}
// StreamResponse sends a request and calls the provided callback for each chunk of the response.
// It handles rate limiting, error checking, and optional debug output.
// Returns the full concatenated response string upon completion.
func (b *Builder) StreamResponse(callback StreamCallback) (string, error) {
msgs := b.buildMessages()
start := time.Now()
// Get the client to use
client := b.client
if client == nil {
client = getDefaultClient()
}
// Build provider request
req := &ProviderRequest{
Model: string(b.model),
Messages: msgs,
Temperature: b.temperature,
Thinking: b.thinking,
Tools: b.tools,
JSONMode: b.jsonMode,
Stream: true,
}
// Get context
ctx := b.ctx
if ctx == nil {
ctx = context.Background()
}
if Debug {
printDebugRequest(b.model, msgs)
}
// Check streaming capability
if !client.provider.Capabilities().Streaming {
if Debug {
fmt.Printf("%s Warning: %s does not support streaming, falling back to regular request\n",
colorYellow("⚠"), client.provider.Name())
}
// Fallback to non-streaming
waitForRateLimit()
resp, err := client.provider.Send(ctx, req)
if err != nil {
return "", err
}
callback(resp.Content)
return resp.Content, nil
}
if Pretty {
fmt.Printf("\n%s %s\n", colorCyan("▸"), colorDim(string(b.model)))
fmt.Println(colorDim("─────────────────────────────────────────────────────────────"))
}
waitForRateLimit()
resp, err := client.provider.SendStream(ctx, req, callback)
if err != nil {
return "", err
}
if Pretty {
fmt.Println()
fmt.Println()
}
// Track stats
trackRequest(&ResponseMeta{
Content: resp.Content,
Model: b.model,
Latency: time.Since(start),
Tokens: resp.TotalTokens,
PromptTokens: resp.PromptTokens,
CompletionTokens: resp.CompletionTokens,
})
return resp.Content, nil
}
// StreamWithMeta sends a request, streams the response via callback, and returns full metadata.
// This is useful when you need token usage stats or latency information along with the streamed content.
func (b *Builder) StreamWithMeta(callback StreamCallback) (*ResponseMeta, error) {
msgs := b.buildMessages()
start := time.Now()
client := b.client
if client == nil {
client = getDefaultClient()
}
req := &ProviderRequest{
Model: string(b.model),
Messages: msgs,
Temperature: b.temperature,
Thinking: b.thinking,
Tools: b.tools,
JSONMode: b.jsonMode,
Stream: true,
}
ctx := b.ctx
if ctx == nil {
ctx = context.Background()
}
if Debug {
printDebugRequest(b.model, msgs)
}
waitForRateLimit()
resp, err := client.provider.SendStream(ctx, req, callback)
if err != nil {
return &ResponseMeta{Error: err, Model: b.model, Latency: time.Since(start)}, err
}
meta := &ResponseMeta{
Content: resp.Content,
Model: b.model,
Latency: time.Since(start),
Tokens: resp.TotalTokens,
PromptTokens: resp.PromptTokens,
CompletionTokens: resp.CompletionTokens,
}
trackRequest(meta)
return meta, nil
}