data.gov.au customisation
data.gov.au customisation

file:a/README.md -> file:b/README.md
=================================== ===================================
dga-spatialingestor dga-spatialingestor
=================================== ===================================
   
Provides ingestion capability for data.gov.au that can be triggered on dataset update with https://github.com/datagovuk/ckanext-os Provides ingestion capability for data.gov.au that can be triggered on dataset update with https://github.com/datagovuk/ckanext-os
   
   
CKAN Configuration: CKAN Configuration:
   
ckan.plugins = os_wfs_server ckan.plugins = os_wfs_server
   
ckanext-os.spatial-datastore.jdbc.url = {"dbname":"geodatastore", "user":"user", "password":"password", "host":"localhost"} ckanext-os.spatial-datastore.jdbc.url = {"dbname":"geodatastore", "user":"user", "password":"password", "host":"localhost"}
   
ckanext-os.spatial-ingester.filepath = /home/co/pyenv_dgu/src/os-spatialingester/spatial.ingester ckanext-os.spatial-ingester.filepath = /home/co/pyenv_dgu/src/os-spatialingester/spatial.ingester
   
Creating the PostGIS database: Creating the PostGIS database:
   
owner=user owner=user
   
db=geodatastore db=geodatastore
   
sudo -u postgres createdb -E UTF8 -O $owner $db sudo -u postgres createdb -E UTF8 -O $owner $db
   
sudo -u postgres psql $db -c "CREATE EXTENSION postgis;" sudo -u postgres psql $db -c "CREATE EXTENSION postgis;"
   
sudo -u postgres psql $db -c "ALTER TABLE geometry_columns OWNER TO $owner; ALTER TABLE spatial_ref_sys OWNER TO $owner;" sudo -u postgres psql $db -c "ALTER TABLE geometry_columns OWNER TO $owner; ALTER TABLE spatial_ref_sys OWNER TO $owner;"
  grant select on table geometry_columns to ckandga_data; # grant select to read only user
   
   
INSERT into spatial_ref_sys (srid, auth_name, auth_srid, proj4text, srtext) values ( 96643, 'sr-org', 6643, '', 'PROJCS["Albers134",GEOGCS["GCS_GDA_1994",DATUM["D_GDA_1994",SPHEROID["GRS_1980",6378137.0,298.257222101]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.017453292519943295]],PROJECTION["Albers"],PARAMETER["False_Easting",0.0],PARAMETER["False_Northing",0.0],PARAMETER["Central_Meridian",134.0],PARAMETER["Standard_Parallel_1",-18.0],PARAMETER["Standard_Parallel_2",-36.0],PARAMETER["Latitude_Of_Origin",0.0],UNIT["Meter",1.0]]'); INSERT into spatial_ref_sys (srid, auth_name, auth_srid, proj4text, srtext) values ( 96643, 'sr-org', 6643, '', 'PROJCS["Albers134",GEOGCS["GCS_GDA_1994",DATUM["D_GDA_1994",SPHEROID["GRS_1980",6378137.0,298.257222101]],PRIMEM["Greenwich",0.0],UNIT["Degree",0.017453292519943295]],PROJECTION["Albers"],PARAMETER["False_Easting",0.0],PARAMETER["False_Northing",0.0],PARAMETER["Central_Meridian",134.0],PARAMETER["Standard_Parallel_1",-18.0],PARAMETER["Standard_Parallel_2",-36.0],PARAMETER["Latitude_Of_Origin",0.0],UNIT["Meter",1.0]]');
   
   
  example run
  python dga-spatialingestor.py '{"dbname":"geodatastore", "user":"postgres", "password":"snmc", "host":"localhost"}' http://localhost:5000 256fa905-cf92-4d6c-8714-95e3da2ea3c2 geodataset
   
  #!/usr/bin/python
  # coding=utf-8
  '''
  spatial ingestor for data.gov.au
  <alex.sadleir@linkdigital.com.au>
  1.0 28/11/2013 initial implementation
  '''
import ckanapi #https://github.com/open-data/ckanapi import ckanapi #https://github.com/open-data/ckanapi
import errno, os, shutil, sys, glob import errno, os, shutil, sys, glob
from pprint import pprint from pprint import pprint
from email.mime.text import MIMEText from email.mime.text import MIMEText
from subprocess import Popen, PIPE from subprocess import Popen, PIPE
import tempfile import tempfile
  import smtplib
