--- a/origin-src/transitfeed-1.2.6/test/testfeedvalidator.py +++ b/origin-src/transitfeed-1.2.6/test/testfeedvalidator.py @@ -1,1 +1,459 @@ - +#!/usr/bin/python2.5 + +# Copyright (C) 2007 Google Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Smoke tests feed validator. Make sure it runs and returns the right things +# for a valid feed and a feed with errors. + +import datetime +import feedvalidator +import os.path +import re +import StringIO +import transitfeed +import unittest +from urllib2 import HTTPError, URLError +import urllib2 +import util +import zipfile + + +class FullTests(util.TempDirTestCaseBase): + def testGoodFeed(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '-n', '--latest_version', + transitfeed.__version__, self.GetPath('test', 'data', 'good_feed')]) + self.assertTrue(re.search(r'feed validated successfully', out)) + self.assertFalse(re.search(r'ERROR', out)) + htmlout = open('validation-results.html').read() + self.assertTrue(re.search(r'feed validated successfully', htmlout)) + self.assertFalse(re.search(r'ERROR', htmlout)) + self.assertFalse(os.path.exists('transitfeedcrash.txt')) + + def testGoodFeedConsoleOutput(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '-n', '--latest_version', + transitfeed.__version__, + '--output=CONSOLE', self.GetPath('test', 'data', 'good_feed')]) + self.assertTrue(re.search(r'feed validated successfully', out)) + self.assertFalse(re.search(r'ERROR', out)) + self.assertFalse(os.path.exists('validation-results.html')) + self.assertFalse(os.path.exists('transitfeedcrash.txt')) + + def testMissingStops(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '-n', '--latest_version', + transitfeed.__version__, + self.GetPath('test', 'data', 'missing_stops')], + expected_retcode=1) + self.assertTrue(re.search(r'ERROR', out)) + self.assertFalse(re.search(r'feed validated successfully', out)) + htmlout = open('validation-results.html').read() + self.assertTrue(re.search(r'Invalid value BEATTY_AIRPORT', htmlout)) + self.assertFalse(re.search(r'feed validated successfully', htmlout)) + self.assertFalse(os.path.exists('transitfeedcrash.txt')) + + def testMissingStopsConsoleOutput(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '-n', '-o', 'console', + '--latest_version', transitfeed.__version__, + self.GetPath('test', 'data', 'missing_stops')], + expected_retcode=1) + self.assertTrue(re.search(r'ERROR', out)) + self.assertFalse(re.search(r'feed validated successfully', out)) + self.assertTrue(re.search(r'Invalid value BEATTY_AIRPORT', out)) + self.assertFalse(os.path.exists('validation-results.html')) + self.assertFalse(os.path.exists('transitfeedcrash.txt')) + + def testLimitedErrors(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '-l', '2', '-n', + '--latest_version', transitfeed.__version__, + self.GetPath('test', 'data', 'missing_stops')], + expected_retcode=1) + self.assertTrue(re.search(r'ERROR', out)) + self.assertFalse(re.search(r'feed validated successfully', out)) + htmlout = open('validation-results.html').read() + self.assertEquals(2, len(re.findall(r'class="problem">stop_id<', htmlout))) + self.assertFalse(os.path.exists('transitfeedcrash.txt')) + + def testBadDateFormat(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '-n', '--latest_version', + transitfeed.__version__, + self.GetPath('test', 'data', 'bad_date_format')], + expected_retcode=1) + self.assertTrue(re.search(r'ERROR', out)) + self.assertFalse(re.search(r'feed validated successfully', out)) + htmlout = open('validation-results.html').read() + self.assertTrue(re.search(r'in field <code>start_date', htmlout)) + self.assertTrue(re.search(r'in field <code>date', htmlout)) + self.assertFalse(re.search(r'feed validated successfully', htmlout)) + self.assertFalse(os.path.exists('transitfeedcrash.txt')) + + def testBadUtf8(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '-n', '--latest_version', + transitfeed.__version__, self.GetPath('test', 'data', 'bad_utf8')], + expected_retcode=1) + self.assertTrue(re.search(r'ERROR', out)) + self.assertFalse(re.search(r'feed validated successfully', out)) + htmlout = open('validation-results.html').read() + self.assertTrue(re.search(r'Unicode error', htmlout)) + self.assertFalse(re.search(r'feed validated successfully', htmlout)) + self.assertFalse(os.path.exists('transitfeedcrash.txt')) + + def testFileNotFound(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '-n', '--latest_version', + transitfeed.__version__, 'file-not-found.zip'], + expected_retcode=1) + self.assertFalse(os.path.exists('transitfeedcrash.txt')) + + def testBadOutputPath(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '-n', '--latest_version', + transitfeed.__version__, '-o', 'path/does/not/exist.html', + self.GetPath('test', 'data', 'good_feed')], + expected_retcode=2) + self.assertFalse(os.path.exists('transitfeedcrash.txt')) + + def testCrashHandler(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '-n', '--latest_version', + transitfeed.__version__, 'IWantMyvalidation-crash.txt'], + expected_retcode=127) + self.assertTrue(re.search(r'Yikes', out)) + self.assertFalse(re.search(r'feed validated successfully', out)) + crashout = open('transitfeedcrash.txt').read() + self.assertTrue(re.search(r'For testing the feed validator crash handler', + crashout)) + + def testCheckVersionIsRun(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '-n', '--latest_version', + '100.100.100', self.GetPath('test', 'data', 'good_feed')]) + self.assertTrue(re.search(r'feed validated successfully', out)) + self.assertTrue(re.search(r'A new version 100.100.100', out)) + htmlout = open('validation-results.html').read() + self.assertTrue(re.search(r'A new version 100.100.100', htmlout)) + self.assertFalse(re.search(r'ERROR', htmlout)) + self.assertFalse(os.path.exists('transitfeedcrash.txt')) + + def testCheckVersionIsRunConsoleOutput(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '-n', '-o', 'console', + '--latest_version=100.100.100', + self.GetPath('test', 'data', 'good_feed')]) + self.assertTrue(re.search(r'feed validated successfully', out)) + self.assertTrue(re.search(r'A new version 100.100.100', out)) + self.assertFalse(os.path.exists('validation-results.html')) + self.assertFalse(os.path.exists('transitfeedcrash.txt')) + + def testUsage(self): + (out, err) = self.CheckCallWithPath( + [self.GetPath('feedvalidator.py'), '--invalid_opt'], expected_retcode=2) + self.assertMatchesRegex(r'[Uu]sage: feedvalidator.py \[options\]', err) + self.assertMatchesRegex(r'wiki/FeedValidator', err) + self.assertMatchesRegex(r'--output', err) # output includes all usage info + self.assertFalse(os.path.exists('transitfeedcrash.txt')) + self.assertFalse(os.path.exists('validation-results.html')) + + +# Regression tests to ensure that CalendarSummary works properly +# even when the feed starts in the future or expires in less than +# 60 days +# See http://code.google.com/p/googletransitdatafeed/issues/detail?id=204 +class CalendarSummaryTestCase(util.TestCase): + + # Test feeds starting in the future + def testFutureFeedDoesNotCrashCalendarSummary(self): + today = datetime.date.today() + start_date = today + datetime.timedelta(days=20) + end_date = today + datetime.timedelta(days=80) + + schedule = transitfeed.Schedule() + service_period = schedule.GetDefaultServicePeriod() + + service_period.SetStartDate(start_date.strftime("%Y%m%d")) + service_period.SetEndDate(end_date.strftime("%Y%m%d")) + service_period.SetWeekdayService(True) + + result = feedvalidator.CalendarSummary(schedule) + + self.assertEquals(0, result['max_trips']) + self.assertEquals(0, result['min_trips']) + self.assertTrue(re.search("40 service dates", result['max_trips_dates'])) + + # Test feeds ending in less than 60 days + def testShortFeedDoesNotCrashCalendarSummary(self): + start_date = datetime.date.today() + end_date = start_date + datetime.timedelta(days=15) + + schedule = transitfeed.Schedule() + service_period = schedule.GetDefaultServicePeriod() + + service_period.SetStartDate(start_date.strftime("%Y%m%d")) + service_period.SetEndDate(end_date.strftime("%Y%m%d")) + service_period.SetWeekdayService(True) + + result = feedvalidator.CalendarSummary(schedule) + + self.assertEquals(0, result['max_trips']) + self.assertEquals(0, result['min_trips']) + self.assertTrue(re.search("15 service dates", result['max_trips_dates'])) + + # Test feeds starting in the future *and* ending in less than 60 days + def testFutureAndShortFeedDoesNotCrashCalendarSummary(self): + today = datetime.date.today() + start_date = today + datetime.timedelta(days=2) + end_date = today + datetime.timedelta(days=3) + + schedule = transitfeed.Schedule() + service_period = schedule.GetDefaultServicePeriod() + + service_period.SetStartDate(start_date.strftime("%Y%m%d")) + service_period.SetEndDate(end_date.strftime("%Y%m%d")) + service_period.SetWeekdayService(True) + + result = feedvalidator.CalendarSummary(schedule) + + self.assertEquals(0, result['max_trips']) + self.assertEquals(0, result['min_trips']) + self.assertTrue(re.search("1 service date", result['max_trips_dates'])) + + # Test feeds without service days + def testFeedWithNoDaysDoesNotCrashCalendarSummary(self): + schedule = transitfeed.Schedule() + result = feedvalidator.CalendarSummary(schedule) + + self.assertEquals({}, result) + + +class MockOptions: + """Pretend to be an optparse options object suitable for testing.""" + def __init__(self): + self.limit_per_type = 5 + self.memory_db = True + self.check_duplicate_trips = True + self.latest_version = transitfeed.__version__ + self.output = 'fake-filename.zip' + self.manual_entry = False + self.service_gap_interval = None + self.extension = None + + +class FeedValidatorTestCase(util.TempDirTestCaseBase): + def testBadEolContext(self): + """Make sure the filename is included in the report of a bad eol.""" + + filename = "routes.txt" + old_zip = zipfile.ZipFile( + self.GetPath('test', 'data', 'good_feed.zip'), 'r') + content_dict = self.ConvertZipToDict(old_zip) + old_routes = content_dict[filename] + new_routes = old_routes.replace('\n', '\r\n', 1) + self.assertNotEquals(old_routes, new_routes) + content_dict[filename] = new_routes + new_zipfile_mem = self.ConvertDictToZip(content_dict) + + options = MockOptions() + output_file = StringIO.StringIO() + feedvalidator.RunValidationOutputToFile( + new_zipfile_mem, options, output_file) + self.assertMatchesRegex(filename, output_file.getvalue()) + + +class LimitPerTypeProblemReporterTestCase(util.TestCase): + + def CreateLimitPerTypeProblemReporter(self, limit): + accumulator = feedvalidator.LimitPerTypeProblemAccumulator(limit) + problems = transitfeed.ProblemReporter(accumulator) + return problems + + def assertProblemsAttribute(self, problem_type, class_name, attribute_name, + expected): + """Join the value of each exception's attribute_name in order.""" + problem_attribute_list = [] + for e in self.problems.GetAccumulator().ProblemList( + problem_type, class_name).problems: + problem_attribute_list.append(getattr(e, attribute_name)) + self.assertEquals(expected, " ".join(problem_attribute_list)) + + def testLimitOtherProblems(self): + """The first N of each type should be kept.""" + self.problems = self.CreateLimitPerTypeProblemReporter(2) + self.accumulator = self.problems.GetAccumulator() + + self.problems.OtherProblem("e1", type=transitfeed.TYPE_ERROR) + self.problems.OtherProblem("w1", type=transitfeed.TYPE_WARNING) + self.problems.OtherProblem("e2", type=transitfeed.TYPE_ERROR) + self.problems.OtherProblem("e3", type=transitfeed.TYPE_ERROR) + self.problems.OtherProblem("w2", type=transitfeed.TYPE_WARNING) + self.assertEquals(2, self.accumulator.WarningCount()) + self.assertEquals(3, self.accumulator.ErrorCount()) + + # These are BoundedProblemList objects + warning_bounded_list = self.accumulator.ProblemList( + transitfeed.TYPE_WARNING, "OtherProblem") + error_bounded_list = self.accumulator.ProblemList( + transitfeed.TYPE_ERROR, "OtherProblem") + + self.assertEquals(2, warning_bounded_list.count) + self.assertEquals(3, error_bounded_list.count) + + self.assertEquals(0, warning_bounded_list.dropped_count) + self.assertEquals(1, error_bounded_list.dropped_count) + + self.assertProblemsAttribute(transitfeed.TYPE_ERROR, "OtherProblem", + "description", "e1 e2") + self.assertProblemsAttribute(transitfeed.TYPE_WARNING, "OtherProblem", + "description", "w1 w2") + + def testKeepUnsorted(self): + """An imperfect test that insort triggers ExceptionWithContext.__cmp__.""" + # If ExceptionWithContext.__cmp__ doesn't trigger TypeError in + # bisect.insort then the default comparison of object id will be used. The + # id values tend to be given out in order of creation so call + # problems._Report with objects in a different order. This test should + # break if ExceptionWithContext.__cmp__ is removed or changed to return 0 + # or cmp(id(self), id(y)). + exceptions = [] + for i in range(20): + exceptions.append(transitfeed.OtherProblem(description="e%i" % i)) + exceptions = exceptions[10:] + exceptions[:10] + self.problems = self.CreateLimitPerTypeProblemReporter(3) + self.accumulator = self.problems.GetAccumulator() + for e in exceptions: + self.problems.AddToAccumulator(e) + + self.assertEquals(0, self.accumulator.WarningCount()) + self.assertEquals(20, self.accumulator.ErrorCount()) + + bounded_list = self.accumulator.ProblemList( + transitfeed.TYPE_ERROR, "OtherProblem") + self.assertEquals(20, bounded_list.count) + self.assertEquals(17, bounded_list.dropped_count) + self.assertProblemsAttribute(transitfeed.TYPE_ERROR, "OtherProblem", + "description", "e10 e11 e12") + + def testLimitSortedTooFastTravel(self): + """Sort by decreasing distance, keeping the N greatest.""" + self.problems = self.CreateLimitPerTypeProblemReporter(3) + self.accumulator = self.problems.GetAccumulator() + self.problems.TooFastTravel("t1", "prev stop", "next stop", 11230.4, 5, + None) + self.problems.TooFastTravel("t2", "prev stop", "next stop", 1120.4, 5, None) + self.problems.TooFastTravel("t3", "prev stop", "next stop", 1130.4, 5, None) + self.problems.TooFastTravel("t4", "prev stop", "next stop", 1230.4, 5, None) + self.assertEquals(0, self.accumulator.WarningCount()) + self.assertEquals(4, self.accumulator.ErrorCount()) + self.assertProblemsAttribute(transitfeed.TYPE_ERROR, "TooFastTravel", + "trip_id", "t1 t4 t3") + + def testLimitSortedStopTooFarFromParentStation(self): + """Sort by decreasing distance, keeping the N greatest.""" + self.problems = self.CreateLimitPerTypeProblemReporter(3) + self.accumulator = self.problems.GetAccumulator() + for i, distance in enumerate((1000, 3002.0, 1500, 2434.1, 5023.21)): + self.problems.StopTooFarFromParentStation( + "s%d" % i, "S %d" % i, "p%d" % i, "P %d" % i, distance) + self.assertEquals(5, self.accumulator.WarningCount()) + self.assertEquals(0, self.accumulator.ErrorCount()) + self.assertProblemsAttribute(transitfeed.TYPE_WARNING, + "StopTooFarFromParentStation", "stop_id", "s4 s1 s3") + + def testLimitSortedStopsTooClose(self): + """Sort by increasing distance, keeping the N closest.""" + self.problems = self.CreateLimitPerTypeProblemReporter(3) + self.accumulator = self.problems.GetAccumulator() + for i, distance in enumerate((4.0, 3.0, 2.5, 2.2, 1.0, 0.0)): + self.problems.StopsTooClose( + "Sa %d" % i, "sa%d" % i, "Sb %d" % i, "sb%d" % i, distance) + self.assertEquals(6, self.accumulator.WarningCount()) + self.assertEquals(0, self.accumulator.ErrorCount()) + self.assertProblemsAttribute(transitfeed.TYPE_WARNING, + "StopsTooClose", "stop_id_a", "sa5 sa4 sa3") + + +class CheckVersionTestCase(util.TempDirTestCaseBase): + def setUp(self): + self.mock = MockURLOpen() + + def tearDown(self): + self.mock = None + feedvalidator.urlopen = urllib2.urlopen + + def testAssignedDifferentVersion(self): + problems = feedvalidator.CheckVersion('100.100.100') + self.assertTrue(re.search(r'A new version 100.100.100', problems)) + + def testAssignedSameVersion(self): + problems = feedvalidator.CheckVersion(transitfeed.__version__) + self.assertEquals(problems, None) + + def testGetCorrectReturns(self): + feedvalidator.urlopen = self.mock.mockedConnectSuccess + problems = feedvalidator.CheckVersion() + self.assertTrue(re.search(r'A new version 100.0.1', problems)) + + def testPageNotFound(self): + feedvalidator.urlopen = self.mock.mockedPageNotFound + problems = feedvalidator.CheckVersion() + self.assertTrue(re.search(r'The server couldn\'t', problems)) + self.assertTrue(re.search(r'Error code: 404', problems)) + + def testConnectionTimeOut(self): + feedvalidator.urlopen = self.mock.mockedConnectionTimeOut + problems = feedvalidator.CheckVersion() + self.assertTrue(re.search(r'We failed to reach', problems)) + self.assertTrue(re.search(r'Reason: Connection timed', problems)) + + def testGetAddrInfoFailed(self): + feedvalidator.urlopen = self.mock.mockedGetAddrInfoFailed + problems = feedvalidator.CheckVersion() + self.assertTrue(re.search(r'We failed to reach', problems)) + self.assertTrue(re.search(r'Reason: Getaddrinfo failed', problems)) + + def testEmptyIsReturned(self): + feedvalidator.urlopen = self.mock.mockedEmptyIsReturned + problems = feedvalidator.CheckVersion() + self.assertTrue(re.search(r'We had trouble parsing', problems)) + + +class MockURLOpen: + """Pretend to be a urllib2.urlopen suitable for testing.""" + def mockedConnectSuccess(self, request): + return StringIO.StringIO('<li><a href="transitfeed-1.0.0/">transitfeed-' + '1.0.0/</a></li><li><a href=transitfeed-100.0.1/>' + 'transitfeed-100.0.1/</a></li>') + + def mockedPageNotFound(self, request): + raise HTTPError(request.get_full_url(), 404, 'Not Found', + request.header_items(), None) + + def mockedConnectionTimeOut(self, request): + raise URLError('Connection timed out') + + def mockedGetAddrInfoFailed(self, request): + raise URLError('Getaddrinfo failed') + + def mockedEmptyIsReturned(self, request): + return StringIO.StringIO() + + +if __name__ == '__main__': + unittest.main() +