|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 | #!/usr/bin/python2.5 # Copyright (C) 2007 Google Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. """Google has a homegrown database for managing the company shuttle. The database dumps its contents in XML. This scripts converts the proprietary XML format into a Google Transit Feed Specification file. """ import datetime from optparse import OptionParser import os.path import re import transitfeed import urllib try: import xml.etree.ElementTree as ET # python 2.5 except ImportError, e: import elementtree.ElementTree as ET # older pythons class NoUnusedStopExceptionProblemReporter(transitfeed.ProblemReporter): """The company shuttle database has a few unused stops for reasons unrelated to this script. Ignore them. """ def __init__(self): accumulator = transitfeed.ExceptionProblemAccumulator() transitfeed.ProblemReporter.__init__(self, accumulator) def UnusedStop(self, stop_id, stop_name): pass def SaveFeed(input, output): tree = ET.parse(urllib.urlopen(input)) schedule = transitfeed.Schedule() service_period = schedule.GetDefaultServicePeriod() service_period.SetWeekdayService() service_period.SetStartDate('20070314') service_period.SetEndDate('20071231') # Holidays for 2007 service_period.SetDateHasService('20070528', has_service=False) service_period.SetDateHasService('20070704', has_service=False) service_period.SetDateHasService('20070903', has_service=False) service_period.SetDateHasService('20071122', has_service=False) service_period.SetDateHasService('20071123', has_service=False) service_period.SetDateHasService('20071224', has_service=False) service_period.SetDateHasService('20071225', has_service=False) service_period.SetDateHasService('20071226', has_service=False) service_period.SetDateHasService('20071231', has_service=False) stops = {} # Map from xml stop id to python Stop object agency = schedule.NewDefaultAgency(name='GBus', url='http://shuttle/', timezone='America/Los_Angeles') for xml_stop in tree.getiterator('stop'): stop = schedule.AddStop(lat=float(xml_stop.attrib['lat']), lng=float(xml_stop.attrib['lng']), name=xml_stop.attrib['name']) stops[xml_stop.attrib['id']] = stop for xml_shuttleGroup in tree.getiterator('shuttleGroup'): if xml_shuttleGroup.attrib['name'] == 'Test': continue r = schedule.AddRoute(short_name="", long_name=xml_shuttleGroup.attrib['name'], route_type='Bus') for xml_route in xml_shuttleGroup.getiterator('route'): t = r.AddTrip(schedule=schedule, headsign=xml_route.attrib['name'], trip_id=xml_route.attrib['id']) trip_stops = [] # Build a list of (time, Stop) tuples for xml_schedule in xml_route.getiterator('schedule'): trip_stops.append( (int(xml_schedule.attrib['time']) / 1000, stops[xml_schedule.attrib['stopId']]) ) trip_stops.sort() # Sort by time for (time, stop) in trip_stops: t.AddStopTime(stop=stop, arrival_secs=time, departure_secs=time) schedule.Validate(problems=NoUnusedStopExceptionProblemReporter()) schedule.WriteGoogleTransitFeed(output) def main(): parser = OptionParser() parser.add_option('--input', dest='input', help='Path or URL of input') parser.add_option('--output', dest='output', help='Path of output file. Should end in .zip and if it ' 'contains the substring YYYYMMDD it will be replaced with ' 'today\'s date. It is impossible to include the literal ' 'string YYYYYMMDD in the path of the output file.') parser.add_option('--execute', dest='execute', help='Commands to run to copy the output. %(path)s is ' 'replaced with full path of the output and %(name)s is ' 'replaced with name part of the path. Try ' 'scp %(path)s myhost:www/%(name)s', action='append') parser.set_defaults(input=None, output=None, execute=[]) (options, args) = parser.parse_args() today = datetime.date.today().strftime('%Y%m%d') options.output = re.sub(r'YYYYMMDD', today, options.output) (_, name) = os.path.split(options.output) path = options.output SaveFeed(options.input, options.output) for command in options.execute: import subprocess def check_call(cmd): """Convenience function that is in the docs for subprocess but not installed on my system.""" retcode = subprocess.call(cmd, shell=True) if retcode < 0: raise Exception("Child '%s' was terminated by signal %d" % (cmd, -retcode)) elif retcode != 0: raise Exception("Child '%s' returned %d" % (cmd, retcode)) # path_output and filename_current can be used to run arbitrary commands check_call(command % locals()) if __name__ == '__main__': main() |