aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Harder <radhermit@gmail.com>2018-04-11 05:42:50 -0400
committerTim Harder <radhermit@gmail.com>2018-04-11 06:00:12 -0400
commit4c2ba4079bbd542523baf37970e4c583e29537a8 (patch)
treeac2c099f3576f580420a6591287bc1af9bdb1779 /tests/merge
parentmove to >=py3.6 only (diff)
downloadpkgcore-4c2ba4079bbd542523baf37970e4c583e29537a8.tar.gz
pkgcore-4c2ba4079bbd542523baf37970e4c583e29537a8.tar.bz2
pkgcore-4c2ba4079bbd542523baf37970e4c583e29537a8.zip
move tests to root dir
Diffstat (limited to 'tests/merge')
-rw-r--r--tests/merge/test_engine.py131
-rw-r--r--tests/merge/test_triggers.py651
-rw-r--r--tests/merge/util.py43
3 files changed, 825 insertions, 0 deletions
diff --git a/tests/merge/test_engine.py b/tests/merge/test_engine.py
new file mode 100644
index 000000000..b2874c838
--- /dev/null
+++ b/tests/merge/test_engine.py
@@ -0,0 +1,131 @@
+# Copyright: 2007-2010 Brian Harring <ferringb@gmail.com>
+# License: GPL2/BSD
+
+import os
+
+from snakeoil.osutils import pjoin
+from snakeoil.test import TestCase
+from snakeoil.test.mixins import tempdir_decorator
+
+from pkgcore.fs import livefs
+from pkgcore.fs.contents import contentsSet
+from pkgcore.merge import engine
+from pkgcore.test.fs.fs_util import fsFile, fsDir, fsSymlink
+from pkgcore.test.merge.util import fake_engine
+
+
+class fake_pkg(object):
+
+ def __init__(self, contents, label=None):
+ self.label = label
+ self.contents = contents
+
+ def __str__(self):
+ return "fake_pkg: %s" % self.label
+
+
+class Test_MergeEngineCsets(TestCase):
+
+ simple_cset = list(fsFile(x) for x in ("/foon", "/usr/dar", "/blah"))
+ simple_cset.extend(fsDir(x) for x in ("/usr", "/usr/lib"))
+ simple_cset.append(fsSymlink("/usr/lib/blah", "../../blah"))
+ simple_cset.append(fsSymlink("/broken-symlink", "dar"))
+ simple_cset = contentsSet(simple_cset, mutable=False)
+
+ kls = engine.MergeEngine
+
+ def assertCsetEqual(self, cset1, cset2):
+ if not isinstance(cset1, contentsSet):
+ cset1 = contentsSet(cset1)
+ if not isinstance(cset2, contentsSet):
+ cset2 = contentsSet(cset2)
+ self.assertEqual(cset1, cset2, reflective=False)
+
+ def assertCsetNotEqual(self, cset1, cset2):
+ if not isinstance(cset1, contentsSet):
+ cset1 = contentsSet(cset1)
+ if not isinstance(cset2, contentsSet):
+ cset2 = contentsSet(cset2)
+ self.assertNotEqual(cset1, cset2, reflective=False)
+
+ def run_cset(self, target, engine, *args):
+ return getattr(self.kls, target)(engine, engine.csets, *args)
+
+ def test_generate_offset_cset(self):
+ engine = fake_engine(csets={"new_cset":self.simple_cset},
+ offset='/')
+ def run(engine, cset):
+ return self.run_cset('generate_offset_cset', engine,
+ lambda e, c:c[cset])
+
+ self.assertCsetEqual(self.simple_cset, run(engine, 'new_cset'))
+ engine.offset = '/foon/'
+ run(engine, 'new_cset')
+ self.assertCsetEqual(self.simple_cset.insert_offset(engine.offset),
+ run(engine, 'new_cset'))
+
+ def test_get_pkg_contents(self):
+ new_cset = self.kls.get_pkg_contents(None, None, fake_pkg(self.simple_cset))
+ self.assertCsetEqual(self.simple_cset, new_cset)
+ # must differ; shouldn't be modifying the original cset
+ self.assertNotIdentical(self.simple_cset, new_cset)
+
+ def test_get_remove_cset(self):
+ files = contentsSet(self.simple_cset.iterfiles(invert=True))
+ engine = fake_engine(csets={'install':files,
+ 'old_cset':self.simple_cset})
+ self.assertCsetEqual(self.simple_cset.iterfiles(),
+ self.run_cset('get_remove_cset', engine))
+
+ def test_get_replace_cset(self):
+ files = contentsSet(self.simple_cset.iterfiles(invert=True))
+ engine = fake_engine(csets={'install':files,
+ 'old_cset':self.simple_cset})
+ self.assertCsetEqual(files,
+ self.run_cset('get_replace_cset', engine))
+
+ @tempdir_decorator
+ def test_rewrite_awareness(self):
+ src = contentsSet(self.simple_cset)
+ src.add(fsFile("/usr/lib/donkey"))
+ trg = src.difference(["/usr/lib/donkey"])
+ trg.add(fsFile("/usr/lib64/donkey"))
+ trg = trg.insert_offset(self.dir)
+ os.mkdir(pjoin(self.dir, 'usr'))
+ os.mkdir(pjoin(self.dir, 'usr', 'lib64'))
+ os.symlink('lib64', pjoin(self.dir, 'usr', 'lib'))
+ pkg = fake_pkg(src)
+ engine = self.kls.install(self.dir, pkg, offset=self.dir)
+ result = engine.csets['resolved_install']
+ self.assertEqual(sorted(result.iterfiles()), sorted(trg.iterfiles()))
+
+ @tempdir_decorator
+ def test_symlink_awareness(self):
+ src = contentsSet(self.simple_cset)
+ src.add(fsFile("/usr/lib/blah/donkey"))
+ trg = src.difference(["/usr/lib/blah/donkey"])
+ trg.add(fsFile("/blah/donkey"))
+ trg = trg.insert_offset(self.dir)
+ pkg = fake_pkg(src)
+ engine = self.kls.install(self.dir, pkg, offset=self.dir)
+ result = engine.csets['new_cset']
+ self.assertEqual(sorted(result.iterfiles()), sorted(trg.iterfiles()))
+ test_symlink_awareness.skip = "contentset should handle this"
+
+ @tempdir_decorator
+ def test__get_livefs_intersect_cset(self):
+ old_cset = self.simple_cset.insert_offset(self.dir)
+ # have to add it; scan adds the root node
+ old_cset.add(fsDir(self.dir))
+ os.mkdir(pjoin(self.dir, "usr"))
+ open(pjoin(self.dir, "usr", "dar"), 'w').close()
+ open(pjoin(self.dir, 'foon'), 'w').close()
+ # note that this *is* a sym in the cset; adding this specific
+ # check so that if the code differs, the test breaks, and the tests
+ # get updated (additionally, folks may not be aware of the potential)
+ open(pjoin(self.dir, 'broken-symlink'), 'w').close()
+ engine = fake_engine(csets={'test':old_cset})
+ existent = livefs.scan(self.dir)
+ generated = self.run_cset('_get_livefs_intersect_cset', engine,
+ 'test')
+ self.assertEqual(generated, existent)
diff --git a/tests/merge/test_triggers.py b/tests/merge/test_triggers.py
new file mode 100644
index 000000000..30c4a2a94
--- /dev/null
+++ b/tests/merge/test_triggers.py
@@ -0,0 +1,651 @@
+# Copyright: 2007-2011 Brian Harring <ferringb@gmail.com>
+# License: GPL2/BSD
+
+from functools import partial
+from itertools import izip
+from math import floor, ceil
+import os
+import shutil
+import time
+
+from snakeoil import process
+from snakeoil.currying import post_curry
+from snakeoil.osutils import pjoin, ensure_dirs, normpath
+from snakeoil.test import mixins, TestCase, SkipTest
+
+from pkgcore.fs import fs
+from pkgcore.merge import triggers, const
+from pkgcore.fs.contents import contentsSet
+from pkgcore.fs.livefs import gen_obj, scan
+from pkgcore.test.merge.util import fake_trigger, fake_engine, fake_reporter
+
+
+def _render_msg(func, msg, *args, **kwargs):
+ func(msg % (args if args else kwargs))
+
+def make_fake_reporter(**kwargs):
+ kwargs = dict((key, partial(_render_msg, val))
+ for key, val in kwargs.iteritems())
+ return fake_reporter(**kwargs)
+
+class TestBase(TestCase):
+
+ kls = fake_trigger
+
+ def mk_trigger(self, kls=None, **kwargs):
+ if kls is None:
+ kls = self.kls
+ return kls(**kwargs)
+
+ def test_default_attrs(self):
+ for x in ("required_csets", "_label", "_hooks", "_engine_types"):
+ self.assertEqual(None, getattr(self.kls, x),
+ msg="%s must exist and be None" % x)
+ self.assertEqual(50, self.kls.priority)
+
+ def test_label(self):
+ self.assertEqual(self.mk_trigger().label, str(self.kls.__name__))
+ self.assertEqual(fake_trigger().label, str(fake_trigger.__name__))
+ self.assertEqual(fake_trigger(_label='foon').label, 'foon')
+
+ def test_localize(self):
+ o = self.mk_trigger()
+ self.assertEqual(o, o.localize(None))
+
+ def test_get_required_csets(self):
+ self.assertEqual(fake_trigger(required_csets=None).get_required_csets(
+ None), None)
+ self.assertEqual(fake_trigger(required_csets=None).get_required_csets(
+ 1), None)
+ self.assertEqual(fake_trigger(required_csets=None).get_required_csets(
+ ""), None)
+ o = fake_trigger(required_csets={"foo":["dar"], "bar":1})
+ self.assertEqual(o.get_required_csets("foo"), ["dar"])
+ self.assertEqual(o.get_required_csets("bar"), 1)
+ self.assertEqual(fake_trigger(required_csets=("dar", "foo"))
+ .get_required_csets("bar"), ("dar", "foo"))
+ self.assertEqual(fake_trigger(required_csets=())
+ .get_required_csets(""), ())
+
+ def test_register(self):
+ engine = fake_engine(mode=1)
+ self.assertRaises(TypeError, self.mk_trigger(mode=1).register, engine)
+ self.assertRaises(TypeError, self.mk_trigger(mode=1, _hooks=2).register,
+ engine)
+ self.assertFalse(engine._triggers)
+
+ # shouldn't puke.
+ o = self.mk_trigger(mode=1, _hooks=("2"))
+ o.register(engine)
+ self.assertEqual(engine._triggers, [('2', o, None)])
+ engine._triggers = []
+
+ # verify it's treating "all csets" differently from "no csets"
+ o = self.mk_trigger(mode=1, _hooks=("2"), required_csets=())
+ o.register(engine)
+ self.assertEqual(engine._triggers, [('2', o, ())])
+
+ # should handle keyerror thrown from the engine for missing hooks.
+ engine = fake_engine(mode=1, blocked_hooks=("foon", "dar"))
+ self.mk_trigger(mode=1, _hooks="foon").register(engine)
+ self.mk_trigger(mode=1, _hooks=("foon", "dar")).register(engine)
+ self.assertFalse(engine._triggers)
+
+ o = self.mk_trigger(mode=1, _hooks=("foon", "bar"), required_csets=(3,))
+ o.register(engine)
+ self.assertEqual(engine._triggers, [('bar', o, (3,))])
+ engine._triggers = []
+ o = self.mk_trigger(mode=1, _hooks="bar", required_csets=None)
+ o.register(engine)
+ self.assertEqual(engine._triggers, [('bar', o, None)])
+
+ def test_call(self):
+ # test "I want all csets"
+ def get_csets(required_csets, csets, fallback=None):
+ o = self.mk_trigger(required_csets={1:required_csets, 2:fallback},
+ mode=(1,))
+ engine = fake_engine(csets=csets, mode=1)
+ o(engine, csets)
+ self.assertEqual([x[0] for x in o._called],
+ [engine]*len(o._called))
+ return [list(x[1:]) for x in o._called]
+
+ d = object()
+ self.assertEqual(get_csets(None, d, [1]), [[d]],
+ msg="raw csets mapping should be passed through without conversion"
+ " for required_csets=None")
+
+ self.assertEqual(get_csets([1,2], {1:1,2:2}), [[1, 2]],
+ msg="basic mapping through failed")
+ self.assertEqual(get_csets([], {}), [[]],
+ msg="for no required csets, must have no args passed")
+
+
+class test_module(TestCase):
+
+ def test_constants(self):
+ self.assertEqual(sorted([const.REPLACE_MODE, const.UNINSTALL_MODE]),
+ sorted(triggers.UNINSTALLING_MODES))
+ self.assertEqual(sorted([const.REPLACE_MODE, const.INSTALL_MODE]),
+ sorted(triggers.INSTALLING_MODES))
+
+
+class Test_mtime_watcher(mixins.TempDirMixin, TestCase):
+
+ kls = triggers.mtime_watcher
+
+ def test_identification(self):
+ o = [gen_obj(self.dir)]
+ t = self.kls()
+ t.set_state([self.dir])
+ self.assertEqual(list(t.saved_mtimes), o)
+ open(pjoin(self.dir, 'file'), 'w').close()
+ t.set_state([self.dir, pjoin(self.dir, 'file')])
+ self.assertEqual(list(t.saved_mtimes), o)
+ loc = pjoin(self.dir, 'dir')
+ os.mkdir(loc)
+ o.append(gen_obj(pjoin(self.dir, 'dir')))
+ o.sort()
+ t.set_state([x.location for x in o])
+ self.assertEqual(sorted(t.saved_mtimes), o)
+
+ # test syms.
+ src = pjoin(self.dir, 'dir2')
+ os.mkdir(src)
+ loc = pjoin(self.dir, 'foo')
+ os.symlink(src, loc)
+ locs = [x.location for x in o]
+
+ # insert a crap location to ensure it handles it.
+ locs.append(pjoin(self.dir, "asdfasdfasdfasfdasdfasdfasdfasdf"))
+
+ locs.append(src)
+ i = gen_obj(src, stat=os.stat(src))
+ o.append(i)
+ o.sort()
+ t.set_state(locs)
+ self.assertEqual(sorted(t.saved_mtimes), o)
+ locs[-1] = loc
+ o.remove(i)
+ i = i.change_attributes(location=loc)
+ o.append(i)
+ o.sort()
+ t.set_state(locs)
+ self.assertEqual(sorted(t.saved_mtimes), o)
+
+ o.remove(i)
+ os.rmdir(src)
+
+ # check stat_func usage; if lstat, the sym won't be derefed,
+ # thus ignored.
+ t.set_state(locs, stat_func=os.lstat)
+ self.assertEqual(sorted(t.saved_mtimes), o)
+ open(pjoin(self.dir, 'bar'), 'w').close()
+ self.assertTrue(t.check_state())
+
+ # test dead sym filtering for stat.
+ t.set_state(locs)
+ self.assertEqual(sorted(t.saved_mtimes), o)
+ self.assertFalse(t.check_state())
+
+ def test_float_mtime(self):
+ t = self.kls()
+ t.set_state([self.dir])
+ l = list(t.saved_mtimes)
+ self.assertEqual(len(l), 1)
+ l = l[0]
+ # mtime *must* be a float.
+ self.assertInstance(l.mtime, float)
+
+ def test_race_protection(self):
+ # note this isn't perfect- being a race, triggering it on
+ # demand is tricky.
+ # hence the 10x loop; can trigger it pretty much each loop
+ # for my 1ghz, so... it's a start.
+ # the race specifically will only rear its head on extremely
+ # fast io (crazy hardware, or async mount), fs's lacking subsecond,
+ # and just severely crappy chance.
+ # faster the io actions, easier it is to trigger.
+ t = self.kls()
+ for x in xrange(100):
+ now = ceil(time.time()) + 1
+ os.utime(self.dir, (now + 100, now + 100))
+ t.set_state([self.dir])
+ now, st_mtime = time.time(), os.stat(self.dir).st_mtime
+ now, st_mtime = ceil(now), floor(st_mtime)
+ self.assertTrue(now > st_mtime,
+ msg="%r must be > %r" % (now, st_mtime))
+
+
+def castrate_trigger(base_kls, **kwargs):
+ class castrated_trigger(base_kls):
+
+ enable_regen = False
+ def __init__(self, *args2, **kwargs2):
+ self._passed_in_args = []
+ base_kls.__init__(self, *args2, **kwargs2)
+
+ def regen(self, *args):
+ self._passed_in_args.append(list(args))
+ if self.enable_regen:
+ return base_kls.regen(self, *args)
+ return []
+
+ locals().update(kwargs)
+
+ return castrated_trigger
+
+
+class trigger_mixin(mixins.TempDirMixin):
+
+ def setUp(self):
+ mixins.TempDirMixin.setUp(self)
+ self.reset_objects()
+
+ def reset_objects(self, mode=const.INSTALL_MODE):
+ self.engine = fake_engine(offset=self.dir, mode=mode)
+ self.trigger = self.kls()
+
+ def assertPaths(self, expected, tested):
+ expected = sorted(expected)
+ tested = sorted(tested)
+ self.assertEqual(expected, tested,
+ msg="expected %r, got %r" % (expected, tested))
+
+
+class Test_ldconfig(trigger_mixin, TestCase):
+
+ # use the kls indirection for when *bsd version of ldconfig trigger
+ # is derived; will be pretty much the same, sans the trigger call.
+
+ kls = castrate_trigger(triggers.ldconfig)
+
+ def test_read_ld_so_conf(self):
+ # test the defaults first. should create etc and the file.
+ self.assertPaths(self.trigger.read_ld_so_conf(self.dir),
+ [pjoin(self.dir, x) for x in self.trigger.default_ld_path])
+ o = gen_obj(pjoin(self.dir, 'etc'))
+ self.assertEqual(o.mode, 0755)
+ self.assertTrue(fs.isdir(o))
+ self.assertTrue(os.path.exists(pjoin(self.dir, 'etc/ld.so.conf')))
+
+ # test normal functioning.
+ with open(pjoin(self.dir, 'etc/ld.so.conf'), 'w') as f:
+ f.write("\n".join(["/foon", "dar", "blarnsball", "#comment"]))
+ self.assertPaths(self.trigger.read_ld_so_conf(self.dir),
+ [pjoin(self.dir, x) for x in ["foon", "dar", "blarnsball"]])
+
+ def assertTrigger(self, touches, ran, dirs=['test-lib', 'test-lib2'],
+ hook='merge', mode=const.INSTALL_MODE, mkdirs=True, same_mtime=False):
+
+ # wipe whats there.
+ for x in scan(self.dir).iterdirs():
+ if x.location == self.dir:
+ continue
+ shutil.rmtree(x.location)
+ for x in scan(self.dir).iterdirs(True):
+ os.unlink(x.location)
+
+ ensure_dirs(pjoin(self.dir, "etc"))
+ with open(pjoin(self.dir, "etc/ld.so.conf"), "w") as f:
+ f.write("\n".join('/' + x for x in dirs))
+ # force directory mtime to 1s less.
+ past = time.time() - 10.0
+ if mkdirs:
+ for x in dirs:
+ ensure_dirs(pjoin(self.dir, x))
+ os.utime(pjoin(self.dir, x), (past, past))
+
+ self.reset_objects()
+ self.engine.phase = 'pre_%s' % hook
+ self.engine.mode = mode
+ self.trigger(self.engine, {})
+ self.assertFalse(self.trigger._passed_in_args)
+ resets = set()
+ for x in touches:
+ fp = pjoin(self.dir, x.lstrip('/'))
+ open(pjoin(fp), "w").close()
+ if same_mtime:
+ os.utime(fp, (past, past))
+ resets.add(os.path.dirname(fp))
+
+ for x in resets:
+ os.utime(x, (past, past))
+
+ self.engine.phase = 'post_%s' % hook
+ self.trigger(self.engine, {})
+
+ self.assertEqual([[getattr(x, 'offset', None) for x in y]
+ for y in self.trigger._passed_in_args],
+ [[self.dir]])
+
+ def test_trigger(self):
+ # ensure it doesn't explode for missing dirs.
+ #self.assertTrigger([], False, mkdirs=False)
+ #self.assertTrigger([], False)
+ self.assertTrigger(['test-lib/foon'], True)
+ self.assertTrigger(['test-lib/foon'], False, same_mtime=True)
+
+
+class TestInfoRegen(trigger_mixin, TestCase):
+
+ raw_kls = triggers.InfoRegen
+ @property
+ def kls(self):
+ return castrate_trigger(self.raw_kls, locations=['/'])
+
+ info_data = \
+"""INFO-DIR-SECTION Network Applications
+START-INFO-DIR-ENTRY
+* Wget: (wget). The non-interactive network downloader.
+END-INFO-DIR-ENTRY
+"""
+
+ def reset_objects(self, **kwargs):
+ trigger_mixin.reset_objects(self, **kwargs)
+ self.trigger.location = [self.dir]
+
+ def test_binary_path(self):
+ existing = os.environ.get("PATH", self)
+ try:
+ try:
+ path = process.find_binary('install-info')
+ except process.CommandNotFound:
+ path = None
+ self.assertEqual(path, self.trigger.get_binary_path())
+ if path is not self:
+ os.environ["PATH"] = ""
+ self.assertEqual(None, self.trigger.get_binary_path())
+ finally:
+ if existing is self:
+ os.environ.pop("PATH", None)
+ else:
+ os.environ["PATH"] = existing
+
+ def test_regen(self):
+ o = self.raw_kls()
+ path = o.get_binary_path()
+ if path is None:
+ raise SkipTest("can't verify regen behaviour due to install-info "
+ "not being available")
+ # test it without the directory existing.
+ self.assertEqual(list(o.regen(path, pjoin(self.dir, 'foo'))), [])
+ self.assertFalse(os.path.exists(pjoin(self.dir, 'foo')))
+ with open(pjoin(self.dir, "foo.info"), 'w') as f:
+ f.write(self.info_data)
+ # no issues.
+ self.assertEqual(list(o.regen(path, self.dir)), [])
+ self.assertTrue(os.path.exists(pjoin(self.dir, 'dir')),
+ msg="info dir file wasn't created")
+
+ # drop the last line, verify it returns that file.
+ with open(pjoin(self.dir, "foo2.info"), 'w') as f:
+ f.write('\n'.join(self.info_data.splitlines()[:-1]))
+ # should ignore \..* files
+ open(pjoin(self.dir, ".foo.info"), 'w').close()
+ os.unlink(pjoin(self.dir, 'dir'))
+ self.assertEqual(list(o.regen(path, self.dir)),
+ [pjoin(self.dir, 'foo2.info')])
+ self.assertTrue(os.path.exists(pjoin(self.dir, 'dir')),
+ msg="info dir file wasn't created")
+
+ def run_trigger(self, phase, expected_regen=[]):
+ l = []
+ self.engine.observer = make_fake_reporter(warn=l.append)
+ self.trigger._passed_in_args = []
+ self.engine.phase = phase
+ self.trigger(self.engine, {})
+ self.assertEqual(map(normpath, (x[1] for x in self.trigger._passed_in_args)),
+ map(normpath, expected_regen))
+ return l
+
+ def test_trigger(self):
+ if self.raw_kls().get_binary_path() is None:
+ raise SkipTest(
+ "can't verify regen behaviour due to install-info not being available")
+
+ cur = os.environ.get("PATH", self)
+ try:
+ os.environ.pop("PATH", None)
+ # shouldn't run if the binary is missing
+ # although it should warn, and this code will explode when it does.
+ self.engine.phase = 'post_merge'
+ self.assertEqual(None, self.trigger(self.engine, {}))
+ finally:
+ if cur is not self:
+ os.environ["PATH"] = cur
+
+ # verify it runs when dir is missing.
+ # doesn't create the file since no info files.
+ self.reset_objects()
+ self.assertFalse(self.run_trigger('pre_merge', []))
+ self.assertFalse(self.run_trigger('post_merge', [self.dir]))
+
+ # add an info, and verify it generated.
+ with open(pjoin(self.dir, 'foo.info'), 'w') as f:
+ f.write(self.info_data)
+ self.reset_objects()
+ self.trigger.enable_regen = True
+ self.assertFalse(self.run_trigger('pre_merge', []))
+ self.assertFalse(self.run_trigger('post_merge', [self.dir]))
+
+ # verify it doesn't; mtime is fine
+ self.reset_objects()
+ self.trigger.enable_regen = True
+ self.assertFalse(self.run_trigger('pre_merge', []))
+ self.assertFalse(self.run_trigger('post_merge', []))
+
+ # verify it handles quoting properly, and that it ignores
+ # complaints about duplicates.
+ self.reset_objects()
+ self.trigger.enable_regen = True
+ self.assertFalse(self.run_trigger('pre_merge', []))
+ with open(pjoin(self.dir, "blaidd drwg.info"), "w") as f:
+ f.write(self.info_data)
+ self.assertFalse(self.run_trigger('post_merge', [self.dir]))
+
+ # verify it passes back failures.
+ self.reset_objects()
+ self.trigger.enable_regen = True
+ self.assertFalse(self.run_trigger('pre_merge', []))
+ with open(pjoin(self.dir, "tiza grande.info"), "w") as f:
+ f.write('\n'.join(self.info_data.splitlines()[:-1]))
+ l = self.run_trigger('post_merge', [self.dir])
+ self.assertEqual(len(l), 1)
+ self.assertIn('tiza grande.info', l[0])
+
+ # verify it holds off on info regen till after unmerge for replaces.
+ self.reset_objects(mode=const.REPLACE_MODE)
+ self.assertFalse(self.run_trigger('pre_merge', []))
+ self.assertFalse(self.run_trigger('post_merge', []))
+ self.assertFalse(self.run_trigger('pre_unmerge', []))
+ os.unlink(pjoin(self.dir, "tiza grande.info"))
+ self.assertFalse(self.run_trigger('post_unmerge', [self.dir]))
+
+
+class single_attr_change_base(object):
+
+ kls = triggers.fix_uid_perms
+ attr = None
+
+ bad_val = 1
+
+ @staticmethod
+ def good_val(val):
+ return 2
+
+ def test_metadata(self):
+ self.assertEqual(self.kls._engine_types, triggers.INSTALLING_MODES)
+ self.assertEqual(self.kls.required_csets, ('new_cset',))
+ self.assertEqual(self.kls._hooks, ('pre_merge',))
+
+ @property
+ def trigger(self):
+ return self.kls(1, 2)
+
+ def assertContents(self, cset=()):
+ orig = sorted(cset)
+ new = contentsSet(orig)
+ self.trigger(fake_engine(mode=const.INSTALL_MODE),
+ {'new_cset':new})
+ new = sorted(new)
+ self.assertEqual(len(orig), len(new))
+ for x, y in izip(orig, new):
+ self.assertEqual(orig.__class__, new.__class__)
+ for attr in x.__attrs__:
+ if self.attr == attr:
+ val = getattr(x, attr)
+ if self.bad_val is not None and val == self.bad_val:
+ self.assertEqual(self.good_val(val), getattr(y, attr))
+ else:
+ self.assertEqual(self.good_val(val), getattr(y, attr))
+ elif attr != 'chksums':
+ # abuse self as unique singleton.
+ self.assertEqual(getattr(x, attr, self),
+ getattr(y, attr, self))
+
+ def test_trigger(self):
+ self.assertContents()
+ self.assertContents([fs.fsFile("/foon", mode=0644, uid=2, gid=1,
+ strict=False)])
+ self.assertContents([fs.fsFile("/foon", mode=0646, uid=1, gid=1,
+ strict=False)])
+ self.assertContents([fs.fsFile("/foon", mode=04766, uid=1, gid=2,
+ strict=False)])
+ self.assertContents([fs.fsFile("/blarn", mode=02700, uid=2, gid=2,
+ strict=False),
+ fs.fsDir("/dir", mode=0500, uid=2, gid=2, strict=False)])
+ self.assertContents([fs.fsFile("/blarn", mode=02776, uid=2, gid=2,
+ strict=False),
+ fs.fsDir("/dir", mode=02777, uid=1, gid=2, strict=False)])
+ self.assertContents([fs.fsFile("/blarn", mode=06772, uid=2, gid=2,
+ strict=False),
+ fs.fsDir("/dir", mode=04774, uid=1, gid=1, strict=False)])
+
+
+class Test_fix_uid_perms(single_attr_change_base, TestCase):
+
+ kls = triggers.fix_uid_perms
+ attr = 'uid'
+
+
+class Test_fix_gid_perms(single_attr_change_base, TestCase):
+
+ kls = triggers.fix_gid_perms
+ attr = 'gid'
+
+
+class Test_fix_set_bits(single_attr_change_base, TestCase):
+
+ kls = triggers.fix_set_bits
+ trigger = property(lambda self:self.kls())
+ attr = 'mode'
+
+ @staticmethod
+ def good_val(val):
+ if val & 06000 and val & 0002:
+ return val & ~06002
+ return val
+
+
+class Test_detect_world_writable(single_attr_change_base, TestCase):
+
+ kls = triggers.detect_world_writable
+ _trigger_override = None
+
+ attr = 'mode'
+
+ @property
+ def trigger(self):
+ if self._trigger_override is None:
+ return self.kls(fix_perms=True)
+ return self._trigger_override()
+
+ def good_val(self, val):
+ self.assertEqual(self._trigger_override, None,
+ msg="bug in test code; good_val should not be invoked when a "
+ "trigger override is in place.")
+ return val & ~0002
+
+ def test_lazyness(self):
+ # ensure it doesn't even look if it won't make noise, and no reporter
+ # cset is intentionally *not* a contentset; it'll explode it it tries
+ # to access it.
+ self.kls().trigger(fake_engine(), None)
+ # now verify that the explosion would occur if either settings are on.
+ self.assertRaises((AttributeError, TypeError),
+ self.kls().trigger, fake_engine(observer=object()), None)
+ self.assertRaises((AttributeError, TypeError),
+ self.kls(fix_perms=True).trigger, fake_engine(), None)
+
+ def test_observer_warn(self):
+ warnings = []
+ engine = fake_engine(observer=make_fake_reporter(warn=warnings.append))
+
+ self._trigger_override = self.kls()
+
+ def run(fs_objs, fix_perms=False):
+ self.kls(fix_perms=fix_perms).trigger(engine,
+ contentsSet(fs_objs))
+
+ run([fs.fsFile('/foon', mode=0770, strict=False)])
+ self.assertFalse(warnings)
+ run([fs.fsFile('/foon', mode=0772, strict=False)])
+ self.assertEqual(len(warnings), 1)
+ self.assertIn('/foon', warnings[0])
+
+ warnings[:] = []
+
+ run([fs.fsFile('/dar', mode=0776, strict=False),
+ fs.fsFile('/bar', mode=0776, strict=False),
+ fs.fsFile('/far', mode=0770, strict=False)])
+
+ self.assertEqual(len(warnings), 2)
+ self.assertIn('/dar', ' '.join(warnings))
+ self.assertIn('/bar', ' '.join(warnings))
+ self.assertNotIn('/far', ' '.join(warnings))
+
+
+class TestPruneFiles(TestCase):
+
+ kls = triggers.PruneFiles
+
+ def test_metadata(self):
+ self.assertEqual(self.kls.required_csets, ('new_cset',))
+ self.assertEqual(self.kls._hooks, ('pre_merge',))
+ self.assertEqual(self.kls._engine_types, triggers.INSTALLING_MODES)
+
+ def test_it(self):
+ orig = contentsSet([
+ fs.fsFile('/cheddar', strict=False),
+ fs.fsFile('/sporks-suck', strict=False),
+ fs.fsDir('/foons-rule', strict=False),
+ fs.fsDir('/mango', strict=False)
+ ])
+
+ engine = fake_engine(mode=const.INSTALL_MODE)
+ def run(func):
+ new = contentsSet(orig)
+ self.kls(func)(engine, {'new_cset':new})
+ return new
+
+ self.assertEqual(orig, run(lambda s:False))
+ self.assertEqual([], run(post_curry(isinstance, fs.fsDir)).dirs())
+ self.assertEqual(sorted(orig.files()),
+ sorted(run(post_curry(isinstance, fs.fsDir)).dirs(True)))
+
+ # check noisyness.
+ info = []
+ engine = fake_engine(observer=make_fake_reporter(info=info.append),
+ mode=const.REPLACE_MODE)
+
+ run(lambda s:False)
+ self.assertFalse(info)
+ run(post_curry(isinstance, fs.fsDir))
+ self.assertEqual(len(info), 2)
+
+ # ensure only the relevant files show.
+ self.assertNotIn('/cheddar', ' '.join(info))
+ self.assertNotIn('/sporks-suck', ' '.join(info))
+ self.assertIn('/foons-rule', ' '.join(info))
+ self.assertIn('/mango', ' '.join(info))
diff --git a/tests/merge/util.py b/tests/merge/util.py
new file mode 100644
index 000000000..170b249b0
--- /dev/null
+++ b/tests/merge/util.py
@@ -0,0 +1,43 @@
+# Copyright: 2007 Brian Harring <ferringb@gmail.com>
+# License: GPL2/BSD
+
+from functools import partial
+
+from pkgcore.merge import triggers
+
+
+class fake_trigger(triggers.base):
+
+ def __init__(self, **kwargs):
+ self._called = []
+ if isinstance(kwargs.get('_hooks', False), basestring):
+ kwargs['_hooks'] = (kwargs['_hooks'],)
+ for k, v in kwargs.iteritems():
+ if callable(v):
+ v = partial(v, self)
+ setattr(self, k, v)
+
+ def trigger(self, *args):
+ self._called.append(args)
+
+
+class fake_engine(object):
+
+ def __init__(self, **kwargs):
+ kwargs.setdefault('observer', None)
+ self._triggers = []
+ for k, v in kwargs.iteritems():
+ if callable(v):
+ v = partial(v, self)
+ setattr(self, k, v)
+
+ def add_trigger(self, hook_point, trigger, required_csets):
+ if hook_point in getattr(self, "blocked_hooks", []):
+ raise KeyError(hook_point)
+ self._triggers.append((hook_point, trigger, required_csets))
+
+
+class fake_reporter(object):
+ def __init__(self, **kwargs):
+ for k, v in kwargs.iteritems():
+ setattr(self, k, v)