Use PDO prepared statements for massive import speedup
Use PDO prepared statements for massive import speedup

#dependencies http://code.google.com/p/python-twitter/  
   
# info # info
# http://stackoverflow.com/questions/4206882/named-entity-recognition-with-preset-list-of-names-for-python-php/4207128#4207128 # http://stackoverflow.com/questions/4206882/named-entity-recognition-with-preset-list-of-names-for-python-php/4207128#4207128
# http://alias-i.com/lingpipe/demos/tutorial/ne/read-me.html approximate dist # http://alias-i.com/lingpipe/demos/tutorial/ne/read-me.html approximate dist
# http://streamhacker.com/2008/12/29/how-to-train-a-nltk-chunker/ more training # http://streamhacker.com/2008/12/29/how-to-train-a-nltk-chunker/ more training
# http://www.postgresql.org/docs/9.1/static/pgtrgm.html # http://www.postgresql.org/docs/9.1/static/pgtrgm.html
   
# data sources # data sources
# http://twitter.com/#!/ACTEmergencyInf instant site wide # http://twitter.com/#!/ACTEmergencyInf instant site wide
# http://twitter.com/#!/ACTPol_Traffic # http://twitter.com/#!/ACTPol_Traffic
# http://esa.act.gov.au/feeds/currentincidents.xml # http://esa.act.gov.au/feeds/currentincidents.xml
   
# source: https://gist.github.com/322906/90dea659c04570757cccf0ce1e6d26c9d06f9283 # source: https://gist.github.com/322906/90dea659c04570757cccf0ce1e6d26c9d06f9283
  # to install python -m nltk.downloader punkt
import nltk import nltk
import twitter import tweepy
import psycopg2 import psycopg2
  import pickle
   
  from iniparse import INIConfig
   
def insert_service_alert_sitewide(heading, message, url): def insert_service_alert_sitewide(heading, message, url):
  print "NaN"
   
def insert_service_alert_for_street(streets, heading, message, url): def insert_service_alert_for_street(streets, heading, message, url):
conn_string = "host='localhost' dbname='energymapper' user='postgres' password='snmc'" conn_string = "host='localhost' dbname='energymapper' user='postgres' password='snmc'"
# print the connection string we will use to connect # print the connection string we will use to connect
print "Connecting to database\n ->%s" % (conn_string) print "Connecting to database\n ->%s" % (conn_string)
try: try:
# get a connection, if a connect cannot be made an exception will be raised here # get a connection, if a connect cannot be made an exception will be raised here
conn = psycopg2.connect(conn_string) conn = psycopg2.connect(conn_string)
   
# conn.cursor will return a cursor object, you can use this cursor to perform queries # conn.cursor will return a cursor object, you can use this cursor to perform queries
cursor = conn.cursor() cursor = conn.cursor()
   
