+# Code shared between tests.
+import os
+import os.path
+import re
+import cStringIO as StringIO
+import shutil
+import subprocess
+import sys
+import tempfile
+import traceback
+import transitfeed
+import unittest
+import zipfile
+def check_call(cmd, expected_retcode=0, stdin_str="", **kwargs):
+  """Convenience function that is in the docs for subprocess but not
+  installed on my system. Raises an Exception if the return code is not
+  expected_retcode. Returns a tuple of strings, (stdout, stderr)."""
+  try:
+    if 'stdout' in kwargs or 'stderr' in kwargs or 'stdin' in kwargs:
+      raise Exception("Don't pass stdout or stderr")
+    p = subprocess.Popen(cmd, stdout=subprocess.PIPE,
+                         stderr=subprocess.PIPE, stdin=subprocess.PIPE,
+                         **kwargs)
+    (out, err) = p.communicate(stdin_str)
+    retcode = p.returncode
+  except Exception, e:
+    raise Exception("When running %s: %s" % (cmd, e))
+  if retcode < 0:
+    raise Exception(
+        "Child '%s' was terminated by signal %d. Output:\n%s\n%s\n" %
+        (cmd, -retcode, out, err))
+  elif retcode != expected_retcode:
+    raise Exception(
+        "Child '%s' returned %d. Output:\n%s\n%s\n" %
+        (cmd, retcode, out, err))
+  return (out, err)
+class TestCase(unittest.TestCase):
+  """Base of every TestCase class in this project.
+  This adds some methods that perhaps should be in unittest.TestCase.
+  """
+  # Note from Tom, Dec 9 2009: Be careful about adding setUp or tearDown
+  # because they will be run a few hundred times.
+  def assertMatchesRegex(self, regex, string):
+    """Assert that regex is found in string."""
+    if not re.search(regex, string):
+      self.fail("string %r did not match regex %r" % (string, regex))
+class GetPathTestCase(TestCase):
+  """TestCase with method to get paths to files in the distribution."""
+  def setUp(self):
+    super(GetPathTestCase, self).setUp()
+    self._origcwd = os.getcwd()
+  def GetExamplePath(self, name):
+    """Return the full path of a file in the examples directory"""
+    return self.GetPath('examples', name)
+  def GetTestDataPath(self, *path):
+    """Return the full path of a file in the test/data directory"""
+    return self.GetPath('test', 'data', *path)
+  def GetPath(self, *path):
+    """Return absolute path of path. path is relative main source directory."""
+    here = os.path.dirname(__file__)  # Relative to _origcwd
+    return os.path.join(self._origcwd, here, '..', *path)
+class TempDirTestCaseBase(GetPathTestCase):
+  """Make a temporary directory the current directory before running the test
+  and remove it after the test.
+  """
+  def setUp(self):
+    GetPathTestCase.setUp(self)
+    self.tempdirpath = tempfile.mkdtemp()
+    os.chdir(self.tempdirpath)
+  def tearDown(self):
+    os.chdir(self._origcwd)
+    shutil.rmtree(self.tempdirpath)
+    GetPathTestCase.tearDown(self)
+  def CheckCallWithPath(self, cmd, expected_retcode=0, stdin_str=""):
+    """Run python script cmd[0] with args cmd[1:], making sure 'import
+    transitfeed' will use the module in this source tree. Raises an Exception
+    if the return code is not expected_retcode. Returns a tuple of strings,
+    (stdout, stderr)."""
+    tf_path = transitfeed.__file__
+    # Path of the directory containing transitfeed. When this is added to
+    # sys.path importing transitfeed should work independent of if
+    # transitfeed.__file__ is <parent>/transitfeed.py or
+    # <parent>/transitfeed/__init__.py
+    transitfeed_parent = tf_path[:tf_path.rfind("transitfeed")]
+    transitfeed_parent = transitfeed_parent.replace("\\", "/").rstrip("/")
+    script_path = cmd[0].replace("\\", "/")
+    script_args = cmd[1:]
+    # Propogate sys.path of this process to the subprocess. This is done
+    # because I assume that if this process has a customized sys.path it is
+    # meant to be used for all processes involved in the tests.  The downside
+    # of this is that the subprocess is no longer a clean version of what you
+    # get when running "python" after installing transitfeed. Hopefully if this
+    # process uses a customized sys.path you know what you are doing.
+    env = {"PYTHONPATH": ":".join(sys.path)}
+    # Instead of directly running the script make sure that the transitfeed
+    # module in this source directory is at the front of sys.path. Then
+    # adjust sys.argv so it looks like the script was run directly. This lets
+    # OptionParser use the correct value for %proj.
+    cmd = [sys.executable, "-c",
+           "import sys; "
+           "sys.path.insert(0,'%s'); "
+           "sys.argv = ['%s'] + sys.argv[1:]; "
+           "exec(open('%s'))" %
+           (transitfeed_parent, script_path, script_path)] + script_args
+    return check_call(cmd, expected_retcode=expected_retcode, shell=False,
+                      env=env, stdin_str=stdin_str)
+  def ConvertZipToDict(self, zip):
+    """Converts a zip file into a dictionary.
+    Arguments:
+        zip: The zipfile whose contents are to be converted to a dictionary.
+    Returns:
+        A dictionary mapping filenames to file contents."""
+    zip_dict = {}
+    for archive_name in zip.namelist():
+      zip_dict[archive_name] = zip.read(archive_name)
+    zip.close()
+    return zip_dict
+  def ConvertDictToZip(self, dict):
+    """Converts a dictionary to an in-memory zipfile.
+    Arguments:
+        dict: A dictionary mapping file names to file contents
+    Returns:
+        The new file's in-memory contents as a file-like object."""
+    zipfile_mem = StringIO.StringIO()
+    zip = zipfile.ZipFile(zipfile_mem, 'a')
+    for arcname, contents in dict.items():
+      zip.writestr(arcname, contents)
+    zip.close()
+    return zipfile_mem
+#TODO(anog): Revisit this after we implement proper per-exception level change
+class RecordingProblemAccumulator(transitfeed.ProblemAccumulatorInterface):
+  """Save all problems for later inspection.
+  Args:
+    test_case: a unittest.TestCase object on which to report problems
+    ignore_types: sequence of string type names that will be ignored by the
+    ProblemAccumulator"""
+  def __init__(self, test_case, ignore_types=None):
+    self.exceptions = []
+    self._test_case = test_case
+    self._ignore_types = ignore_types or set()
+  def _Report(self, e):
+    # Ensure that these don't crash
+    e.FormatProblem()
+    e.FormatContext()
+    if e.__class__.__name__ in self._ignore_types:
+      return
+    # Keep the 7 nearest stack frames. This should be enough to identify
+    # the code path that created the exception while trimming off most of the
+    # large test framework's stack.
+    traceback_list = traceback.format_list(traceback.extract_stack()[-7:-1])
+    self.exceptions.append((e, ''.join(traceback_list)))
+  def PopException(self, type_name):
+    """Return the first exception, which must be a type_name."""
+    e = self.exceptions.pop(0)
+    e_name = e[0].__class__.__name__
+    self._test_case.assertEqual(e_name, type_name,
+                                "%s != %s\n%s" %
+                                (e_name, type_name, self.FormatException(*e)))
+    return e[0]
+  def FormatException(self, exce, tb):
+    return ("%s\nwith gtfs file context %s\nand traceback\n%s" %
+            (exce.FormatProblem(), exce.FormatContext(), tb))
+  def TearDownAssertNoMoreExceptions(self):
+    """Assert that there are no unexpected problems left after a test has run.
+       This function should be called on a test's tearDown. For more information
+       please see AssertNoMoreExceptions"""
+    assert len(self.exceptions) == 0, \
+        "see util.RecordingProblemAccumulator.AssertNoMoreExceptions"
+  def AssertNoMoreExceptions(self):
+    """Check that no unexpected problems were reported.
+    Every test that uses a RecordingProblemReporter should end with a call to
+    this method. If setUp creates a RecordingProblemReporter it is good for
+    tearDown to double check that the exceptions list was emptied.
+    """
+    exceptions_as_text = []
+    for e, tb in self.exceptions:
+      exceptions_as_text.append(self.FormatException(e, tb))
+    # If the assertFalse below fails the test will abort and tearDown is
+    # called. Some tearDown methods assert that self.exceptions is empty as
+    # protection against a test that doesn't end with AssertNoMoreExceptions
+    # and has exceptions remaining in the RecordingProblemReporter. It would
+    # be nice to trigger a normal test failure in tearDown but the idea was
+    # rejected (http://bugs.python.org/issue5531).
+    self.exceptions = []
+    self._test_case.assertFalse(exceptions_as_text,
+                                "\n".join(exceptions_as_text))
+  def PopInvalidValue(self, column_name, file_name=None):
+    e = self.PopException("InvalidValue")
+    self._test_case.assertEquals(column_name, e.column_name)
+    if file_name:
+      self._test_case.assertEquals(file_name, e.file_name)
+    return e
+  def PopMissingValue(self, column_name, file_name=None):
+    e = self.PopException("MissingValue")
+    self._test_case.assertEquals(column_name, e.column_name)
+    if file_name:
+      self._test_case.assertEquals(file_name, e.file_name)
+    return e
+  def PopDuplicateColumn(self, file_name, header, count):
+    e = self.PopException("DuplicateColumn")
+    self._test_case.assertEquals(file_name, e.file_name)
+    self._test_case.assertEquals(header, e.header)
+    self._test_case.assertEquals(count, e.count)
+    return e