Skip to content

Commit 463afb1

Browse files
committed
✨(summary) taskV2 closer to target API gateway contract
Updated taskV2 API contract to be closer to the target gateway contract. GET operations return the same things as the webhook payload. Also store the summary on S3 to be iso with transcript.
1 parent 394a0ea commit 463afb1

7 files changed

Lines changed: 191 additions & 48 deletions

File tree

src/helm/env.d/dev-keycloak/values.meet.yaml.gotmpl

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ _summaryEnvVars: &summaryEnvVars
66
AWS_S3_ACCESS_KEY_ID: meet
77
AWS_S3_SECRET_ACCESS_KEY: password
88
AWS_S3_SECURE_ACCESS: False
9-
AUTHORIZED_TENANTS: '[{"id": "dictaphone", "api_key": "dictaphone_token", "webhook_url": "http://dictaphone-backend.dictaphone.svc.cluster.local/api/v1.0/files/ai-webhook/", "webhook_api_key": "token_summary"}]'
9+
AUTHORIZED_TENANTS: '[{"id": "dictaphone", "api_key": "dictaphone_token", "webhook_url": "http://dictaphone-backend.dictaphone.svc.cluster.local/api/v1.0/ai-jobs/webhook/", "webhook_api_key": "token_summary"}]'
1010
WHISPERX_API_KEY:
1111
secretKeyRef:
1212
name: secret-dev
Lines changed: 65 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,24 @@
11
"""API routes related to application tasks (V2 / tenant friendly)."""
22

33
from celery.result import AsyncResult
4-
from fastapi import APIRouter, Depends, HTTPException
4+
from fastapi import APIRouter, Depends, HTTPException, Request
55

66
from summary.core.celery_worker import (
7+
celery,
78
process_audio_transcribe_v2_task,
89
summarize_v2_task,
910
)
1011
from summary.core.config import AuthorizedTenant
1112
from summary.core.models import SummarizeTaskV2Request, TranscribeTaskV2Request
1213
from summary.core.security import verify_tenant_api_key_v2
14+
from summary.core.shared_models import (
15+
SummarizeWebhookFailurePayload,
16+
SummarizeWebhookPendingPayload,
17+
SummarizeWebhookSuccessPayload,
18+
TranscribeWebhookFailurePayload,
19+
TranscribeWebhookPendingPayload,
20+
TranscribeWebhookSuccessPayload,
21+
)
1322

1423
router_tasks_v2 = APIRouter()
1524

@@ -24,7 +33,7 @@ async def create_transcribe_task_v2(
2433
args=[{**request.model_dump(), "tenant_id": request_tenant.id}]
2534
)
2635

27-
return {"job_id": task.id, "message": "Transcribe job created"}
36+
return TranscribeWebhookPendingPayload(job_id=task.id).model_dump()
2837

2938

3039
@router_tasks_v2.post("/async-jobs/summarize")
@@ -37,25 +46,64 @@ async def create_summarize_task_v2(
3746
args=[{**request.model_dump(), "tenant_id": request_tenant.id}]
3847
)
3948

40-
return {"job_id": task.id, "message": "Summarize job created"}
49+
return SummarizeWebhookPendingPayload(job_id=task.id).model_dump()
4150

4251

