-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathmovies.py
More file actions
1504 lines (1300 loc) · 54 KB
/
movies.py
File metadata and controls
1504 lines (1300 loc) · 54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
from fastapi import APIRouter, Query, Path, Body, HTTPException
from fastapi.responses import JSONResponse
from src.database.mongo_client import get_collection, voyage_ai_available
from src.models.models import VectorSearchResult, CreateMovieRequest, Movie, SuccessResponse, UpdateMovieRequest, SearchMoviesResponse
from typing import Any, List, Optional
from src.utils.successResponse import create_success_response
from src.utils.errorResponse import create_error_response
from src.utils.response_docs import (
VECTOR_SEARCH_RESPONSES,
OBJECTID_VALIDATION_RESPONSES,
SEARCH_ENDPOINT_RESPONSES,
FASTAPI_400_MISSING_SEARCH_PARAMS,
DATABASE_OPERATION_RESPONSES,
CRUD_OPERATION_RESPONSES,
CRUD_WITH_OBJECTID_RESPONSES
)
from src.utils.exceptions import VoyageAuthError, VoyageAPIError
from bson import ObjectId, errors
import re
from bson.errors import InvalidId
import voyageai
import voyageai.error as voyage_error
import os
'''
This file contains all the business logic for movie operations.
Each method demonstrates different MongoDB operations using the PyMongo driver.
The /search and /vector-search endpoints are at the top of the file because they must be
before the /{id} endpoint to avoid route conflicts where the /search and /vector-search
endpoints match the /{id} pattern rather than the intended paths.
Implemented Endpoints:
- GET /api/movies/search :
Search movies using MongoDB Search across the plot, fullplot, directors, writers, and cast fields.
Supports compound search operators and fuzzy matching.
- GET /api/movies/vector-search :
Search movies using MongoDB Vector Search to enable semantic search capabilities over
the plot field.
- GET /api/movies/genres :
Retrieve all distinct genre values from the movies collection.
Demonstrates the distinct() operation.
- GET /api/movies/{id} :
Retrieve a single movie by its ID.
- GET /api/movies/ :
Retrieve a list of movies with optional filtering, sorting, and pagination.
Supports text search, genre, year, rating filters, and customizable sorting.
- POST /api/movies/ :
Create a new movie.
- POST /api/movies/batch :
Create multiple movies in a single request.
- PATCH /api/movies/{movie_id} :
Update a single movie by its ID.
- PATCH /api/movies/ :
Batch update movies matching the given filter.
- DELETE /api/movies/{id} :
Delete a single movie by its ID.
- DELETE /api/movies/ :
Delete multiple movies matching the given filter.
- DELETE /api/movies/{id}/find-and-delete :
Find and delete a movie in a single atomic operation.
- GET /api/movies/aggregations/reportingByComments :
Aggregate movies with their most recent comments using MongoDB $lookup aggregation.
- GET /api/movies/aggregations/reportingByYear :
Aggregate movies by year with average rating and movie count.
- GET /api/movies/aggregations/reportingByDirectors :
Aggregate directors with the most movies and their statistics.
Helper Functions:
- execute_aggregation(pipeline): Executes a MongoDB aggregation pipeline and returns the
results.
- execute_aggregation_on_collection(collection, pipeline): Executes a MongoDB aggregation pipeline on a specific collection and returns the results.
- get_embedding(data, input_type): Creates the vector embedding for a given input using the specified input type.
'''
router = APIRouter()
#----------------------------------------------------------------------------------------------------------
# MongoDB Search
#
# MongoDB Search based on searching the plot, fullplot, directors, writers, and cast fields.
# This fuzzy operator is being used to allow for some misspellings in the search terms
# but that allows for very generous matching. This can be adjusted as needed.
#----------------------------------------------------------------------------------------------------------
"""
GET /api/movies/search
Search movies using MongoDB Search across the plot, fullplot, directors, writers, and cast fields.
You can combine multiple fields in a single query, and control how they are combined using the `search_operator` parameter.
Query Parameters:
plot (str, optional): Text to search against the plot field.
fullplot (str, optional): Text to search against the fullplot field.
directors (str, optional): Text to search against the directors field.
writers (str, optional): Text to search against the writers field.
cast (str, optional): Text to search against the cast field.
limit (int, optional): Number of results to return (default: 20)
skip (int, optional): Number of results to skip for pagination (default: 0)
search_operator (str, optional): How to combine multiple search fields.
Must be one of "must", "should", "mustNot", or "filter". Default is "must".
Returns:
SuccessResponse[SearchMoviesResponse]: A response object containing the list of matching movies and total count.
"""
@router.get(
"/search",
response_model=SuccessResponse[SearchMoviesResponse],
status_code = 200,
summary="Search movies using MongoDB Search.",
responses={
**SEARCH_ENDPOINT_RESPONSES,
400: FASTAPI_400_MISSING_SEARCH_PARAMS
}
)
async def search_movies(
plot: Optional[str] = None,
fullplot: Optional[str] = None,
directors: Optional[str] = None,
writers: Optional[str] = None,
cast: Optional[str] = None,
limit:int = Query(default=20, ge=1, le=100),
skip:int = Query(default=0, ge=0),
search_operator: str = Query(default="must", alias="searchOperator")
) -> SuccessResponse[SearchMoviesResponse]:
search_phrases = []
# Validate the search_operator parameter to ensure it's a valid compound operator
valid_operators = {"must", "should", "mustNot", "filter"}
if search_operator not in valid_operators:
raise HTTPException(
status_code = 400,
detail=f"Invalid search operator '{search_operator}'. The search operator must be one of {valid_operators}."
)
# Build the search_phrases list based on which fields were provided by the user.
# Each phrase becomes a separate clause in the MongoDB Search compound query.
if plot is not None:
search_phrases.append({
# The phrase operator performs an exact phrase match on the specified field. This is useful for searching for specific phrases within text fields.
# The text operator is more flexible and allows for fuzzy matching, making it suitable for fields like names where typos may occur.
"phrase": {
"query": plot,
"path": "plot",
}
})
if fullplot is not None:
search_phrases.append({
"phrase": {
"query": fullplot,
"path": "fullplot",
}
})
if directors is not None:
# Use compound operator with "should" clauses to create a scoring hierarchy:
# 1. phrase match (highest score) - exact phrase in same array element
# 2. text match without fuzzy (high score) - all terms present, exact spelling
# 3. text match with fuzzy (lower score) - typo-tolerant fallback; update fuzzy settings as needed
# For more details, see: https://www.mongodb.com/docs/atlas/atlas-search/operators-collectors/text/
search_phrases.append({
"compound": {
"should": [
# Highest score: exact phrase match
{"phrase": {"query": directors, "path": "directors"}},
# High score: exact text match (all terms, no fuzzy)
{"text": {"query": directors, "path": "directors", "matchCriteria": "all"}},
# Lower score: fuzzy match (typo tolerance)
{"text": {"query": directors, "path": "directors", "matchCriteria": "all",
"fuzzy": {"maxEdits": 1, "prefixLength": 2}}} # Allow up to 1 edit, require first 2 characters to match
],
"minimumShouldMatch": 1
}
})
if writers is not None:
# See comments above regarding compound scoring hierarchy.
search_phrases.append({
"compound": {
"should": [
{"phrase": {"query": writers, "path": "writers"}},
{"text": {"query": writers, "path": "writers", "matchCriteria": "all"}},
{"text": {"query": writers, "path": "writers", "matchCriteria": "all",
"fuzzy": {"maxEdits": 1, "prefixLength": 2}}}
],
"minimumShouldMatch": 1
}
})
if cast is not None:
# See comments above regarding compound scoring hierarchy.
search_phrases.append({
"compound": {
"should": [
{"phrase": {"query": cast, "path": "cast"}},
{"text": {"query": cast, "path": "cast", "matchCriteria": "all"}},
{"text": {"query": cast, "path": "cast", "matchCriteria": "all",
"fuzzy": {"maxEdits": 1, "prefixLength": 2}}}
],
"minimumShouldMatch": 1
}
})
if not search_phrases:
raise HTTPException(
status_code = 400,
detail="At least one search parameter must be provided."
)
# Build the aggregation pipeline for MongoDB Search.
# The $search stage uses the specified compound operator (must, should, etc.)
aggregation_pipeline = [
{
"$search": {
"index": "movieSearchIndex",
"compound": {
search_operator: search_phrases
}
}
},
{
"$facet": {
"totalCount": [
{"$count": "count"}
],
"results": [
{"$skip": skip},
{"$limit": limit},
# Project only the fields needed in the response
{
"$project": {
"_id": 1,
"title": 1,
"year": 1,
"plot": 1,
"fullplot": 1,
"released":1,
"runtime": 1,
"poster": 1,
"genres": 1,
"directors": 1,
"writers": 1,
"cast": 1,
"countries": 1,
"languages": 1,
"rated": 1,
"awards": 1,
"imdb": 1,
}
}
]
}
}
]
# Execute the aggregation pipeline using the helper function
try:
results = await execute_aggregation(aggregation_pipeline)
except Exception as e:
raise HTTPException(
status_code = 500,
detail=f"An error occurred while performing the search: {str(e)}"
)
# Extract total count and movies from facet results with proper bounds checking
if not results or len(results) == 0:
return create_success_response(
SearchMoviesResponse(movies=[], totalCount=0),
"No movies found matching the search criteria."
)
facet_result = results[0]
# Safely extract total count
total_count_array = facet_result.get("totalCount", [])
total_count = total_count_array[0].get("count", 0) if total_count_array else 0
# Safely extract movies data
movies_data = facet_result.get("results", [])
# Convert ObjectId to string for each movie in the results
movies = []
for movie in movies_data:
movie["_id"] = str(movie["_id"])
movies.append(movie)
return create_success_response(
SearchMoviesResponse(movies=movies, totalCount=total_count),
f"Found {total_count} movies matching the search criteria."
)
#----------------------------------------------------------------------------------------------------------
# MongoDB Vector Search
#
# MongoDB Vector Search based on searching the plot_embedding_voyage_3_large field.
#----------------------------------------------------------------------------------------------------------
"""
GET /api/movies/vector-search
Search movies using MongoDB Vector Search to find movies with similar plots.
Uses embeddings generated by the Voyage AI model to perform semantic similarity search.
Query Parameters:
q (str, required): Search query text to find movies with similar plots.
limit (int, optional): Number of results to return (default: 10, max: 50).
Returns:
SuccessResponse[List[VectorSearchResult]]: A response object containing movies with similarity scores.
Each result includes:
- _id: Movie ObjectId
- title: Movie title
- plot: Movie plot text
- score: Vector search similarity score (0.0 to 1.0, higher = more similar)
"""
# Specify your Voyage AI embedding model
model = "voyage-3-large"
outputDimension = 2048 #Set to 2048 to match the dimensions of the collection's embeddings
# Vector Search Endpoint
@router.get(
"/vector-search",
response_model=SuccessResponse[List[VectorSearchResult]],
responses=VECTOR_SEARCH_RESPONSES
)
async def vector_search_movies(
q: str = Query(..., description="Search query to find similar movies by plot"),
limit: int = Query(default=10, ge=1, le=50, description="Number of results to return")
):
"""
Vector search endpoint for finding movies with similar plots.
Args:
q: The search query string
limit: Maximum number of results to return
Returns:
SuccessResponse containing a list of movies with similarity scores
"""
# Check if Voyage AI API key is configured
if not voyage_ai_available():
return JSONResponse(
status_code=400,
content=create_error_response(
message="Vector search unavailable: VOYAGE_API_KEY not configured. Please add your API key to the .env file",
code="SERVICE_UNAVAILABLE"
)
)
try:
# The vector search index was already created at startup time
# Generate embedding for the search query (client is created inside get_embedding)
query_embedding = get_embedding(q, input_type="query")
# Get the embedded movies collection
embedded_movies_collection = get_collection("embedded_movies")
# Define vector search pipeline
pipeline = [
{
"$vectorSearch": {
"index": "vector_index",
"path": "plot_embedding_voyage_3_large",
"queryVector": query_embedding, #2048
"numCandidates": limit * 20, # We recommend searching 20 times higher than the limit to improve result relevance
"limit": limit
}
},
{
"$project": {
"_id": 1,
"title": 1,
"plot": 1,
"poster": 1,
"year": {
"$cond": {
"if": {
"$and": [
{"$ne": ["$year", None]},
{"$eq": [{"$type": "$year"}, "int"]}
]
},
"then": "$year",
"else": None
}
},
"genres": 1,
"directors": 1,
"cast": 1,
"score": {
"$meta": "vectorSearchScore"
}
}
}
]
raw_results = await execute_aggregation_on_collection(embedded_movies_collection, pipeline)
# Convert ObjectId to string and create VectorSearchResult objects
for result in raw_results:
if "_id" in result and result["_id"]:
try:
result["_id"] = str(result["_id"])
except (InvalidId, TypeError):
# Handle invalid ObjectId conversion
result["_id"] = str(result["_id"]) if result["_id"] else None
# This code converts the raw results into VectorSearchResult objects
results = [VectorSearchResult(**doc) for doc in raw_results]
return create_success_response(
results,
f"Found {len(results)} similar movies for query: '{q}'"
)
except VoyageAuthError:
# Re-raise custom exceptions to be handled by the exception handlers
raise
except VoyageAPIError:
# Re-raise custom exceptions to be handled by the exception handlers
raise
except Exception as e:
# Log the error for debugging
print(f"Vector search error: {str(e)}")
# Handle generic errors
raise HTTPException(
status_code=500,
detail=f"Error performing vector search: {str(e)}"
)
"""
GET /api/movies/genres
Retrieve all distinct genre values from the movies collection.
Demonstrates the distinct() operation.
Returns:
SuccessResponse[List[str]]: A response object containing the list of unique genres, sorted alphabetically.
"""
@router.get(
"/genres",
response_model=SuccessResponse[List[str]],
status_code=200,
summary="Retrieve all distinct genres from the movies collection.",
responses=DATABASE_OPERATION_RESPONSES
)
async def get_distinct_genres():
movies_collection = get_collection("movies")
try:
# Use distinct() to get all unique values from the genres array field
# MongoDB automatically flattens array fields when using distinct()
genres = await movies_collection.distinct("genres")
except Exception as e:
raise HTTPException(
status_code=500,
detail=f"Database error occurred: {str(e)}"
)
# Filter out null/empty values and sort alphabetically
valid_genres = sorted([
genre for genre in genres
if isinstance(genre, str) and len(genre) > 0
])
return create_success_response(valid_genres, f"Found {len(valid_genres)} distinct genres")
"""
GET /api/movies/{id}
Retrieve a single movie by its ID.
Path Parameters:
id (str): The ObjectId of the movie to retrieve.
Returns:
SuccessResponse[Movie]: A response object containing the movie data.
"""
@router.get(
"/{id}",
response_model=SuccessResponse[Movie],
status_code = 200,
summary="Retrieve a single movie by its ID.",
responses=OBJECTID_VALIDATION_RESPONSES
)
async def get_movie_by_id(id: str):
# Validate ObjectId format
try:
object_id = ObjectId(id)
except errors.InvalidId:
raise HTTPException(
status_code = 400,
detail=f"The provided ID '{id}' is not a valid ObjectId"
)
movies_collection = get_collection("movies")
try:
movie = await movies_collection.find_one({"_id": object_id})
except Exception as e:
raise HTTPException(
status_code = 500,
detail=f"Database error occurred: {str(e)}"
)
if movie is None:
raise HTTPException(
status_code = 404,
detail=f"No movie found with ID: {id}"
)
movie["_id"] = str(movie["_id"]) # Convert ObjectId to string
return create_success_response(movie, "Movie retrieved successfully")
"""
GET /api/movies/
Retrieve a list of movies with optional filtering, sorting, and pagination.
Query Parameters:
q (str, optional): Text search query (searches title, plot, fullplot).
genre (str, optional): Filter by genre.
year (int, optional): Filter by year.
min_rating (float, optional): Minimum IMDB rating.
max_rating (float, optional): Maximum IMDB rating.
limitNum (int, optional): Number of results to return (default: 20, max: 100).
skipNum (int, optional): Number of documents to skip for pagination (default: 0).
sortBy (str, optional): Field to sort by (default: "title").
sort_order (str, optional): Sort direction, "asc" or "desc" (default: "asc").
Returns:
SuccessResponse[List[Movie]]: A response object containing the list of movies and metadata.
"""
@router.get(
"/",
response_model=SuccessResponse[List[Movie]],
status_code = 200,
summary="Retrieve a list of movies with optional filtering, sorting, and pagination.",
responses=DATABASE_OPERATION_RESPONSES
)
# Validate the query parameters using FastAPI's Query functionality.
async def get_all_movies(
q:str = Query(default=None),
title: str = Query(default=None),
genre:str = Query(default=None),
year:int = Query(default=None),
min_rating:float = Query(default=None, alias="minRating"),
max_rating:float = Query(default=None, alias="maxRating"),
limit:int = Query(default=20, ge=1, le=100),
skip:int = Query(default=0, ge=0),
sort_by:str = Query(default="title", alias="sortBy"),
sort_order:str = Query(default="asc", alias="sortOrder")
):
movies_collection = get_collection("movies")
filter_dict = {}
if q:
filter_dict["$text"] = {"$search": q}
if title:
filter_dict["title"] = {"$regex": title, "$options": "i"}
if genre:
filter_dict["genres"] = {"$regex": genre, "$options": "i"}
if isinstance(year, int):
filter_dict["year"] = year
if min_rating is not None or max_rating is not None:
rating_filter = {}
if min_rating is not None:
rating_filter["$gte"] = min_rating
if max_rating is not None:
rating_filter["$lte"] = max_rating
filter_dict["imdb.rating"] = rating_filter
# Building the sort object based on user input
sort_order = -1 if sort_order == "desc" else 1
sort = [(sort_by, sort_order)]
# Query the database with the constructed filter, sort, skip, and limit.
try:
result = movies_collection.find(filter_dict).sort(sort).skip(skip).limit(limit)
except Exception as e:
raise HTTPException(
status_code = 500,
detail=f"An error occurred while fetching movies. {str(e)}"
)
movies = []
async for movie in result:
if "title" in movie:
movie["_id"] = str(movie["_id"]) # Convert ObjectId to string
# Ensure that the year field contains int value.
if "year" in movie and not isinstance(movie["year"], int):
cleaned_year = re.sub(r"\D", "", str(movie["year"]))
try:
movie["year"] = int(cleaned_year) if cleaned_year else None
except ValueError:
movie["year"] = None
movies.append(movie)
# Return the results wrapped in a SuccessResponse
message = f"Found {len(movies)} movies."
return create_success_response(movies, message)
"""
POST /api/movies/
Create a new movie.
Request Body:
movie (CreateMovieRequest): A movie object containing the movie data.
See CreateMovieRequest model for available fields.
Returns:
SuccessResponse[Movie]: A response object containing the created movie data.
"""
@router.post(
"/",
response_model=SuccessResponse[Movie],
status_code = 201,
summary="Creates a new movie in the database.",
responses=CRUD_OPERATION_RESPONSES
)
async def create_movie(movie: CreateMovieRequest):
# Pydantic automatically validates the structure
movie_data = movie.model_dump(by_alias=True, exclude_none=True)
movies_collection = get_collection("movies")
try:
result = await movies_collection.insert_one(movie_data)
except Exception as e:
raise HTTPException(
status_code = 500,
detail=f"Database error occurred: {str(e)}"
)
# Verify that the document was created before querying it
if not result.acknowledged:
raise HTTPException(
status_code = 500,
detail="Failed to create movie: The database did not acknowledge the insert operation"
)
try:
# Retrieve the created document to return complete data
created_movie = await movies_collection.find_one({"_id": result.inserted_id})
except Exception as e:
raise HTTPException(
status_code = 500,
detail=f"Database error occurred: {str(e)}"
)
if created_movie is None:
raise HTTPException(
status_code = 500,
detail="Movie was created but could not be retrieved for verification"
)
created_movie["_id"] = str(created_movie["_id"]) # Convert ObjectId to string
return create_success_response(created_movie, f"Movie '{movie_data['title']}' created successfully")
"""
POST /api/movies/batch
Create multiple movies in a single request.
Request Body:
movies (List[CreateMovieRequest]): A list of movie objects to insert. Each object should include:
- title (str): The movie title.
- year (int, optional): The release year.
- plot (str, optional): Short plot summary.
- fullplot (str, optional): Full plot summary.
- genres (List[str], optional): List of genres.
- directors (List[str], optional): List of directors.
- writers (List[str], optional): List of writers.
- cast (List[str], optional): List of cast members.
- countries (List[str], optional): List of countries.
- languages (List[str], optional): List of languages.
- rated (str, optional): Movie rating.
- runtime (int, optional): Runtime in minutes.
- poster (str, optional): Poster URL.
Returns:
SuccessResponse: A response object containing the number of inserted movies and their IDs.
"""
@router.post(
"/batch",
response_model=SuccessResponse[dict],
status_code = 201,
summary = "Create multiple movies in a single request.",
responses=CRUD_OPERATION_RESPONSES
)
async def create_movies_batch(movies: List[CreateMovieRequest]) ->SuccessResponse[dict]:
movies_collection = get_collection("movies")
#Verify that the movies list is not empty
if not movies:
raise HTTPException(
status_code = 400,
detail="Request body must be a non-empty list of movies."
)
movies_dicts = []
for movie in movies:
movie_dict = movie.model_dump(exclude_unset=True, exclude_none=True)
# Remove _id if it exists to let MongoDB generate it automatically
movie_dict.pop('_id', None)
movies_dicts.append(movie_dict)
try:
result = await movies_collection.insert_many(movies_dicts)
return create_success_response({
"insertedCount": len(result.inserted_ids),
"insertedIds": [str(_id) for _id in result.inserted_ids]
},
f"Successfully created {len(result.inserted_ids)} movies."
)
except Exception as e:
raise HTTPException(
status_code = 500,
detail=f"Database error occurred: {str(e)}"
)
"""
PATCH /api/movies/{id}
Update a single movie by its ID.
Path Parameters:
id (str): The ObjectId of the movie to update
Request Body:
move_data (UpdateMovieRequest): Fields and values to update. Only provided fields will be updated.
Returns:
SuccessResponse: The updated movie document, the number of fields modified and a success message.
"""
@router.patch(
"/{id}",
response_model=SuccessResponse[Movie],
status_code = 200,
summary="Update a single movie by its ID.",
responses=CRUD_WITH_OBJECTID_RESPONSES
)
async def update_movie(
movie_data: UpdateMovieRequest,
movie_id: str = Path(..., alias="id")
) -> SuccessResponse[Movie]:
movies_collection = get_collection("movies")
# Validate the ObjectId
try:
movie_id = ObjectId(movie_id)
except Exception :
raise HTTPException(
status_code = 400,
detail=f"Invalid movie_id format: {movie_id}"
)
update_dict = movie_data.model_dump(exclude_unset=True, exclude_none=True)
# Validate that the dict is not empty
if not update_dict:
raise HTTPException(
status_code = 400,
detail="No valid fields provided for update."
)
try:
result = await movies_collection.update_one(
{"_id": movie_id},
{"$set":update_dict}
)
except Exception as e:
raise HTTPException(
status_code = 500,
detail=f"An error occurred while updating the movie: {str(e)}"
)
if result.matched_count == 0:
raise HTTPException(
status_code = 404,
detail=f"No movie with that _id was found: {movie_id}"
)
updatedMovie = await movies_collection.find_one({"_id": movie_id})
updatedMovie["_id"] = str(updatedMovie["_id"])
return create_success_response(updatedMovie, f"Movie updated successfully. Modified {len(update_dict)} fields.")
"""
PATCH /api/movies
Batch update movies matching the given filter
Request Body:
filter (MoviesUpdateFilter): Criteria to select which movies to update. Only movies matching this filter will be updated.
update (UpdateMovieRequest): Fields and values to update for the matched movies. Only provided fields will be updated.
Returns:
SuccessResponse: A response object containing the number of matched and modified movies and a success message.
"""
@router.patch(
"/",
response_model=SuccessResponse[dict],
status_code = 200,
summary="Batch update movies matching the given filter.",
responses=CRUD_OPERATION_RESPONSES
)
async def update_movies_batch(
request_body: dict = Body(...)
) -> SuccessResponse[dict]:
movies_collection = get_collection("movies")
# Extract filter and update from the request body
filter_data = request_body.get("filter", {})
update_data = request_body.get("update", {})
if not filter_data or not update_data:
raise HTTPException(
status_code = 400,
detail="Both filter and update objects are required"
)
# Convert string IDs to ObjectIds if _id filter is present
if "_id" in filter_data and isinstance(filter_data["_id"], dict):
if "$in" in filter_data["_id"]:
# Convert list of string IDs to ObjectIds
try:
filter_data["_id"]["$in"] = [ObjectId(id_str) for id_str in filter_data["_id"]["$in"]]
except Exception:
raise HTTPException(
status_code = 400,
detail="Invalid ObjectId format in filter",
)
try:
result = await movies_collection.update_many(filter_data, {"$set": update_data})
except Exception as e:
raise HTTPException(
status_code = 500,
detail=f"An error occurred while updating movies: {str(e)}"
)
return create_success_response({
"matchedCount": result.matched_count,
"modifiedCount": result.modified_count
},
f"Update operation completed. Matched {result.matched_count} movie(s), modified {result.modified_count} movie(s)."
)
"""
DELETE /api/movies/{id}
Delete a single movie by its ID.
Path Parameters:
id (str): The ObjectId of the movie to delete.
Returns:
SuccessResponse[dict]: A response object containing deletion details.
"""
@router.delete(
"/{id}",
response_model=SuccessResponse[dict],
status_code = 200,
summary="Delete a single movie by its ID.",
responses=OBJECTID_VALIDATION_RESPONSES
)
async def delete_movie_by_id(id: str):
try:
object_id = ObjectId(id)
except errors.InvalidId:
raise HTTPException(
status_code = 400,
detail=f"Invalid movie ID format: The provided ID '{id}' is not a valid ObjectId"
)
movies_collection = get_collection("movies")
try:
# Use deleteOne() to remove a single document
result = await movies_collection.delete_one({"_id": object_id})
except Exception as e:
raise HTTPException(
status_code = 500,
detail=f"Database error occurred: {str(e)}"
)
if result.deleted_count == 0:
raise HTTPException(
status_code = 404,
detail=f"No movie found with ID: {id}"
)
return create_success_response(
{"deletedCount": result.deleted_count},
"Movie deleted successfully"
)
"""
DELETE /api/movies/
Delete multiple movies matching the given filter.
Request Body:
movie_filter (MovieFilter): Criteria to select which movies to delete. Only movies matching this filter will be removed.
Returns:
SuccessResponse: An object containing the number of deleted movies and a success message.
"""
@router.delete(
"/",
response_model=SuccessResponse[dict],
status_code = 200,
summary="Delete multiple movies matching the given filter.",
responses=CRUD_OPERATION_RESPONSES
)
async def delete_movies_batch(request_body: dict = Body(...)) -> SuccessResponse[dict]:
movies_collection = get_collection("movies")
# Extract filter from the request body
filter_data = request_body.get("filter", {})
if not filter_data:
raise HTTPException(
status_code = 400,
detail="Filter object is required and cannot be empty."
)
# Convert string IDs to ObjectIds if _id filter is present
if "_id" in filter_data and isinstance(filter_data["_id"], dict):
if "$in" in filter_data["_id"]:
# Convert list of string IDs to ObjectIds
try:
filter_data["_id"]["$in"] = [ObjectId(id_str) for id_str in filter_data["_id"]["$in"]]
except Exception:
raise HTTPException(
status_code = 400,
detail="Invalid ObjectId format in filter."
)
try:
result = await movies_collection.delete_many(filter_data)
except Exception as e:
raise HTTPException(
status_code = 500,
detail=f"An error occurred while deleting movies: {str(e)}"
)
return create_success_response(
{"deletedCount":result.deleted_count},
f'Delete operation completed. Removed {result.deleted_count} movies.'
)
"""
DELETE /api/movies/{id}/find-and-delete
Finds and deletes a movie in a single atomic operation.
Demonstrates the findOneAndDelete() operation.
Path Parameters:
id (str): The ObjectId of the movie to find and delete.
Returns:
SuccessResponse[Movie]: A response object containing the deleted movie data.
"""
@router.delete(
"/{id}/find-and-delete",
response_model=SuccessResponse[Movie],
status_code = 200,
summary="Find and delete a movie in a single operation.",
responses=OBJECTID_VALIDATION_RESPONSES
)
async def find_and_delete_movie(id: str):
try:
object_id = ObjectId(id)
except errors.InvalidId: