-
Notifications
You must be signed in to change notification settings - Fork 118
Expand file tree
/
Copy pathhelpers.py
More file actions
168 lines (149 loc) · 5.81 KB
/
helpers.py
File metadata and controls
168 lines (149 loc) · 5.81 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
# Copyright 2024-2025 New Vector Ltd
#
# SPDX-License-Identifier: AGPL-3.0-only
import asyncio
import time
import pyhelm3
from lightkube import AsyncClient
from lightkube.models.core_v1 import (
Capabilities,
Container,
PodSpec,
SeccompProfile,
SecurityContext,
)
from lightkube.models.meta_v1 import ObjectMeta
from lightkube.resources.core_v1 import ConfigMap, Endpoints, Namespace, Pod, Secret
from ..artifacts import CertKey
from .utils import merge
def namespace(name: str) -> Namespace:
return Namespace(metadata=ObjectMeta(name=name))
def kubernetes_docker_secret(name: str, namespace: str, docker_config_json: str) -> Secret:
secret = Secret(
type="kubernetes.io/dockerconfigjson",
metadata=ObjectMeta(name=name, namespace=namespace, labels={"app.kubernetes.io/managed-by": "pytest"}),
stringData={".dockerconfigjson": docker_config_json},
)
return secret
def kubernetes_tls_secret(name: str, namespace: str, certificate: CertKey) -> Secret:
secret = Secret(
type="kubernetes.io/tls",
metadata=ObjectMeta(name=name, namespace=namespace, labels={"app.kubernetes.io/managed-by": "pytest"}),
stringData={
"tls.crt": certificate.cert_bundle_as_pem(),
"tls.key": certificate.key_as_pem(),
"ca.crt": certificate.get_root_ca().cert_as_pem(),
},
)
return secret
async def wait_for_endpoint_ready(name, namespace, cluster, kube_client):
await asyncio.to_thread(
cluster.wait,
name=f"endpoints/{name}",
namespace=namespace,
waitfor="jsonpath='{.subsets[].addresses}'",
)
# We wait maximum 30 seconds for the endpoints to be ready
start_time = time.time()
while time.time() - start_time < 30:
endpoint = await kube_client.get(Endpoints, name=name, namespace=namespace)
for subset in endpoint.subsets:
if not subset or subset.notReadyAddresses or not subset.addresses or not subset.ports:
await asyncio.sleep(0.1)
break
else:
break
return endpoint
async def deploy_with_values_patch(
generated_data, helm_client: pyhelm3.Client, values_patch: dict, timeout="600s"
) -> tuple[pyhelm3.ReleaseRevision, pyhelm3.Error | None]:
# Get the current deployed values to patch them
revision = await helm_client.get_current_revision(
generated_data.release_name, namespace=generated_data.ess_namespace
)
values = await revision.values()
values = merge(values, values_patch)
chart = await helm_client.get_chart("charts/matrix-stack")
# Install or upgrade a release
error = None
try:
revision = await helm_client.install_or_upgrade_release(
generated_data.release_name,
chart,
values,
namespace=generated_data.ess_namespace,
timeout=timeout,
atomic=False,
wait=True,
)
except pyhelm3.errors.Error as e:
error = e
revision = await helm_client.get_current_revision(
generated_data.release_name, namespace=generated_data.ess_namespace
)
return revision, error
async def get_deployment_marker(kube_client, generated_data, marker: str):
# The marker should now show delegated_auth
configmap = await kube_client.get(
ConfigMap,
namespace=generated_data.ess_namespace,
name=f"{generated_data.release_name}-markers",
)
return configmap.data.get(marker)
async def run_pod_with_args(kube_client: AsyncClient, namespace, image_name, pod_name, args):
pod = Pod(
metadata=ObjectMeta(name=pod_name + "-" + str(int(time.time() * 1000)), namespace=namespace),
spec=PodSpec(
restartPolicy="Never",
containers=[
Container(
name="cmd",
image=image_name,
args=args,
securityContext=SecurityContext(
seccompProfile=SeccompProfile(type="RuntimeDefault"),
capabilities=Capabilities(drop=["ALL"]),
readOnlyRootFilesystem=True,
allowPrivilegeEscalation=False,
runAsNonRoot=True,
runAsUser=3000,
runAsGroup=3000,
),
)
],
),
)
assert pod.metadata
assert pod.metadata.name
assert pod.metadata.namespace
try:
await kube_client.create(pod)
start_time = time.time()
now = time.time()
completed = False
while start_time + 30 > now and not completed:
found_pod = await kube_client.get(Pod, name=pod.metadata.name, namespace=pod.metadata.namespace)
if (
found_pod.status
and found_pod.status.containerStatuses
and found_pod.status.containerStatuses[0].lastState
and found_pod.status.containerStatuses[0].lastState.terminated
and found_pod.status.containerStatuses[0].lastState.terminated.reason == "Completed"
):
completed = True
else:
now = time.time()
await asyncio.sleep(1)
else:
if start_time + 30 > now:
raise RuntimeError(
f"Pod {pod.metadata.name} did not start in time "
f"(failed after {time.time() - now} seconds), "
f"pod status: {found_pod.status}"
)
log_lines = ""
async for log_line in kube_client.log(pod.metadata.name, namespace=pod.metadata.namespace, container="cmd"):
log_lines += log_line
return log_lines
finally:
await kube_client.delete(Pod, name=pod.metadata.name, namespace=namespace)