Skip to content

Question about custom event handling #253

@jrobbins-LiveData

Description

@jrobbins-LiveData

The Mangum doc shows this example of how one might handle a custom event:

def handler(event, context):
    if event.get("some-key"):
        # Do something or return, etc.
        return

    asgi_handler = Mangum(app)
    response = asgi_handler(event, context) # Call the instance with the event arguments

    return response

I need to handle an incoming AWS EventBridge event. I want to invoke the same method that my HTTP API handler is also invoking -- and it is an async method.

Here's what I am using

def handler(event: LambdaEvent, context: LambdaContext) -> dict:
    if "requestContext" in event:
        logger.debug("HTTP API")
        response = fastapi_handler(event, context)
    else:
        logger.debug("Not HTTP API")
        loop = asyncio.get_event_loop()
        response = loop.run_until_complete(app_logic())
    return response

My question(s): Is this the correct pattern to use? In the Mangum source, I see code like this, so I am trying to fit in. My code seems to work, but I read in the Python docs that get_event_loop() is

Deprecated since version 3.10: Deprecation warning is emitted if there is no running event loop. In future Python releases, this function will be an alias of get_running_loop().

Which made me wonder if I were doing this correctly?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions