forked from mongodb/docs-sample-apps
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy patherrorHandler.py
More file actions
109 lines (91 loc) · 3.37 KB
/
errorHandler.py
File metadata and controls
109 lines (91 loc) · 3.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from fastapi import Request
from fastapi.responses import JSONResponse
from pymongo.errors import PyMongoError, DuplicateKeyError, WriteError
from datetime import datetime, timezone
from typing import Any, Optional
from src.models.models import ErrorDetails, ErrorResponse, SuccessResponse, T
'''
Creates a standardized success response.
Args:
data (T): The data to include in the response.
message (Optional[str]): An optional message to include.
Returns:
SuccessResponse[T]: A standardized success response object.
'''
def create_success_response(data:T, message: Optional[str] = None) -> SuccessResponse[T]:
return SuccessResponse(
message=message or "Operation completed successfully.",
data=data,
timestamp=datetime.now(timezone.utc).isoformat() + "Z",
)
'''
Creates a standardized error response.
Args:
message (str): The error message.
code (Optional[str]): An optional error code.
details (Optional[Any]): Additional error details.
Returns:
ErrorResponse: A standardized error response object.
'''
def create_error_response(message: str, code: Optional[str]=None, details: Optional[Any]=None) -> ErrorResponse:
return ErrorResponse(
message=message,
error=ErrorDetails(
message=message,
code=code,
details=details
),
timestamp=datetime.now(timezone.utc).isoformat() + "Z",
)
def parse_mongo_exception(exc: Exception) -> dict:
if isinstance(exc, DuplicateKeyError):
return{
"message": "Duplicate key error occurred.",
"code": "DUPLICATE_KEY_ERROR",
"details": "A document with the same key already exists.",
"statusCode":409
}
# This is stating that the data that you are trying to implement is the wrong shape
# for the schema implemented in MongoDB.
elif isinstance(exc, WriteError):
return{
"message": "Document validation failed.",
"code": "WRITE_ERROR",
"details": str(exc),
"statusCode":400
}
elif isinstance(exc, PyMongoError):
return {
"message" : "A database error occurred.",
"code": "DATABASE_ERROR",
"details": str(exc),
"statusCode":500
}
return {
"message": "An unknown error occurred.",
"code": "UNKNOWN_ERROR",
"details": str(exc),
"statusCode": 500
}
def register_error_handlers(app):
@app.exception_handler(PyMongoError)
async def mongo_exception_handler(request: Request, exc: PyMongoError):
error_details = parse_mongo_exception(exc)
return JSONResponse(
status_code = error_details["statusCode"],
content=create_error_response(
message=error_details["message"],
code=error_details["code"],
details=error_details["details"]
).model_dump()
)
@app.exception_handler(Exception)
async def generic_exception_handler(request: Request, exc: Exception):
return JSONResponse(
status_code=500,
content=create_error_response(
message=str(exc),
code="INTERNAL_SERVER_ERROR",
details=getattr(exc, 'detail', None) or getattr(exc, 'args', None)
).model_dump()
)