Use PDO prepared statements for massive import speedup
--- a/servicealerts/importer.py
+++ b/servicealerts/importer.py
@@ -1,5 +1,3 @@
-#dependencies http://code.google.com/p/python-twitter/
-
# info
# http://stackoverflow.com/questions/4206882/named-entity-recognition-with-preset-list-of-names-for-python-php/4207128#4207128
# http://alias-i.com/lingpipe/demos/tutorial/ne/read-me.html approximate dist
@@ -12,11 +10,17 @@
# http://esa.act.gov.au/feeds/currentincidents.xml
# source: https://gist.github.com/322906/90dea659c04570757cccf0ce1e6d26c9d06f9283
+# to install python -m nltk.downloader punkt
import nltk
-import twitter
+import tweepy
import psycopg2
+import pickle
+
+from iniparse import INIConfig
+
def insert_service_alert_sitewide(heading, message, url):
-
+ print "NaN"
+
def insert_service_alert_for_street(streets, heading, message, url):
conn_string = "host='localhost' dbname='energymapper' user='postgres' password='snmc'"
# print the connection string we will use to connect
@@ -29,30 +33,22 @@
cursor = conn.cursor()
# execute our Query
- cursor.execute("select max(value), extract(dow from max(time)) as dow, \
-extract(year from max(time))::text || lpad(extract(month from max(time))::text,2,'0') \
-|| lpad(extract(month from max(time))::text,2,'0') as yearmonthweek, to_char(max(time),'J') \
-from environmentdata_values where \"dataSourceID\"='NSWAEMODemand' \
-group by extract(dow from time), extract(year from time), extract(week from time) \
-order by extract(year from time), extract(week from time), extract(dow from time)")
+ cursor.execute("")
# retrieve the records from the database
records = cursor.fetchall()
for record in records:
ys.append(record[0])
-# >>> cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (42, 'bar'))
-#>>> cur.statusmessage
-#'INSERT 0 1'
+ # >>> cur.execute("INSERT INTO test (num, data) VALUES (%s, %s)", (42, 'bar'))
+ #>>> cur.statusmessage
+ #'INSERT 0 1'
except:
# Get the most recent exception
exceptionType, exceptionValue, exceptionTraceback = sys.exc_info()
# Exit the script and print an error telling what happened.
sys.exit("Database connection failed!\n ->%s" % (exceptionValue))
-def get_tweets(user):
- tapi = twitter.Api()
- return tapi.GetUserTimeline(user)
def extract_entity_names(t):
entity_names = []
@@ -86,3 +82,23 @@
# Print unique entity names
print set(entity_names)
+cfg = INIConfig(open('/tmp/aws.ini'))
+
+auth = tweepy.OAuthHandler(cfg.api_keys.twitter_consumer_key, cfg.api_keys.twitter_consumer_secret)
+auth.set_access_token(cfg.api_keys.twitter_access_token, cfg.api_keys.twitter_access_token_secret)
+
+#api = tweepy.API(auth)
+api = tweepy.API()
+# If the authentication was successful, you should
+# see the name of the account print out
+#print api.me().name
+# https://github.com/tweepy/tweepy/blob/master/tweepy/api.py
+for status in api.user_timeline(screen_name="ACTPol_Traffic",exclude_replies='true'):
+ print status.text
+ print status.created_at
+ print extract_names(status.text)
+# print api.update_status(status="test")
+
+last_tweet_ids = { "lion": "111", "kitty": "2222" }
+pickle.dump( last_tweet_ids, open( "save.p", "wb" ) )
+last_tweet_ids = pickle.load( open( "save.p", "rb" ) )
--- /dev/null
+++ b/servicealerts/punkt.zip
--- a/updatedb.php
+++ b/updatedb.php
@@ -17,7 +17,6 @@
*/
if (php_sapi_name() == "cli") {
include ('include/common.inc.php');
- $conn = pg_connect("dbname=transitdata user=postgres password=snmc host=localhost") or die('connection failed');
$pdconn = new PDO("pgsql:dbname=transitdata;user=postgres;password=snmc;host=localhost");
/*
@@ -34,7 +33,7 @@
// Unzip cbrfeed.zip, import all csv files to database
$unzip = false;
$zip = zip_open(dirname(__FILE__) . "/cbrfeed.zip");
- $tmpdir = "c:/tmp/";
+ $tmpdir = "c:/tmp/cbrfeed/";
mkdir($tmpdir);
if ($unzip) {
if (is_resource($zip)) {
@@ -53,6 +52,7 @@
}
foreach (scandir($tmpdir) as $file) {
+ $headers = Array();
if (!strpos($file, ".txt") === false) {
$fieldseparator = ",";
$lineseparator = "\n";
@@ -60,33 +60,50 @@
echo "Opening $file \n";
$line = 0;
$handle = fopen($tmpdir . $file, "r");
- if ($tablename == "stop_times") {
- $stmt = $pdconn->prepare("insert into stop_times (trip_id,stop_id,stop_sequence,arrival_time,departure_time) values(:trip_id, :stop_id, :stop_sequence,:arrival_time,:departure_time);");
- $stmt->bindParam(':trip_id', $trip_id);
- $stmt->bindParam(':stop_id', $stop_id);
- $stmt->bindParam(':stop_sequence', $stop_sequence);
- $stmt->bindParam(':arrival_time', $time);
- $stmt->bindParam(':departure_time', $time);
- }
$distance = 0;
$lastshape = 0;
$lastlat = 0;
$lastlon = 0;
+ $stmt = null;
while (($data = fgetcsv($handle, 1000, ",")) !== FALSE) {
if ($line == 0) {
-
- } else {
- $query = "insert into $tablename values(";
+ $headers = array_values($data);
+ if ($tablename == "stops") {
+ $headers[] = "position";
+ }
+ if ($tablename == "shapes") {
+ $headers[] = "shape_pt";
+ }
+ $query = "insert into $tablename (";
+ $valueCount = 0;
+ foreach ($headers as $value) {
+ $query.=($valueCount > 0 ? "," : "") . pg_escape_string($value);
+ $valueCount++;
+ }
+ $query.= ") values( ";
$valueCount = 0;
foreach ($data as $value) {
- $query.=($valueCount > 0 ? "','" : "'") . pg_escape_string($value);
+ $query.=($valueCount > 0 ? "," : "") . '?';
$valueCount++;
}
+ if ($tablename == "stops") {
+ $query.= ", ST_GeographyFromText(?));";
+ } else if ($tablename == "shapes") {
+ $query.= ", ST_GeographyFromText(?));";
+ } else {
+ $query.= ");";
+ }
+ echo $query;
+ $stmt = $pdconn->prepare($query);
+ } else {
+ $values = array_values($data);
if ($tablename == "stops") {
- $query.= "', ST_GeographyFromText('SRID=4326;POINT({$data[2]} {$data[0]})'));";
- } else if ($tablename == "shapes") {
+ // Coordinate values are out of range [-180 -90, 180 90]
+ $values[] = 'SRID=4326;POINT('.$values[5].' '.$values[4].')';
+ }
+ if ($tablename == "shapes") {
if ($data[0] != $lastshape) {
$distance = 0;
$lastshape = $data[0];
@@ -95,28 +112,26 @@
}
$lastlat = $data[1];
$lastlon = $data[2];
- $query.= "', $distance, ST_GeographyFromText('SRID=4326;POINT({$data[2]} {$data[1]})'));";
- } else {
- $query.= "');";
+
+ $values[4] = $distance;
+ $values[] = 'SRID=4326;POINT('.$values[2].' '.$values[1].')';
}
- if ($tablename == "stop_times") {
- // $query = "insert into $tablename (trip_id,stop_id,stop_sequence) values('{$data[0]}','{$data[3]}','{$data[4]}');";
- $trip_id = $data[0];
- $stop_id = $data[3];
- $stop_sequence = $data[4];
- $time = ($data[1] == "" ? null : $data[1]);
+if (substr($values[1],0,2) == '24') $values[1] = "23:59:59";
+if (substr($values[2],0,2) == '24') $values[2] = "23:59:59";
+ $stmt->execute($values);
+ $err = $pdconn->errorInfo();
+ if ($err[2] != "" && strpos($err[2], "duplicate key") === false) {
+ print_r($values);
+ print_r($err);
+ die("terminated import due to db error above");
}
- }
- if ($tablename == "stop_times") {
- $stmt->execute();
- } else {
- $result = pg_query($conn, $query);
}
$line++;
if ($line % 10000 == 0)
echo "$line records... " . date('c') . "\n";
}
fclose($handle);
+ $stmt->closeCursor();
echo "Found a total of $line records in $file.\n";
}
}