--- a/origin-src/transitfeed-1.2.6/test/testtransitfeed.py +++ b/origin-src/transitfeed-1.2.6/test/testtransitfeed.py @@ -1,1 +1,5748 @@ - +#!/usr/bin/python2.5 + +# Copyright (C) 2007 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Unit tests for the transitfeed module. + +import datetime +from datetime import date +import dircache +import os.path +import re +import sys +import tempfile +import time +import transitfeed +import types +import unittest +import util +from util import RecordingProblemAccumulator +from StringIO import StringIO +import zipfile +import zlib + + +def DataPath(path): + here = os.path.dirname(__file__) + return os.path.join(here, 'data', path) + +def GetDataPathContents(): + here = os.path.dirname(__file__) + return dircache.listdir(os.path.join(here, 'data')) + + +class ExceptionProblemReporterNoExpiration(transitfeed.ProblemReporter): + """Ignores feed expiration problems. + + Use TestFailureProblemReporter in new code because it fails more cleanly, is + easier to extend and does more thorough checking. + """ + + def __init__(self): + accumulator = transitfeed.ExceptionProblemAccumulator(raise_warnings=True) + transitfeed.ProblemReporter.__init__(self, accumulator) + + def ExpirationDate(self, expiration, context=None): + pass # We don't want to give errors about our test data + + +def GetTestFailureProblemReporter(test_case, + ignore_types=("ExpirationDate",)): + accumulator = TestFailureProblemAccumulator(test_case, ignore_types) + problems = transitfeed.ProblemReporter(accumulator) + return problems + + +class TestFailureProblemAccumulator(transitfeed.ProblemAccumulatorInterface): + """Causes a test failure immediately on any problem.""" + def __init__(self, test_case, ignore_types=("ExpirationDate",)): + self.test_case = test_case + self._ignore_types = ignore_types or set() + + def _Report(self, e): + # These should never crash + formatted_problem = e.FormatProblem() + formatted_context = e.FormatContext() + exception_class = e.__class__.__name__ + if exception_class in self._ignore_types: + return + self.test_case.fail( + "%s: %s\n%s" % (exception_class, formatted_problem, formatted_context)) + + +class UnrecognizedColumnRecorder(transitfeed.ProblemReporter): + """Keeps track of unrecognized column errors.""" + def __init__(self, test_case): + self.accumulator = RecordingProblemAccumulator(test_case, + ignore_types=("ExpirationDate",)) + self.column_errors = [] + + def UnrecognizedColumn(self, file_name, column_name, context=None): + self.column_errors.append((file_name, column_name)) + + +class RedirectStdOutTestCaseBase(util.TestCase): + """Save stdout to the StringIO buffer self.this_stdout""" + def setUp(self): + self.saved_stdout = sys.stdout + self.this_stdout = StringIO() + sys.stdout = self.this_stdout + + def tearDown(self): + sys.stdout = self.saved_stdout + self.this_stdout.close() + + +# ensure that there are no exceptions when attempting to load +# (so that the validator won't crash) +class NoExceptionTestCase(RedirectStdOutTestCaseBase): + def runTest(self): + for feed in GetDataPathContents(): + loader = transitfeed.Loader(DataPath(feed), + problems=transitfeed.ProblemReporter(), + extra_validation=True) + schedule = loader.Load() + schedule.Validate() + + +class EndOfLineCheckerTestCase(util.TestCase): + def setUp(self): + self.accumulator = RecordingProblemAccumulator(self) + self.problems = transitfeed.ProblemReporter(self.accumulator) + + def RunEndOfLineChecker(self, end_of_line_checker): + # Iterating using for calls end_of_line_checker.next() until a + # StopIteration is raised. EndOfLineChecker does the final check for a mix + # of CR LF and LF ends just before raising StopIteration. + for line in end_of_line_checker: + pass + + def testInvalidLineEnd(self): + f = transitfeed.EndOfLineChecker(StringIO("line1\r\r\nline2"), + "<StringIO>", + self.problems) + self.RunEndOfLineChecker(f) + e = self.accumulator.PopException("InvalidLineEnd") + self.assertEqual(e.file_name, "<StringIO>") + self.assertEqual(e.row_num, 1) + self.assertEqual(e.bad_line_end, r"\r\r\n") + self.accumulator.AssertNoMoreExceptions() + + def testInvalidLineEndToo(self): + f = transitfeed.EndOfLineChecker( + StringIO("line1\nline2\r\nline3\r\r\r\n"), + "<StringIO>", self.problems) + self.RunEndOfLineChecker(f) + e = self.accumulator.PopException("InvalidLineEnd") + self.assertEqual(e.file_name, "<StringIO>") + self.assertEqual(e.row_num, 3) + self.assertEqual(e.bad_line_end, r"\r\r\r\n") + e = self.accumulator.PopException("OtherProblem") + self.assertEqual(e.file_name, "<StringIO>") + self.assertTrue(e.description.find("consistent line end") != -1) + self.accumulator.AssertNoMoreExceptions() + + def testEmbeddedCr(self): + f = transitfeed.EndOfLineChecker( + StringIO("line1\rline1b"), + "<StringIO>", self.problems) + self.RunEndOfLineChecker(f) + e = self.accumulator.PopException("OtherProblem") + self.assertEqual(e.file_name, "<StringIO>") + self.assertEqual(e.row_num, 1) + self.assertEqual(e.FormatProblem(), + "Line contains ASCII Carriage Return 0x0D, \\r") + self.accumulator.AssertNoMoreExceptions() + + def testEmbeddedUtf8NextLine(self): + f = transitfeed.EndOfLineChecker( + StringIO("line1b\xc2\x85"), + "<StringIO>", self.problems) + self.RunEndOfLineChecker(f) + e = self.accumulator.PopException("OtherProblem") + self.assertEqual(e.file_name, "<StringIO>") + self.assertEqual(e.row_num, 1) + self.assertEqual(e.FormatProblem(), + "Line contains Unicode NEXT LINE SEPARATOR U+0085") + self.accumulator.AssertNoMoreExceptions() + + def testEndOfLineMix(self): + f = transitfeed.EndOfLineChecker( + StringIO("line1\nline2\r\nline3\nline4"), + "<StringIO>", self.problems) + self.RunEndOfLineChecker(f) + e = self.accumulator.PopException("OtherProblem") + self.assertEqual(e.file_name, "<StringIO>") + self.assertEqual(e.FormatProblem(), + "Found 1 CR LF \"\\r\\n\" line end (line 2) and " + "2 LF \"\\n\" line ends (lines 1, 3). A file must use a " + "consistent line end.") + self.accumulator.AssertNoMoreExceptions() + + def testEndOfLineManyMix(self): + f = transitfeed.EndOfLineChecker( + StringIO("1\n2\n3\n4\n5\n6\n7\r\n8\r\n9\r\n10\r\n11\r\n"), + "<StringIO>", self.problems) + self.RunEndOfLineChecker(f) + e = self.accumulator.PopException("OtherProblem") + self.assertEqual(e.file_name, "<StringIO>") + self.assertEqual(e.FormatProblem(), + "Found 5 CR LF \"\\r\\n\" line ends (lines 7, 8, 9, 10, " + "11) and 6 LF \"\\n\" line ends (lines 1, 2, 3, 4, 5, " + "...). A file must use a consistent line end.") + self.accumulator.AssertNoMoreExceptions() + + def testLoad(self): + loader = transitfeed.Loader( + DataPath("bad_eol.zip"), problems=self.problems, extra_validation=True) + loader.Load() + + e = self.accumulator.PopException("OtherProblem") + self.assertEqual(e.file_name, "calendar.txt") + self.assertTrue(re.search( + r"Found 1 CR LF.* \(line 2\) and 2 LF .*\(lines 1, 3\)", + e.FormatProblem())) + + e = self.accumulator.PopException("InvalidLineEnd") + self.assertEqual(e.file_name, "routes.txt") + self.assertEqual(e.row_num, 5) + self.assertTrue(e.FormatProblem().find(r"\r\r\n") != -1) + + e = self.accumulator.PopException("OtherProblem") + self.assertEqual(e.file_name, "trips.txt") + self.assertEqual(e.row_num, 1) + self.assertTrue(re.search( + r"contains ASCII Form Feed", + e.FormatProblem())) + # TODO(Tom): avoid this duplicate error for the same issue + e = self.accumulator.PopException("CsvSyntax") + self.assertEqual(e.row_num, 1) + self.assertTrue(re.search( + r"header row should not contain any space char", + e.FormatProblem())) + + self.accumulator.AssertNoMoreExceptions() + + +class LoadTestCase(util.TestCase): + def setUp(self): + self.accumulator = RecordingProblemAccumulator(self, ("ExpirationDate",)) + self.problems = transitfeed.ProblemReporter(self.accumulator) + + def Load(self, feed_name): + loader = transitfeed.Loader( + DataPath(feed_name), problems=self.problems, extra_validation=True) + loader.Load() + + def ExpectInvalidValue(self, feed_name, column_name): + self.Load(feed_name) + self.accumulator.PopInvalidValue(column_name) + self.accumulator.AssertNoMoreExceptions() + + def ExpectMissingFile(self, feed_name, file_name): + self.Load(feed_name) + e = self.accumulator.PopException("MissingFile") + self.assertEqual(file_name, e.file_name) + # Don't call AssertNoMoreExceptions() because a missing file causes + # many errors. + + +class LoadFromZipTestCase(util.TestCase): + def runTest(self): + loader = transitfeed.Loader( + DataPath('good_feed.zip'), + problems = GetTestFailureProblemReporter(self), + extra_validation = True) + loader.Load() + + # now try using Schedule.Load + schedule = transitfeed.Schedule( + problem_reporter=ExceptionProblemReporterNoExpiration()) + schedule.Load(DataPath('good_feed.zip'), extra_validation=True) + + +class LoadAndRewriteFromZipTestCase(util.TestCase): + def runTest(self): + schedule = transitfeed.Schedule( + problem_reporter=ExceptionProblemReporterNoExpiration()) + schedule.Load(DataPath('good_feed.zip'), extra_validation=True) + + # Finally see if write crashes + schedule.WriteGoogleTransitFeed(tempfile.TemporaryFile()) + + +class LoadFromDirectoryTestCase(util.TestCase): + def runTest(self): + loader = transitfeed.Loader( + DataPath('good_feed'), + problems = GetTestFailureProblemReporter(self), + extra_validation = True) + loader.Load() + + +class LoadUnknownFeedTestCase(util.TestCase): + def runTest(self): + feed_name = DataPath('unknown_feed') + loader = transitfeed.Loader( + feed_name, + problems = ExceptionProblemReporterNoExpiration(), + extra_validation = True) + try: + loader.Load() + self.fail('FeedNotFound exception expected') + except transitfeed.FeedNotFound, e: + self.assertEqual(feed_name, e.feed_name) + +class LoadUnknownFormatTestCase(util.TestCase): + def runTest(self): + feed_name = DataPath('unknown_format.zip') + loader = transitfeed.Loader( + feed_name, + problems = ExceptionProblemReporterNoExpiration(), + extra_validation = True) + try: + loader.Load() + self.fail('UnknownFormat exception expected') + except transitfeed.UnknownFormat, e: + self.assertEqual(feed_name, e.feed_name) + +class LoadUnrecognizedColumnsTestCase(util.TestCase): + def runTest(self): + problems = UnrecognizedColumnRecorder(self) + loader = transitfeed.Loader(DataPath('unrecognized_columns'), + problems=problems) + loader.Load() + found_errors = set(problems.column_errors) + expected_errors = set([ + ('agency.txt', 'agency_lange'), + ('stops.txt', 'stop_uri'), + ('routes.txt', 'Route_Text_Color'), + ('calendar.txt', 'leap_day'), + ('calendar_dates.txt', 'leap_day'), + ('trips.txt', 'sharpe_id'), + ('stop_times.txt', 'shapedisttraveled'), + ('stop_times.txt', 'drop_off_time'), + ('fare_attributes.txt', 'transfer_time'), + ('fare_rules.txt', 'source_id'), + ('frequencies.txt', 'superfluous'), + ('transfers.txt', 'to_stop') + ]) + + # Now make sure we got the unrecognized column errors that we expected. + not_expected = found_errors.difference(expected_errors) + self.failIf(not_expected, 'unexpected errors: %s' % str(not_expected)) + not_found = expected_errors.difference(found_errors) + self.failIf(not_found, 'expected but not found: %s' % str(not_found)) + +class LoadExtraCellValidationTestCase(LoadTestCase): + """Check that the validation detects too many cells in a row.""" + def runTest(self): + self.Load('extra_row_cells') + e = self.accumulator.PopException("OtherProblem") + self.assertEquals("routes.txt", e.file_name) + self.assertEquals(4, e.row_num) + self.accumulator.AssertNoMoreExceptions() + + +class LoadMissingCellValidationTestCase(LoadTestCase): + """Check that the validation detects missing cells in a row.""" + def runTest(self): + self.Load('missing_row_cells') + e = self.accumulator.PopException("OtherProblem") + self.assertEquals("routes.txt", e.file_name) + self.assertEquals(4, e.row_num) + self.accumulator.AssertNoMoreExceptions() + +class LoadUnknownFileTestCase(util.TestCase): + """Check that the validation detects unknown files.""" + def runTest(self): + feed_name = DataPath('unknown_file') + self.accumulator = RecordingProblemAccumulator(self, ("ExpirationDate",)) + self.problems = transitfeed.ProblemReporter(self.accumulator) + loader = transitfeed.Loader( + feed_name, + problems = self.problems, + extra_validation = True) + loader.Load() + e = self.accumulator.PopException('UnknownFile') + self.assertEqual('frecuencias.txt', e.file_name) + self.accumulator.AssertNoMoreExceptions() + +class LoadUTF8BOMTestCase(util.TestCase): + def runTest(self): + loader = transitfeed.Loader( + DataPath('utf8bom'), + problems = GetTestFailureProblemReporter(self), + extra_validation = True) + loader.Load() + + +class LoadUTF16TestCase(util.TestCase): + def runTest(self): + # utf16 generated by `recode utf8..utf16 *' + accumulator = transitfeed.ExceptionProblemAccumulator() + problem_reporter = transitfeed.ProblemReporter(accumulator) + loader = transitfeed.Loader( + DataPath('utf16'), + problems = problem_reporter, + extra_validation = True) + try: + loader.Load() + # TODO: make sure processing proceeds beyond the problem + self.fail('FileFormat exception expected') + except transitfeed.FileFormat, e: + # make sure these don't raise an exception + self.assertTrue(re.search(r'encoded in utf-16', e.FormatProblem())) + e.FormatContext() + + +class LoadNullTestCase(util.TestCase): + def runTest(self): + accumulator = transitfeed.ExceptionProblemAccumulator() + problem_reporter = transitfeed.ProblemReporter(accumulator) + loader = transitfeed.Loader( + DataPath('contains_null'), + problems = problem_reporter, + extra_validation = True) + try: + loader.Load() + self.fail('FileFormat exception expected') + except transitfeed.FileFormat, e: + self.assertTrue(re.search(r'contains a null', e.FormatProblem())) + # make sure these don't raise an exception + e.FormatContext() + + +class ProblemReporterTestCase(RedirectStdOutTestCaseBase): + # Unittest for problem reporter + def testContextWithBadUnicodeProblem(self): + pr = transitfeed.ProblemReporter() + # Context has valid unicode values + pr.SetFileContext('filename.foo', 23, + [u'Andr\202', u'Person \uc720 foo', None], + [u'1\202', u'2\202', u'3\202']) + pr.OtherProblem('test string') + pr.OtherProblem(u'\xff\xfe\x80\x88') + # Invalid ascii and utf-8. encode('utf-8') and decode('utf-8') will fail + # for this value + pr.OtherProblem('\xff\xfe\x80\x88') + self.assertTrue(re.search(r"test string", self.this_stdout.getvalue())) + self.assertTrue(re.search(r"filename.foo:23", self.this_stdout.getvalue())) + + def testNoContextWithBadUnicode(self): + pr = transitfeed.ProblemReporter() + pr.OtherProblem('test string') + pr.OtherProblem(u'\xff\xfe\x80\x88') + # Invalid ascii and utf-8. encode('utf-8') and decode('utf-8') will fail + # for this value + pr.OtherProblem('\xff\xfe\x80\x88') + self.assertTrue(re.search(r"test string", self.this_stdout.getvalue())) + + def testBadUnicodeContext(self): + pr = transitfeed.ProblemReporter() + pr.SetFileContext('filename.foo', 23, + [u'Andr\202', 'Person \xff\xfe\x80\x88 foo', None], + [u'1\202', u'2\202', u'3\202']) + pr.OtherProblem("help, my context isn't utf-8!") + self.assertTrue(re.search(r"help, my context", self.this_stdout.getvalue())) + self.assertTrue(re.search(r"filename.foo:23", self.this_stdout.getvalue())) + + def testLongWord(self): + # Make sure LineWrap doesn't puke + pr = transitfeed.ProblemReporter() + pr.OtherProblem('1111untheontuhoenuthoentuhntoehuontehuntoehuntoehunto' + '2222oheuntheounthoeunthoeunthoeuntheontuheontuhoue') + self.assertTrue(re.search(r"1111.+2222", self.this_stdout.getvalue())) + + +class BadProblemReporterTestCase(RedirectStdOutTestCaseBase): + """Make sure ProblemReporter doesn't crash when given bad unicode data and + does find some error""" + # tom.brown.code-utf8_weaknesses fixed a bug with problem reporter and bad + # utf-8 strings + def runTest(self): + loader = transitfeed.Loader( + DataPath('bad_utf8'), + problems = transitfeed.ProblemReporter(), + extra_validation = True) + loader.Load() + # raises exception if not found + self.this_stdout.getvalue().index('Invalid value') + + +class BadUtf8TestCase(LoadTestCase): + def runTest(self): + self.Load('bad_utf8') + self.accumulator.PopException("UnrecognizedColumn") + self.accumulator.PopInvalidValue("agency_name", "agency.txt") + self.accumulator.PopInvalidValue("stop_name", "stops.txt") + self.accumulator.PopInvalidValue("route_short_name", "routes.txt") + self.accumulator.PopInvalidValue("route_long_name", "routes.txt") + self.accumulator.PopInvalidValue("trip_headsign", "trips.txt") + self.accumulator.PopInvalidValue("stop_headsign", "stop_times.txt") + self.accumulator.AssertNoMoreExceptions() + + +class LoadMissingAgencyTestCase(LoadTestCase): + def runTest(self): + self.ExpectMissingFile('missing_agency', 'agency.txt') + + +class LoadMissingStopsTestCase(LoadTestCase): + def runTest(self): + self.ExpectMissingFile('missing_stops', 'stops.txt') + + +class LoadMissingRoutesTestCase(LoadTestCase): + def runTest(self): + self.ExpectMissingFile('missing_routes', 'routes.txt') + + +class LoadMissingTripsTestCase(LoadTestCase): + def runTest(self): + self.ExpectMissingFile('missing_trips', 'trips.txt') + + +class LoadMissingStopTimesTestCase(LoadTestCase): + def runTest(self): + self.ExpectMissingFile('missing_stop_times', 'stop_times.txt') + + +class LoadMissingCalendarTestCase(LoadTestCase): + def runTest(self): + self.ExpectMissingFile('missing_calendar', 'calendar.txt') + + +class EmptyFileTestCase(util.TestCase): + def runTest(self): + loader = transitfeed.Loader( + DataPath('empty_file'), + problems = ExceptionProblemReporterNoExpiration(), + extra_validation = True) + try: + loader.Load() + self.fail('EmptyFile exception expected') + except transitfeed.EmptyFile, e: + self.assertEqual('agency.txt', e.file_name) + + +class MissingColumnTestCase(util.TestCase): + def runTest(self): + loader = transitfeed.Loader( + DataPath('missing_column'), + problems = ExceptionProblemReporterNoExpiration(), + extra_validation = True) + try: + loader.Load() + self.fail('MissingColumn exception expected') + except transitfeed.MissingColumn, e: + self.assertEqual('agency.txt', e.file_name) + self.assertEqual('agency_name', e.column_name) + + +class ZeroBasedStopSequenceTestCase(LoadTestCase): + def runTest(self): + self.ExpectInvalidValue('negative_stop_sequence', 'stop_sequence') + + +class DuplicateStopTestCase(util.TestCase): + def runTest(self): + schedule = transitfeed.Schedule( + problem_reporter=ExceptionProblemReporterNoExpiration()) + try: + schedule.Load(DataPath('duplicate_stop'), extra_validation=True) + self.fail('OtherProblem exception expected') + except transitfeed.OtherProblem: + pass + +class DuplicateStopSequenceTestCase(util.TestCase): + def runTest(self): + accumulator = RecordingProblemAccumulator(self, ("ExpirationDate", + "NoServiceExceptions")) + problems = transitfeed.ProblemReporter(accumulator) + schedule = transitfeed.Schedule(problem_reporter=problems) + schedule.Load(DataPath('duplicate_stop_sequence'), extra_validation=True) + e = accumulator.PopException('InvalidValue') + self.assertEqual('stop_sequence', e.column_name) + accumulator.AssertNoMoreExceptions() + + +class MissingEndpointTimesTestCase(util.TestCase): + def runTest(self): + problems = ExceptionProblemReporterNoExpiration() + schedule = transitfeed.Schedule(problem_reporter=problems) + try: + schedule.Load(DataPath('missing_endpoint_times'), extra_validation=True) + self.fail('InvalidValue exception expected') + except transitfeed.InvalidValue, e: + self.assertEqual('departure_time', e.column_name) + self.assertEqual('', e.value) + + +class DuplicateScheduleIDTestCase(util.TestCase): + def runTest(self): + schedule = transitfeed.Schedule( + problem_reporter=ExceptionProblemReporterNoExpiration()) + try: + schedule.Load(DataPath('duplicate_schedule_id'), extra_validation=True) + self.fail('DuplicateID exception expected') + except transitfeed.DuplicateID: + pass + +class OverlappingBlockSchedule(transitfeed.Schedule): + """Special Schedule subclass that counts the number of calls to + GetServicePeriod() so we can verify service period overlap calculation + caching""" + + _get_service_period_call_count = 0 + + def GetServicePeriod(self, service_id): + self._get_service_period_call_count += 1 + return transitfeed.Schedule.GetServicePeriod(self,service_id) + + def GetServicePeriodCallCount(self): + return self._get_service_period_call_count + +class OverlappingBlockTripsTestCase(util.TestCase): + """Builds a simple schedule for testing of overlapping block trips""" + + def setUp(self): + self.accumulator = RecordingProblemAccumulator( + self, ("ExpirationDate", "NoServiceExceptions")) + self.problems = transitfeed.ProblemReporter(self.accumulator) + + schedule = OverlappingBlockSchedule(problem_reporter=self.problems) + schedule.AddAgency("Demo Transit Authority", "http://dta.org", + "America/Los_Angeles") + + sp1 = transitfeed.ServicePeriod("SID1") + sp1.SetWeekdayService(True) + sp1.SetStartDate("20070605") + sp1.SetEndDate("20080605") + schedule.AddServicePeriodObject(sp1) + + sp2 = transitfeed.ServicePeriod("SID2") + sp2.SetDayOfWeekHasService(0) + sp2.SetDayOfWeekHasService(2) + sp2.SetDayOfWeekHasService(4) + sp2.SetStartDate("20070605") + sp2.SetEndDate("20080605") + schedule.AddServicePeriodObject(sp2) + + sp3 = transitfeed.ServicePeriod("SID3") + sp3.SetWeekendService(True) + sp3.SetStartDate("20070605") + sp3.SetEndDate("20080605") + schedule.AddServicePeriodObject(sp3) + + self.stop1 = schedule.AddStop(lng=-116.75167, + lat=36.915682, + name="Stagecoach Hotel & Casino", + stop_id="S1") + + self.stop2 = schedule.AddStop(lng=-116.76218, + lat=36.905697, + name="E Main St / S Irving St", + stop_id="S2") + + self.route = schedule.AddRoute("", "City", "Bus", route_id="CITY") + + self.schedule = schedule + self.sp1 = sp1 + self.sp2 = sp2 + self.sp3 = sp3 + + def testNoOverlap(self): + + schedule, route, sp1 = self.schedule, self.route, self.sp1 + + trip1 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY1") + trip1.block_id = "BLOCK" + trip1.AddStopTime(self.stop1, stop_time="6:00:00") + trip1.AddStopTime(self.stop2, stop_time="6:30:00") + + trip2 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY2") + trip2.block_id = "BLOCK" + trip2.AddStopTime(self.stop2, stop_time="6:30:00") + trip2.AddStopTime(self.stop1, stop_time="7:00:00") + + schedule.Validate(self.problems) + + self.accumulator.AssertNoMoreExceptions() + + def testOverlapSameServicePeriod(self): + + schedule, route, sp1 = self.schedule, self.route, self.sp1 + + trip1 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY1") + trip1.block_id = "BLOCK" + trip1.AddStopTime(self.stop1, stop_time="6:00:00") + trip1.AddStopTime(self.stop2, stop_time="6:30:00") + + trip2 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY2") + trip2.block_id = "BLOCK" + trip2.AddStopTime(self.stop2, stop_time="6:20:00") + trip2.AddStopTime(self.stop1, stop_time="6:50:00") + + schedule.Validate(self.problems) + + e = self.accumulator.PopException('OverlappingTripsInSameBlock') + self.assertEqual(e.trip_id1, 'CITY1') + self.assertEqual(e.trip_id2, 'CITY2') + self.assertEqual(e.block_id, 'BLOCK') + + self.accumulator.AssertNoMoreExceptions() + + def testOverlapDifferentServicePeriods(self): + + schedule, route, sp1, sp2 = self.schedule, self.route, self.sp1, self.sp2 + + trip1 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY1") + trip1.block_id = "BLOCK" + trip1.AddStopTime(self.stop1, stop_time="6:00:00") + trip1.AddStopTime(self.stop2, stop_time="6:30:00") + + trip2 = route.AddTrip(schedule, service_period=sp2, trip_id="CITY2") + trip2.block_id = "BLOCK" + trip2.AddStopTime(self.stop2, stop_time="6:20:00") + trip2.AddStopTime(self.stop1, stop_time="6:50:00") + + trip3 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY3") + trip3.block_id = "BLOCK" + trip3.AddStopTime(self.stop1, stop_time="7:00:00") + trip3.AddStopTime(self.stop2, stop_time="7:30:00") + + trip4 = route.AddTrip(schedule, service_period=sp2, trip_id="CITY4") + trip4.block_id = "BLOCK" + trip4.AddStopTime(self.stop2, stop_time="7:20:00") + trip4.AddStopTime(self.stop1, stop_time="7:50:00") + + schedule.Validate(self.problems) + + e = self.accumulator.PopException('OverlappingTripsInSameBlock') + self.assertEqual(e.trip_id1, 'CITY1') + self.assertEqual(e.trip_id2, 'CITY2') + self.assertEqual(e.block_id, 'BLOCK') + + e = self.accumulator.PopException('OverlappingTripsInSameBlock') + self.assertEqual(e.trip_id1, 'CITY3') + self.assertEqual(e.trip_id2, 'CITY4') + self.assertEqual(e.block_id, 'BLOCK') + + self.accumulator.AssertNoMoreExceptions() + + # If service period overlap calculation caching is working correctly, + # we expect only two calls to GetServicePeriod(), one each for sp1 and + # sp2, as oppossed four calls total for the four overlapping trips + self.assertEquals(2,schedule.GetServicePeriodCallCount()) + + def testNoOverlapDifferentServicePeriods(self): + + schedule, route, sp1, sp3 = self.schedule, self.route, self.sp1, self.sp3 + + trip1 = route.AddTrip(schedule, service_period=sp1, trip_id="CITY1") + trip1.block_id = "BLOCK" + trip1.AddStopTime(self.stop1, stop_time="6:00:00") + trip1.AddStopTime(self.stop2, stop_time="6:30:00") + + trip2 = route.AddTrip(schedule, service_period=sp3, trip_id="CITY2") + trip2.block_id = "BLOCK" + trip2.AddStopTime(self.stop2, stop_time="6:20:00") + trip2.AddStopTime(self.stop1, stop_time="6:50:00") + + schedule.Validate(self.problems) + + self.accumulator.AssertNoMoreExceptions() + +class ColorLuminanceTestCase(util.TestCase): + def runTest(self): + self.assertEqual(transitfeed.ColorLuminance('000000'), 0, + "ColorLuminance('000000') should be zero") + self.assertEqual(transitfeed.ColorLuminance('FFFFFF'), 255, + "ColorLuminance('FFFFFF') should be 255") + RGBmsg = ("ColorLuminance('RRGGBB') should be " + "0.299*<Red> + 0.587*<Green> + 0.114*<Blue>") + decimal_places_tested = 8 + self.assertAlmostEqual(transitfeed.ColorLuminance('640000'), 29.9, + decimal_places_tested, RGBmsg) + self.assertAlmostEqual(transitfeed.ColorLuminance('006400'), 58.7, + decimal_places_tested, RGBmsg) + self.assertAlmostEqual(transitfeed.ColorLuminance('000064'), 11.4, + decimal_places_tested, RGBmsg) + self.assertAlmostEqual(transitfeed.ColorLuminance('1171B3'), + 0.299*17 + 0.587*113 + 0.114*179, + decimal_places_tested, RGBmsg) + +INVALID_VALUE = Exception() +class ValidationTestCase(util.TestCase): + def setUp(self): + self.accumulator = RecordingProblemAccumulator( + self, ("ExpirationDate", "NoServiceExceptions")) + self.problems = transitfeed.ProblemReporter(self.accumulator) + + def tearDown(self): + self.accumulator.TearDownAssertNoMoreExceptions() + + def ExpectNoProblems(self, object): + self.accumulator.AssertNoMoreExceptions() + object.Validate(self.problems) + self.accumulator.AssertNoMoreExceptions() + + # TODO: Get rid of Expect*Closure methods. With the + # RecordingProblemAccumulator it is now possible to replace + # self.ExpectMissingValueInClosure(lambda: o.method(...), foo) + # with + # o.method(...) + # self.ExpectMissingValueInClosure(foo) + # because problems don't raise an exception. This has the advantage of + # making it easy and clear to test the return value of o.method(...) and + # easier to test for a sequence of problems caused by one c