-
Notifications
You must be signed in to change notification settings - Fork 1.1k
Expand file tree
/
Copy pathbuild_coastline.py
More file actions
76 lines (63 loc) · 2.28 KB
/
Copy pathbuild_coastline.py
File metadata and controls
76 lines (63 loc) · 2.28 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
import logging
import os
import shutil
from datetime import timedelta
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.utils.dates import days_ago
from airmaps.instruments import settings
from airmaps.instruments import storage
from airmaps.instruments.utils import get_latest_filename
from airmaps.instruments.utils import put_current_date_in_filename
from airmaps.instruments.utils import rm_build
from maps_generator.generator import stages_declaration as sd
from maps_generator.generator.env import Env
from maps_generator.generator.env import WORLD_COASTS_NAME
from maps_generator.maps_generator import run_generation
logger = logging.getLogger("airmaps")
DAG = DAG(
"Build_coastline",
schedule_interval=timedelta(days=1),
default_args={
"owner": "MAPS.ME",
"depends_on_past": True,
"start_date": days_ago(0),
"email": settings.EMAILS,
"email_on_failure": True,
"email_on_retry": False,
"retries": 0,
"retry_delay": timedelta(minutes=5),
"priority_weight": 1,
},
)
COASTLINE_STORAGE_PATH = f"{settings.STORAGE_PREFIX}/coasts"
def publish_coastline(env):
for name in (f"{WORLD_COASTS_NAME}.geom", f"{WORLD_COASTS_NAME}.rawgeom"):
coastline = put_current_date_in_filename(name)
latest = get_latest_filename(name)
coastline_full = os.path.join(env.paths.coastline_path, coastline)
latest_full = os.path.join(env.paths.coastline_path, latest)
shutil.move(os.path.join(env.paths.coastline_path, name), coastline_full)
os.symlink(coastline, latest_full)
storage.wd_publish(coastline_full, f"{COASTLINE_STORAGE_PATH}/{coastline}")
storage.wd_publish(latest_full, f"{COASTLINE_STORAGE_PATH}/{latest}")
def build_coastline(**kwargs):
env = Env()
kwargs["ti"].xcom_push(key="build_name", value=env.build_name)
run_generation(
env,
(
sd.StageDownloadAndConvertPlanet(),
sd.StageCoastline(use_old_if_fail=False),
sd.StageCleanup(),
),
)
env.finish()
publish_coastline(env)
rm_build(env)
BUILD_COASTLINE_TASK = PythonOperator(
task_id="Build_coastline_task",
provide_context=True,
python_callable=build_coastline,
dag=DAG,
)