-
Notifications
You must be signed in to change notification settings - Fork 4
Expand file tree
/
Copy pathimport2pg.py
More file actions
100 lines (83 loc) · 4.37 KB
/
Copy pathimport2pg.py
File metadata and controls
100 lines (83 loc) · 4.37 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
from subprocess import run
DESCRIPTION = "The layers can be imported into PostGIS database."
IS_SYSTEM = True
def run_check(params, status):
from osgeo import ogr
from osgeo.gdalconst import OF_READONLY
from qc_tool.vector.helper import do_layers
# Check if the current delivery is excluded from vector checks
if "skip_vector_checks" in params:
if params["skip_vector_checks"]:
status.info("The delivery has been excluded from vector.import2pg check because the vector data source does not contain a single object of interest.")
return
dsn, schema = params["connection_manager"].get_dsn_schema()
# Import all layers found in layer_defs.
for layer_def in params["layer_defs"].values():
src_layer_name = layer_def["src_layer_name"]
pg_layer_name = layer_def["layer_alias"]
if "detected_epsg" in params:
ogr2ogr_params = ["ogr2ogr",
"-overwrite",
"-f", "PostgreSQL",
"-lco", "GEOMETRY_NAME=geom",
"-lco", "SCHEMA={:s}".format(schema),
"-lco", "PRECISION=NO",
"-nlt", "MULTIPOLYGON",
"-nln", pg_layer_name,
"-a_srs", "EPSG:{:d}".format(params["detected_epsg"]),
"PG:{:s}".format(dsn),
str(layer_def["src_filepath"]),
src_layer_name]
else:
ogr2ogr_params = ["ogr2ogr",
"-overwrite",
"-f", "PostgreSQL",
"-lco", "GEOMETRY_NAME=geom",
"-lco", "SCHEMA={:s}".format(schema),
"-lco", "PRECISION=NO",
"-nlt", "MULTIPOLYGON",
"-nln", pg_layer_name,
"PG:{:s}".format(dsn),
str(layer_def["src_filepath"]),
src_layer_name]
# Special import options for CSV layers.
if str(layer_def["src_filepath"]).lower().endswith(".csv"):
ogr2ogr_params += ["-oo", "SEPARATOR=SEMICOLON"]
ogr2ogr_params += ["-oo", "AUTODETECT_TYPE=YES"]
ogr2ogr_params += ["-oo", "EMPTY_STRING_AS_NULL=YES"]
# Run ogr2ogr command with the arguments defined above.
pc = run(ogr2ogr_params)
if pc.returncode != 0:
status.aborted("Failed to import layer {:s} into PostGIS.".format(src_layer_name))
else:
# ogr2ogr not always returns non-zero exit code in case of error.
# Therefore we try some checking whether the layer has been imported correctly.
## Open datasource from filesystem.
src_datasource = ogr.Open(str(layer_def["src_filepath"]), OF_READONLY)
src_layer = src_datasource.GetLayerByName(src_layer_name)
## Open datasource from postgis.
conn_string = "PG:{:s} active_schema={:s}".format(dsn, schema)
dst_datasource = ogr.Open(conn_string, OF_READONLY)
# NOTE: GetLayerByName() works case insensitive in this case.
dst_layer = dst_datasource.GetLayerByName(pg_layer_name)
if dst_layer is None:
status.aborted("Just imported layer {:s} can not be found in postgis.".format(src_layer_name))
else:
## Set pg info back to layer_defs.
##
## FIXME: such construct is not really clear while it exploits mutable dictionaries
## and bypasses currently standard use of status.add_params().
layer_def["pg_layer_name"] = pg_layer_name
layer_def["pg_fid_name"] = dst_layer.GetFIDColumn().lower()
if layer_def["pg_fid_name"] == "objectid":
layer_def["fid_display_name"] = "objectid"
else:
layer_def["fid_display_name"] = "row number"
## Ensure all features has been imported.
src_count = src_layer.GetFeatureCount()
dst_count = dst_layer.GetFeatureCount()
if src_count != dst_count:
status.aborted("Imported layer {:s} has only {:d} out of {:d} features loaded."
.format(src_layer_name, dst_count, src_count))