sampledoc

Source code for Bcfg2.Server.Plugins.GroupPatterns

""" set group membership based on client hostnames """

import os
import re
import sys
import Bcfg2.Server.Lint
import Bcfg2.Server.Plugin
from Bcfg2.Utils import PackedDigitRange


class PatternMap(object):
    """ Handler for a single pattern or range """

    def __init__(self, pattern, rangestr, groups):
        self.pattern = pattern
        self.rangestr = rangestr
        self.groups = groups
        if pattern is not None:
            self.re = re.compile(pattern)
            self.process = self.process_re
        elif rangestr is not None:
            if '\\' in rangestr:
                raise Exception("Backslashes are not allowed in NameRanges")
            range_finder = r'\[\[[\d\-,]+\]\]'
            self.process = self.process_range
            self.re = re.compile(r'^' + re.sub(range_finder, r'(\d+)',
                                               rangestr))
            dmatcher = re.compile(re.sub(range_finder,
                                         r'\[\[([\d\-,]+)\]\]',
                                         rangestr))
            self.dranges = [PackedDigitRange(x)
                            for x in dmatcher.match(rangestr).groups()]
        else:
            raise Exception("No pattern or range given")

    def process_range(self, name):
        """ match the given hostname against a range-based NameRange """
        match = self.re.match(name)
        if not match:
            return None
        digits = match.groups()
        for grp in range(len(digits)):
            if not self.dranges[grp].includes(digits[grp]):
                return None
        return self.groups

    def process_re(self, name):
        """ match the given hostname against a regex-based NamePattern """
        match = self.re.search(name)
        if not match:
            return None
        ret = list()
        sub = match.groups()
        for group in self.groups:
            newg = group
            for idx in range(len(sub)):
                newg = newg.replace('$%s' % (idx + 1), sub[idx])
            ret.append(newg)
        return ret

    def __str__(self):
        return "%s: %s %s" % (self.__class__.__name__, self.pattern,
                              self.groups)


class PatternFile(Bcfg2.Server.Plugin.XMLFileBacked):
    """ representation of GroupPatterns config.xml """
    __identifier__ = None
    create = 'GroupPatterns'

    def __init__(self, filename, core=None):
        try:
            fam = core.fam
        except AttributeError:
            fam = None
        Bcfg2.Server.Plugin.XMLFileBacked.__init__(self, filename, fam=fam,
                                                   should_monitor=True)
        self.core = core
        self.patterns = []

    def Index(self):
        Bcfg2.Server.Plugin.XMLFileBacked.Index(self)
        if (self.core and
            self.core.metadata_cache_mode in ['cautious', 'aggressive']):
            self.core.metadata_cache.expire()
        self.patterns = []
        for entry in self.xdata.xpath('//GroupPattern'):
            try:
                groups = [g.text for g in entry.findall('Group')]
                for pat_ent in entry.findall('NamePattern'):
                    pat = pat_ent.text
                    self.patterns.append(PatternMap(pat, None, groups))
                for range_ent in entry.findall('NameRange'):
                    rng = range_ent.text
                    self.patterns.append(PatternMap(None, rng, groups))
            except:  # pylint: disable=W0702
                self.logger.error("GroupPatterns: Failed to initialize "
                                  "pattern %s: %s" % (entry.text,
                                                      sys.exc_info()[1]))

    def process_patterns(self, hostname):
        """ return a list of groups that should be added to the given
        client based on patterns that match the hostname """
        ret = []
        for pattern in self.patterns:
            try:
                grps = pattern.process(hostname)
                if grps is not None:
                    ret.extend(grps)
            except:  # pylint: disable=W0702
                self.logger.error("GroupPatterns: Failed to process pattern "
                                  "%s for %s" % (pattern.pattern, hostname),
                                  exc_info=1)
        return ret


class GroupPatterns(Bcfg2.Server.Plugin.Plugin,
                    Bcfg2.Server.Plugin.Connector):
    """ set group membership based on client hostnames """

    def __init__(self, core, datastore):
        Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
        Bcfg2.Server.Plugin.Connector.__init__(self)
        self.config = PatternFile(os.path.join(self.data, 'config.xml'),
                                  core=core)

    def get_additional_groups(self, metadata):
        return self.config.process_patterns(metadata.hostname)


[docs]class GroupPatternsLint(Bcfg2.Server.Lint.ServerPlugin): """ ``bcfg2-lint`` plugin to check all given :ref:`GroupPatterns <server-plugins-grouping-grouppatterns>` patterns for validity. This is simply done by trying to create a :class:`Bcfg2.Server.Plugins.GroupPatterns.PatternMap` object for each pattern, and catching exceptions and presenting them as ``bcfg2-lint`` errors.""" def Run(self): cfg = self.core.plugins['GroupPatterns'].config for entry in cfg.xdata.xpath('//GroupPattern'): groups = [g.text for g in entry.findall('Group')] self.check(entry, groups, ptype='NamePattern') self.check(entry, groups, ptype='NameRange') @classmethod def Errors(cls): return {"pattern-fails-to-initialize": "error"}
[docs] def check(self, entry, groups, ptype="NamePattern"): """ Check a single pattern for validity """ for el in entry.findall(ptype): pat = el.text try: if ptype == "NamePattern": PatternMap(pat, None, groups) else: PatternMap(None, pat, groups) except: # pylint: disable=W0702 err = sys.exc_info()[1] self.LintError("pattern-fails-to-initialize", "Failed to initialize %s %s for %s: %s" % (ptype, pat, entry.get('pattern'), err))