|
#!/usr/bin/python |
|
# coding=utf-8 |
|
''' |
|
spatial ingestor for data.gov.au |
|
<alex.sadleir@linkdigital.com.au> |
|
1.0 28/11/2013 initial implementation |
|
''' |
|
import ckanapi #https://github.com/open-data/ckanapi |
|
import errno, os, shutil, sys, glob |
|
from pprint import pprint |
|
from email.mime.text import MIMEText |
|
from subprocess import Popen, PIPE |
|
import tempfile |
|
import smtplib |
|
from zipfile import ZipFile |
|
from datetime import datetime |
|
import urllib |
|
import fileinput |
|
import json |
|
import psycopg2 |
|
import requests |
|
|
|
geoserver_addr = "http://data.disclosurelo.gs:8080/geoserver/" |
|
geoserver_user = "admin" |
|
geoserver_passwd = "geoserver" |
|
email_addr = "maxious@lambdacomplex.org" |
|
shp2pgsql = "/usr/lib/postgresql/9.2/bin/shp2pgsql" |
|
omitted_orgs = [] |
|
|
|
def email(subject, body): |
|
msg = MIMEText(body) |
|
msg["From"] = "ckan@localhost" |
|
msg["To"] = email_addr |
|
msg["Subject"] = subject |
|
# Send the message via our own SMTP server, but don't include the |
|
# envelope header. |
|
s = smtplib.SMTP('localhost') |
|
s.sendmail(msg["From"], [msg["To"]], msg.as_string()) |
|
s.quit() |
|
|
|
def success(msg): |
|
print "Completed!" |
|
email("geodata success",msg) |
|
sys.exit(errno.EACCES) |
|
|
|
def failure(msg): |
|
print "ERROR -"+msg |
|
email("geodata error",str(sys.argv)+msg) |
|
sys.exit(errno.EACCES) |
|
|
|
def get_cursor(db_settings): |
|
|
|
# Connect to an existing database |
|
try: |
|
conn = psycopg2.connect(dbname=db_settings['dbname'], user=db_settings['user'], password=db_settings['password'], host=db_settings['host']) |
|
except: |
|
failure("I am unable to connect to the database.") |
|
# Open a cursor to perform database operations |
|
cur = conn.cursor() |
|
conn.set_isolation_level(0) |
|
# Execute a command: this creates a new table |
|
#cur.execute("create extension postgis") |
|
return (cur,conn) |
|
|
|
def psql_load(proc): |
|
(cur,conn) = get_cursor(db_settings) |
|
sql = "" |
|
for line in iter(proc.stdout.readline,''): |
|
sql += line |
|
if sql.endswith(';'): |
|
cur.execute(sql) |
|
sql = "" |
|
if sql != "": |
|
cur.execute(sql) |
|
cur.close() |
|
conn.close() |
|
|
|
if len(sys.argv) != 5: |
|
print "spatial ingester. command line: postgis_url api_url api_key dataset_id" |
|
sys.exit(errno.EACCES) |
|
else: |
|
(path, db_settings_json, api_url, api_key, dataset_id) = sys.argv |
|
db_settings = json.loads(db_settings_json) |
|
|
|
ckan = ckanapi.RemoteCKAN(address=api_url, apikey=api_key) |
|
dataset = ckan.action.package_show(id=dataset_id) |
|
print "loaded dataset"+dataset['name'] |
|
#pprint(dataset) |
|
if dataset['owner_org'] in omitted_orgs: |
|
print(dataset.owner_org + " in omitted_orgs") |
|
sys.exit(0); |
|
|
|
ows_resources = [] |
|
kml_resources = [] |
|
shp_resources = [] |
|
data_modified_date = None |
|
for resource in dataset['resources']: |
|
if "wms" in resource['format'] or "wfs" in resource['format']: |
|
if geoserver_addr not in resource['url'] : |
|
failure(dataset['id']+" already has geo api"); |
|
else: |
|
ows_resources += [resource] |
|
|
|
if "kml" in resource['format']: |
|
kml_resources += [resource] |
|
if "shp" in resource['format']: |
|
shp_resources += [resource] |
|
|
|
if len(shp_resources) == 0: |
|
print "No geodata format files detected" |
|
sys.exit(0); |
|
|
|
#if geoserver api link does not exist or api link is out of date with data, continue |
|
if len(ows_resources) > 0 and data_modified_date <= wms_resources[0]['last_modified']: |
|
print "Already up to date" |
|
sys.exit(0); |
|
|
|
email("geodata processing started for "+dataset['id'], str(sys.argv)) |
|
msg = "" |
|
#download resource to tmpfile |
|
|
|
#check filesize limit |
|
|
|
(cur,conn) = get_cursor(db_settings) |
|
table_name = dataset['id'].replace("-","_") |
|
cur.execute("DROP TABLE IF EXISTS "+table_name) |
|
cur.close() |
|
conn.close() |
|
|
|
tempdir = tempfile.mkdtemp(dataset['id']) |
|
os.chdir(tempdir) |
|
print tempdir+" created" |
|
#load esri shapefiles |
|
if len(shp_resources) > 0: |
|
print "using SHP file "+shp_resources[0]['url'] |
|
(filepath,headers) = urllib.urlretrieve(shp_resources[0]['url'], "input.zip" ) |
|
print "shp downlaoded" |
|
with ZipFile(filepath, 'r') as myzip: |
|
myzip.extractall() |
|
print "shp unziped" |
|
shpfiles = glob.glob("*.[sS][hH][pP]") |
|
prjfiles = glob.glob("*.[pP][rR][jJ]") |
|
if len(shpfiles) == 0: |
|
failure("no shp files found in zip "+shp_resources[0]['url']) |
|
print "converting to pgsql "+shpfiles[0] |
|
process = Popen([shp2pgsql,shpfiles[0], table_name], stdout=PIPE, stderr=PIPE) |
|
psql_load(process) |
|
if len(prjfiles) > 0: |
|
nativeCRS = open(prjfiles[0], 'r').read() |
|
#else: |
|
# print "using KML file "+kml_resources[0]['url'] |
|
# #if kml ogr2ogr http://gis.stackexchange.com/questions/33102/how-to-import-kml-file-with-custom-data-to-postgres-postgis-database |
|
# (filepath,headers) = urllib.urlretrieve(kml_resources[0]['url'], "input.kml") |
|
|
|
|
|
#load bounding boxes |
|
(cur,conn) = get_cursor(db_settings) |
|
cur.execute('SELECT ST_Extent(geom) as box,ST_AsGeoJSON(ST_Extent(geom)) as geojson from '+table_name) |
|
(bbox,bgjson) = cur.fetchone() |
|
cur.close() |
|
conn.close() |
|
print bbox |
|
|
|
|
|
#create geoserver dataset http://boundlessgeo.com/2012/10/adding-layers-to-geoserver-using-the-rest-api/ |
|
# name workspace after dataset |
|
workspace = dataset['name'] |
|
ws = requests.post(geoserver_addr+'rest/workspaces', data=json.dumps({'workspace': {'name': workspace} }), headers={'Content-type': 'application/json'}, auth=('admin', 'geoserver')) |
|
pprint(ws) |
|
#echo ws.status_code |
|
#echo ws.text |
|
|
|
datastore = dataset['name']+'ds' |
|
dsdata =json.dumps({'dataStore':{'name':datastore, |
|
'connectionParameters' : { |
|
'host':db_settings['host'], |
|
'port':5432, |
|
'database': db_settings['dbname'], |
|
'schema':'public', |
|
'user':db_settings['user'], |
|
'passwd':db_settings['password'], |
|
'dbtype':'postgis' |
|
|
|
}}}) |
|
#print dsdata |
|
r = requests.post(geoserver_addr+'rest/workspaces/'+workspace+'/datastores', data=dsdata, headers={'Content-type': 'application/json'}, auth=('admin', 'geoserver')) |
|
pprint(r) |
|
#echo r.status_code |
|
#echo r.text |
|
|
|
# name layer after resource title |
|
ftdata = {'featureType':{'name':table_name, 'title': resource['name']}} |
|
(minx,miny, maxx, maxy) = bbox.replace("BOX","").replace("(","").replace(")","").replace(","," ").split(" ") |
|
bbox_obj = { 'minx': minx,'maxx': maxx,'miny': miny,'maxy': maxy } |
|
|
|
if nativeCRS != None: |
|
ftdata['featureType']['nativeCRS'] = nativeCRS |
|
else: |
|
ftdata['featureType']['nativeBoundingBox'] = bbox_obj |
|
ftdata['featureType']['latLonBoundingBox'] = bbox_obj |
|
ftdata['featureType']['srs'] = "EPSG:4326" |
|
ftdata = json.dumps(ftdata) |
|
r = requests.post(geoserver_addr+'rest/workspaces/'+workspace+'/datastores/'+datastore+"/featuretypes", data= ftdata, headers={'Content-Type': 'application/json'}, auth=('admin', 'geoserver')) |
|
pprint(r) |
|
|
|
#generate wms/wfs api links, kml, png resources and add to package |
|
print bgjson |
|
dataset['spatial'] = bgjson |
|
|
|
ws_addr = geoserver_addr+dataset['name']+"/" |
|
for format in ['image/png','kml']: |
|
url = ws_addr+"wms?request=GetMap&layers="+table_name+"&bbox="+bbox_obj['minx']+","+bbox_obj['miny']+","+bbox_obj['maxx']+","+bbox_obj['maxy']+"&width=512&height=512&format="+urllib.quote(format) |
|
if format == "image/png": |
|
dataset['resources'].append({"name":dataset['title'] + " Preview Image","description":"View overview image of this dataset" ,"format":format,"url":url, "last_modified": datetime.now().isoformat()}) |
|
if format == "kml": |
|
dataset['resources'].append({"name":dataset['title'] + " KML","description":"For use in web and desktop spatial data tools including Google Earth" ,"format":format,"url":url, "last_modified": datetime.now().isoformat()}) |
|
for format in ['csv','json']: |
|
url = ws_addr+"wfs?request=GetFeature&typeName="+table_name+"&outputFormat="+urllib.quote(format) |
|
if format == "csv": |
|
dataset['resources'].append({"name": dataset['title'] + " CSV","description":"For summary of the objects/data in this collection","format":format,"url":url, "last_modified": datetime.now().isoformat()}) |
|
if format == "json": |
|
dataset['resources'].append({"name":dataset['title'] + " GeoJSON","description":"For use in web-based data visualisation of this collection","format":format,"url":url, "last_modified": datetime.now().isoformat()}) |
|
dataset['resources'].append({"name":dataset['name'] + " - Preview this Dataset (WMS)","description":"View the data in this datasets online via web-based WMS viewer","format":"wms", |
|
"url":ws_addr+"wms?request=GetCapabilities", "last_modified": datetime.now().isoformat()}) |
|
dataset['resources'].append({"name":dataset['title'] + " WFS Link","description":"WFS Link for use of live data in Desktop GIS tools","format":"wfs", |
|
"url":ws_addr+"wfs?request=GetCapabilities", "last_modified": datetime.now().isoformat()}) |
|
|
|
pprint(dataset) |
|
ckan.action.package_update(id=dataset['id'],spatial=dataset['spatial'],resources=dataset['resources']) |
|
|
|
|
|
#delete tempdir |
|
shutil.rmtree(tempdir) |
|
success(msg) |
|
|