Upgrade origin-src to google transit feed 1.2.6
[bus.git] / origin-src / transitfeed-1.2.6 / test / testtransitfeed.py
blob:a/origin-src/transitfeed-1.2.6/test/testtransitfeed.py -> blob:b/origin-src/transitfeed-1.2.6/test/testtransitfeed.py
--- a/origin-src/transitfeed-1.2.6/test/testtransitfeed.py
+++ b/origin-src/transitfeed-1.2.6/test/testtransitfeed.py
@@ -1,1 +1,5748 @@
-
+#!/usr/bin/python2.5
+
+# Copyright (C) 2007 Google Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Unit tests for the transitfeed module.
+
+import datetime
+from datetime import date
+import dircache
+import os.path
+import re
+import sys
+import tempfile
+import time
+import transitfeed
+import types
+import unittest
+import util
+from util import RecordingProblemAccumulator
+from StringIO import StringIO
+import zipfile
+import zlib
+
+
+def DataPath(path):
+  here = os.path.dirname(__file__)
+  return os.path.join(here, 'data', path)
+
+def GetDataPathContents():
+  here = os.path.dirname(__file__)
+  return dircache.listdir(os.path.join(here, 'data'))
+
+
+class ExceptionProblemReporterNoExpiration(transitfeed.ProblemReporter):
+  """Ignores feed expiration problems.
+
+  Use TestFailureProblemReporter in new code because it fails more cleanly, is
+  easier to extend and does more thorough checking.
+  """
+
+  def __init__(self):
+    accumulator = transitfeed.ExceptionProblemAccumulator(raise_warnings=True)
+    transitfeed.ProblemReporter.__init__(self, accumulator)
+
+  def ExpirationDate(self, expiration, context=None):
+    pass  # We don't want to give errors about our test data
+
+
+def GetTestFailureProblemReporter(test_case,
+                                  ignore_types=("ExpirationDate",)):
+  accumulator = TestFailureProblemAccumulator(test_case, ignore_types)
+  problems = transitfeed.ProblemReporter(accumulator)
+  return problems
+
+
+class TestFailureProblemAccumulator(transitfeed.ProblemAccumulatorInterface):
+  """Causes a test failure immediately on any problem."""
+  def __init__(self, test_case, ignore_types=("ExpirationDate",)):
+    self.test_case = test_case
+    self._ignore_types = ignore_types or set()
+
+  def _Report(self, e):
+    # These should never crash
+    formatted_problem = e.FormatProblem()
+    formatted_context = e.FormatContext()
+    exception_class = e.__class__.__name__
+    if exception_class in self._ignore_types:
+      return
+    self.test_case.fail(
+        "%s: %s\n%s" % (exception_class, formatted_problem, formatted_context))
+
+
+class UnrecognizedColumnRecorder(transitfeed.ProblemReporter):
+  """Keeps track of unrecognized column errors."""
+  def __init__(self, test_case):
+    self.accumulator = RecordingProblemAccumulator(test_case,
+        ignore_types=("ExpirationDate",))
+    self.column_errors = []
+
+  def UnrecognizedColumn(self, file_name, column_name, context=None):
+    self.column_errors.append((file_name, column_name))
+
+
+class RedirectStdOutTestCaseBase(util.TestCase):
+  """Save stdout to the StringIO buffer self.this_stdout"""
+  def setUp(self):
+    self.saved_stdout = sys.stdout
+    self.this_stdout = StringIO()
+    sys.stdout = self.this_stdout
+
+  def tearDown(self):
+    sys.stdout = self.saved_stdout
+    self.this_stdout.close()
+
+
+# ensure that there are no exceptions when attempting to load
+# (so that the validator won't crash)
+class NoExceptionTestCase(RedirectStdOutTestCaseBase):
+  def runTest(self):
+    for feed in GetDataPathContents():
+      loader = transitfeed.Loader(DataPath(feed),
+                                  problems=transitfeed.ProblemReporter(),
+                                  extra_validation=True)
+      schedule = loader.Load()
+      schedule.Validate()
+
+
+class EndOfLineCheckerTestCase(util.TestCase):
+  def setUp(self):
+    self.accumulator = RecordingProblemAccumulator(self)
+    self.problems = transitfeed.ProblemReporter(self.accumulator)
+
+  def RunEndOfLineChecker(self, end_of_line_checker):
+    # Iterating using for calls end_of_line_checker.next() until a
+    # StopIteration is raised. EndOfLineChecker does the final check for a mix
+    # of CR LF and LF ends just before raising StopIteration.
+    for line in end_of_line_checker:
+      pass
+
+  def testInvalidLineEnd(self):
+    f = transitfeed.EndOfLineChecker(StringIO("line1\r\r\nline2"),
+                                     "<StringIO>",
+                                     self.problems)
+    self.RunEndOfLineChecker(f)
+    e = self.accumulator.PopException("InvalidLineEnd")
+    self.assertEqual(e.file_name, "<StringIO>")
+    self.assertEqual(e.row_num, 1)
+    self.assertEqual(e.bad_line_end, r"\r\r\n")
+    self.accumulator.AssertNoMoreExceptions()
+
+  def testInvalidLineEndToo(self):
+    f = transitfeed.EndOfLineChecker(
+        StringIO("line1\nline2\r\nline3\r\r\r\n"),
+        "<StringIO>", self.problems)
+    self.RunEndOfLineChecker(f)
+    e = self.accumulator.PopException("InvalidLineEnd")
+    self.assertEqual(e.file_name, "<StringIO>")
+    self.assertEqual(e.row_num, 3)
+    self.assertEqual(e.bad_line_end, r"\r\r\r\n")
+    e = self.accumulator.PopException("OtherProblem")
+    self.assertEqual(e.file_name, "<StringIO>")
+    self.assertTrue(e.description.find("consistent line end") != -1)
+    self.accumulator.AssertNoMoreExceptions()
+
+  def testEmbeddedCr(self):
+    f = transitfeed.EndOfLineChecker(
+        StringIO("line1\rline1b"),
+        "<StringIO>", self.problems)
+    self.RunEndOfLineChecker(f)
+    e = self.accumulator.PopException("OtherProblem")
+    self.assertEqual(e.file_name, "<StringIO>")
+    self.assertEqual(e.row_num, 1)
+    self.assertEqual(e.FormatProblem(),
+                     "Line contains ASCII Carriage Return 0x0D, \\r")
+    self.accumulator.AssertNoMoreExceptions()
+
+  def testEmbeddedUtf8NextLine(self):
+    f = transitfeed.EndOfLineChecker(
+        StringIO("line1b\xc2\x85"),
+        "<StringIO>", self.problems)
+    self.RunEndOfLineChecker(f)
+    e = self.accumulator.PopException("OtherProblem")
+    self.assertEqual(e.file_name, "<StringIO>")
+    self.assertEqual(e.row_num, 1)
+    self.assertEqual(e.FormatProblem(),
+                     "Line contains Unicode NEXT LINE SEPARATOR U+0085")
+    self.accumulator.AssertNoMoreExceptions()
+
+  def testEndOfLineMix(self):
+    f = transitfeed.EndOfLineChecker(
+        StringIO("line1\nline2\r\nline3\nline4"),
+        "<StringIO>", self.problems)
+    self.RunEndOfLineChecker(f)
+    e = self.accumulator.PopException("OtherProblem")
+    self.assertEqual(e.file_name, "<StringIO>")
+    self.assertEqual(e.FormatProblem(),
+                     "Found 1 CR LF \"\\r\\n\" line end (line 2) and "
+                     "2 LF \"\\n\" line ends (lines 1, 3). A file must use a "
+                     "consistent line end.")
+    self.accumulator.AssertNoMoreExceptions()
+
+  def testEndOfLineManyMix(self):
+    f = transitfeed.EndOfLineChecker(
+        StringIO("1\n2\n3\n4\n5\n6\n7\r\n8\r\n9\r\n10\r\n11\r\n"),
+        "<StringIO>", self.problems)
+    self.RunEndOfLineChecker(f)
+    e = self.accumulator.PopException("OtherProblem")
+    self.assertEqual(e.file_name, "<StringIO>")
+    self.assertEqual(e.FormatProblem(),
+                     "Found 5 CR LF \"\\r\\n\" line ends (lines 7, 8, 9, 10, "
+                     "11) and 6 LF \"\\n\" line ends (lines 1, 2, 3, 4, 5, "
+                     "...). A file must use a consistent line end.")
+    self.accumulator.AssertNoMoreExceptions()
+
+  def testLoad(self):
+    loader = transitfeed.Loader(
+      DataPath("bad_eol.zip"), problems=self.problems, extra_validation=True)
+    loader.Load()
+
+    e = self.accumulator.PopException("OtherProblem")
+    self.assertEqual(e.file_name, "calendar.txt")
+    self.assertTrue(re.search(
+      r"Found 1 CR LF.* \(line 2\) and 2 LF .*\(lines 1, 3\)",
+      e.FormatProblem()))
+
+    e = self.accumulator.PopException("InvalidLineEnd")
+    self.assertEqual(e.file_name, "routes.txt")
+    self.assertEqual(e.row_num, 5)
+    self.assertTrue(e.FormatProblem().find(r"\r\r\n") != -1)
+
+    e = self.accumulator.PopException("OtherProblem")
+    self.assertEqual(e.file_name, "trips.txt")
+    self.assertEqual(e.row_num, 1)
+    self.assertTrue(re.search(
+      r"contains ASCII Form Feed",
+      e.FormatProblem()))
+    # TODO(Tom): avoid this duplicate error for the same issue
+    e = self.accumulator.PopException("CsvSyntax")
+    self.assertEqual(e.row_num, 1)
+    self.assertTrue(re.search(
+      r"header row should not contain any space char",
+      e.FormatProblem()))
+
+    self.accumulator.AssertNoMoreExceptions()
+
+
+class LoadTestCase(util.TestCase):
+  def setUp(self):
+    self.accumulator = RecordingProblemAccumulator(self, ("ExpirationDate",))
+    self.problems = transitfeed.ProblemReporter(self.accumulator)
+
+  def Load(self, feed_name):
+    loader = transitfeed.Loader(
+      DataPath(feed_name), problems=self.problems, extra_validation=True)
+    loader.Load()
+
+  def ExpectInvalidValue(self, feed_name, column_name):
+    self.Load(feed_name)
+    self.accumulator.PopInvalidValue(column_name)
+    self.accumulator.AssertNoMoreExceptions()
+
+  def ExpectMissingFile(self, feed_name, file_name):
+    self.Load(feed_name)
+    e = self.accumulator.PopException("MissingFile")
+    self.assertEqual(file_name, e.file_name)
+    # Don't call AssertNoMoreExceptions() because a missing file causes
+    # many errors.
+
+
+class LoadFromZipTestCase(util.TestCase):
+  def runTest(self):
+    loader = transitfeed.Loader(
+      DataPath('good_feed.zip'),
+      problems = GetTestFailureProblemReporter(self),
+      extra_validation = True)
+    loader.Load()
+
+    # now try using Schedule.Load
+    schedule = transitfeed.Schedule(
+        problem_reporter=ExceptionProblemReporterNoExpiration())
+    schedule.Load(DataPath('good_feed.zip'), extra_validation=True)
+
+
+class LoadAndRewriteFromZipTestCase(util.TestCase):
+  def runTest(self):
+    schedule = transitfeed.Schedule(
+        problem_reporter=ExceptionProblemReporterNoExpiration())
+    schedule.Load(DataPath('good_feed.zip'), extra_validation=True)
+
+    # Finally see if write crashes
+    schedule.WriteGoogleTransitFeed(tempfile.TemporaryFile())
+
+
+class LoadFromDirectoryTestCase(util.TestCase):
+  def runTest(self):
+    loader = transitfeed.Loader(
+      DataPath('good_feed'),
+      problems = GetTestFailureProblemReporter(self),
+      extra_validation = True)
+    loader.Load()
+
+
+class LoadUnknownFeedTestCase(util.TestCase):
+  def runTest(self):
+    feed_name = DataPath('unknown_feed')
+    loader = transitfeed.Loader(
+      feed_name,
+      problems = ExceptionProblemReporterNoExpiration(),
+      extra_validation = True)
+    try:
+      loader.Load()
+      self.fail('FeedNotFound exception expected')
+    except transitfeed.FeedNotFound, e:
+      self.assertEqual(feed_name, e.feed_name)
+
+class LoadUnknownFormatTestCase(util.TestCase):
+  def runTest(self):
+    feed_name = DataPath('unknown_format.zip')
+    loader = transitfeed.Loader(
+      feed_name,
+      problems = ExceptionProblemReporterNoExpiration(),
+      extra_validation = True)
+    try:
+      loader.Load()
+      self.fail('UnknownFormat exception expected')
+    except transitfeed.UnknownFormat, e:
+      self.assertEqual(feed_name, e.feed_name)
+
+class LoadUnrecognizedColumnsTestCase(util.TestCase):
+  def runTest(self):
+    problems = UnrecognizedColumnRecorder(self)
+    loader = transitfeed.Loader(DataPath('unrecognized_columns'),
+                                problems=problems)
+    loader.Load()
+    found_errors = set(problems.column_errors)
+    expected_errors = set([
+      ('agency.txt', 'agency_lange'),
+      ('stops.txt', 'stop_uri'),
+      ('routes.txt', 'Route_Text_Color'),
+      ('calendar.txt', 'leap_day'),
+      ('calendar_dates.txt', 'leap_day'),
+      ('trips.txt', 'sharpe_id'),
+      ('stop_times.txt', 'shapedisttraveled'),
+      ('stop_times.txt', 'drop_off_time'),
+      ('fare_attributes.txt', 'transfer_time'),
+      ('fare_rules.txt', 'source_id'),
+      ('frequencies.txt', 'superfluous'),
+      ('transfers.txt', 'to_stop')
+    ])
+
+    # Now make sure we got the unrecognized column errors that we expected.
+    not_expected = found_errors.difference(expected_errors)
+    self.failIf(not_expected, 'unexpected errors: %s' % str(not_expected))
+    not_found = expected_errors.difference(found_errors)
+    self.failIf(not_found, 'expected but not found: %s' % str(not_found))
+
+class LoadExtraCellValidationTestCase(LoadTestCase):
+  """Check that the validation detects too many cells in a row."""
+  def runTest(self):
+    self.Load('extra_row_cells')
+    e = self.accumulator.PopException("OtherProblem")
+    self.assertEquals("routes.txt", e.file_name)
+    self.assertEquals(4, e.row_num)
+    self.accumulator.AssertNoMoreExceptions()
+
+
+class LoadMissingCellValidationTestCase(LoadTestCase):
+  """Check that the validation detects missing cells in a row."""
+  def runTest(self):
+    self.Load('missing_row_cells')
+    e = self.accumulator.PopException("OtherProblem")
+    self.assertEquals("routes.txt", e.file_name)
+    self.assertEquals(4, e.row_num)
+    self.accumulator.AssertNoMoreExceptions()
+
+class LoadUnknownFileTestCase(util.TestCase):
+  """Check that the validation detects unknown files."""
+  def runTest(self):
+    feed_name = DataPath('unknown_file')
+    self.accumulator = RecordingProblemAccumulator(self, ("ExpirationDate",))
+    self.problems = transitfeed.ProblemReporter(self.accumulator)
+    loader = transitfeed.Loader(
+      feed_name,
+      problems = self.problems,
+      extra_validation = True)
+    loader.Load()
+    e = self.accumulator.PopException('UnknownFile')
+    self.assertEqual('frecuencias.txt', e.file_name)
+    self.accumulator.AssertNoMoreExceptions()
+
+class LoadUTF8BOMTestCase(util.TestCase):
+  def runTest(self):
+    loader = transitfeed.Loader(
+      DataPath('utf8bom'),
+      problems = GetTestFailureProblemReporter(self),
+      extra_validation = True)
+    loader.Load()
+
+
+class LoadUTF16TestCase(util.TestCase):
+  def runTest(self):
+    # utf16 generated by `recode utf8..utf16 *'
+    accumulator = transitfeed.ExceptionProblemAccumulator()
+    problem_reporter = transitfeed.ProblemReporter(accumulator)
+    loader = transitfeed.Loader(
+      DataPath('utf16'),
+      problems = problem_reporter,
+      extra_validation = True)
+    try:
+      loader.Load()
+      # TODO: make sure processing proceeds beyond the problem
+      self.fail('FileFormat exception expected')
+    except transitfeed.FileFormat, e:
+      # make sure these don't raise an exception
+      self.assertTrue(re.search(r'encoded in utf-16', e.FormatProblem()))
+      e.FormatContext()
+
+
+class LoadNullTestCase(util.TestCase):
+  def runTest(self):
+    accumulator = transitfeed.ExceptionProblemAccumulator()
+    problem_reporter = transitfeed.ProblemReporter(accumulator)
+    loader = transitfeed.Loader(
+      DataPath('contains_null'),
+      problems = problem_reporter,
+      extra_validation = True)
+    try:
+      loader.Load()
+      self.fail('FileFormat exception expected')
+    except transitfeed.FileFormat, e:
+      self.assertTrue(re.search(r'contains a null', e.FormatProblem()))
+      # make sure these don't raise an exception
+      e.FormatContext()
+
+
+class ProblemReporterTestCase(RedirectStdOutTestCaseBase):
+  # Unittest for problem reporter
+  def testContextWithBadUnicodeProblem(self):
+    pr = transitfeed.ProblemReporter()
+    # Context has valid unicode values
+    pr.SetFileContext('filename.foo', 23,
+                      [u'Andr\202', u'Person \uc720 foo', None],
+                      [u'1\202', u'2\202', u'3\202'])
+    pr.OtherProblem('test string')
+    pr.OtherProblem(u'\xff\xfe\x80\x88')
+    # Invalid ascii and utf-8. encode('utf-8') and decode('utf-8') will fail
+    # for this value
+    pr.OtherProblem('\xff\xfe\x80\x88')
+    self.assertTrue(re.search(r"test string", self.this_stdout.getvalue()))
+    self.assertTrue(re.search(r"filename.foo:23", self.this_stdout.getvalue()))
+
+  def testNoContextWithBadUnicode(self):
+    pr = transitfeed.ProblemReporter()
+    pr.OtherProblem('test string')
+    pr.OtherProblem(u'\xff\xfe\x80\x88')
+    # Invalid ascii and utf-8. encode('utf-8') and decode('utf-8') will fail
+    # for this value
+    pr.OtherProblem('\xff\xfe\x80\x88')
+    self.assertTrue(re.search(r"test string", self.this_stdout.getvalue()))
+
+  def testBadUnicodeContext(self):
+    pr = transitfeed.ProblemReporter()
+    pr.SetFileContext('filename.foo', 23,
+                      [u'Andr\202', 'Person \xff\xfe\x80\x88 foo', None],
+                      [u'1\202', u'2\202', u'3\202'])
+    pr.OtherProblem("help, my context isn't utf-8!")
+    self.assertTrue(re.search(r"help, my context", self.this_stdout.getvalue()))
+    self.assertTrue(re.search(r"filename.foo:23", self.this_stdout.getvalue()))
+
+  def testLongWord(self):
+    # Make sure LineWrap doesn't puke
+    pr = transitfeed.ProblemReporter()
+    pr.OtherProblem('1111untheontuhoenuthoentuhntoehuontehuntoehuntoehunto'
+                    '2222oheuntheounthoeunthoeunthoeuntheontuheontuhoue')
+    self.assertTrue(re.search(r"1111.+2222", self.this_stdout.getvalue()))
+
+
+class BadProblemReporterTestCase(RedirectStdOutTestCaseBase):
+  """Make sure ProblemReporter doesn't crash when given bad unicode data and
+  does find some error"""
+  # tom.brown.code-utf8_weaknesses fixed a bug with problem reporter and bad
+  # utf-8 strings
+  def runTest(self):
+    loader = transitfeed.Loader(
+      DataPath('bad_utf8'),
+      problems = transitfeed.ProblemReporter(),
+      extra_validation = True)
+    loader.Load()
+    # raises exception if not found
+    self.this_stdout.getvalue().index('Invalid value')
+
+
+class BadUtf8TestCase(LoadTestCase):
+  def runTest(self):
+    self.Load('bad_utf8')
+    self.accumulator.PopException("UnrecognizedColumn")
+    self.accumulator.PopInvalidValue("agency_name", "agency.txt")
+    self.accumulator.PopInvalidValue("stop_name", "stops.txt")
+    self.accumulator.PopInvalidValue("route_short_name", "routes.txt")
+    self.accumulator.PopInvalidValue("route_long_name", "routes.txt")
+    self.accumulator.PopInvalidValue("trip_headsign", "trips.txt")
+    self.accumulator.PopInvalidValue("stop_headsign", "stop_times.txt")
+    self.accumulator.AssertNoMoreExceptions()
+
+
+class LoadMissingAgencyTestCase(LoadTestCase):
+  def runTest(self):
+    self.ExpectMissingFile('missing_agency', 'agency.txt')
+
+
+class LoadMissingStopsTestCase(LoadTestCase):
+  def runTest(self):
+    self.ExpectMissingFile('missing_stops', 'stops.txt')
+
+
+class LoadMissingRoutesTestCase(LoadTestCase):
+  def runTest(self):
+    self.ExpectMissingFile('missing_routes', 'routes.txt')
+
+
+class LoadMissingTripsTestCase(LoadTestCase):
+  def runTest(self):
+    self.ExpectMissingFile('missing_trips', 'trips.txt')
+
+
+class LoadMissingStopTimesTestCase(LoadTestCase):
+  def runTest(self):
+    self.ExpectMissingFile('missing_stop_times', 'stop_times.txt')
+
+
+class LoadMissingCalendarTestCase(LoadTestCase):
+  def runTest(self):
+    self.ExpectMissingFile('missing_calendar', 'calendar.txt')
+
+
+class EmptyFileTestCase(util.TestCase):
+  def runTest(self):
+    loader = transitfeed.Loader(
+      DataPath('empty_file'),
+      problems = ExceptionProblemReporterNoExpiration(),
+      extra_validation = True)
+    try:
+      loader.Load()
+      self.fail('EmptyFile exception expected')
+    except transitfeed.EmptyFile, e:
+      self.assertEqual('agency.txt', e.file_name)
+
+
+class MissingColumnTestCase(util.TestCase):
+  def runTest(self):
+    loader = transitfeed.Loader(
+      DataPath('missing_column'),
+      problems = ExceptionProblemReporterNoExpiration(),
+      extra_validation = True)
+    try:
+      loader.Load()
+      self.fail('MissingColumn exception expected')
+    except transitfeed.MissingColumn, e:
+      self.assertEqual('agency.txt', e.file_name)
+      self.assertEqual('agency_name', e.column_name)
+
+
+class ZeroBasedStopSequenceTestCase(LoadTestCase):
+  def runTest(self):
+    self.ExpectInvalidValue('negative_stop_sequence', 'stop_sequence')
+
+
+class DuplicateStopTestCase(util.TestCase):
+  def runTest(self):
+    schedule = transitfeed.Schedule(
+        problem_reporter=ExceptionProblemReporterNoExpiration())
+    try:
+      schedule.Load(DataPath('duplicate_stop'), extra_validation=True)
+      self.fail('OtherProblem exception expected')
+    except transitfeed.OtherProblem:
+      pass
+
+class DuplicateStopSequenceTestCase(util.TestCase):
+  def runTest(self):
+    accumulator = RecordingProblemAccumulator(self, ("ExpirationDate",
+                                                     "NoServiceExceptions"))
+    problems = transitfeed.ProblemReporter(accumulator)
+    schedule = transitfeed.Schedule(problem_reporter=problems)
+    schedule.Load(DataPath('duplicate_stop_sequence'), extra_validation=True)
+    e = accumulator.PopException('InvalidValue')
+    self.assertEqual('stop_sequence', e.column_name)
+    accumulator.AssertNoMoreExceptions()
+
+
+class MissingEndpointTimesTestCase(util.TestCase):
+  def runTest(self):
+    problems = ExceptionProblemReporterNoExpiration()
+    schedule = transitfeed.Schedule(problem_reporter=problems)
+    try:
+      schedule.Load(DataPath('missing_endpoint_times'), extra_validation=True)
+      self.fail('InvalidValue exception expected')
+    except transitfeed.InvalidValue, e:
+      self.assertEqual('departure_time', e.column_name)
+      self.assertEqual('', e.value)
+
+
+class DuplicateScheduleIDTestCase(util.TestCase):
+  def runTest(self):
+    schedule = transitfeed.Schedule(
+        problem_reporter=ExceptionProblemReporterNoExpiration())
+    try:
+      schedule.Load(DataPath('duplicate_schedule_id'), extra_validation=True)
+      self.fail('DuplicateID exception expected')
+    except transitfeed.DuplicateID:
+      pass
+
+class OverlappingBlockSchedule(transitfeed.Schedule):
+  """Special Schedule subclass that counts the number of calls to
+  GetServicePeriod() so we can verify service period overlap calculation
+  caching"""
+
+  _get_service_period_call_count = 0
+
+  def GetServicePeriod(self, service_id):
+    self._get_service_period_call_count += 1
+    return transitfeed.Schedule.GetServicePeriod(self,service_id)
+
+  def GetServicePeriodCallCount(self):
+    return self._get_service_period_call_count
+
+class OverlappingBlockTripsTestCase(util.TestCase):
+  """Builds a simple schedule for testing of overlapping block trips"""
+
+  def setUp(self):
+    self.accumulator = RecordingProblemAccumulator(
+        self, ("ExpirationDate", "NoServiceExceptions"))
+    self.problems = transitfeed.ProblemReporter(self.accumulator)
+
+    schedule = OverlappingBlockSchedule(problem_reporter=self.problems)
+    schedule.AddAgency("Demo Transit Authority", "http://dta.org",
+                       "America/Los_Angeles")
+
+    sp1 = transitfeed.ServicePeriod("SID1")
+    sp1.SetWeekdayService(True)
+    sp1.SetStartDate("20070605")
+    sp1.SetEndDate("20080605")
+    schedule.AddServicePeriodObject(sp1)
+
+    sp2 = transitfeed.ServicePeriod("SID2")
+    sp2.SetDayOfWeekHasService(0)
+    sp2.SetDayOfWeekHasService(2)
+    sp2.SetDayOfWeekHasService(4)
+    sp2.SetStartDate("20070605")
+    sp2.SetEndDate("20080605")
+    schedule.AddServicePeriodObject(sp2)
+
+    sp3 = transitfeed.ServicePeriod("SID3")
+    sp3.SetWeekendService(True)
+    sp3.SetStartDate("20070605")
+    sp3.SetEndDate("20080605")
+    schedule.AddServicePeriodObject(sp3)
+
+    self.stop1 = schedule.AddStop(lng=-116.75167,
+                                  lat=36.915682,
+                                  name="Stagecoach Hotel & Casino",
+                                  stop_id="S1")
+
+    self.stop2 = schedule.AddStop(lng=-116.76218,
+                                  lat=36.905697,
+                                  name="E Main St / S Irving St",
+                                  stop_id="S2")
+
+    self.route = schedule.AddRoute("", "City", "Bus", route_id="CITY")
+
+    self.schedule = schedule
+    self.sp1 = sp1
+    self.sp2 = sp2
+    self.sp3 = sp3
+
+  def testNoOverlap(self):
+
+    schedule, route, sp1 = self.schedule, self.route, self.sp1
+
+    trip1 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY1")
+    trip1.block_id = "BLOCK"
+    trip1.AddStopTime(self.stop1, stop_time="6:00:00")
+    trip1.AddStopTime(self.stop2, stop_time="6:30:00")
+
+    trip2 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY2")
+    trip2.block_id = "BLOCK"
+    trip2.AddStopTime(self.stop2, stop_time="6:30:00")
+    trip2.AddStopTime(self.stop1, stop_time="7:00:00")
+
+    schedule.Validate(self.problems)
+
+    self.accumulator.AssertNoMoreExceptions()
+
+  def testOverlapSameServicePeriod(self):
+
+    schedule, route, sp1 = self.schedule, self.route, self.sp1
+
+    trip1 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY1")
+    trip1.block_id = "BLOCK"
+    trip1.AddStopTime(self.stop1, stop_time="6:00:00")
+    trip1.AddStopTime(self.stop2, stop_time="6:30:00")
+
+    trip2 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY2")
+    trip2.block_id = "BLOCK"
+    trip2.AddStopTime(self.stop2, stop_time="6:20:00")
+    trip2.AddStopTime(self.stop1, stop_time="6:50:00")
+
+    schedule.Validate(self.problems)
+
+    e = self.accumulator.PopException('OverlappingTripsInSameBlock')
+    self.assertEqual(e.trip_id1, 'CITY1')
+    self.assertEqual(e.trip_id2, 'CITY2')
+    self.assertEqual(e.block_id, 'BLOCK')
+
+    self.accumulator.AssertNoMoreExceptions()
+
+  def testOverlapDifferentServicePeriods(self):
+
+    schedule, route, sp1, sp2 = self.schedule, self.route, self.sp1, self.sp2
+
+    trip1 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY1")
+    trip1.block_id = "BLOCK"
+    trip1.AddStopTime(self.stop1, stop_time="6:00:00")
+    trip1.AddStopTime(self.stop2, stop_time="6:30:00")
+
+    trip2 = route.AddTrip(schedule, service_period=sp2, trip_id="CITY2")
+    trip2.block_id = "BLOCK"
+    trip2.AddStopTime(self.stop2, stop_time="6:20:00")
+    trip2.AddStopTime(self.stop1, stop_time="6:50:00")
+
+    trip3 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY3")
+    trip3.block_id = "BLOCK"
+    trip3.AddStopTime(self.stop1, stop_time="7:00:00")
+    trip3.AddStopTime(self.stop2, stop_time="7:30:00")
+
+    trip4 = route.AddTrip(schedule, service_period=sp2, trip_id="CITY4")
+    trip4.block_id = "BLOCK"
+    trip4.AddStopTime(self.stop2, stop_time="7:20:00")
+    trip4.AddStopTime(self.stop1, stop_time="7:50:00")
+
+    schedule.Validate(self.problems)
+
+    e = self.accumulator.PopException('OverlappingTripsInSameBlock')
+    self.assertEqual(e.trip_id1, 'CITY1')
+    self.assertEqual(e.trip_id2, 'CITY2')
+    self.assertEqual(e.block_id, 'BLOCK')
+
+    e = self.accumulator.PopException('OverlappingTripsInSameBlock')
+    self.assertEqual(e.trip_id1, 'CITY3')
+    self.assertEqual(e.trip_id2, 'CITY4')
+    self.assertEqual(e.block_id, 'BLOCK')
+
+    self.accumulator.AssertNoMoreExceptions()
+
+    # If service period overlap calculation caching is working correctly,
+    # we expect only two calls to GetServicePeriod(), one each for sp1 and
+    # sp2, as oppossed four calls total for the four overlapping trips
+    self.assertEquals(2,schedule.GetServicePeriodCallCount())
+
+  def testNoOverlapDifferentServicePeriods(self):
+
+    schedule, route, sp1, sp3 = self.schedule, self.route, self.sp1, self.sp3
+
+    trip1 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY1")
+    trip1.block_id = "BLOCK"
+    trip1.AddStopTime(self.stop1, stop_time="6:00:00")
+    trip1.AddStopTime(self.stop2, stop_time="6:30:00")
+
+    trip2 = route.AddTrip(schedule, service_period=sp3, trip_id="CITY2")
+    trip2.block_id = "BLOCK"
+    trip2.AddStopTime(self.stop2, stop_time="6:20:00")
+    trip2.AddStopTime(self.stop1, stop_time="6:50:00")
+
+    schedule.Validate(self.problems)
+
+    self.accumulator.AssertNoMoreExceptions()
+
+class ColorLuminanceTestCase(util.TestCase):
+  def runTest(self):
+    self.assertEqual(transitfeed.ColorLuminance('000000'), 0,
+        "ColorLuminance('000000') should be zero")
+    self.assertEqual(transitfeed.ColorLuminance('FFFFFF'), 255,
+        "ColorLuminance('FFFFFF') should be 255")
+    RGBmsg = ("ColorLuminance('RRGGBB') should be "
+              "0.299*<Red> + 0.587*<Green> + 0.114*<Blue>")
+    decimal_places_tested = 8
+    self.assertAlmostEqual(transitfeed.ColorLuminance('640000'), 29.9,
+                           decimal_places_tested, RGBmsg)
+    self.assertAlmostEqual(transitfeed.ColorLuminance('006400'), 58.7,
+                     decimal_places_tested, RGBmsg)
+    self.assertAlmostEqual(transitfeed.ColorLuminance('000064'), 11.4,
+                     decimal_places_tested, RGBmsg)
+    self.assertAlmostEqual(transitfeed.ColorLuminance('1171B3'),
+                     0.299*17 + 0.587*113 + 0.114*179,
+                     decimal_places_tested, RGBmsg)
+
+INVALID_VALUE = Exception()
+class ValidationTestCase(util.TestCase):
+  def setUp(self):
+    self.accumulator = RecordingProblemAccumulator(
+        self, ("ExpirationDate", "NoServiceExceptions"))
+    self.problems = transitfeed.ProblemReporter(self.accumulator)
+
+  def tearDown(self):
+    self.accumulator.TearDownAssertNoMoreExceptions()
+
+  def ExpectNoProblems(self, object):
+    self.accumulator.AssertNoMoreExceptions()
+    object.Validate(self.problems)
+    self.accumulator.AssertNoMoreExceptions()
+
+  # TODO: Get rid of Expect*Closure methods. With the
+  # RecordingProblemAccumulator it is now possible to replace
+  # self.ExpectMissingValueInClosure(lambda: o.method(...), foo)
+  # with
+  # o.method(...)
+  # self.ExpectMissingValueInClosure(foo)
+  # because problems don't raise an exception. This has the advantage of
+  # making it easy and clear to test the return value of o.method(...) and
+  # easier to test for a sequence of problems caused by one c