from zipfile import ZipFile from zipfile import ZipFile
  from datetime import datetime
import urllib import urllib
import fileinput import fileinput
import json import json
import psycopg2 import psycopg2
import requests import requests
   
geoserver_addr = "http://localhost:8080/geoserver/" geoserver_addr = "http://data.gov.au/geoserver/"
geoserver_user = "admin" geoserver_user = "admin"
geoserver_passwd = "geoserver" geoserver_passwd = "oRu7chan"
email_addr = "maxious@lambdacomplex.org" email_addr = "alex.sadleir@linkdigital.com.au"
shp2pgsql = "/usr/lib/postgresql/9.2/bin/shp2pgsql" shp2pgsql = "/usr/bin/shp2pgsql"
omitted_orgs = [] omitted_orgs = ['launcestoncitycouncil','gcc']
   
def email(subject, body): def email(subject, body):
msg = MIMEText(body) msg = MIMEText(body)
msg["From"] = "ckan@localhost" msg["From"] = "datagovau@gmail.com"
msg["To"] = email_addr msg["To"] = email_addr
msg["Subject"] = subject msg["Subject"] = subject
# Send the message via our own SMTP server, but don't include the # Send the message via our own SMTP server, but don't include the
# envelope header. # envelope header.
s = smtplib.SMTP('localhost') #p = Popen(["/usr/sbin/sendmail", "-t"], stdin=PIPE)
  #p.communicate(msg.as_string())
  s = smtplib.SMTP('smtp.gmail.com',587)
  s.ehlo()
  s.starttls()
  s.ehlo
  s.login('datagovau@gmail.com','3P4ELm9kjNAmKUL')
s.sendmail(msg["From"], [msg["To"]], msg.as_string()) s.sendmail(msg["From"], [msg["To"]], msg.as_string())
s.quit() s.quit()
   
def success(msg): def success(msg):
print "Completed!" print "Completed!"
# email("geodata success",msg) email("geodata success",msg)
sys.exit(errno.EACCES) sys.exit(errno.EACCES)
   
def failure(msg): def failure(msg):
print "ERROR -"+msg print "ERROR -"+msg
# email("geodata error",str(sys.argv)+msg) email("geodata error",str(sys.argv)+msg)
sys.exit(errno.EACCES) sys.exit(errno.EACCES)
   
def get_cursor(db_settings): def get_cursor(db_settings):
   
# Connect to an existing database # Connect to an existing database
try: try:
conn = psycopg2.connect(dbname=db_settings['dbname'], user=db_settings['user'], password=db_settings['password'], host=db_settings['host']) conn = psycopg2.connect(dbname=db_settings['dbname'], user=db_settings['user'], password=db_settings['password'], host=db_settings['host'])
except: except:
failure("I am unable to connect to the database.") failure("I am unable to connect to the database.")
# Open a cursor to perform database operations # Open a cursor to perform database operations
cur = conn.cursor() cur = conn.cursor()
conn.set_isolation_level(0) conn.set_isolation_level(0)
# Execute a command: this creates a new table # Execute a command: this creates a new table
#cur.execute("create extension postgis") #cur.execute("create extension postgis")
return (cur,conn) return (cur,conn)
   
def psql_load(proc): def psql_load(proc):
(cur,conn) = get_cursor(db_settings) (cur,conn) = get_cursor(db_settings)
sql = "" sql = ""
for line in iter(proc.stdout.readline,''): for line in iter(proc.stdout.readline,''):
sql += line sql += line
if sql.endswith(';'): if sql.endswith(';'):
cur.execute(sql) cur.execute(sql)
sql = "" sql = ""
if sql != "": if sql != "":
cur.execute(sql) cur.execute(sql)
cur.close() cur.close()
conn.close() conn.close()
   
if len(sys.argv) != 5: if len(sys.argv) != 5:
print "spatial ingester. command line: postgis_url api_url api_key dataset_id" print "spatial ingester. command line: postgis_url api_url api_key dataset_id"
sys.exit(errno.EACCES) sys.exit(errno.EACCES)
else: else:
(path, db_settings_json, api_url, api_key, dataset_id) = sys.argv (path, db_settings_json, api_url, api_key, dataset_id) = sys.argv
db_settings = json.loads(db_settings_json) db_settings = json.loads(db_settings_json)
   
