Skip to content

Commit ce28e72

Browse files
committed
Make changes
1 parent 8200c88 commit ce28e72

2 files changed

Lines changed: 62 additions & 7 deletions

File tree

util/opentelemetry-util-genai/src/opentelemetry/util/genai/_upload/completion_hook.py

Lines changed: 27 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,11 @@
1919
import logging
2020
import posixpath
2121
import threading
22+
from collections import OrderedDict
2223
from concurrent.futures import Future, ThreadPoolExecutor
2324
from contextlib import ExitStack
2425
from dataclasses import asdict, dataclass
25-
from functools import lru_cache, partial
26+
from functools import partial
2627
from os import environ
2728
from time import time
2829
from typing import Any, Callable, Final, Literal
@@ -78,6 +79,12 @@ class CompletionRefs:
7879
UploadData = dict[str, Callable[[], JsonEncodeable]]
7980

8081

82+
def is_system_instructions_hashable(
83+
system_instruction: list[types.MessagePart],
84+
) -> bool:
85+
return all(isinstance(x, types.Text) for x in system_instruction)
86+
87+
8188
class UploadCompletionHook(CompletionHook):
8289
"""An completion hook using ``fsspec`` to upload to external storage
8390
@@ -98,10 +105,13 @@ def __init__(
98105
base_path: str,
99106
max_size: int = 20,
100107
upload_format: Format | None = None,
108+
lru_cache_max_size: int = 1024,
101109
) -> None:
102110
self._max_size = max_size
103111
self._fs, base_path = fsspec.url_to_fs(base_path)
104112
self._base_path = self._fs.unstrip_protocol(base_path)
113+
self.lru_dict = OrderedDict()
114+
self.lru_cache_max_size = lru_cache_max_size
105115

106116
if upload_format not in _FORMATS + (None,):
107117
raise ValueError(
@@ -159,7 +169,7 @@ def _calculate_ref_path(
159169
# TODO: experimental with using the trace_id and span_id, or fetching
160170
# gen_ai.response.id from the active span.
161171
system_instruction_hash = None
162-
if all(isinstance(x, types.Text) for x in system_instruction):
172+
if is_system_instructions_hashable(system_instruction):
163173
# Get a hash of the text.
164174
system_instruction_hash = hashlib.sha256(
165175
"\n".join(x.content for x in system_instruction).encode( # pyright: ignore[reportUnknownMemberType, reportAttributeAccessIssue, reportUnknownArgumentType]
@@ -181,10 +191,18 @@ def _calculate_ref_path(
181191
),
182192
)
183193

184-
@lru_cache(maxsize=512)
185194
def _file_exists(self, path: str) -> bool:
195+
if path in self.lru_dict:
196+
self.lru_dict.move_to_end(path)
197+
return True
186198
# https://filesystem-spec.readthedocs.io/en/latest/api.html#fsspec.spec.AbstractFileSystem.exists
187-
return self._fs.exists(path)
199+
file_exists = self._fs.exists(path)
200+
if not file_exists:
201+
return False
202+
self.lru_dict[path] = True
203+
if len(self.lru_dict) > self.lru_cache_max_size:
204+
self.lru_dict.popitem(last=False)
205+
return True
188206

189207
def _do_upload(
190208
self, path: str, json_encodeable: Callable[[], JsonEncodeable]
@@ -214,6 +232,11 @@ def _do_upload(
214232
gen_ai_json_dump(message, file)
215233
file.write("\n")
216234

235+
if "_system_instruction" in path:
236+
self.lru_dict[path] = True
237+
if len(self.lru_dict) > self.lru_cache_max_size:
238+
self.lru_dict.popitem(last=False)
239+
217240
def on_completion(
218241
self,
219242
*,

util/opentelemetry-util-genai/tests/test_upload.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,8 +125,7 @@ def setUp(self):
125125
self.mock_fs.exists.return_value = False
126126

127127
self.hook = UploadCompletionHook(
128-
base_path=BASE_PATH,
129-
max_size=MAXSIZE,
128+
base_path=BASE_PATH, max_size=MAXSIZE, lru_cache_max_size=5
130129
)
131130

132131
def tearDown(self) -> None:
@@ -184,7 +183,7 @@ def test_system_insruction_is_hashed_to_avoid_reupload(self):
184183
log_record=record,
185184
)
186185
# Wait a bit for file upload to finish..
187-
time.sleep(2)
186+
time.sleep(0.5)
188187
self.mock_fs.exists.return_value = True
189188
self.hook.on_completion(
190189
inputs=[],
@@ -206,6 +205,39 @@ def test_system_insruction_is_hashed_to_avoid_reupload(self):
206205
f"{expected_hash}_system_instruction.json",
207206
)
208207

208+
def test_lru_cache_works(self):
209+
record = LogRecord()
210+
self.hook.on_completion(
211+
inputs=[],
212+
outputs=[],
213+
system_instruction=FAKE_SYSTEM_INSTRUCTION,
214+
log_record=record,
215+
)
216+
# Wait a bit for file upload to finish..
217+
time.sleep(0.5)
218+
assert record.attributes is not None
219+
self.assertTrue(
220+
self.hook._file_exists(
221+
record.attributes["gen_ai.system_instructions_ref"]
222+
)
223+
)
224+
# LRU cache has a size of 5. So only AFTER 5 uploads should the original file be removed from the cache.
225+
for x in range(5):
226+
self.assertTrue(
227+
record.attributes["gen_ai.system_instructions_ref"]
228+
in self.hook.lru_dict
229+
)
230+
self.hook.on_completion(
231+
inputs=[],
232+
outputs=[],
233+
system_instruction=[types.Text(content=str(x))],
234+
)
235+
print(self.hook.lru_dict)
236+
self.assertFalse(
237+
record.attributes["gen_ai.system_instructions_ref"]
238+
in self.hook.lru_dict
239+
)
240+
209241
def test_upload_when_inputs_outputs_empty(self):
210242
record = LogRecord()
211243
self.hook.on_completion(

0 commit comments

Comments
 (0)