[cig-commits] commit: Refactor ForestSnapshot into the Forest class.
Mercurial
hg at geodynamics.org
Mon Nov 24 11:27:09 PST 2008
changeset: 61:45f4c6176e27
user: Simon Law <simon at akoha.org>
date: Tue Aug 28 15:27:29 2007 -0400
files: forest.py tests/test-class-forest.py tests/test-forest.py
description:
Refactor ForestSnapshot into the Forest class.
Forest is a lot bigger but it's also a lot more flexible. This will
let us do more clever things without having to twiddle booleans.
diff -r df4e33bd7149 -r 45f4c6176e27 forest.py
--- a/forest.py Mon Aug 27 13:38:49 2007 -0400
+++ b/forest.py Tue Aug 28 15:27:29 2007 -0400
@@ -45,6 +45,7 @@ walkhg = (0|no|false|1|yes|true)
"""
import ConfigParser
+import errno
import os
import re
@@ -69,6 +70,11 @@ except AttributeError:
except AttributeError:
findcmd.findcmd = commands.findcmd
findcmd.__doc__ = commands.findcmd.__doc__
+
+try:
+ parseurl = hg.parseurl
+except:
+ parseurl = cmdutil.parseurl
cmdtable = None
@@ -344,69 +350,397 @@ def tree_sections(cfg, withtop=True):
return secs
-def mq_patches_applied(rpath):
- if rpath.startswith("ssh:"):
- raise util.Abort(_("'%s' starts with ssh:") % rpath)
- elif rpath.startswith("http:"):
- raise util.Abort(_("'%s' starts with http:") % rpath)
- elif rpath.startswith("file:"):
- rpath = rpath[len("file:"):]
- rpath = util.localpath(rpath)
- rpath = os.path.join(rpath, ".hg")
- entries = os.listdir(rpath)
- for e in entries:
- path = os.path.join(rpath, e)
- if os.path.isdir(path):
- series = os.path.join(path, "series")
- if os.path.isfile(series):
- s = os.stat(os.path.join(path, "status"))
- if s.st_size > 0:
- return True
- return False
+def die_on_numeric_revs(revs):
+ """Check to ensure that the revs passed in are not numeric.
+
+ Numeric revisions make no sense when searching a forest. You want
+ only named branches and tags. The only special exception is
+ revision -1, which occurs before the first checkin.
+ """
+ if revs is None:
+ return
+ for strrev in revs:
+ try:
+ intrev = int(strrev)
+ except:
+ continue # String-based revision
+ if intrev == 0 and strrev.startswith("00"):
+ continue # Revision -1
+ raise util.Abort(_("numeric revision '%s'") % strrev)
+
+
+def relpath(root, pathname):
+ """Returns the relative path of a local pathname from a local root."""
+ root = os.path.abspath(root)
+ pathname = os.path.abspath(pathname)
+ if root == pathname or pathname.startswith(root + os.sep):
+ pathname = os.path.normpath(pathname[len(root)+1:])
+ return pathname
+
+
+def urltopath(url):
+ if url and hg.islocal(url):
+ if url.startswith("file://"):
+ url = url[7:]
+ elif url.startswith("file:"):
+ url = url[5:]
+ return url
+
+
+class Forest(object):
+ """Describes the state of the forest within the current repository.
+
+ This data structure describes the Forest contained within the
+ current repository. It contains a list of Trees that describe
+ each sub-repository.
+ """
+
+ class SnapshotError(ConfigParser.NoSectionError):
+ pass
+
+ class Tree(object):
+ """Describe a local sub-repository within a forest."""
+
+ class Skip(Warning):
+ """Exception that signals this tree should be skipped."""
+ pass
+
+ __slots__ = ('_repo', '_root', 'revs', 'paths')
+
+ def __init__(self, repo=None, root=None, revs=[], paths={}):
+ """Create a Tree object.
+
+ repo may be any mercurial.localrepo object
+ root is the absolute path of this repo object
+ rev is the desired revision for this repository, None meaning the tip
+ paths is a dictionary of path aliases to real paths
+ """
+ self._repo = repo
+ self.revs = revs
+ if repo:
+ self.paths = {}
+ self.setrepo(repo)
+ self.paths.update(paths)
+ else:
+ self.setroot(root)
+ self.paths = paths
+
+ def die_on_mq(self, rootpath=None):
+ """Raises a util.Abort exception if self has mq patches applied."""
+ if self.mq_applied():
+ rpath = self.root
+ if rootpath:
+ if not isinstance(rootpath, str):
+ rootpath = rootpath.root
+ rpath = relpath(rootpath, rpath)
+ raise util.Abort(_("'%s' has mq patches applied") % rpath)
+
+ def mq_applied(self):
+ rpath = urltopath(self.root)
+ if not hg.islocal(rpath):
+ raise util.Abort(_("'%s' is not a local repository") % rpath)
+ rpath = util.localpath(rpath)
+ rpath = os.path.join(rpath, ".hg")
+ if not os.path.isdir(rpath):
+ return False
+ for entry in os.listdir(rpath):
+ path = os.path.join(rpath, entry)
+ if (os.path.isdir(path) and
+ os.path.isfile(os.path.join(path, 'series'))):
+ try:
+ s = os.stat(os.path.join(path, "status"))
+ if s.st_size > 0:
+ return path
+ except OSError, err:
+ if err.errno != errno.ENOENT:
+ raise
+ return False
+
+ def getpath(self, paths):
+ assert(type(paths) != str)
+ if paths is None:
+ return None
+ for path in paths:
+ if not hg.islocal(path):
+ return path
+ result = urltopath(path)
+ if os.path.isdir(result):
+ return result
+ result = urltopath(self.paths.get(path, None))
+ if result is not None:
+ return result
+ return None
+
+ def getrepo(self, ui=False):
+ if not self._repo and self._root:
+ if ui is False:
+ raise AttributeError("getrepo() requires 'ui' parameter")
+ self._repo = hg.repository(ui, self._root)
+ return self._repo
+
+ def setrepo(self, repo):
+ self._root = None
+ self._repo = repo
+ if repo.ui:
+ self.paths.update(dict(repo.ui.configitems('paths')))
+
+ def getroot(self):
+ if self._repo:
+ return self._repo.root
+ else:
+ return self._root
+
+ def setroot(self, root):
+ self._repo = None
+ self._root = root
+
+ @staticmethod
+ def skip(function):
+ """Decorator that turns any exception into a Forest.Tree.Skip"""
+ def skipme(*args, **keywords):
+ try:
+ function(*args, **keywords)
+ except Exception, err:
+ raise Forest.Tree.Skip(err)
+ return skipme
+
+ @staticmethod
+ def warn(function):
+ """Decorator that turns any exception into a Warning"""
+ def warnme(*args, **keywords):
+ try:
+ function(*args, **keywords)
+ except Exception, err:
+ raise Warning(err)
+ return warnme
+
+ def working_revs(self):
+ """Returns the revision of the working copy."""
+ ctx = self.repo.workingctx()
+ parents = ctx.parents()
+ return [node.hex(parents[0].node())]
+
+ def __repr__(self):
+ return ("<forest.Tree object "
+ "- repo: %s "
+ "- revs: %s "
+ "- root: %s "
+ "- paths: %s>") % (self.repo, self.revs,
+ self.root, self.paths)
+
+ repo = property(getrepo, setrepo, None, None)
+ root = property(getroot, setroot, None, None)
+
+ __slots__ = ('trees', 'snapfile')
+
+ def __init__(self, error=None, top=None, snapfile=None, walkhg=True):
+ """Create a Forest object.
+
+ top is the mercurial.localrepo object at the top of the forest.
+ snapfile is the filename of the snapshot file.
+ walkhg controls if we descend into .hg directories.
+
+ If you provide no snapfile, the top repo will be searched for
+ sub-repositories.
+
+ If you do provide a snapfile, then the snapfile will be read
+ for sub-repositories and no searching of the filesystem will
+ be done. The top repository is queried for the root of all
+ relative paths, but if it's missing, then the current
+ directory will be assumed.
+ """
+ if error:
+ raise AttributeError("__init__() takes only named arguments")
+ self.trees = []
+ self.snapfile = None
+ if snapfile:
+ self.snapfile = snapfile
+ if top is None:
+ toppath = ""
+ else:
+ toppath = top.root
+ self.read(snapfile, toppath)
+ elif top:
+ self.trees.append(Forest.Tree(repo=top))
+ self.scan(walkhg)
+
+ def apply(self, ui, function, paths, opts, prehooks=[]):
+ """Apply function(repo, targetpath, opts) to the entire forest.
+
+ path is a path provided on the command line.
+ function is a function that should be called for every repository.
+ opts is a list of options provided to the function
+ prehooks is a list of hook(tree) that are run before function()
+
+ Useful for the vast majority of commands that scan a local
+ forest and perform some command on each sub-repository.
+
+ Skips a sub-repository skipping it isn't actually a repository
+ or if it has mq patches applied.
+
+ In function(), targetpath will be /-separated. You may have
+ to util.localpath() it.
+ """
+ opts['force'] = None # Acting on unrelated repos is BAD
+ if paths:
+ # Extract revisions from # syntax in path.
+ paths[0], revs = parseurl(paths[0], opts['rev'])
+ else:
+ revs = opts['rev']
+ die_on_numeric_revs(revs)
+ for tree in self.trees:
+ rpath = relpath(self.top().root, tree.root)
+ ui.status("[%s]\n" % rpath)
+ try:
+ for hook in prehooks:
+ try:
+ hook(tree)
+ except Forest.Tree.Skip:
+ raise
+ except Warning, message:
+ ui.warn(_("warning: %s\n") % message)
+ except Forest.Tree.Skip, message:
+ ui.warn(_("skipped: %s\n") % message)
+ ui.status("\n")
+ continue
+ except util.Abort:
+ raise
+ if revs:
+ opts['rev'] = revs
+ else:
+ opts['rev'] = tree.revs
+ targetpath = paths or None
+ if paths:
+ targetpath = tree.getpath(paths)
+ if targetpath:
+ if targetpath == paths[0] and rpath != os.curdir:
+ targetpath = '/'.join((targetpath, util.pconvert(rpath)))
+ function(tree, targetpath, opts)
+ ui.status("\n")
+
+ def read(self, snapfile, toppath="."):
+ """Loads the information in snapfile into this forest.
+
+ snapfile is the filename of a snapshot file
+ toppath is the path of the top of this forest
+ """
+ if not toppath:
+ toppath = "."
+ cfg = ConfigParser.RawConfigParser()
+ if not cfg.read([snapfile]):
+ raise util.Abort("%s: %s" % (snapfile, os.strerror(errno.ENOENT)))
+ seen_root = False
+ sections = {}
+ for section in cfg.sections():
+ if section.endswith('.paths'):
+ # Compatibility with old Forest snapshot files
+ paths = dict(cfg.items(section))
+ section = section[:-6]
+ if section in sections:
+ sections[section].paths.update(paths)
+ else:
+ sections[section] = Forest.Tree(paths=paths)
+ else:
+ root = cfg.get(section, 'root')
+ if root == '.':
+ seen_root = True
+ root = toppath
+ else:
+ root = os.path.join(toppath, util.localpath(root))
+ root = os.path.normpath(root)
+ rev = cfg.get(section, 'revision')
+ if not rev:
+ rev = []
+ paths = dict([(k[5:], v)
+ for k, v in cfg.items(section)
+ if k.startswith('path')])
+ if section in sections:
+ sections[section].root = root
+ sections[section].revs = [rev]
+ sections[section].paths.update(paths)
+ else:
+ sections[section] = Forest.Tree(root=root,
+ revs=[rev],
+ paths=paths)
+ if not seen_root:
+ raise Forest.SnapshotError("Could not find 'root = .' in '%s'" %
+ snapfile)
+ self.trees = sections.values()
+ self.trees.sort(key=(lambda tree: tree.root))
+
+ def scan(self, walkhg):
+ """Scans for sub-repositories within this forest.
+
+ This method modifies this forest in-place. It searches within the
+ forest's directories and enumerates all the repositories it finds.
+ """
+ trees = []
+ top = self.top()
+ ui = top.repo.ui
+ for relpath in top.repo.forests(walkhg):
+ if relpath != '.':
+ abspath = os.path.join(top.root, util.localpath(relpath))
+ trees.append(Forest.Tree(hg.repository(ui, abspath)))
+ trees.sort(key=(lambda tree: tree.root))
+ trees.insert(0, Forest.Tree(hg.repository(ui, top.root)))
+ self.trees = trees
+
+ def top(self):
+ """Returns the top Forest.Tree in this forest."""
+ if len(self.trees):
+ return self.trees[0]
+ else:
+ return None
+
+ def update(self, ui=None):
+ """Gets the most recent information about repos."""
+ try:
+ if not ui:
+ ui = self.top().repo.ui
+ except:
+ pass
+ for tree in self.trees:
+ try:
+ repo = hg.repository(ui, tree.root)
+ except RepoError:
+ repo = None
+ tree.repo = repo
+
+ def write(self, fd, oldstyle=False):
+ """Writes a snapshot file to a file descriptor."""
+ counter = 1
+ for tree in self.trees:
+ fd.write("[tree%s]\n" % counter)
+ root = relpath(self.top().root, tree.root)
+ if root == os.curdir:
+ root = '.'
+ root = util.normpath(root)
+ fd.write("root = %s\n" % root)
+ if tree.revs:
+ fd.write("revision = %s\n" % tree.revs[0])
+ else:
+ fd.write("revision = None\n")
+ if not oldstyle:
+ for name, path in tree.paths.items():
+ fd.write("path.%s = %s\n" % (name, path))
+ else:
+ fd.write("\n[tree%s.paths]\n" % counter)
+ for name, path in tree.paths.items():
+ fd.write("%s = %s\n" % (name, path))
+ fd.write("\n")
+ counter += 1
+
+
+ def __repr__(self):
+ return ("<forest.Forest object - trees: %s> ") % self.trees
class ForestSnapshot(object):
- class Tree(object):
-
- __slots__ = ('root', 'rev', 'paths')
-
- def __init__(self, root, rev, paths={}):
- self.root = root
- self.rev = rev
- self.paths = paths
-
- def info(self, pathalias):
- return self.root, self.rev, self.paths.get(pathalias, None)
-
- def update(self, rev, paths):
- self.rev = rev
- for name, path in paths.items():
- if self.paths.has_key(name):
- self.paths[name] = path
-
- def write(self, ui, section):
- ui.write("root = %s\n" % self.root)
- ui.write("revision = %s\n" % self.rev)
- ui.write("\n[%s]\n" % (section + ".paths"))
- for name, path in self.paths.items():
- ui.write("%s = %s\n" % (name, path))
-
-
- __slots__ = ('rootmap', 'trees')
+ __slots__ = ('forest')
def __init__(self, snapfile=None):
- self.rootmap = {}
- self.trees = []
- if snapfile is not None:
- cfg = ConfigParser.RawConfigParser()
- cfg.read([snapfile])
- for section in tree_sections(cfg):
- root = cfg.get(section, 'root')
- tree = ForestSnapshot.Tree(root, cfg.get(section, 'revision'),
- dict(cfg.items(section + '.paths')))
- self.rootmap[root] = tree
- self.trees.append(tree)
+ self.forest = Forest(snapfile=snapfile)
def __call__(self, ui, toprepo, func, pathalias=None, mq_check=True):
"""Apply a function to trees matching a snapshot entry.
@@ -415,28 +749,26 @@ class ForestSnapshot(object):
toprepo and its nested repositories where repo matches a
snapshot entry.
"""
-
- repo = None
+ if self.forest.snapfile:
+ self.forest = Forest(snapfile=self.forest.snapfile,
+ top=toprepo)
+ self.forest.update(ui)
pfx = toprepo.url()
- for t in self.trees:
- root, rev, path = t.info(pathalias)
- ui.write("[%s]\n" % root)
+ for t in self.forest.trees:
+ root = relpath(self.forest.top().root, t.root)
+ ui.status("[%s]\n" % root)
+ path = t.paths.get(pathalias, None)
if pathalias is not None and path is None:
- ui.write(_("skipped, no path alias '%s' defined\n\n")
+ ui.warn(_("skipped, no path alias '%s' defined\n\n")
% pathalias)
continue
- if repo is None:
- repo = toprepo
- else:
- try:
- rpath = os.path.join(pfx, util.localpath(root))
- repo = hg.repository(ui, util.pconvert(rpath))
- except RepoError:
- ui.write(_("skipped, no valid repo found\n\n"))
- continue
- func(repo, root, path, rev,
- mq_check and mq_patches_applied(repo.url()))
- ui.write("\n")
+ if not t.repo:
+ ui.warn(_("skipped, no valid repo found\n\n"))
+ rev = None
+ if t.revs:
+ rev = t.revs[0]
+ func(t.repo, root, path, rev, (mq_check and t.mq_applied()))
+ ui.status("\n")
def update(self, ui, repo, mq_fatal, walkhg='', tip=False):
@@ -449,39 +781,21 @@ class ForestSnapshot(object):
from the corresponding repository.
"""
- rootmap = {}
- self.trees = []
- top = repo.url()
- if hasattr(repo, "root"):
- top = repo.root
- for relpath in repo.forests(walkhg):
- abspath = os.path.join(top, util.localpath(relpath))
- if relpath != '.':
- repo = hg.repository(ui, abspath)
- if mq_fatal and mq_patches_applied(abspath):
- raise util.Abort(_("'%s' has mq patches applied") % relpath)
- if tip:
- rev = None
- else:
- rev = node.hex(repo.dirstate.parents()[0])
- paths = dict(repo.ui.configitems('paths'))
- if self.rootmap.has_key(relpath):
- tree = self.rootmap[relpath]
- tree.update(rev, paths)
- else:
- tree = ForestSnapshot.Tree(relpath, rev, paths)
- rootmap[relpath] = tree
- self.trees.append(tree)
- self.rootmap = rootmap
+ if self.forest.top():
+ self.forest.update(ui)
+ else:
+ if repo:
+ self.forest = Forest(top=repo)
+ self.forest.scan(walkhg)
+ if mq_fatal or not tip:
+ for tree in self.forest.trees:
+ if mq_fatal:
+ tree.die_on_mq(self.forest.top())
+ if not tip:
+ tree.revs = tree.working_revs()
def write(self, ui):
- index = 1
- for t in self.trees:
- section = 'tree' + str(index)
- ui.write("[%s]\n" % section)
- t.write(ui, section)
- ui.write("\n")
- index += 1
+ self.forest.write(ui, oldstyle=True)
def clone(ui, source, dest, walkhg, **opts):
diff -r df4e33bd7149 -r 45f4c6176e27 tests/test-class-forest.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test-class-forest.py Tue Aug 28 15:27:29 2007 -0400
@@ -0,0 +1,324 @@
+import nose
+from nose.tools import (assert_equals, assert_false, assert_not_equals,
+ assert_raises, assert_true, with_setup)
+
+import os
+from subprocess import (Popen, PIPE)
+import shutil
+import sys
+
+from mercurial import (hg, localrepo, ui, util)
+from mercurial.repo import RepoError
+from forest import (Forest, relpath)
+from forest import (die_on_numeric_revs, relpath, urltopath, Forest)
+
+TESTREPO = os.path.join(os.path.dirname(__file__), 'repo')
+
+
+def run(cmd, cwd=None):
+ print "$", " ".join(cmd)
+ p = Popen(cmd, cwd=cwd, stdout=PIPE, stderr=PIPE)
+ output = p.communicate()[0]
+ run.returncode = p.returncode
+ sys.stdout.write(output)
+ return output
+
+
+def create_repo():
+ paths = [TESTREPO,
+ os.path.join(TESTREPO, 'a'),
+ os.path.join(TESTREPO, 'b'),
+ os.path.join(TESTREPO, 'c')]
+ if os.path.isdir(TESTREPO):
+ shutil.rmtree(TESTREPO)
+ for path in paths:
+ os.mkdir(path)
+ run(["hg", "init"], cwd=path)
+ run(["hg", "qinit", "-c"], cwd=path)
+ fd = open(os.path.join(TESTREPO, "README"), 'w')
+ print >>fd, "zero"
+ fd.close()
+ run(["hg", "add", "README"], cwd=TESTREPO)
+ run(["hg", "ci", "-m", "zero"], cwd=TESTREPO)
+ fd = open(os.path.join(TESTREPO, "README"), 'w')
+ print >>fd, "one"
+ fd.close()
+ run(["hg", "add", "README"], cwd=TESTREPO)
+ run(["hg", "ci", "-m", "one"], cwd=TESTREPO)
+ fd = open(os.path.join(TESTREPO, "README"), 'w')
+ print >>fd, "two"
+ fd.close()
+ run(["hg", "add", "README"], cwd=TESTREPO)
+ run(["hg", "ci", "-m", "two"], cwd=TESTREPO)
+
+
+ at with_setup(create_repo)
+def test_Tree_init():
+ top = hg.repository(None, TESTREPO)
+
+ tree = Forest.Tree()
+ assert_equals(tree.repo, None)
+ assert_equals(tree.root, None)
+ assert_equals(tree.revs, [])
+ assert_equals(tree.paths, {})
+
+ tree = Forest.Tree(repo=top)
+ assert_equals(tree.repo, top)
+ assert_equals(tree.root, TESTREPO)
+ assert_equals(tree.revs, [])
+ assert_equals(tree.paths, {})
+
+
+ at with_setup(create_repo)
+def test_Tree_die_on_mq():
+ forest = Forest(top=hg.repository(None, TESTREPO))
+ tree = forest.top()
+ tree.die_on_mq()
+ try:
+ os.unlink(os.path.join(TESTREPO, ".hg", "patches", "status"))
+ except OSError:
+ pass
+ tree.die_on_mq()
+ print run(["hg", "qnew", "patch"], cwd=TESTREPO)
+ try:
+ tree.die_on_mq()
+ except util.Abort, err:
+ assert_equals(str(err), "'%s' has mq patches applied" % TESTREPO)
+ print run(["hg", "qpop", "-a"], cwd=TESTREPO)
+ tree.die_on_mq()
+ print run(["hg", "qpush", "-a"], cwd=TESTREPO)
+ try:
+ tree.die_on_mq(tree)
+ except util.Abort, err:
+ assert_equals(str(err), "'.' has mq patches applied")
+
+
+ at with_setup(create_repo)
+def test_Tree_mq_applied():
+ forest = Forest(top=hg.repository(None, TESTREPO))
+ tree = forest.top()
+ assert_false(tree.mq_applied())
+ try:
+ os.unlink(os.path.join(TESTREPO, ".hg", "patches", "status"))
+ except OSError:
+ pass
+ assert_false(tree.mq_applied())
+ print run(["hg", "qnew", "patch"], cwd=TESTREPO)
+ assert_true(tree.mq_applied())
+ print run(["hg", "qpop", "-a"], cwd=TESTREPO)
+ assert_false(tree.mq_applied())
+ print run(["hg", "qpush", "-a"], cwd=TESTREPO)
+ assert_true(tree.mq_applied())
+
+
+ at with_setup(create_repo)
+def test_Tree_getpath():
+ forest = Forest(top=hg.repository(None, TESTREPO))
+ tree = forest.top()
+ assert_equals(tree.getpath(None), None)
+ assert_equals(tree.getpath(["/tmp"]), "/tmp")
+ assert_equals(tree.getpath(["file:///tmp"]), "/tmp")
+ assert_equals(tree.getpath(["file:/tmp"]), "/tmp")
+ assert_equals(tree.getpath(["hg://localhost"]), "hg://localhost")
+ assert_equals(tree.getpath(["http://localhost"]), "http://localhost")
+ assert_equals(tree.getpath(["https://localhost"]), "https://localhost")
+ assert_equals(tree.getpath(["old-http://localhost"]), "old-http://localhost")
+ assert_equals(tree.getpath(["static-http://localhost"]), "static-http://localhost")
+ assert_equals(tree.getpath(["ssh://localhost"]), "ssh://localhost")
+ assert_equals(tree.getpath(["default"]), None)
+ fd = open(os.path.join(TESTREPO, ".hg", "hgrc"), "a")
+ fd.write("\n"
+ "[paths]\n"
+ "default = /tmp\n"
+ "default-push = http://localhost\n"
+ "simon = file:///tmp\n")
+ fd.close()
+ assert_equals(tree.getpath(["default"]), None)
+ print "tree.paths =", tree.paths
+ forest.update()
+ print "tree.paths =", tree.paths
+ assert_equals(tree.getpath(["default"]), "/tmp")
+ assert_equals(tree.getpath(["simon"]), "/tmp")
+ assert_equals(tree.getpath(["default-push"]), "http://localhost")
+ assert_equals(tree.getpath(["law", "default-push", "default"]),
+ "http://localhost")
+
+
+ at with_setup(create_repo)
+def test_Tree_repo():
+ tree = Forest.Tree(root=TESTREPO)
+ assert_raises(AttributeError, tree.getrepo)
+ repo = tree.getrepo(None)
+ assert_true(isinstance(repo, localrepo.localrepository), type(repo))
+ assert_equals(tree.root, TESTREPO)
+ assert_equals(tree.revs, [])
+ assert_equals(tree.paths, {})
+ tree = Forest.Tree(root="")
+ assert_equals(tree.repo, None)
+ assert_equals(tree.root, "")
+ top = hg.repository(None, TESTREPO)
+ tree.repo = top
+ assert_equals(tree.repo, top)
+ assert_equals(tree.root, TESTREPO)
+ tree.root = TESTREPO
+ try:
+ tree.repo
+ except AttributeError, err:
+ assert_equals(str(err), "getrepo() requires 'ui' parameter")
+ assert_equals(tree.getrepo(None).root, TESTREPO)
+ assert_equals(tree.root, TESTREPO)
+ tree.root = "bad-repo"
+ try:
+ tree.repo
+ except AttributeError, err:
+ assert_equals(str(err), "getrepo() requires 'ui' parameter")
+ try:
+ tree.getrepo(None).root
+ except RepoError, err:
+ assert_equals(str(err), "repository bad-repo not found")
+ assert_equals(tree.root, "bad-repo")
+
+
+ at with_setup(create_repo)
+def test_Tree_root():
+ top = hg.repository(None, TESTREPO)
+ tree = Forest.Tree(repo=top)
+ assert_equals(tree.repo, top)
+ assert_equals(tree.root, TESTREPO)
+ tree.root = ""
+ assert_equals(tree.repo, None)
+ assert_equals(tree.root, "")
+ tree.root = None
+ assert_equals(tree.repo, None)
+ assert_equals(tree.root, None)
+
+
+def test_Tree_skip():
+ @Forest.Tree.skip
+ def failing(message):
+ raise Exception(message)
+ assert_raises(Forest.Tree.Skip, failing)
+ assert_raises(Forest.Tree.Skip, failing, "message")
+ try:
+ failing("hello")
+ except Exception, message:
+ assert_equals(str(message), "hello")
+
+
+def test_Tree_warn():
+ @Forest.Tree.warn
+ def warning(message):
+ raise Exception(message)
+ assert_raises(Warning, warning)
+ assert_raises(Warning, warning, "message")
+ try:
+ warning("hello")
+ except Exception, message:
+ assert_equals(str(message), "hello")
+ pass
+
+
+ at with_setup(create_repo)
+def test_Tree_working_revs():
+ forest = Forest(top=hg.repository(None, TESTREPO))
+ print "forest =", forest, "\n"
+ rev = run(["hg", "identify", "--debug"], cwd=TESTREPO)
+ print rev
+ rev = rev.split()[0]
+ assert_equals(forest.trees[0].working_revs(), [rev])
+ assert_equals(forest.trees[1].working_revs(),
+ ['0000000000000000000000000000000000000000'])
+ print run(["hg", "up", "0"], cwd=TESTREPO)
+ rev = run(["hg", "identify", "--debug"], cwd=TESTREPO)
+ print rev
+ rev = rev.split()[0]
+ forest.scan(walkhg=True)
+ print "forest =", forest, "\n"
+ assert_equals(forest.trees[0].working_revs(), [rev])
+ assert_equals(forest.trees[1].working_revs(),
+ ['0000000000000000000000000000000000000000'])
+
+
+def test_init():
+ forest = Forest()
+ print "forest =", forest
+ assert_equals(forest.snapfile, None)
+ assert_equals(forest.trees, [])
+
+
+def test_apply():
+ pass
+
+
+def test_read():
+ pass
+
+
+ at with_setup(create_repo)
+def test_scan():
+ user_interface = ui.ui()
+ top = hg.repository(user_interface, TESTREPO)
+ print "top.root =", top.root
+ forest = Forest(top=top)
+ print "forest =", forest
+ forest.scan(walkhg=True)
+ roots = [TESTREPO,
+ os.path.join(TESTREPO, '.hg', 'patches'),
+ os.path.join(TESTREPO, 'a'),
+ os.path.join(TESTREPO, 'a', '.hg', 'patches'),
+ os.path.join(TESTREPO, 'b'),
+ os.path.join(TESTREPO, 'b', '.hg', 'patches'),
+ os.path.join(TESTREPO, 'c'),
+ os.path.join(TESTREPO, 'c', '.hg', 'patches')]
+ assert_not_equals(forest.trees[0].repo, top)
+ assert_equals(forest.trees[0].repo.root, roots[0])
+ assert_equals(forest.trees[1].repo.root, roots[1])
+ assert_equals(forest.trees[2].repo.root, roots[2])
+ assert_equals(forest.trees[3].repo.root, roots[3])
+ assert_equals(forest.trees[4].repo.root, roots[4])
+ assert_equals(forest.trees[5].repo.root, roots[5])
+ assert_equals(forest.trees[6].repo.root, roots[6])
+ assert_equals(forest.trees[7].repo.root, roots[7])
+ try:
+ forest.trees[8].repo.root
+ except Exception, err:
+ assert_true(isinstance(err, IndexError))
+ for tree in forest.trees:
+ assert_equals(tree.repo.ui.parentui, user_interface)
+
+
+ at with_setup(create_repo)
+def test_top():
+ forest = Forest()
+ assert_equals(forest.top(), None)
+
+ top = hg.repository(None, TESTREPO)
+ forest = Forest(top=top)
+ assert_equals(forest.top().root, TESTREPO)
+
+
+ at with_setup(create_repo)
+def test_update():
+ forest = Forest(top=hg.repository(None, TESTREPO))
+ print "forest =", forest, "\n"
+ rev = run(["hg", "identify", "--debug"], cwd=TESTREPO)
+ print rev
+ rev = rev.split()[0]
+ assert_equals(forest.top().working_revs(), [rev])
+ print run(["hg", "up", "0"], cwd=TESTREPO)
+ forest.update()
+ print "forest =", forest, "\n"
+ user_interface = ui.ui()
+ forest.update(ui=user_interface)
+ for tree in forest.trees:
+ assert_equals(tree.repo.ui.parentui, user_interface)
+ rev = run(["hg", "identify", "--debug"], cwd=TESTREPO)
+ print rev
+ rev = rev.split()[0]
+ assert_equals(forest.trees[0].working_revs(), [rev])
+
+
+def test_write():
+ pass
+
+
diff -r df4e33bd7149 -r 45f4c6176e27 tests/test-forest.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/test-forest.py Tue Aug 28 15:27:29 2007 -0400
@@ -0,0 +1,41 @@
+import nose
+from nose.tools import (assert_equals, assert_false, assert_not_equals,
+ assert_raises, assert_true, with_setup)
+
+import os
+
+from mercurial import util
+from forest import (die_on_numeric_revs, relpath, urltopath)
+
+TESTREPO = "/tmp"
+
+def test_die_on_numeric_revs():
+ die_on_numeric_revs(None)
+ die_on_numeric_revs([""])
+ die_on_numeric_revs(["000"])
+ die_on_numeric_revs(["0000"])
+ assert_raises(util.Abort, die_on_numeric_revs, ["0001"])
+ assert_raises(util.Abort, die_on_numeric_revs, ["15"])
+ die_on_numeric_revs(["0.9.4"])
+ die_on_numeric_revs(["default"])
+ die_on_numeric_revs(["tip"])
+
+
+def test_relpath():
+ assert_equals(relpath(TESTREPO, TESTREPO), '.')
+ assert_equals(relpath(TESTREPO, TESTREPO + os.sep), '.')
+ assert_equals(relpath(TESTREPO, os.path.join(TESTREPO, '.')), '.')
+ assert_equals(relpath(TESTREPO, os.path.join(TESTREPO, '.hg', 'patches')),
+ os.path.join('.hg', 'patches'))
+
+
+def test_urltopath():
+ assert_equals(urltopath(None), None)
+ assert_equals(urltopath(""), "")
+ assert_equals(urltopath("foo"), "foo")
+ assert_equals(urltopath("/tmp"), "/tmp")
+ assert_equals(urltopath("file:tmp"), "tmp")
+ assert_equals(urltopath("file:/tmp"), "/tmp")
+ assert_equals(urltopath("file://tmp"), "tmp")
+ assert_equals(urltopath("file:///tmp"), "/tmp")
+ assert_equals(urltopath("http://localhost"), "http://localhost")
More information about the CIG-COMMITS
mailing list