ckan = ckanapi.RemoteCKAN(address=api_url, apikey=api_key) ckan = ckanapi.RemoteCKAN(address=api_url, apikey=api_key)
dataset = ckan.action.package_show(id=dataset_id) dataset = ckan.action.package_show(id=dataset_id)
print "loaded dataset"+dataset['name'] print "loaded dataset"+dataset['name']
#pprint(dataset) #pprint(dataset)
if dataset['owner_org'] in omitted_orgs: if dataset['organization']['name'] in omitted_orgs:
print(dataset.owner_org + " in omitted_orgs") print(dataset['organization']['name'] + " in omitted_orgs")
sys.exit(0); sys.exit(0);
   
ows_resources = [] ows_resources = []
kml_resources = [] kml_resources = []
shp_resources = [] shp_resources = []
data_modified_date = None data_modified_date = None
for resource in dataset['resources']: for resource in dataset['resources']:
if "wms" in resource['format'] or "wfs" in resource['format']: if "wms" in resource['format'] or "wfs" in resource['format']:
if geoserver_addr not in resource['url'] : if geoserver_addr not in resource['url'] :
failure(dataset['id']+" already has geo api"); failure(dataset['id']+" already has geo api");
else: else:
ows_resources += [resource] ows_resources += [resource]
if "kml" in resource['format']: # if "kml" in resource['format']:
kml_resources += [resource] # data_modified_date = resource['last_modified']
  # kml_resources += [resource]
if "shp" in resource['format']: if "shp" in resource['format']:
  data_modified_date = resource['last_modified']
shp_resources += [resource] shp_resources += [resource]
   
if len(shp_resources) == 0: if len(shp_resources) == 0:
print "No geodata format files detected" print "No geodata format files detected"
sys.exit(0); sys.exit(0);
   
#if geoserver api link does not exist or api link is out of date with data, continue #if geoserver api link does not exist or api link is out of date with data, continue
if len(ows_resources) > 0 and data_modified_date <= wms_resources[0]['last_modified']: if len(ows_resources) > 0 and data_modified_date <= wms_resources[0]['last_modified']:
print "Already up to date" print "Already up to date"
sys.exit(0); sys.exit(0);
   
#email("geodata processing started for "+dataset['id'], str(sys.argv)) email("geodata processing started for "+dataset['id'], "")
msg = "" msg = dataset['id']
#download resource to tmpfile #download resource to tmpfile
   
#check filesize limit #check filesize limit
   
(cur,conn) = get_cursor(db_settings) (cur,conn) = get_cursor(db_settings)
table_name = dataset['id'].replace("-","_") table_name = dataset['id']
cur.execute("DROP TABLE IF EXISTS "+table_name) #.replace("-","_")
  cur.execute('DROP TABLE IF EXISTS "'+table_name+'"')
cur.close() cur.close()
conn.close() conn.close()
   
