Skip to content

Commit 8aac748

Browse files
authored
feat(server): process requests in batch from a file (#308)
1 parent bd7c6c5 commit 8aac748

6 files changed

Lines changed: 456 additions & 57 deletions

File tree

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
package org.eqasim.server;
2+
3+
import java.io.File;
4+
import java.io.IOException;
5+
import java.util.LinkedList;
6+
import java.util.List;
7+
import java.util.concurrent.Callable;
8+
import java.util.concurrent.ExecutionException;
9+
import java.util.concurrent.ExecutorService;
10+
import java.util.concurrent.Executors;
11+
import java.util.function.Function;
12+
13+
import org.eqasim.core.misc.ParallelProgress;
14+
import org.eqasim.server.ServiceBuilder.Services;
15+
import org.eqasim.server.services.isochrone.road.RoadIsochroneRequest;
16+
import org.eqasim.server.services.isochrone.road.RoadIsochroneResponse;
17+
import org.eqasim.server.services.isochrone.transit.TransitIsochroneRequest;
18+
import org.eqasim.server.services.isochrone.transit.TransitIsochroneResponse;
19+
import org.eqasim.server.services.router.road.FreespeedSettings;
20+
import org.eqasim.server.services.router.road.RoadRouterRequest;
21+
import org.eqasim.server.services.router.road.RoadRouterResponse;
22+
import org.eqasim.server.services.router.transit.TransitRouterRequest;
23+
import org.eqasim.server.services.router.transit.TransitRouterResponse;
24+
import org.eqasim.server.services.router.transit.TransitUtilities;
25+
import org.matsim.core.config.CommandLine;
26+
import org.matsim.core.config.CommandLine.ConfigurationException;
27+
28+
import com.fasterxml.jackson.annotation.JsonProperty;
29+
import com.fasterxml.jackson.core.JsonParseException;
30+
import com.fasterxml.jackson.databind.JsonMappingException;
31+
import com.fasterxml.jackson.databind.ObjectMapper;
32+
33+
public class RunProcessor {
34+
public static void main(String[] args)
35+
throws ConfigurationException, JsonParseException, JsonMappingException, IOException, InterruptedException,
36+
ExecutionException {
37+
CommandLine cmd = new CommandLine.Builder(args) //
38+
.requireOptions("config-path", "input-path", "output-path") //
39+
.allowOptions("threads", "configuration-path", "use-transit", "indent-response") //
40+
.build();
41+
42+
Services services = new ServiceBuilder().build(cmd);
43+
44+
int threads = cmd.getOption("threads").map(Integer::parseInt)
45+
.orElse(Runtime.getRuntime().availableProcessors());
46+
47+
ObjectMapper objectMapper = new ObjectMapper();
48+
if (cmd.getOption("indent-response").map(Boolean::parseBoolean).orElse(false)) {
49+
50+
}
51+
52+
ProcessorInput input = objectMapper.readValue(new File(cmd.getOptionStrict("input-path")),
53+
ProcessorInput.class);
54+
ProcessorOutput output = new ProcessorOutput();
55+
56+
ExecutorService executor = Executors.newFixedThreadPool(threads);
57+
58+
process(input.roadRouter, output.roadRouter,
59+
request -> services.roadRouterService().processRequest(request, input.freespeed), "road_router",
60+
executor);
61+
62+
process(input.roadIsochrone, output.roadIsochrone, services.roadIsochroneService()::processRequest,
63+
"road_isochrone", executor);
64+
65+
process(input.transitRouter, output.transitRouter,
66+
request -> services.transitRouterService().processRequest(request, input.transitUtilities),
67+
"transit_router", executor);
68+
69+
process(input.transitIsochrone, output.transitIsochrone, services.transitIsochroneService()::processRequest,
70+
"transit_isochrone", executor);
71+
72+
objectMapper.writerWithDefaultPrettyPrinter().writeValue(new File(cmd.getOptionStrict("output-path")), output);
73+
}
74+
75+
private static class ProcessorInput {
76+
@JsonProperty("freespeed")
77+
FreespeedSettings freespeed = null;
78+
79+
@JsonProperty("road_router")
80+
List<RoadRouterRequest> roadRouter = new LinkedList<>();
81+
82+
@JsonProperty("road_isochrone")
83+
List<RoadIsochroneRequest> roadIsochrone = new LinkedList<>();
84+
85+
@JsonProperty("transit_utilities")
86+
TransitUtilities transitUtilities = null;
87+
88+
@JsonProperty("transit_router")
89+
List<TransitRouterRequest> transitRouter = new LinkedList<>();
90+
91+
@JsonProperty("transit_isochrone")
92+
List<TransitIsochroneRequest> transitIsochrone = new LinkedList<>();
93+
}
94+
95+
private static class ProcessorOutput {
96+
@JsonProperty("road_router")
97+
List<RoadRouterResponse> roadRouter = new LinkedList<>();
98+
99+
@JsonProperty("road_isochrone")
100+
List<RoadIsochroneResponse> roadIsochrone = new LinkedList<>();
101+
102+
@JsonProperty("transit_router")
103+
List<TransitRouterResponse> transitRouter = new LinkedList<>();
104+
105+
@JsonProperty("transit_isochrone")
106+
List<TransitIsochroneResponse> transitIsochrone = new LinkedList<>();
107+
}
108+
109+
private static <Response, Request> void process(List<Request> requests, List<Response> responses,
110+
Function<Request, Response> service, String serivceName, ExecutorService executor)
111+
throws InterruptedException, ExecutionException {
112+
if (requests.size() > 0) {
113+
ParallelProgress progress = new ParallelProgress("Processing " + serivceName, requests.size());
114+
115+
List<Callable<Response>> tasks = new LinkedList<>();
116+
for (Request request : requests) {
117+
tasks.add(() -> {
118+
Response response = service.apply(request);
119+
progress.update();
120+
return response;
121+
});
122+
}
123+
124+
for (var task : executor.invokeAll(tasks)) {
125+
responses.add(task.get());
126+
}
127+
128+
progress.close();
129+
}
130+
}
131+
}

server/src/main/java/org/eqasim/server/RunServer.java

Lines changed: 10 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,38 +1,19 @@
11
package org.eqasim.server;
22

3-
import java.io.File;
43
import java.io.IOException;
5-
import java.util.Collections;
64
import java.util.concurrent.ExecutorService;
75
import java.util.concurrent.Executors;
86

9-
import org.eqasim.core.components.raptor.EqasimRaptorConfigGroup;
7+
import org.eqasim.server.ServiceBuilder.Services;
108
import org.eqasim.server.api.RoadIsochroneEndpoint;
119
import org.eqasim.server.api.RoadRouterEndpoint;
1210
import org.eqasim.server.api.TransitIsochroneEndpoint;
1311
import org.eqasim.server.api.TransitRouterEndpoint;
14-
import org.eqasim.server.services.ServiceConfiguration;
15-
import org.eqasim.server.services.isochrone.road.RoadIsochroneService;
16-
import org.eqasim.server.services.isochrone.transit.TransitIsochroneService;
17-
import org.eqasim.server.services.router.road.RoadRouterService;
18-
import org.eqasim.server.services.router.transit.TransitRouterService;
19-
import org.matsim.api.core.v01.Scenario;
20-
import org.matsim.api.core.v01.network.Network;
2112
import org.matsim.core.config.CommandLine;
2213
import org.matsim.core.config.CommandLine.ConfigurationException;
23-
import org.matsim.core.config.Config;
24-
import org.matsim.core.config.ConfigGroup;
25-
import org.matsim.core.config.ConfigUtils;
26-
import org.matsim.core.network.NetworkUtils;
27-
import org.matsim.core.network.algorithms.NetworkCleaner;
28-
import org.matsim.core.network.algorithms.TransportModeNetworkFilter;
29-
import org.matsim.core.network.io.MatsimNetworkReader;
30-
import org.matsim.core.scenario.ScenarioUtils;
31-
import org.matsim.pt.transitSchedule.api.TransitScheduleReader;
3214

3315
import com.fasterxml.jackson.core.JsonParseException;
3416
import com.fasterxml.jackson.databind.JsonMappingException;
35-
import com.fasterxml.jackson.databind.ObjectMapper;
3617

3718
import io.javalin.Javalin;
3819

@@ -44,17 +25,11 @@ public static void main(String[] args)
4425
.allowOptions("threads", "configuration-path", "use-transit") //
4526
.build();
4627

28+
Services services = new ServiceBuilder().build(cmd);
29+
4730
int threads = cmd.getOption("threads").map(Integer::parseInt)
4831
.orElse(Runtime.getRuntime().availableProcessors());
4932

50-
ServiceConfiguration configuration = new ServiceConfiguration();
51-
52-
if (cmd.hasOption("configuration-path")) {
53-
ObjectMapper objectMapper = new ObjectMapper();
54-
configuration = objectMapper.readValue(new File(cmd.getOptionStrict("configuration-path")),
55-
ServiceConfiguration.class);
56-
}
57-
5833
// Create Javalin application and enable CORS
5934
Javalin app = Javalin.create(config -> {
6035
config.plugins.enableCors(cors -> {
@@ -64,44 +39,22 @@ public static void main(String[] args)
6439
});
6540
});
6641

67-
Config config = ConfigUtils.loadConfig(cmd.getOptionStrict("config-path"), new EqasimRaptorConfigGroup());
68-
Scenario scenario = ScenarioUtils.createScenario(config);
69-
70-
new MatsimNetworkReader(scenario.getNetwork())
71-
.readURL(ConfigGroup.getInputFileURL(config.getContext(), config.network().getInputFile()));
72-
73-
boolean useTransit = cmd.getOption("use-transit").map(Boolean::parseBoolean).orElse(true);
74-
if (useTransit) {
75-
new TransitScheduleReader(scenario).readURL(
76-
ConfigGroup.getInputFileURL(config.getContext(), config.transit().getTransitScheduleFile()));
77-
}
78-
7942
ExecutorService executor = Executors.newFixedThreadPool(threads);
8043

81-
Network roadNetwork = NetworkUtils.createNetwork();
82-
new TransportModeNetworkFilter(scenario.getNetwork()).filter(roadNetwork, Collections.singleton("car"));
83-
new NetworkCleaner().run(roadNetwork);
84-
85-
RoadRouterService roadRouterService = RoadRouterService.create(config, roadNetwork, configuration.walk,
86-
threads);
87-
RoadRouterEndpoint roadRouterEndpoint = new RoadRouterEndpoint(executor, roadRouterService);
44+
RoadRouterEndpoint roadRouterEndpoint = new RoadRouterEndpoint(executor, services.roadRouterService());
8845
app.post("/router/road", roadRouterEndpoint::post);
8946

90-
RoadIsochroneService roadIsochroneService = RoadIsochroneService.create(config, roadNetwork,
91-
configuration.walk);
92-
RoadIsochroneEndpoint roadIsochroneEndpoint = new RoadIsochroneEndpoint(executor, roadIsochroneService);
47+
RoadIsochroneEndpoint roadIsochroneEndpoint = new RoadIsochroneEndpoint(executor,
48+
services.roadIsochroneService());
9349
app.post("/isochrone/road", roadIsochroneEndpoint::post);
9450

95-
if (useTransit) {
96-
TransitRouterService transitRouterService = TransitRouterService.create(config, scenario.getNetwork(),
97-
scenario.getTransitSchedule(), configuration.transit, configuration.walk);
98-
TransitRouterEndpoint transitRouterEndpoint = new TransitRouterEndpoint(executor, transitRouterService);
51+
if (services.transitRouterService() != null) {
52+
TransitRouterEndpoint transitRouterEndpoint = new TransitRouterEndpoint(executor,
53+
services.transitRouterService());
9954
app.post("/router/transit", transitRouterEndpoint::post);
10055

101-
TransitIsochroneService transitIsochroneService = TransitIsochroneService.create(config,
102-
scenario.getTransitSchedule(), configuration.transit, configuration.walk);
10356
TransitIsochroneEndpoint transitIsochroneEndpoint = new TransitIsochroneEndpoint(executor,
104-
transitIsochroneService);
57+
services.transitIsochroneService());
10558
app.post("/isochrone/transit", transitIsochroneEndpoint::post);
10659
}
10760

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package org.eqasim.server;
2+
3+
import java.io.File;
4+
import java.io.IOException;
5+
import java.util.Collections;
6+
7+
import org.eqasim.core.components.raptor.EqasimRaptorConfigGroup;
8+
import org.eqasim.server.services.ServiceConfiguration;
9+
import org.eqasim.server.services.isochrone.road.RoadIsochroneService;
10+
import org.eqasim.server.services.isochrone.transit.TransitIsochroneService;
11+
import org.eqasim.server.services.router.road.RoadRouterService;
12+
import org.eqasim.server.services.router.transit.TransitRouterService;
13+
import org.matsim.api.core.v01.Scenario;
14+
import org.matsim.api.core.v01.network.Network;
15+
import org.matsim.core.config.CommandLine;
16+
import org.matsim.core.config.CommandLine.ConfigurationException;
17+
import org.matsim.core.config.Config;
18+
import org.matsim.core.config.ConfigGroup;
19+
import org.matsim.core.config.ConfigUtils;
20+
import org.matsim.core.network.NetworkUtils;
21+
import org.matsim.core.network.algorithms.NetworkCleaner;
22+
import org.matsim.core.network.algorithms.TransportModeNetworkFilter;
23+
import org.matsim.core.network.io.MatsimNetworkReader;
24+
import org.matsim.core.scenario.ScenarioUtils;
25+
import org.matsim.pt.transitSchedule.api.TransitScheduleReader;
26+
27+
import com.fasterxml.jackson.core.JsonParseException;
28+
import com.fasterxml.jackson.databind.JsonMappingException;
29+
import com.fasterxml.jackson.databind.ObjectMapper;
30+
31+
public class ServiceBuilder {
32+
public record Services(
33+
RoadRouterService roadRouterService,
34+
RoadIsochroneService roadIsochroneService,
35+
TransitRouterService transitRouterService,
36+
TransitIsochroneService transitIsochroneService) {
37+
}
38+
39+
public Services build(CommandLine cmd)
40+
throws JsonParseException, JsonMappingException, IOException, ConfigurationException {
41+
int threads = cmd.getOption("threads").map(Integer::parseInt)
42+
.orElse(Runtime.getRuntime().availableProcessors());
43+
44+
ServiceConfiguration configuration = new ServiceConfiguration();
45+
46+
if (cmd.hasOption("configuration-path")) {
47+
ObjectMapper objectMapper = new ObjectMapper();
48+
configuration = objectMapper.readValue(new File(cmd.getOptionStrict("configuration-path")),
49+
ServiceConfiguration.class);
50+
}
51+
52+
Config config = ConfigUtils.loadConfig(cmd.getOptionStrict("config-path"),
53+
new EqasimRaptorConfigGroup());
54+
Scenario scenario = ScenarioUtils.createScenario(config);
55+
56+
new MatsimNetworkReader(scenario.getNetwork())
57+
.readURL(ConfigGroup.getInputFileURL(config.getContext(),
58+
config.network().getInputFile()));
59+
60+
boolean useTransit = cmd.getOption("use-transit").map(Boolean::parseBoolean).orElse(true);
61+
if (useTransit) {
62+
new TransitScheduleReader(scenario).readURL(
63+
ConfigGroup.getInputFileURL(config.getContext(),
64+
config.transit().getTransitScheduleFile()));
65+
}
66+
67+
Network roadNetwork = NetworkUtils.createNetwork();
68+
new TransportModeNetworkFilter(scenario.getNetwork()).filter(roadNetwork, Collections.singleton("car"));
69+
new NetworkCleaner().run(roadNetwork);
70+
71+
final RoadRouterService roadRouterService;
72+
final RoadIsochroneService roadIsochroneService;
73+
final TransitRouterService transitRouterService;
74+
final TransitIsochroneService transitIsochroneService;
75+
76+
roadRouterService = RoadRouterService.create(config, roadNetwork, configuration.walk,
77+
threads);
78+
79+
roadIsochroneService = RoadIsochroneService.create(config, roadNetwork,
80+
configuration.walk);
81+
82+
if (useTransit) {
83+
transitRouterService = TransitRouterService.create(config, scenario.getNetwork(),
84+
scenario.getTransitSchedule(), configuration.transit, configuration.walk);
85+
86+
transitIsochroneService = TransitIsochroneService.create(config,
87+
scenario.getTransitSchedule(), configuration.transit, configuration.walk);
88+
} else {
89+
transitRouterService = null;
90+
transitIsochroneService = null;
91+
}
92+
93+
return new Services(roadRouterService, roadIsochroneService, transitRouterService, transitIsochroneService);
94+
}
95+
}
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
package org.eqasim.server;
2+
3+
import static org.junit.Assert.assertTrue;
4+
5+
import java.io.File;
6+
import java.io.IOException;
7+
import java.net.URL;
8+
import java.util.concurrent.ExecutionException;
9+
10+
import org.apache.commons.io.FileUtils;
11+
import org.eqasim.core.simulation.EqasimConfigurator;
12+
import org.junit.After;
13+
import org.junit.Before;
14+
import org.junit.Test;
15+
import org.matsim.core.config.CommandLine.ConfigurationException;
16+
17+
import com.fasterxml.jackson.core.JsonParseException;
18+
import com.fasterxml.jackson.databind.JsonMappingException;
19+
20+
public class TestProcessor {
21+
@Before
22+
public void setUp() throws IOException {
23+
new File("processor_test").mkdirs();
24+
}
25+
26+
@After
27+
public void tearDown() throws IOException {
28+
FileUtils.deleteDirectory(new File("processor_test"));
29+
}
30+
31+
@Test
32+
public void testProcessor() throws JsonParseException, JsonMappingException, ConfigurationException, IOException,
33+
InterruptedException, ExecutionException {
34+
URL scenarioUrl = EqasimConfigurator.class.getClassLoader().getResource("melun");
35+
String configPath = new File(scenarioUrl.getPath(), "config.xml").getAbsolutePath();
36+
37+
URL resourcesUrl = getClass().getClassLoader().getResource("processor");
38+
String inputPath = new File(resourcesUrl.getPath(), "input.json").getAbsolutePath();
39+
String expectedOutputPath = new File(resourcesUrl.getPath(), "output.json").getAbsolutePath();
40+
String outputPath = new File("processor_test/output.json").getPath();
41+
42+
// uncomment to regenerate
43+
outputPath = expectedOutputPath;
44+
45+
RunProcessor.main(new String[] {
46+
"--config-path", configPath,
47+
"--input-path", inputPath,
48+
"--output-path", outputPath,
49+
"--threads", "4"
50+
});
51+
52+
assertTrue(FileUtils.contentEquals(new File(expectedOutputPath), new File(outputPath)));
53+
}
54+
}

0 commit comments

Comments
 (0)