#!/usr/bin/python |
#!/usr/bin/python |
# coding=utf-8 |
# coding=utf-8 |
''' |
''' |
spatial ingestor for data.gov.au |
spatial ingestor for data.gov.au |
<alex.sadleir@linkdigital.com.au> |
<alex.sadleir@linkdigital.com.au> |
1.0 28/11/2013 initial implementation |
1.0 28/11/2013 initial implementation |
''' |
''' |
import ckanapi #https://github.com/open-data/ckanapi |
import ckanapi #https://github.com/open-data/ckanapi |
import errno, os, shutil, sys, glob |
import errno, os, shutil, sys, glob |
from pprint import pprint |
from pprint import pprint |
from email.mime.text import MIMEText |
from email.mime.text import MIMEText |
from subprocess import Popen, PIPE |
from subprocess import Popen, PIPE |
import tempfile |
import tempfile |
import smtplib |
import smtplib |
from zipfile import ZipFile |
from zipfile import ZipFile |
from datetime import datetime |
from datetime import datetime |
import urllib |
import urllib |
import fileinput |
import fileinput |
import json |
import json |
import psycopg2 |
import psycopg2 |
import requests |
import requests |
|
|
geoserver_addr = "http://data.disclosurelo.gs:8080/geoserver/" |
geoserver_addr = "http://data.gov.au/geoserver/" |
geoserver_user = "admin" |
geoserver_user = "admin" |
geoserver_passwd = "geoserver" |
geoserver_passwd = "oRu7chan" |
email_addr = "maxious@lambdacomplex.org" |
email_addr = "alex.sadleir@linkdigital.com.au" |
shp2pgsql = "/usr/lib/postgresql/9.2/bin/shp2pgsql" |
shp2pgsql = "/usr/bin/shp2pgsql" |
omitted_orgs = [] |
omitted_orgs = ['launcestoncitycouncil','gcc'] |
|
|
def email(subject, body): |
def email(subject, body): |
msg = MIMEText(body) |
msg = MIMEText(body) |
msg["From"] = "ckan@localhost" |
msg["From"] = "datagovau@gmail.com" |
msg["To"] = email_addr |
msg["To"] = email_addr |
msg["Subject"] = subject |
msg["Subject"] = subject |
# Send the message via our own SMTP server, but don't include the |
# Send the message via our own SMTP server, but don't include the |
# envelope header. |
# envelope header. |
s = smtplib.SMTP('localhost') |
#p = Popen(["/usr/sbin/sendmail", "-t"], stdin=PIPE) |
|
#p.communicate(msg.as_string()) |
|
s = smtplib.SMTP('smtp.gmail.com',587) |
|
s.ehlo() |
|
s.starttls() |
|
s.ehlo |
|
s.login('datagovau@gmail.com','3P4ELm9kjNAmKUL') |
s.sendmail(msg["From"], [msg["To"]], msg.as_string()) |
s.sendmail(msg["From"], [msg["To"]], msg.as_string()) |
s.quit() |
s.quit() |
|
|
def success(msg): |
def success(msg): |
print "Completed!" |
print "Completed!" |
email("geodata success",msg) |
email("geodata success",msg) |
sys.exit(errno.EACCES) |
sys.exit(errno.EACCES) |
|
|
def failure(msg): |
def failure(msg): |
print "ERROR -"+msg |
print "ERROR -"+msg |
email("geodata error",str(sys.argv)+msg) |
email("geodata error",str(sys.argv)+msg) |
sys.exit(errno.EACCES) |
sys.exit(errno.EACCES) |
|
|
def get_cursor(db_settings): |
def get_cursor(db_settings): |
|
|
# Connect to an existing database |
# Connect to an existing database |
try: |
try: |
conn = psycopg2.connect(dbname=db_settings['dbname'], user=db_settings['user'], password=db_settings['password'], host=db_settings['host']) |
conn = psycopg2.connect(dbname=db_settings['dbname'], user=db_settings['user'], password=db_settings['password'], host=db_settings['host']) |
except: |
except: |
failure("I am unable to connect to the database.") |
failure("I am unable to connect to the database.") |
# Open a cursor to perform database operations |
# Open a cursor to perform database operations |
cur = conn.cursor() |
cur = conn.cursor() |
conn.set_isolation_level(0) |
conn.set_isolation_level(0) |
# Execute a command: this creates a new table |
# Execute a command: this creates a new table |
#cur.execute("create extension postgis") |
#cur.execute("create extension postgis") |
return (cur,conn) |
return (cur,conn) |
|
|
def psql_load(proc): |
def psql_load(proc): |
(cur,conn) = get_cursor(db_settings) |
(cur,conn) = get_cursor(db_settings) |
sql = "" |
sql = "" |
for line in iter(proc.stdout.readline,''): |
for line in iter(proc.stdout.readline,''): |
sql += line |
sql += line |
if sql.endswith(';'): |
if sql.endswith(';'): |
cur.execute(sql) |
cur.execute(sql) |
sql = "" |
sql = "" |
if sql != "": |
if sql != "": |
cur.execute(sql) |
cur.execute(sql) |
cur.close() |
cur.close() |
conn.close() |
conn.close() |
|
|
if len(sys.argv) != 5: |
if len(sys.argv) != 5: |
print "spatial ingester. command line: postgis_url api_url api_key dataset_id" |
print "spatial ingester. command line: postgis_url api_url api_key dataset_id" |
sys.exit(errno.EACCES) |
sys.exit(errno.EACCES) |
else: |
else: |
(path, db_settings_json, api_url, api_key, dataset_id) = sys.argv |
(path, db_settings_json, api_url, api_key, dataset_id) = sys.argv |
db_settings = json.loads(db_settings_json) |
db_settings = json.loads(db_settings_json) |
|
|
ckan = ckanapi.RemoteCKAN(address=api_url, apikey=api_key) |
ckan = ckanapi.RemoteCKAN(address=api_url, apikey=api_key) |
dataset = ckan.action.package_show(id=dataset_id) |
dataset = ckan.action.package_show(id=dataset_id) |
print "loaded dataset"+dataset['name'] |
print "loaded dataset"+dataset['name'] |
#pprint(dataset) |
#pprint(dataset) |
if dataset['owner_org'] in omitted_orgs: |
if dataset['organization']['name'] in omitted_orgs: |
print(dataset.owner_org + " in omitted_orgs") |
print(dataset['organization']['name'] + " in omitted_orgs") |
sys.exit(0); |
sys.exit(0); |
|
|
ows_resources = [] |
ows_resources = [] |
kml_resources = [] |
kml_resources = [] |
shp_resources = [] |
shp_resources = [] |
data_modified_date = None |
data_modified_date = None |
for resource in dataset['resources']: |
for resource in dataset['resources']: |
if "wms" in resource['format'] or "wfs" in resource['format']: |
if "wms" in resource['format'] or "wfs" in resource['format']: |
if geoserver_addr not in resource['url'] : |
if geoserver_addr not in resource['url'] : |
failure(dataset['id']+" already has geo api"); |
failure(dataset['id']+" already has geo api"); |
else: |
else: |
ows_resources += [resource] |
ows_resources += [resource] |
|
|
if "kml" in resource['format']: |
# if "kml" in resource['format']: |
kml_resources += [resource] |
# data_modified_date = resource['last_modified'] |
|
# kml_resources += [resource] |
if "shp" in resource['format']: |
if "shp" in resource['format']: |
|
data_modified_date = resource['last_modified'] |
shp_resources += [resource] |
shp_resources += [resource] |
|
|
if len(shp_resources) == 0: |
if len(shp_resources) == 0: |
print "No geodata format files detected" |
print "No geodata format files detected" |
sys.exit(0); |
sys.exit(0); |
|
|
#if geoserver api link does not exist or api link is out of date with data, continue |
#if geoserver api link does not exist or api link is out of date with data, continue |
if len(ows_resources) > 0 and data_modified_date <= wms_resources[0]['last_modified']: |
if len(ows_resources) > 0 and data_modified_date <= wms_resources[0]['last_modified']: |
print "Already up to date" |
print "Already up to date" |
sys.exit(0); |
sys.exit(0); |
|
|
email("geodata processing started for "+dataset['id'], str(sys.argv)) |
email("geodata processing started for "+dataset['id'], "") |
msg = "" |
msg = dataset['id'] |
#download resource to tmpfile |
#download resource to tmpfile |
|
|
#check filesize limit |
#check filesize limit |
|
|
(cur,conn) = get_cursor(db_settings) |
(cur,conn) = get_cursor(db_settings) |
table_name = dataset['id'].replace("-","_") |
table_name = dataset['id'] |
cur.execute("DROP TABLE IF EXISTS "+table_name) |
#.replace("-","_") |
|
cur.execute('DROP TABLE IF EXISTS "'+table_name+'"') |
cur.close() |
cur.close() |
conn.close() |
conn.close() |
|
|
tempdir = tempfile.mkdtemp(dataset['id']) |
tempdir = tempfile.mkdtemp(dataset['id']) |
os.chdir(tempdir) |
os.chdir(tempdir) |
print tempdir+" created" |
print tempdir+" created" |
#load esri shapefiles |
#load esri shapefiles |
if len(shp_resources) > 0: |
if len(shp_resources) > 0: |
print "using SHP file "+shp_resources[0]['url'] |
print "using SHP file "+shp_resources[0]['url'] |
(filepath,headers) = urllib.urlretrieve(shp_resources[0]['url'], "input.zip" ) |
(filepath,headers) = urllib.urlretrieve(shp_resources[0]['url'], "input.zip" ) |
print "shp downlaoded" |
print "shp downlaoded" |
with ZipFile(filepath, 'r') as myzip: |
with ZipFile(filepath, 'r') as myzip: |
myzip.extractall() |
myzip.extractall() |
print "shp unziped" |
print "shp unziped" |
shpfiles = glob.glob("*.[sS][hH][pP]") |
shpfiles = glob.glob("*.[sS][hH][pP]") |
prjfiles = glob.glob("*.[pP][rR][jJ]") |
prjfiles = glob.glob("*.[pP][rR][jJ]") |
if len(shpfiles) == 0: |
if len(shpfiles) == 0: |
failure("no shp files found in zip "+shp_resources[0]['url']) |
failure("no shp files found in zip "+shp_resources[0]['url']) |
print "converting to pgsql "+shpfiles[0] |
print "converting to pgsql "+shpfiles[0] |
process = Popen([shp2pgsql,shpfiles[0], table_name], stdout=PIPE, stderr=PIPE) |
process = Popen([shp2pgsql,shpfiles[0], table_name], stdout=PIPE, stderr=PIPE) |
psql_load(process) |
psql_load(process) |
if len(prjfiles) > 0: |
if len(prjfiles) > 0: |
nativeCRS = open(prjfiles[0], 'r').read() |
nativeCRS = open(prjfiles[0], 'r').read() |
#else: |
#else: |
# print "using KML file "+kml_resources[0]['url'] |
# print "using KML file "+kml_resources[0]['url'] |
# #if kml ogr2ogr http://gis.stackexchange.com/questions/33102/how-to-import-kml-file-with-custom-data-to-postgres-postgis-database |
# #if kml ogr2ogr http://gis.stackexchange.com/questions/33102/how-to-import-kml-file-with-custom-data-to-postgres-postgis-database |
# (filepath,headers) = urllib.urlretrieve(kml_resources[0]['url'], "input.kml") |
# (filepath,headers) = urllib.urlretrieve(kml_resources[0]['url'], "input.kml") |
|
|
|
|
#load bounding boxes |
#load bounding boxes |
(cur,conn) = get_cursor(db_settings) |
(cur,conn) = get_cursor(db_settings) |
cur.execute('SELECT ST_Extent(geom) as box,ST_AsGeoJSON(ST_Extent(geom)) as geojson from '+table_name) |
cur.execute('SELECT ST_Extent(geom) as box,ST_AsGeoJSON(ST_Extent(geom)) as geojson from "'+table_name+'"') |
(bbox,bgjson) = cur.fetchone() |
(bbox,bgjson) = cur.fetchone() |
cur.close() |
cur.close() |
conn.close() |
conn.close() |
print bbox |
print bbox |
|
|
|
|
#create geoserver dataset http://boundlessgeo.com/2012/10/adding-layers-to-geoserver-using-the-rest-api/ |
#create geoserver dataset http://boundlessgeo.com/2012/10/adding-layers-to-geoserver-using-the-rest-api/ |
# name workspace after dataset |
# name workspace after dataset |
workspace = dataset['name'] |
workspace = dataset['name'] |
ws = requests.post(geoserver_addr+'rest/workspaces', data=json.dumps({'workspace': {'name': workspace} }), headers={'Content-type': 'application/json'}, auth=('admin', 'geoserver')) |
ws = requests.post(geoserver_addr+'rest/workspaces', data=json.dumps({'workspace': {'name': workspace} }), headers={'Content-type': 'application/json'}, auth=(geoserver_user, geoserver_passwd)) |
pprint(ws) |
pprint(ws) |
#echo ws.status_code |
#echo ws.status_code |
#echo ws.text |
#echo ws.text |
|
|
datastore = dataset['name']+'ds' |
datastore = dataset['name']+'ds' |
dsdata =json.dumps({'dataStore':{'name':datastore, |
dsdata =json.dumps({'dataStore':{'name':datastore, |
'connectionParameters' : { |
'connectionParameters' : { |
'host':db_settings['host'], |
'host':db_settings['host'], |
'port':5432, |
'port':5432, |
'database': db_settings['dbname'], |
'database': db_settings['dbname'], |
'schema':'public', |
'schema':'public', |
'user':db_settings['user'], |
'user':db_settings['user'] + "_data", #use read only user |
'passwd':db_settings['password'], |
'passwd':db_settings['password'], |
'dbtype':'postgis' |
'dbtype':'postgis' |
|
|
}}}) |
}}}) |
#print dsdata |
print dsdata |
r = requests.post(geoserver_addr+'rest/workspaces/'+workspace+'/datastores', data=dsdata, headers={'Content-type': 'application/json'}, auth=('admin', 'geoserver')) |
r = requests.post(geoserver_addr+'rest/workspaces/'+workspace+'/datastores', data=dsdata, headers={'Content-type': 'application/json'}, auth=(geoserver_user, geoserver_passwd)) |
pprint(r) |
pprint(r) |
#echo r.status_code |
#echo r.status_code |
#echo r.text |
#echo r.text |
|
|
# name layer after resource title |
# name layer after resource title |
ftdata = {'featureType':{'name':table_name, 'title': resource['name']}} |
ftdata = {'featureType':{'name':table_name, 'title': dataset['title']}} |
(minx,miny, maxx, maxy) = bbox.replace("BOX","").replace("(","").replace(")","").replace(","," ").split(" ") |
(minx,miny, maxx, maxy) = bbox.replace("BOX","").replace("(","").replace(")","").replace(","," ").split(" ") |
bbox_obj = { 'minx': minx,'maxx': maxx,'miny': miny,'maxy': maxy } |
bbox_obj = { 'minx': minx,'maxx': maxx,'miny': miny,'maxy': maxy } |
|
|
if nativeCRS != None: |
if nativeCRS != None: |
ftdata['featureType']['nativeCRS'] = nativeCRS |
ftdata['featureType']['nativeCRS'] = nativeCRS |
else: |
else: |
ftdata['featureType']['nativeBoundingBox'] = bbox_obj |
ftdata['featureType']['nativeBoundingBox'] = bbox_obj |
ftdata['featureType']['latLonBoundingBox'] = bbox_obj |
ftdata['featureType']['latLonBoundingBox'] = bbox_obj |
ftdata['featureType']['srs'] = "EPSG:4326" |
ftdata['featureType']['srs'] = "EPSG:4326" |
ftdata = json.dumps(ftdata) |
ftdata = json.dumps(ftdata) |
r = requests.post(geoserver_addr+'rest/workspaces/'+workspace+'/datastores/'+datastore+"/featuretypes", data= ftdata, headers={'Content-Type': 'application/json'}, auth=('admin', 'geoserver')) |
print geoserver_addr+'rest/workspaces/'+workspace+'/datastores/'+datastore+"/featuretypes" |
|
print ftdata |
|
r = requests.post(geoserver_addr+'rest/workspaces/'+workspace+'/datastores/'+datastore+"/featuretypes", data= ftdata, headers={'Content-Type': 'application/json'}, auth=(geoserver_user, geoserver_passwd)) |
pprint(r) |
pprint(r) |
|
|
#generate wms/wfs api links, kml, png resources and add to package |
#generate wms/wfs api links, kml, png resources and add to package |
print bgjson |
print bgjson |
dataset['spatial'] = bgjson |
dataset['spatial'] = bgjson |
|
|
ws_addr = geoserver_addr+dataset['name']+"/" |
ws_addr = geoserver_addr+dataset['name']+"/" |
for format in ['image/png','kml']: |
for format in ['image/png','kml']: |
url = ws_addr+"wms?request=GetMap&layers="+table_name+"&bbox="+bbox_obj['minx']+","+bbox_obj['miny']+","+bbox_obj['maxx']+","+bbox_obj['maxy']+"&width=512&height=512&format="+urllib.quote(format) |
url = ws_addr+"wms?request=GetMap&layers="+table_name+"&bbox="+bbox_obj['minx']+","+bbox_obj['miny']+","+bbox_obj['maxx']+","+bbox_obj['maxy']+"&width=512&height=512&format="+urllib.quote(format) |
if format == "image/png": |
if format == "image/png": |
dataset['resources'].append({"name":dataset['title'] + " Preview Image","description":"View overview image of this dataset" ,"format":format,"url":url, "last_modified": datetime.now().isoformat()}) |
dataset['resources'].append({"name":dataset['title'] + " Preview Image","description":"View overview image of this dataset" ,"format":format,"url":url, "last_modified": datetime.now().isoformat()}) |
if format == "kml": |
if format == "kml": |
dataset['resources'].append({"name":dataset['title'] + " KML","description":"For use in web and desktop spatial data tools including Google Earth" ,"format":format,"url":url, "last_modified": datetime.now().isoformat()}) |
dataset['resources'].append({"name":dataset['title'] + " KML","description":"For use in web and desktop spatial data tools including Google Earth" ,"format":format,"url":url, "last_modified": datetime.now().isoformat()}) |
for format in ['csv','json']: |
for format in ['csv','json']: |
url = ws_addr+"wfs?request=GetFeature&typeName="+table_name+"&outputFormat="+urllib.quote(format) |
url = ws_addr+"wfs?request=GetFeature&typeName="+table_name+"&outputFormat="+urllib.quote(format) |
if format == "csv": |
if format == "csv": |
dataset['resources'].append({"name": dataset['title'] + " CSV","description":"For summary of the objects/data in this collection","format":format,"url":url, "last_modified": datetime.now().isoformat()}) |
dataset['resources'].append({"name": dataset['title'] + " CSV","description":"For summary of the objects/data in this collection","format":format,"url":url, "last_modified": datetime.now().isoformat()}) |
if format == "json": |
if format == "json": |
dataset['resources'].append({"name":dataset['title'] + " GeoJSON","description":"For use in web-based data visualisation of this collection","format":format,"url":url, "last_modified": datetime.now().isoformat()}) |
dataset['resources'].append({"name":dataset['title'] + " GeoJSON","description":"For use in web-based data visualisation of this collection","format":format,"url":url, "last_modified": datetime.now().isoformat()}) |
dataset['resources'].append({"name":dataset['name'] + " - Preview this Dataset (WMS)","description":"View the data in this datasets online via web-based WMS viewer","format":"wms", |
dataset['resources'].append({"name":dataset['title'] + " - Preview this Dataset (WMS)","description":"View the data in this datasets online via web-based WMS viewer","format":"wms", |
"url":ws_addr+"wms?request=GetCapabilities", "last_modified": datetime.now().isoformat()}) |
"url":ws_addr+"wms?request=GetCapabilities", "last_modified": datetime.now().isoformat()}) |
dataset['resources'].append({"name":dataset['title'] + " WFS Link","description":"WFS Link for use of live data in Desktop GIS tools","format":"wfs", |
dataset['resources'].append({"name":dataset['title'] + " WFS Link","description":"WFS Link for use of live data in Desktop GIS tools","format":"wfs", |
"url":ws_addr+"wfs?request=GetCapabilities", "last_modified": datetime.now().isoformat()}) |
"url":ws_addr+"wfs?request=GetCapabilities", "last_modified": datetime.now().isoformat()}) |
|
|
pprint(dataset) |
pprint(dataset) |
ckan.action.package_update(id=dataset['id'],spatial=dataset['spatial'],resources=dataset['resources']) |
#ckan.action.package_update(id=dataset['id'],spatial=dataset['spatial'],resources=dataset['resources']) |
|
|
|
|
#delete tempdir |
#delete tempdir |
shutil.rmtree(tempdir) |
shutil.rmtree(tempdir) |
success(msg) |
success(msg) |
|
|