tempdir = tempfile.mkdtemp(dataset['id']) tempdir = tempfile.mkdtemp(dataset['id'])
os.chdir(tempdir) os.chdir(tempdir)
print tempdir+" created" print tempdir+" created"
#load esri shapefiles #load esri shapefiles
if len(shp_resources) > 0: if len(shp_resources) > 0:
print "using SHP file "+shp_resources[0]['url'] print "using SHP file "+shp_resources[0]['url']
(filepath,headers) = urllib.urlretrieve(shp_resources[0]['url'], "input.zip" ) (filepath,headers) = urllib.urlretrieve(shp_resources[0]['url'], "input.zip" )
print "shp downlaoded" print "shp downlaoded"
with ZipFile(filepath, 'r') as myzip: with ZipFile(filepath, 'r') as myzip:
myzip.extractall() myzip.extractall()
print "shp unziped" print "shp unziped"
shpfiles = glob.glob("*.[sS][hH][pP]") shpfiles = glob.glob("*.[sS][hH][pP]")
prjfiles = glob.glob("*.[pP][rR][jJ]") prjfiles = glob.glob("*.[pP][rR][jJ]")
if len(shpfiles) == 0: if len(shpfiles) == 0:
failure("no shp files found in zip "+shp_resources[0]['url']) failure("no shp files found in zip "+shp_resources[0]['url'])
print "converting to pgsql "+shpfiles[0] print "converting to pgsql "+shpfiles[0]
process = Popen([shp2pgsql,shpfiles[0], table_name], stdout=PIPE, stderr=PIPE) process = Popen([shp2pgsql,shpfiles[0], table_name], stdout=PIPE, stderr=PIPE)
psql_load(process) psql_load(process)
if len(prjfiles) > 0: if len(prjfiles) > 0:
nativeCRS = open(prjfiles[0], 'r').read() nativeCRS = open(prjfiles[0], 'r').read()
#else: #else:
# print "using KML file "+kml_resources[0]['url'] # print "using KML file "+kml_resources[0]['url']
# #if kml ogr2ogr http://gis.stackexchange.com/questions/33102/how-to-import-kml-file-with-custom-data-to-postgres-postgis-database # #if kml ogr2ogr http://gis.stackexchange.com/questions/33102/how-to-import-kml-file-with-custom-data-to-postgres-postgis-database
# (filepath,headers) = urllib.urlretrieve(kml_resources[0]['url'], "input.kml") # (filepath,headers) = urllib.urlretrieve(kml_resources[0]['url'], "input.kml")
   
   
#load bounding boxes #load bounding boxes
(cur,conn) = get_cursor(db_settings) (cur,conn) = get_cursor(db_settings)
cur.execute('SELECT ST_Extent(geom) as box,ST_AsGeoJSON(ST_Extent(geom)) as geojson from '+table_name) cur.execute('SELECT ST_Extent(geom) as box,ST_AsGeoJSON(ST_Extent(geom)) as geojson from "'+table_name+'"')
(bbox,bgjson) = cur.fetchone() (bbox,bgjson) = cur.fetchone()
cur.close() cur.close()
conn.close() conn.close()
print bbox print bbox
   
   
#create geoserver dataset http://boundlessgeo.com/2012/10/adding-layers-to-geoserver-using-the-rest-api/ #create geoserver dataset http://boundlessgeo.com/2012/10/adding-layers-to-geoserver-using-the-rest-api/
# name workspace after dataset # name workspace after dataset
workspace = dataset['name'] workspace = dataset['name']
ws = requests.post(geoserver_addr+'rest/workspaces', data=json.dumps({'workspace': {'name': workspace} }), headers={'Content-type': 'application/json'}, auth=('admin', 'geoserver')) ws = requests.post(geoserver_addr+'rest/workspaces', data=json.dumps({'workspace': {'name': workspace} }), headers={'Content-type': 'application/json'}, auth=(geoserver_user, geoserver_passwd))
pprint(ws) pprint(ws)
#echo ws.status_code #echo ws.status_code
#echo ws.text #echo ws.text
   
datastore = dataset['name']+'ds' datastore = dataset['name']+'ds'
dsdata =json.dumps({'dataStore':{'name':datastore, dsdata =json.dumps({'dataStore':{'name':datastore,
'connectionParameters' : { 'connectionParameters' : {
'host':db_settings['host'], 'host':db_settings['host'],
'port':5432, 'port':5432,
'database': db_settings['dbname'], 'database': db_settings['dbname'],
'schema':'public', 'schema':'public',
'user':db_settings['user'], 'user':db_settings['user'] + "_data", #use read only user
'passwd':db_settings['password'], 'passwd':db_settings['password'],
'dbtype':'postgis' 'dbtype':'postgis'
   
}}}) }}})
#print dsdata print dsdata
r = requests.post(geoserver_addr+'rest/workspaces/'+workspace+'/datastores', data=dsdata, headers={'Content-type': 'application/json'}, auth=('admin', 'geoserver')) r = requests.post(geoserver_addr+'rest/workspaces/'+workspace+'/datastores', data=dsdata, headers={'Content-type': 'application/json'}, auth=(geoserver_user, geoserver_passwd))
pprint(r) pprint(r)
#echo r.status_code #echo r.status_code
#echo r.text #echo r.text
   
# name layer after resource title # name layer after resource title
ftdata = {'featureType':{'name':table_name, 'title': resource['name']}} ftdata = {'featureType':{'name':table_name, 'title': dataset['title']}}
  (minx,miny, maxx, maxy) = bbox.replace("BOX","").replace("(","").replace(")","").replace(","," ").split(" ")
  bbox_obj = { 'minx': minx,'maxx': maxx,'miny': miny,'maxy': maxy }
   
