--- /dev/null +++ b/README.md @@ -1,1 +1,33 @@ +=================================== +dga-spatialingestor +=================================== +Provides ingestion capability for data.gov.au that can be triggered on dataset update with https://github.com/datagovuk/ckanext-os + + +CKAN Configuration: + + ckan.plugins = os_wfs_server + + ckanext-os.spatial-datastore.jdbc.url = {"dbname":"geodatastore", "user":"user", "password":"password", "host":"localhost"} + + ckanext-os.spatial-ingester.filepath = /home/co/pyenv_dgu/src/os-spatialingester/spatial.ingester + +Creating the PostGIS database: + + owner=user + + db=geodatastore + + sudo -u postgres createdb -E UTF8 -O $owner $db + + sudo -u postgres psql $db -c "CREATE EXTENSION postgis;" + + sudo -u postgres psql $db -c "ALTER TABLE geometry_columns OWNER TO $owner; ALTER TABLE spatial_ref_sys OWNER TO $owner;" + +INSERT into spatial_ref_sys (srid, auth_name, auth_srid, proj4text, srtext) values ( 96643, 'sr-org', 6643, '', 'PROJCS["Albers134",GEOGCS["GCS_GDA_1994",DATUM["D_GDA_1994",SPHEROID["GRS_1980",6378137.0,298.257222101]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.017453292519943295]],PROJECTION["Albers"],PARAMETER["False_Easting",0.0],PARAMETER["False_Northing",0.0],PARAMETER["Central_Meridian",134.0],PARAMETER["Standard_Parallel_1",-18.0],PARAMETER["Standard_Parallel_2",-36.0],PARAMETER["Latitude_Of_Origin",0.0],UNIT["Meter",1.0]]'); + + +example run +python dga-spatialingestor.py '{"dbname":"geodatastore", "user":"postgres", "password":"snmc", "host":"localhost"}' http://localhost:5000 256fa905-cf92-4d6c-8714-95e3da2ea3c2 geodataset +
--- /dev/null +++ b/dga-spatialingestor.py @@ -1,1 +1,235 @@ - +#!/usr/bin/python +# coding=utf-8 +''' +spatial ingestor for data.gov.au +<alex.sadleir@linkdigital.com.au> +1.0 28/11/2013 initial implementation +''' +import ckanapi #https://github.com/open-data/ckanapi +import errno, os, shutil, sys, glob +from pprint import pprint +from email.mime.text import MIMEText +from subprocess import Popen, PIPE +import tempfile +import smtplib +from zipfile import ZipFile +from datetime import datetime +import urllib +import fileinput +import json +import psycopg2 +import requests + +geoserver_addr = "http://data.disclosurelo.gs:8080/geoserver/" +geoserver_user = "admin" +geoserver_passwd = "geoserver" +email_addr = "maxious@lambdacomplex.org" +shp2pgsql = "/usr/lib/postgresql/9.2/bin/shp2pgsql" +omitted_orgs = [] + +def email(subject, body): + msg = MIMEText(body) + msg["From"] = "ckan@localhost" + msg["To"] = email_addr + msg["Subject"] = subject + # Send the message via our own SMTP server, but don't include the + # envelope header. + s = smtplib.SMTP('localhost') + s.sendmail(msg["From"], [msg["To"]], msg.as_string()) + s.quit() + +def success(msg): + print "Completed!" + email("geodata success",msg) + sys.exit(errno.EACCES) + +def failure(msg): + print "ERROR -"+msg + email("geodata error",str(sys.argv)+msg) + sys.exit(errno.EACCES) + +def get_cursor(db_settings): + + # Connect to an existing database + try: + conn = psycopg2.connect(dbname=db_settings['dbname'], user=db_settings['user'], password=db_settings['password'], host=db_settings['host']) + except: + failure("I am unable to connect to the database.") + # Open a cursor to perform database operations + cur = conn.cursor() + conn.set_isolation_level(0) + # Execute a command: this creates a new table + #cur.execute("create extension postgis") + return (cur,conn) + +def psql_load(proc): + (cur,conn) = get_cursor(db_settings) + sql = "" + for line in iter(proc.stdout.readline,''): + sql += line + if sql.endswith(';'): + cur.execute(sql) + sql = "" + if sql != "": + cur.execute(sql) + cur.close() + conn.close() + +if len(sys.argv) != 5: + print "spatial ingester. command line: postgis_url api_url api_key dataset_id" + sys.exit(errno.EACCES) +else: + (path, db_settings_json, api_url, api_key, dataset_id) = sys.argv + db_settings = json.loads(db_settings_json) + +ckan = ckanapi.RemoteCKAN(address=api_url, apikey=api_key) +dataset = ckan.action.package_show(id=dataset_id) +print "loaded dataset"+dataset['name'] +#pprint(dataset) +if dataset['owner_org'] in omitted_orgs: + print(dataset.owner_org + " in omitted_orgs") + sys.exit(0); + +ows_resources = [] +kml_resources = [] +shp_resources = [] +data_modified_date = None +for resource in dataset['resources']: + if "wms" in resource['format'] or "wfs" in resource['format']: + if geoserver_addr not in resource['url'] : + failure(dataset['id']+" already has geo api"); + else: + ows_resources += [resource] + + if "kml" in resource['format']: + kml_resources += [resource] + if "shp" in resource['format']: + shp_resources += [resource] + +if len(shp_resources) == 0: + print "No geodata format files detected" + sys.exit(0); + +#if geoserver api link does not exist or api link is out of date with data, continue +if len(ows_resources) > 0 and data_modified_date <= wms_resources[0]['last_modified']: + print "Already up to date" + sys.exit(0); + +email("geodata processing started for "+dataset['id'], str(sys.argv)) +msg = "" +#download resource to tmpfile + +#check filesize limit + +(cur,conn) = get_cursor(db_settings) +table_name = dataset['id'].replace("-","_") +cur.execute("DROP TABLE IF EXISTS "+table_name) +cur.close() +conn.close() + +tempdir = tempfile.mkdtemp(dataset['id']) +os.chdir(tempdir) +print tempdir+" created" +#load esri shapefiles +if len(shp_resources) > 0: + print "using SHP file "+shp_resources[0]['url'] + (filepath,headers) = urllib.urlretrieve(shp_resources[0]['url'], "input.zip" ) + print "shp downlaoded" + with ZipFile(filepath, 'r') as myzip: + myzip.extractall() + print "shp unziped" + shpfiles = glob.glob("*.[sS][hH][pP]") + prjfiles = glob.glob("*.[pP][rR][jJ]") + if len(shpfiles) == 0: + failure("no shp files found in zip "+shp_resources[0]['url']) + print "converting to pgsql "+shpfiles[0] + process = Popen([shp2pgsql,shpfiles[0], table_name], stdout=PIPE, stderr=PIPE) + psql_load(process) + if len(prjfiles) > 0: + nativeCRS = open(prjfiles[0], 'r').read() +#else: +# print "using KML file "+kml_resources[0]['url'] +# #if kml ogr2ogr http://gis.stackexchange.com/questions/33102/how-to-import-kml-file-with-custom-data-to-postgres-postgis-database +# (filepath,headers) = urllib.urlretrieve(kml_resources[0]['url'], "input.kml") + + +#load bounding boxes +(cur,conn) = get_cursor(db_settings) +cur.execute('SELECT ST_Extent(geom) as box,ST_AsGeoJSON(ST_Extent(geom)) as geojson from '+table_name) +(bbox,bgjson) = cur.fetchone() +cur.close() +conn.close() +print bbox + + +#create geoserver dataset http://boundlessgeo.com/2012/10/adding-layers-to-geoserver-using-the-rest-api/ +# name workspace after dataset +workspace = dataset['name'] +ws = requests.post(geoserver_addr+'rest/workspaces', data=json.dumps({'workspace': {'name': workspace} }), headers={'Content-type': 'application/json'}, auth=('admin', 'geoserver')) +pprint(ws) +#echo ws.status_code +#echo ws.text + +datastore = dataset['name']+'ds' +dsdata =json.dumps({'dataStore':{'name':datastore, + 'connectionParameters' : { + 'host':db_settings['host'], + 'port':5432, + 'database': db_settings['dbname'], + 'schema':'public', + 'user':db_settings['user'], + 'passwd':db_settings['password'], + 'dbtype':'postgis' + + }}}) +#print dsdata +r = requests.post(geoserver_addr+'rest/workspaces/'+workspace+'/datastores', data=dsdata, headers={'Content-type': 'application/json'}, auth=('admin', 'geoserver')) +pprint(r) +#echo r.status_code +#echo r.text + +# name layer after resource title +ftdata = {'featureType':{'name':table_name, 'title': resource['name']}} +(minx,miny, maxx, maxy) = bbox.replace("BOX","").replace("(","").replace(")","").replace(","," ").split(" ") +bbox_obj = { 'minx': minx,'maxx': maxx,'miny': miny,'maxy': maxy } + +if nativeCRS != None: + ftdata['featureType']['nativeCRS'] = nativeCRS +else: + ftdata['featureType']['nativeBoundingBox'] = bbox_obj + ftdata['featureType']['latLonBoundingBox'] = bbox_obj + ftdata['featureType']['srs'] = "EPSG:4326" +ftdata = json.dumps(ftdata) +r = requests.post(geoserver_addr+'rest/workspaces/'+workspace+'/datastores/'+datastore+"/featuretypes", data= ftdata, headers={'Content-Type': 'application/json'}, auth=('admin', 'geoserver')) +pprint(r) + +#generate wms/wfs api links, kml, png resources and add to package +print bgjson +dataset['spatial'] = bgjson + +ws_addr = geoserver_addr+dataset['name']+"/" +for format in ['image/png','kml']: + url = ws_addr+"wms?request=GetMap&layers="+table_name+"&bbox="+bbox_obj['minx']+","+bbox_obj['miny']+","+bbox_obj['maxx']+","+bbox_obj['maxy']+"&width=512&height=512&format="+urllib.quote(format) + if format == "image/png": + dataset['resources'].append({"name":dataset['title'] + " Preview Image","description":"View overview image of this dataset" ,"format":format,"url":url, "last_modified": datetime.now().isoformat()}) + if format == "kml": + dataset['resources'].append({"name":dataset['title'] + " KML","description":"For use in web and desktop spatial data tools including Google Earth" ,"format":format,"url":url, "last_modified": datetime.now().isoformat()}) +for format in ['csv','json']: + url = ws_addr+"wfs?request=GetFeature&typeName="+table_name+"&outputFormat="+urllib.quote(format) + if format == "csv": + dataset['resources'].append({"name": dataset['title'] + " CSV","description":"For summary of the objects/data in this collection","format":format,"url":url, "last_modified": datetime.now().isoformat()}) + if format == "json": + dataset['resources'].append({"name":dataset['title'] + " GeoJSON","description":"For use in web-based data visualisation of this collection","format":format,"url":url, "last_modified": datetime.now().isoformat()}) +dataset['resources'].append({"name":dataset['name'] + " - Preview this Dataset (WMS)","description":"View the data in this datasets online via web-based WMS viewer","format":"wms", + "url":ws_addr+"wms?request=GetCapabilities", "last_modified": datetime.now().isoformat()}) +dataset['resources'].append({"name":dataset['title'] + " WFS Link","description":"WFS Link for use of live data in Desktop GIS tools","format":"wfs", + "url":ws_addr+"wfs?request=GetCapabilities", "last_modified": datetime.now().isoformat()}) + +pprint(dataset) +ckan.action.package_update(id=dataset['id'],spatial=dataset['spatial'],resources=dataset['resources']) + + +#delete tempdir +shutil.rmtree(tempdir) +success(msg) +