-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathmain.py
More file actions
110 lines (96 loc) · 4.25 KB
/
main.py
File metadata and controls
110 lines (96 loc) · 4.25 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from src.routers import movies
from src.utils.errorHandler import register_error_handlers, create_error_response
from src.database.mongo_client import db, get_collection
import traceback
import os
from dotenv import load_dotenv
# Load environment variables from .env file
load_dotenv()
app = FastAPI()
# Add CORS middleware
cors_origins = os.getenv("CORS_ORIGINS", "http://localhost:3000,http://localhost:3001").split(",")
app.add_middleware(
CORSMiddleware,
allow_origins=[origin.strip() for origin in cors_origins], # Load from environment variable
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
register_error_handlers(app)
app.include_router(movies.router, prefix="/api/movies", tags=["movies"])
@app.on_event("startup")
async def ensure_search_index():
try:
movies_collection = db.get_collection("movies")
result = await movies_collection.list_search_indexes()
indexes = [idx async for idx in result]
index_names = [index["name"] for index in indexes]
if "movieSearchIndex" in index_names:
return
# Create a mapping if the movieSearchIndex does not exist
index_definition = {
"mappings": {
"dynamic": False,
"fields": {
"plot": {"type": "string", "analyzer": "lucene.standard"},
"fullplot": {"type": "string", "analyzer": "lucene.standard"},
"directors": {"type": "string", "analyzer": "lucene.standard"},
"writers": {"type": "string", "analyzer": "lucene.standard"},
"cast": {"type": "string", "analyzer": "lucene.standard"}
}
}
}
# Creates movieSearchIndex on the movies collection
await db.command({
"createSearchIndexes": "movies",
"indexes": [{
"name": "movieSearchIndex",
"definition": index_definition
}]
})
except Exception as e:
raise RuntimeError(
f"Failed to create search index 'movieSearchIndex': {str(e)}. "
f"Search functionality may not work properly. "
f"Please check your MongoDB Atlas configuration and ensure the cluster supports search indexes."
)
@app.on_event("startup")
async def vector_search_index():
"""
Creates vector search index on application startup if it doesn't already exist.
This ensures the index is ready before any vector search requests are made.
"""
try:
embedded_movies_collection = get_collection("embedded_movies")
# Get list of existing indexes - convert AsyncCommandCursor to list
existing_indexes_cursor = await embedded_movies_collection.list_search_indexes()
existing_indexes = await existing_indexes_cursor.to_list(length=None)
index_names = [index.get("name") for index in existing_indexes]
# Check if our vector_index already exists
if "vector_index" not in index_names:
# Define the vector search index specification
index_definition = {
"name": "vector_index",
"type": "vectorSearch",
"definition": {
"fields": [
{
"type": "vector",
"path": "plot_embedding_voyage_3_large",
"numDimensions": 2048, #Set this to 2048 to match the embedding dimensions on the path
"similarity": "cosine"
}
]
}
}
# Create the index
result = await embedded_movies_collection.create_search_index(index_definition)
except Exception as e:
raise RuntimeError(
f"Failed to create vector search index 'vector_index': {str(e)}. "
f"Vector search functionality will not be available. "
f"Please check your MongoDB Atlas configuration, ensure the cluster supports vector search, "
f"and verify the 'embedded_movies' collection exists with the required embedding field."
)