4352
@router_tasks_v2.get("/async-jobs/transcribe/{job_id}")
53+
async def get_transcribe_job_status(
54+
job_id: str,
55+
request: Request,
56+
request_tenant: AuthorizedTenant = Depends(verify_tenant_api_key_v2),
57+
):
58+
"""Check transcription task status by ID."""
59+
# We have to look directly in Redis to check if the task exists
60+
redis_client = celery.backend.client
61+
key = f"celery-task-meta-{job_id}"
62+
if not redis_client.exists(key):
63+
raise HTTPException(status_code=404, detail="Not found")
64+
65+
task = AsyncResult(job_id, app=celery)
66+
task_tenant_id = task.args[0]["tenant_id"]
67+
if task_tenant_id != request_tenant.id:
68+
raise HTTPException(status_code=403, detail="Forbidden")
69+
70+
if task.status == "SUCCESS":
71+
result = task.result
72+
return TranscribeWebhookSuccessPayload.model_validate(result).model_dump()
73+
74+
if task.status == "FAILURE":
75+
return TranscribeWebhookFailurePayload(
76+
job_id=job_id, error_code="unknown_error"
77+
).model_dump()
78+
79+
return TranscribeWebhookPendingPayload(job_id=job_id).model_dump()
80+
81+
4482
@router_tasks_v2.get("/async-jobs/summarize/{job_id}")
45-
async def get_task_status(
83+
async def get_summarize_job_status(
4684
job_id: str,
85+
request: Request,
4786
request_tenant: AuthorizedTenant = Depends(verify_tenant_api_key_v2),
4887
):
49-
"""Check task status by ID."""
50-
task = AsyncResult(job_id)
51-
try:
52-
if (
53-
isinstance(task.args, (list, tuple))
54-
and len(task.args) > 0
55-
and isinstance(task.args[0], dict)
56-
and task.args[0].get("tenant_id") == request_tenant.id
57-
):
58-
return {"job_id": job_id, "status": task.status}
59-
except (TypeError, KeyError):
60-
pass
61-
raise HTTPException(status_code=404, detail="Not found")
88+
"""Check summarize task status by ID."""
89+
# We have to look directly in Redis to check if the task exists
90+
redis_client = celery.backend.client
91+
key = f"celery-task-meta-{job_id}"
92+
if not redis_client.exists(key):
93+
raise HTTPException(status_code=404, detail="Not found")
94+
95+
task = AsyncResult(job_id, app=celery)
96+
task_tenant_id = task.args[0]["tenant_id"]
97+
if task_tenant_id != request_tenant.id:
98+
raise HTTPException(status_code=403, detail="Forbidden")
99+
100+
if task.status == "SUCCESS":
101+
result = task.result
102+
return SummarizeWebhookSuccessPayload.model_validate(result).model_dump()
103+
104+
if task.status == "FAILURE":
105+
return SummarizeWebhookFailurePayload(
106+
job_id=job_id, error_code="unknown_error"
107+
).model_dump()
108+
109+
return SummarizeWebhookPendingPayload(job_id=job_id).model_dump()

src/summary/summary/core/celery_worker.py

Lines changed: 18 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,9 @@
5858
broker=settings.celery_broker_url,
5959
backend=settings.celery_result_backend,
6060
broker_connection_retry_on_startup=True,
61+
# To store the tasks args too in results and make the
62+
# V2 API work
63+
result_extended=True,
6164
)
6265

6366
celery.config_from_object("summary.core.celery_config")
@@ -465,15 +468,14 @@ def process_audio_transcribe_v2_task(
465468
job_id=job_id,
466469
)
467470

471+
success_payload = TranscribeWebhookSuccessPayload(
472+
job_id=job_id,
473+
transcription_data_url=file_service.get_transcript_signed_url(job_id),
474+
)
468475
call_webhook_v2_task.apply_async(
469-
args=[
470-
TranscribeWebhookSuccessPayload(
471-
job_id=job_id,
472-
transcription_data_url=file_service.get_transcript_signed_url(job_id),
473-
).model_dump(),
474-
payload.tenant_id,
475-
]
476+
args=[success_payload.model_dump(), payload.tenant_id]
476477
)
478+
return success_payload.model_dump()
477479

478480

479481
@signals.task_failure.connect(sender=process_audio_transcribe_v2_task)
@@ -531,14 +533,17 @@ def summarize_v2_task(
531533
transcript=payload.content,
532534
session_id=self.request.id,
533535
)
536+
job_id = self.request.id
537+
file_service.store_summary(summary=summary, job_id=job_id)
538+
539+
success_payload = SummarizeWebhookSuccessPayload(
540+
job_id=job_id,
541+
summary_data_url=file_service.get_summary_signed_url(job_id),
542+
)
534543
call_webhook_v2_task.apply_async(
535-
args=[
536-
SummarizeWebhookSuccessPayload(
537-
job_id=self.request.id, summary=summary
538-
).model_dump(),
539-
payload.tenant_id,
540-
]
544+
args=[success_payload.model_dump(), payload.tenant_id]
541545
)
546+
return success_payload.model_dump()
542547

543548

544549
@signals.task_failure.connect(sender=summarize_v2_task)

src/summary/summary/core/config.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ class Settings(BaseSettings):
7878
aws_s3_secret_access_key: SecretStr
7979
aws_s3_secure_access: bool = True
8080
aws_transcript_path: str = "transcripts"
81+
aws_summary_path: str = "summaries"
8182

8283
# AI-related settings
8384
whisperx_api_key: SecretStr

src/summary/summary/core/file_service.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,5 +293,27 @@ def get_transcript_signed_url(self, job_id: str) -> str:
293293
transcript_path = f"{settings.aws_transcript_path}/{job_id}.json"
294294
logger.debug("Transcript path: %s", transcript_path)
295295
return self._minio_client.presigned_get_object(
296-
self._bucket_name, transcript_path, expires=timedelta(hours=1)
296+
self._bucket_name, transcript_path, expires=timedelta(hours=24)
297+
)
298+
299+
def store_summary(self, *, summary: str, job_id: str) -> None:
300+
"""Store summary in MinIO."""
301+
logger.info("Storing summary for job id %s", job_id)
302+
summary_path = f"{settings.aws_summary_path}/{job_id}.txt"
303+
logger.debug("Summary path: %s", summary_path)
304+
data = summary.encode()
305+
self._minio_client.put_object(
306+
self._bucket_name,
307+
summary_path,
308+
io.BytesIO(data),
309+
length=len(data),
310+
)
311+
logger.info("Summary stored successfully for job id %s", job_id)
312+
313+
def get_summary_signed_url(self, job_id: str) -> str:
314+
"""Get signed URL for summary file."""
315+
summary_path = f"{settings.aws_summary_path}/{job_id}.txt"
316+
logger.debug("Summary path: %s", summary_path)
317+
return self._minio_client.presigned_get_object(
318+
self._bucket_name, summary_path, expires=timedelta(hours=24)
297319
)

src/summary/summary/core/shared_models.py

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,13 @@ class TranscribeWebhookSuccessPayload(BaseWebhook):
6565
)
6666

6767

68+
class TranscribeWebhookPendingPayload(BaseWebhook):
69+
"""Payload for a pending transcription webhook-like response."""
70+
71+
type: Literal["transcript"] = Field(default="transcript")
72+
status: Literal["pending"] = Field(default="pending")
73+
74+
6875
class TranscribeWebhookFailurePayload(BaseWebhook):
6976
"""Payload for a failed transcription webhook."""
7077

@@ -76,7 +83,11 @@ class TranscribeWebhookFailurePayload(BaseWebhook):
7683

7784

7885
TranscribeWebhookPayloads = Annotated[
79-
Union[TranscribeWebhookSuccessPayload, TranscribeWebhookFailurePayload],
86+
Union[
87+
TranscribeWebhookSuccessPayload,
88+
TranscribeWebhookPendingPayload,
89+
TranscribeWebhookFailurePayload,
90+
],
8091
Field(discriminator="status"),
8192
]
8293

@@ -86,7 +97,16 @@ class SummarizeWebhookSuccessPayload(BaseWebhook):
8697

8798
type: Literal["summary"] = Field(default="summary")
8899
status: Literal["success"] = Field(default="success")
89-
summary: str = Field(title="Summary", description="The summary of the text.")
100+
summary_data_url: str = Field(
101+
title="Summary", description="URL to the raw summary data."
102+
)
103+
104+
105+
class SummarizeWebhookPendingPayload(BaseWebhook):
106+
"""Payload for a pending summarization webhook-like response."""
107+
108+
type: Literal["summary"] = Field(default="summary")
109+
status: Literal["pending"] = Field(default="pending")
90110

91111

92112
class SummarizeWebhookFailurePayload(BaseWebhook):
@@ -100,7 +120,11 @@ class SummarizeWebhookFailurePayload(BaseWebhook):
100120

101121

102122
SummarizeWebhookPayloads = Annotated[
103-
Union[SummarizeWebhookSuccessPayload, SummarizeWebhookFailurePayload],
123+
Union[
124+
SummarizeWebhookSuccessPayload,
125+
SummarizeWebhookPendingPayload,
126+
SummarizeWebhookFailurePayload,
127+
],
104128
Field(discriminator="status"),
105129
]
106130

@@ -114,8 +138,10 @@ class SummarizeWebhookFailurePayload(BaseWebhook):
114138

115139
__all__ = [
116140
"TranscribeWebhookSuccessPayload",
141+
"TranscribeWebhookPendingPayload",
117142
"TranscribeWebhookFailurePayload",
118143
"SummarizeWebhookSuccessPayload",
144+
"SummarizeWebhookPendingPayload",
119145
"SummarizeWebhookFailurePayload",
120146
"TranscribeWebhookPayloads",
121147
"SummarizeWebhookPayloads",

0 commit comments

Comments
 (0)