# execute our Query # execute our Query
cursor.execute("select max(value), extract(dow from max(time)) as dow, \ cursor.execute("")
extract(year from max(time))::text || lpad(extract(month from max(time))::text,2,'0') \  
|| lpad(extract(month from max(time))::text,2,'0') as yearmonthweek, to_char(max(time),'J') \  
from environmentdata_values where \"dataSourceID\"='NSWAEMODemand' \  
group by extract(dow from time), extract(year from time), extract(week from time) \  
order by extract(year from time), extract(week from time), extract(dow from time)")  
   
# retrieve the records from the database # retrieve the records from the database
records = cursor.fetchall() records = cursor.fetchall()
   
for record in records: for record in records:
ys.append(record[0]) ys.append(record[0])
# >>> cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (42, 'bar')) # >>> cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (42, 'bar'))
#>>> cur.statusmessage #>>> cur.statusmessage
#'INSERT 0 1' #'INSERT 0 1'
except: except:
# Get the most recent exception # Get the most recent exception
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info() exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
# Exit the script and print an error telling what happened. # Exit the script and print an error telling what happened.
sys.exit("Database connection failed!\n ->%s" % (exceptionValue)) sys.exit("Database connection failed!\n ->%s" % (exceptionValue))
def get_tweets(user):  
tapi = twitter.Api()  
return tapi.GetUserTimeline(user)  
   
def extract_entity_names(t): def extract_entity_names(t):
entity_names = [] entity_names = []
if hasattr(t, 'node') and t.node: if hasattr(t, 'node') and t.node:
if t.node == 'NE': if t.node == 'NE':
entity_names.append(' '.join([child[0] for child in t])) entity_names.append(' '.join([child[0] for child in t]))
else: else:
for child in t: for child in t:
entity_names.extend(extract_entity_names(child)) entity_names.extend(extract_entity_names(child))
return entity_names return entity_names
   
def extract_names(sample): def extract_names(sample):
sentences = nltk.sent_tokenize(sample) sentences = nltk.sent_tokenize(sample)
tokenized_sentences = [nltk.word_tokenize(sentence) for sentence in sentences] tokenized_sentences = [nltk.word_tokenize(sentence) for sentence in sentences]
tagged_sentences = [nltk.pos_tag(sentence) for sentence in tokenized_sentences] tagged_sentences = [nltk.pos_tag(sentence) for sentence in tokenized_sentences]
chunked_sentences = nltk.batch_ne_chunk(tagged_sentences, binary=True) chunked_sentences = nltk.batch_ne_chunk(tagged_sentences, binary=True)
# chunked/tagged may be enough to just find and match the nouns # chunked/tagged may be enough to just find and match the nouns
   
entity_names = [] entity_names = []
for tree in chunked_sentences: for tree in chunked_sentences:
# Print results per sentence # Print results per sentence
# print extract_entity_names(tree) # print extract_entity_names(tree)
entity_names.extend(extract_entity_names(tree)) entity_names.extend(extract_entity_names(tree))
   
# Print all entity names # Print all entity names
#print entity_names #print entity_names
   
# Print unique entity names # Print unique entity names
print set(entity_names) print set(entity_names)
   
  cfg = INIConfig(open('/tmp/aws.ini'))
   
  auth = tweepy.OAuthHandler(cfg.api_keys.twitter_consumer_key, cfg.api_keys.twitter_consumer_secret)
  auth.set_access_token(cfg.api_keys.twitter_access_token, cfg.api_keys.twitter_access_token_secret)
   
  #api = tweepy.API(auth)
  api = tweepy.API()
  # If the authentication was successful, you should
  # see the name of the account print out
  #print api.me().name
  # https://github.com/tweepy/tweepy/blob/master/tweepy/api.py
  for status in api.user_timeline(screen_name="ACTPol_Traffic",exclude_replies='true'):
  print status.text
  print status.created_at
  print extract_names(status.text)
  # print api.update_status(status="test")
   
  last_tweet_ids = { "lion": "111", "kitty": "2222" }
  pickle.dump( last_tweet_ids, open( "save.p", "wb" ) )
  last_tweet_ids = pickle.load( open( "save.p", "rb" ) )
<?php <?php
   
/* /*
* Copyright 2010,2011 Alexander Sadleir * Copyright 2010,2011 Alexander Sadleir
   
Licensed under the Apache License, Version 2.0 (the "License"); Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License. you may not use this file except in compliance with the License.
You may obtain a copy of the License at You may obtain a copy of the License at
   
http://www.apache.org/licenses/LICENSE-2.0 http://www.apache.org/licenses/LICENSE-2.0
   
Unless required by applicable law or agreed to in writing, software Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and See the License for the specific language governing permissions and
limitations under the License. limitations under the License.
*/ */
if (php_sapi_name() == "cli") { if (php_sapi_name() == "cli") {
include ('include/common.inc.php'); include ('include/common.inc.php');
$conn = pg_connect("dbname=transitdata user=postgres password=snmc host=localhost") or die('connection failed');  
$pdconn = new PDO("pgsql:dbname=transitdata;user=postgres;password=snmc;host=localhost"); $pdconn = new PDO("pgsql:dbname=transitdata;user=postgres;password=snmc;host=localhost");
   
/* /*
delete from agency; delete from agency;
delete from calendar; delete from calendar;
delete from calendar_dates; delete from calendar_dates;
delete from routes; delete from routes;
delete from shapes; delete from shapes;
delete from stop_times; delete from stop_times;
delete from stops; delete from stops;
delete from trips; delete from trips;
*/ */
   
// Unzip cbrfeed.zip, import all csv files to database // Unzip cbrfeed.zip, import all csv files to database
$unzip = false; $unzip = false;
$zip = zip_open(dirname(__FILE__) . "/cbrfeed.zip"); $zip = zip_open(dirname(__FILE__) . "/cbrfeed.zip");
$tmpdir = "c:/tmp/"; $tmpdir = "c:/tmp/cbrfeed/";
mkdir($tmpdir); mkdir($tmpdir);
if ($unzip) { if ($unzip) {
if (is_resource($zip)) { if (is_resource($zip)) {
while ($zip_entry = zip_read($zip)) { while ($zip_entry = zip_read($zip)) {
$fp = fopen($tmpdir . zip_entry_name($zip_entry), "w"); $fp = fopen($tmpdir . zip_entry_name($zip_entry), "w");
if (zip_entry_open($zip, $zip_entry, "r")) { if (zip_entry_open($zip, $zip_entry, "r")) {
echo "Extracting " . zip_entry_name($zip_entry) . "\n"; echo "Extracting " . zip_entry_name($zip_entry) . "\n";
$buf = zip_entry_read($zip_entry, zip_entry_filesize($zip_entry)); $buf = zip_entry_read($zip_entry, zip_entry_filesize($zip_entry));
fwrite($fp, "$buf"); fwrite($fp, "$buf");
zip_entry_close($zip_entry); zip_entry_close($zip_entry);
fclose($fp); fclose($fp);
} }
} }
zip_close($zip); zip_close($zip);
} }
} }
   
foreach (scandir($tmpdir) as $file) { foreach (scandir($tmpdir) as $file) {
  $headers = Array();
if (!strpos($file, ".txt") === false) { if (!strpos($file, ".txt") === false) {
$fieldseparator = ","; $fieldseparator = ",";
$lineseparator = "\n"; $lineseparator = "\n";
$tablename = str_replace(".txt", "", $file); $tablename = str_replace(".txt", "", $file);
echo "Opening $file \n"; echo "Opening $file \n";
$line = 0; $line = 0;
$handle = fopen($tmpdir . $file, "r"); $handle = fopen($tmpdir . $file, "r");
if ($tablename == "stop_times") {  
$stmt = $pdconn->prepare("insert into stop_times (trip_id,stop_id,stop_sequence,arrival_time,departure_time) values(:trip_id, :stop_id, :stop_sequence,:arrival_time,:departure_time);");  
$stmt->bindParam(':trip_id', $trip_id);  
$stmt->bindParam(':stop_id', $stop_id);  
$stmt->bindParam(':stop_sequence', $stop_sequence);  
$stmt->bindParam(':arrival_time', $time);  
$stmt->bindParam(':departure_time', $time);  
}  
   
$distance = 0; $distance = 0;
$lastshape = 0; $lastshape = 0;
$lastlat = 0; $lastlat = 0;
$lastlon = 0; $lastlon = 0;
  $stmt = null;
while (($data = fgetcsv($handle, 1000, ",")) !== FALSE) { while (($data = fgetcsv($handle, 1000, ",")) !== FALSE) {
if ($line == 0) { if ($line == 0) {
  $headers = array_values($data);
} else { if ($tablename == "stops") {
$query = "insert into $tablename values("; $headers[] = "position";
  }
  if ($tablename == "shapes") {
  $headers[] = "shape_pt";
  }
  $query = "insert into $tablename (";
  $valueCount = 0;
  foreach ($headers as $value) {
  $query.=($valueCount > 0 ? "," : "") . pg_escape_string($value);
  $valueCount++;
  }
  $query.= ") values( ";
$valueCount = 0; $valueCount = 0;
foreach ($data as $value) { foreach ($data as $value) {
$query.=($valueCount > 0 ? "','" : "'") . pg_escape_string($value); $query.=($valueCount > 0 ? "," : "") . '?';
$valueCount++; $valueCount++;
} }
  if ($tablename == "stops") {
  $query.= ", ST_GeographyFromText(?));";
  } else if ($tablename == "shapes") {
  $query.= ", ST_GeographyFromText(?));";
  } else {
  $query.= ");";
  }
   
  echo $query;
  $stmt = $pdconn->prepare($query);
  } else {
  $values = array_values($data);
if ($tablename == "stops") { if ($tablename == "stops") {
$query.= "', ST_GeographyFromText('SRID=4326;POINT({$data[2]} {$data[0]})'));"; // Coordinate values are out of range [-180 -90, 180 90]
} else if ($tablename == "shapes") { $values[] = 'SRID=4326;POINT('.$values[5].' '.$values[4].')';
  }
  if ($tablename == "shapes") {
if ($data[0] != $lastshape) { if ($data[0] != $lastshape) {
$distance = 0; $distance = 0;
$lastshape = $data[0]; $lastshape = $data[0];
} else { } else {
$distance += distance($lastlat, $lastlon, $data[1], $data[2]); $distance += distance($lastlat, $lastlon, $data[1], $data[2]);
} }
$lastlat = $data[1]; $lastlat = $data[1];
$lastlon = $data[2]; $lastlon = $data[2];
$query.= "', $distance, ST_GeographyFromText('SRID=4326;POINT({$data[2]} {$data[1]})'));";  
} else { $values[4] = $distance;
$query.= "');"; $values[] = 'SRID=4326;POINT('.$values[2].' '.$values[1].')';
} }
if ($tablename == "stop_times") { if (substr($values[1],0,2) == '24') $values[1] = "23:59:59";
// $query = "insert into $tablename (trip_id,stop_id,stop_sequence) values('{$data[0]}','{$data[3]}','{$data[4]}');"; if (substr($values[2],0,2) == '24') $values[2] = "23:59:59";
$trip_id = $data[0]; $stmt->execute($values);
$stop_id = $data[3]; $err = $pdconn->errorInfo();
$stop_sequence = $data[4]; if ($err[2] != "" && strpos($err[2], "duplicate key") === false) {
$time = ($data[1] == "" ? null : $data[1]); print_r($values);
  print_r($err);
  die("terminated import due to db error above");
} }
}  
if ($tablename == "stop_times") {  
$stmt->execute();  
} else {  
$result = pg_query($conn, $query);  
} }
$line++; $line++;
if ($line % 10000 == 0) if ($line % 10000 == 0)
echo "$line records... " . date('c') . "\n"; echo "$line records... " . date('c') . "\n";
} }
fclose($handle); fclose($handle);
  $stmt->closeCursor();
echo "Found a total of $line records in $file.\n"; echo "Found a total of $line records in $file.\n";
} }
} }
} }
?> ?>