forked from pulp/pulpcore
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy path__init__.py
More file actions
105 lines (83 loc) · 3.67 KB
/
__init__.py
File metadata and controls
105 lines (83 loc) · 3.67 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
import asyncio
from contextlib import suppress
from importlib import import_module
import logging
import os
from asgiref.sync import sync_to_async
from aiohttp import web
import django
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "pulpcore.app.settings")
django.setup()
from django.conf import settings # noqa: E402: module level not at top of file
from django.db.utils import ( # noqa: E402: module level not at top of file
InterfaceError,
DatabaseError,
)
from pulpcore.app.apps import pulp_plugin_configs # noqa: E402: module level not at top of file
from pulpcore.app.models import ContentAppStatus # noqa: E402: module level not at top of file
from pulpcore.app.util import get_worker_name # noqa: E402: module level not at top of file
from .handler import Handler # noqa: E402: module level not at top of file
from .instrumentation import instrumentation # noqa: E402: module level not at top of file
from .authentication import authenticate, guid # noqa: E402: module level not at top of file
log = logging.getLogger(__name__)
if settings.OTEL_ENABLED:
app = web.Application(middlewares=[guid, authenticate, instrumentation()])
else:
app = web.Application(middlewares=[guid, authenticate])
CONTENT_MODULE_NAME = "content"
async def _heartbeat():
content_app_status = None
name = get_worker_name()
heartbeat_interval = settings.CONTENT_APP_TTL // 4
msg = "Content App '{name}' heartbeat written, sleeping for '{interarrival}' seconds".format(
name=name, interarrival=heartbeat_interval
)
fail_msg = (
"Content App '{name}' failed to write a heartbeat to the database, sleeping for "
"'{interarrival}' seconds."
).format(name=name, interarrival=heartbeat_interval)
versions = {app.label: app.version for app in pulp_plugin_configs()}
try:
while True:
try:
content_app_status, created = await ContentAppStatus.objects.aget_or_create(
name=name, defaults={"versions": versions}
)
if not created:
await sync_to_async(content_app_status.save_heartbeat)()
if content_app_status.versions != versions:
content_app_status.versions = versions
await content_app_status.asave(update_fields=["versions"])
log.debug(msg)
except (InterfaceError, DatabaseError):
await sync_to_async(Handler._reset_db_connection)()
log.info(fail_msg)
await asyncio.sleep(heartbeat_interval)
finally:
if content_app_status:
await content_app_status.adelete()
async def _heartbeat_ctx(app):
heartbeat_task = asyncio.create_task(_heartbeat())
yield
heartbeat_task.cancel()
try:
await heartbeat_task
except asyncio.CancelledError:
pass
async def server(*args, **kwargs):
os.chdir(settings.WORKING_DIRECTORY)
for pulp_plugin in pulp_plugin_configs():
if pulp_plugin.name != "pulpcore.app":
content_module_name = "{name}.{module}".format(
name=pulp_plugin.name, module=CONTENT_MODULE_NAME
)
with suppress(ModuleNotFoundError):
import_module(content_module_name)
path_prefix = settings.CONTENT_PATH_PREFIX
if settings.DOMAIN_ENABLED:
path_prefix = path_prefix + "{pulp_domain}/"
app.add_routes([web.get(path_prefix[:-1], Handler().list_distributions)])
app.add_routes([web.get(path_prefix, Handler().list_distributions)])
app.add_routes([web.get(path_prefix + "{path:.+}", Handler().stream_content)])
app.cleanup_ctx.append(_heartbeat_ctx)
return app