Skip to content
This repository was archived by the owner on Apr 26, 2024. It is now read-only.

Commit 358896e

Browse files
author
Mathieu Velten
authored
Implements a task scheduler for resumable potentially long running tasks (#15891)
1 parent 79c349d commit 358896e

10 files changed

Lines changed: 831 additions & 1 deletion

File tree

changelog.d/15891.feature

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Implements a task scheduler for resumable potentially long running tasks.

synapse/app/generic_worker.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@
9191
from synapse.storage.databases.main.stats import StatsStore
9292
from synapse.storage.databases.main.stream import StreamWorkerStore
9393
from synapse.storage.databases.main.tags import TagsWorkerStore
94+
from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore
9495
from synapse.storage.databases.main.transactions import TransactionWorkerStore
9596
from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore
9697
from synapse.storage.databases.main.user_directory import UserDirectoryStore
@@ -144,6 +145,7 @@ class GenericWorkerStore(
144145
TransactionWorkerStore,
145146
LockStore,
146147
SessionStore,
148+
TaskSchedulerWorkerStore,
147149
):
148150
# Properties that multiple storage classes define. Tell mypy what the
149151
# expected type is.

synapse/server.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,6 +142,7 @@
142142
from synapse.util.macaroons import MacaroonGenerator
143143
from synapse.util.ratelimitutils import FederationRateLimiter
144144
from synapse.util.stringutils import random_string
145+
from synapse.util.task_scheduler import TaskScheduler
145146

146147
logger = logging.getLogger(__name__)
147148

@@ -360,6 +361,7 @@ def setup_background_tasks(self) -> None:
360361
"""
361362
for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP:
362363
getattr(self, "get_" + i + "_handler")()
364+
self.get_task_scheduler()
363365

364366
def get_reactor(self) -> ISynapseReactor:
365367
"""
@@ -912,6 +914,9 @@ def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager:
912914
"""Usage metrics shared between phone home stats and the prometheus exporter."""
913915
return CommonUsageMetricsManager(self)
914916

915-
@cache_in_self
916917
def get_worker_locks_handler(self) -> WorkerLocksHandler:
917918
return WorkerLocksHandler(self)
919+
920+
@cache_in_self
921+
def get_task_scheduler(self) -> TaskScheduler:
922+
return TaskScheduler(self)

synapse/storage/databases/main/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@
7070
from .stats import StatsStore
7171
from .stream import StreamWorkerStore
7272
from .tags import TagsStore
73+
from .task_scheduler import TaskSchedulerWorkerStore
7374
from .transactions import TransactionWorkerStore
7475
from .ui_auth import UIAuthStore
7576
from .user_directory import UserDirectoryStore
@@ -127,6 +128,7 @@ class DataStore(
127128
CacheInvalidationWorkerStore,
128129
LockStore,
129130
SessionStore,
131+
TaskSchedulerWorkerStore,
130132
):
131133
def __init__(
132134
self,
Lines changed: 202 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
# Copyright 2023 The Matrix.org Foundation C.I.C.
2+
#
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
#
7+
# http://www.apache.org/licenses/LICENSE-2.0
8+
#
9+
# Unless required by applicable law or agreed to in writing, software
10+
# distributed under the License is distributed on an "AS IS" BASIS,
11+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
# See the License for the specific language governing permissions and
13+
# limitations under the License.
14+
15+
from typing import TYPE_CHECKING, Any, Dict, List, Optional
16+
17+
from synapse.storage._base import SQLBaseStore, db_to_json
18+
from synapse.storage.database import (
19+
DatabasePool,
20+
LoggingDatabaseConnection,
21+
LoggingTransaction,
22+
make_in_list_sql_clause,
23+
)
24+
from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus
25+
from synapse.util import json_encoder
26+
27+
if TYPE_CHECKING:
28+
from synapse.server import HomeServer
29+
30+
31+
class TaskSchedulerWorkerStore(SQLBaseStore):
32+
def __init__(
33+
self,
34+
database: DatabasePool,
35+
db_conn: LoggingDatabaseConnection,
36+
hs: "HomeServer",
37+
):
38+
super().__init__(database, db_conn, hs)
39+
40+
@staticmethod
41+
def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask:
42+
row["status"] = TaskStatus(row["status"])
43+
if row["params"] is not None:
44+
row["params"] = db_to_json(row["params"])
45+
if row["result"] is not None:
46+
row["result"] = db_to_json(row["result"])
47+
return ScheduledTask(**row)
48+
49+
async def get_scheduled_tasks(
50+
self,
51+
*,
52+
actions: Optional[List[str]] = None,
53+
resource_id: Optional[str] = None,
54+
statuses: Optional[List[TaskStatus]] = None,
55+
max_timestamp: Optional[int] = None,
56+
) -> List[ScheduledTask]:
57+
"""Get a list of scheduled tasks from the DB.
58+
59+
Args:
60+
actions: Limit the returned tasks to those specific action names
61+
resource_id: Limit the returned tasks to the specific resource id, if specified
62+
statuses: Limit the returned tasks to the specific statuses
63+
max_timestamp: Limit the returned tasks to the ones that have
64+
a timestamp inferior to the specified one
65+
66+
Returns: a list of `ScheduledTask`, ordered by increasing timestamps
67+
"""
68+
69+
def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]:
70+
clauses: List[str] = []
71+
args: List[Any] = []
72+
if resource_id:
73+
clauses.append("resource_id = ?")
74+
args.append(resource_id)
75+
if actions is not None:
76+
clause, temp_args = make_in_list_sql_clause(
77+
txn.database_engine, "action", actions
78+
)
79+
clauses.append(clause)
80+
args.extend(temp_args)
81+
if statuses is not None:
82+
clause, temp_args = make_in_list_sql_clause(
83+
txn.database_engine, "status", statuses
84+
)
85+
clauses.append(clause)
86+
args.extend(temp_args)
87+
if max_timestamp is not None:
88+
clauses.append("timestamp <= ?")
89+
args.append(max_timestamp)
90+
91+
sql = "SELECT * FROM scheduled_tasks"
92+
if clauses:
93+
sql = sql + " WHERE " + " AND ".join(clauses)
94+
95+
sql = sql + "ORDER BY timestamp"
96+
97+
txn.execute(sql, args)
98+
return self.db_pool.cursor_to_dict(txn)
99+
100+
rows = await self.db_pool.runInteraction(
101+
"get_scheduled_tasks", get_scheduled_tasks_txn
102+
)
103+
return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows]
104+
105+
async def insert_scheduled_task(self, task: ScheduledTask) -> None:
106+
"""Insert a specified `ScheduledTask` in the DB.
107+
108+
Args:
109+
task: the `ScheduledTask` to insert
110+
"""
111+
await self.db_pool.simple_insert(
112+
"scheduled_tasks",
113+
{
114+
"id": task.id,
115+
"action": task.action,
116+
"status": task.status,
117+
"timestamp": task.timestamp,
118+
"resource_id": task.resource_id,
119+
"params": None
120+
if task.params is None
121+
else json_encoder.encode(task.params),
122+
"result": None
123+
if task.result is None
124+
else json_encoder.encode(task.result),
125+
"error": task.error,
126+
},
127+
desc="insert_scheduled_task",
128+
)
129+
130+
async def update_scheduled_task(
131+
self,
132+
id: str,
133+
timestamp: int,
134+
*,
135+
status: Optional[TaskStatus] = None,
136+
result: Optional[JsonMapping] = None,
137+
error: Optional[str] = None,
138+
) -> bool:
139+
"""Update a scheduled task in the DB with some new value(s).
140+
141+
Args:
142+
id: id of the `ScheduledTask` to update
143+
timestamp: new timestamp of the task
144+
status: new status of the task
145+
result: new result of the task
146+
error: new error of the task
147+
148+
Returns: `False` if no matching row was found, `True` otherwise
149+
"""
150+
updatevalues: JsonDict = {"timestamp": timestamp}
151+
if status is not None:
152+
updatevalues["status"] = status
153+
if result is not None:
154+
updatevalues["result"] = json_encoder.encode(result)
155+
if error is not None:
156+
updatevalues["error"] = error
157+
nb_rows = await self.db_pool.simple_update(
158+
"scheduled_tasks",
159+
{"id": id},
160+
updatevalues,
161+
desc="update_scheduled_task",
162+
)
163+
return nb_rows > 0
164+
165+
async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]:
166+
"""Get a specific `ScheduledTask` from its id.
167+
168+
Args:
169+
id: the id of the task to retrieve
170+
171+
Returns: the task if available, `None` otherwise
172+
"""
173+
row = await self.db_pool.simple_select_one(
174+
table="scheduled_tasks",
175+
keyvalues={"id": id},
176+
retcols=(
177+
"id",
178+
"action",
179+
"status",
180+
"timestamp",
181+
"resource_id",
182+
"params",
183+
"result",
184+
"error",
185+
),
186+
allow_none=True,
187+
desc="get_scheduled_task",
188+
)
189+
190+
return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None
191+
192+
async def delete_scheduled_task(self, id: str) -> None:
193+
"""Delete a specific task from its id.
194+
195+
Args:
196+
id: the id of the task to delete
197+
"""
198+
await self.db_pool.simple_delete(
199+
"scheduled_tasks",
200+
keyvalues={"id": id},
201+
desc="delete_scheduled_task",
202+
)

synapse/storage/schema/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,7 @@
113113
114114
Changes in SCHEMA_VERSION = 80
115115
- The event_txn_id_device_id is always written to for new events.
116+
- Add tables for the task scheduler.
116117
"""
117118

118119

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/* Copyright 2023 The Matrix.org Foundation C.I.C
2+
*
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software
10+
* distributed under the License is distributed on an "AS IS" BASIS,
11+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
* See the License for the specific language governing permissions and
13+
* limitations under the License.
14+
*/
15+
16+
-- cf ScheduledTask docstring for the meaning of the fields.
17+
CREATE TABLE IF NOT EXISTS scheduled_tasks(
18+
id TEXT PRIMARY KEY,
19+
action TEXT NOT NULL,
20+
status TEXT NOT NULL,
21+
timestamp BIGINT NOT NULL,
22+
resource_id TEXT,
23+
params TEXT,
24+
result TEXT,
25+
error TEXT
26+
);
27+
28+
CREATE INDEX IF NOT EXISTS scheduled_tasks_status ON scheduled_tasks(status);

synapse/types/__init__.py

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
import abc
1616
import re
1717
import string
18+
from enum import Enum
1819
from typing import (
1920
TYPE_CHECKING,
2021
AbstractSet,
@@ -969,3 +970,41 @@ class UserProfile(TypedDict):
969970
class RetentionPolicy:
970971
min_lifetime: Optional[int] = None
971972
max_lifetime: Optional[int] = None
973+
974+
975+
class TaskStatus(str, Enum):
976+
"""Status of a scheduled task"""
977+
978+
# Task is scheduled but not active
979+
SCHEDULED = "scheduled"
980+
# Task is active and probably running, and if not
981+
# will be run on next scheduler loop run
982+
ACTIVE = "active"
983+
# Task has completed successfully
984+
COMPLETE = "complete"
985+
# Task is over and either returned a failed status, or had an exception
986+
FAILED = "failed"
987+
988+
989+
@attr.s(auto_attribs=True, frozen=True, slots=True)
990+
class ScheduledTask:
991+
"""Description of a scheduled task"""
992+
993+
# Id used to identify the task
994+
id: str
995+
# Name of the action to be run by this task
996+
action: str
997+
# Current status of this task
998+
status: TaskStatus
999+
# If the status is SCHEDULED then this represents when it should be launched,
1000+
# otherwise it represents the last time this task got a change of state.
1001+
# In milliseconds since epoch in system time timezone, usually UTC.
1002+
timestamp: int
1003+
# Optionally bind a task to some resource id for easy retrieval
1004+
resource_id: Optional[str]
1005+
# Optional parameters that will be passed to the function ran by the task
1006+
params: Optional[JsonMapping]
1007+
# Optional result that can be updated by the running task
1008+
result: Optional[JsonMapping]
1009+
# Optional error that should be assigned a value when the status is FAILED
1010+
error: Optional[str]

0 commit comments

Comments
 (0)