if nativeCRS != None: if nativeCRS != None:
ftdata['featureType']['nativeCRS'] = nativeCRS ftdata['featureType']['nativeCRS'] = nativeCRS
else: else:
(minx,miny, maxx, maxy) = bbox.replace("BOX","").replace("(","").replace(")","").replace(","," ").split(" ")  
bbox_obj = { 'minx': minx,'maxx': maxx,'miny': miny,'maxy': maxy }  
ftdata['featureType']['nativeBoundingBox'] = bbox_obj ftdata['featureType']['nativeBoundingBox'] = bbox_obj
ftdata['featureType']['latLonBoundingBox'] = bbox_obj ftdata['featureType']['latLonBoundingBox'] = bbox_obj
ftdata['featureType']['srs'] = "EPSG:4326" ftdata['featureType']['srs'] = "EPSG:4326"
ftdata = json.dumps(ftdata) ftdata = json.dumps(ftdata)
r = requests.post(geoserver_addr+'rest/workspaces/'+workspace+'/datastores/'+datastore+"/featuretypes", data= ftdata, headers={'Content-Type': 'application/json'}, auth=('admin', 'geoserver')) print geoserver_addr+'rest/workspaces/'+workspace+'/datastores/'+datastore+"/featuretypes"
  print ftdata
  r = requests.post(geoserver_addr+'rest/workspaces/'+workspace+'/datastores/'+datastore+"/featuretypes", data= ftdata, headers={'Content-Type': 'application/json'}, auth=(geoserver_user, geoserver_passwd))
pprint(r) pprint(r)
   
#generate wms/wfs api links, kml, png resources and add to package #generate wms/wfs api links, kml, png resources and add to package
print bgjson print bgjson
dataset['spatial'] = bgjson dataset['spatial'] = bgjson
# ckan.action.resource_update(id=resource['id'],url=resource['url'],name=resource['name'], last_modified=datetime.now().isoformat())  
#/geodatasetws/wms ws_addr = geoserver_addr+dataset['name']+"/"
#/geodatasetws/wfs for format in ['image/png','kml']:
#http://cloudnine.lambdacomplex.org:8080/geoserver/tiger/ows?service=WFS&version=1.0.0&request=GetFeature&typeName=tiger:tiger_roads&outputFormat=application/json url = ws_addr+"wms?request=GetMap&layers="+table_name+"&bbox="+bbox_obj['minx']+","+bbox_obj['miny']+","+bbox_obj['maxx']+","+bbox_obj['maxy']+"&width=512&height=512&format="+urllib.quote(format)
#http://cloudnine.lambdacomplex.org:8080/geoserver/tiger/ows?service=WFS&version=1.0.0&request=GetFeature&typeName=tiger:tiger_roads&outputFormat=csv if format == "image/png":
  dataset['resources'].append({"name":dataset['title'] + " Preview Image","description":"View overview image of this dataset" ,"format":format,"url":url, "last_modified": datetime.now().isoformat()})
  if format == "kml":
  dataset['resources'].append({"name":dataset['title'] + " KML","description":"For use in web and desktop spatial data tools including Google Earth" ,"format":format,"url":url, "last_modified": datetime.now().isoformat()})
  for format in ['csv','json']:
  url = ws_addr+"wfs?request=GetFeature&typeName="+table_name+"&outputFormat="+urllib.quote(format)
  if format == "csv":
  dataset['resources'].append({"name": dataset['title'] + " CSV","description":"For summary of the objects/data in this collection","format":format,"url":url, "last_modified": datetime.now().isoformat()})
  if format == "json":
  dataset['resources'].append({"name":dataset['title'] + " GeoJSON","description":"For use in web-based data visualisation of this collection","format":format,"url":url, "last_modified": datetime.now().isoformat()})
  dataset['resources'].append({"name":dataset['title'] + " - Preview this Dataset (WMS)","description":"View the data in this datasets online via web-based WMS viewer","format":"wms",
  "url":ws_addr+"wms?request=GetCapabilities", "last_modified": datetime.now().isoformat()})
  dataset['resources'].append({"name":dataset['title'] + " WFS Link","description":"WFS Link for use of live data in Desktop GIS tools","format":"wfs",
  "url":ws_addr+"wfs?request=GetCapabilities", "last_modified": datetime.now().isoformat()})
   
  pprint(dataset)
  #ckan.action.package_update(id=dataset['id'],spatial=dataset['spatial'],resources=dataset['resources'])
   
   
#delete tempdir #delete tempdir
shutil.rmtree(tempdir) shutil.rmtree(tempdir)
success(msg) success(msg)