Source code for Bcfg2.Server.Lint.Validate

"""Validate XML files.

Ensure that all XML files in the Bcfg2 repository validate according
to their respective schemas.

import glob
import os
import sys

import lxml.etree
from subprocess import Popen, PIPE, STDOUT

import Bcfg2.Server.Lint

[docs]class Validate(Bcfg2.Server.Lint.ServerlessPlugin): """ Ensure that all XML files in the Bcfg2 repository validate according to their respective schemas. """ def __init__(self, *args, **kwargs): Bcfg2.Server.Lint.ServerlessPlugin.__init__(self, *args, **kwargs) #: A dict of <file glob>: <schema file> that maps files in the #: Bcfg2 specification to their schemas. The globs are #: extended :mod:`fnmatch` globs that also support ``**``, #: which matches any number of any characters, including #: forward slashes. The schema files are relative to the #: schema directory, which can be controlled by the #: ``bcfg2-lint --schema`` option. self.filesets = \ {"Metadata/groups.xml": "metadata.xsd", "Metadata/clients.xml": "clients.xsd", "Cfg/**/info.xml": "info.xsd", "Cfg/**/privkey.xml": "privkey.xsd", "Cfg/**/pubkey.xml": "pubkey.xsd", "Cfg/**/authorizedkeys.xml": "authorizedkeys.xsd", "Cfg/**/authorized_keys.xml": "authorizedkeys.xsd", "SSHbase/**/info.xml": "info.xsd", "SSLCA/**/info.xml": "info.xsd", "TGenshi/**/info.xml": "info.xsd", "TCheetah/**/info.xml": "info.xsd", "Bundler/*.xml": "bundle.xsd", "Bundler/*.genshi": "bundle.xsd", "Pkgmgr/*.xml": "pkglist.xsd", "Base/*.xml": "base.xsd", "Rules/*.xml": "rules.xsd", "Defaults/*.xml": "defaults.xsd", "etc/report-configuration.xml": "report-configuration.xsd", "Deps/*.xml": "deps.xsd", "Decisions/*.xml": "decisions.xsd", "Packages/sources.xml": "packages.xsd", "GroupPatterns/config.xml": "grouppatterns.xsd", "AWSTags/config.xml": "awstags.xsd", "NagiosGen/config.xml": "nagiosgen.xsd", "FileProbes/config.xml": "fileprobes.xsd", "SSLCA/**/cert.xml": "sslca-cert.xsd", "SSLCA/**/key.xml": "sslca-key.xsd", "GroupLogic/groups.xml": "grouplogic.xsd" } self.filelists = {} self.get_filelists() def Run(self): schemadir = self.config['schema'] for path, schemaname in self.filesets.items(): try: filelist = self.filelists[path] except KeyError: filelist = [] if filelist: # avoid loading schemas for empty file lists schemafile = os.path.join(schemadir, schemaname) schema = self._load_schema(schemafile) if schema: for filename in filelist: self.validate(filename, schemafile, schema=schema) self.check_properties() @classmethod def Errors(cls): return {"schema-failed-to-parse": "warning", "properties-schema-not-found": "warning", "xml-failed-to-parse": "error", "xml-failed-to-read": "error", "xml-failed-to-verify": "error", "xinclude-does-not-exist": "error", "input-output-error": "error"}
[docs] def check_properties(self): """ Check Properties files against their schemas. """ for filename in self.filelists['props']: schemafile = "%s.xsd" % os.path.splitext(filename)[0] if os.path.exists(schemafile): self.validate(filename, schemafile) else: self.LintError("properties-schema-not-found", "No schema found for %s" % filename) # ensure that it at least parses self.parse(filename)
[docs] def parse(self, filename): """ Parse an XML file, raising the appropriate LintErrors if it can't be parsed or read. Return the lxml.etree._ElementTree parsed from the file. :param filename: The full path to the file to parse :type filename: string :returns: lxml.etree._ElementTree - the parsed data""" try: xdata = lxml.etree.parse(filename) if self.files is None: self._expand_wildcard_xincludes(xdata) xdata.xinclude() return xdata except (lxml.etree.XIncludeError, SyntaxError): cmd = ["xmllint", "--noout"] if self.files is None: cmd.append("--xinclude") cmd.append(filename) lint = Popen(cmd, stdout=PIPE, stderr=STDOUT) self.LintError("xml-failed-to-parse", "%s fails to parse:\n%s" % (filename, lint.communicate()[0])) lint.wait() return False except IOError: self.LintError("xml-failed-to-read", "Failed to open file %s" % filename) return False
def _expand_wildcard_xincludes(self, xdata): """ a lightweight version of :func:`Bcfg2.Server.Plugin.helpers.XMLFileBacked._follow_xincludes` """ xinclude = '%sinclude' % Bcfg2.Server.XI_NAMESPACE for el in xdata.findall('//' + xinclude): name = el.get("href") if name.startswith("/"): fpath = name else: fpath = os.path.join(os.path.dirname(xdata.docinfo.URL), name) # expand globs in xinclude, a bcfg2-specific extension extras = glob.glob(fpath) if not extras: msg = "%s: %s does not exist, skipping: %s" % \ (xdata.docinfo.URL, name, self.RenderXML(el)) if el.findall('./%sfallback' % Bcfg2.Server.XI_NAMESPACE): self.logger.debug(msg) else: self.LintError("xinclude-does-not-exist", msg) parent = el.getparent() parent.remove(el) for extra in extras: if extra != xdata.docinfo.URL: lxml.etree.SubElement(parent, xinclude, href=extra)
[docs] def validate(self, filename, schemafile, schema=None): """ Validate a file against the given schema. :param filename: The full path to the file to validate :type filename: string :param schemafile: The full path to the schema file to validate against :type schemafile: string :param schema: The loaded schema to validate against. This can be used to avoid parsing a single schema file for every file that needs to be validate against it. :type schema: lxml.etree.Schema :returns: bool - True if the file validates, false otherwise """ if schema is None: # if no schema object was provided, instantiate one schema = self._load_schema(schemafile) if not schema: return False datafile = self.parse(filename) if not datafile: return False if not schema.validate(datafile): cmd = ["xmllint"] if self.files is None: cmd.append("--xinclude") cmd.extend(["--noout", "--schema", schemafile, filename]) lint = Popen(cmd, stdout=PIPE, stderr=STDOUT) output = lint.communicate()[0] # py3k fix if not isinstance(output, str): output = output.decode('utf-8') if lint.wait(): self.LintError("xml-failed-to-verify", "%s fails to verify:\n%s" % (filename, output)) return False return True
[docs] def get_filelists(self): """ Get lists of different kinds of files to validate. This doesn't return anything, but it sets :attr:`Bcfg2.Server.Lint.Validate.Validate.filelists` to a dict whose keys are path globs given in :attr:`Bcfg2.Server.Lint.Validate.Validate.filesets` and whose values are lists of the full paths to all files in the Bcfg2 repository (or given with ``bcfg2-lint --stdin``) that match the glob.""" for path in self.filesets.keys(): if '/**/' in path: if self.files is not None: self.filelists[path] = self.list_matching_files(path) else: # self.files is None fpath, fname = path.split('/**/') self.filelists[path] = [] for root, _, files in \ os.walk(os.path.join(self.config['repo'], fpath)): self.filelists[path].extend([os.path.join(root, f) for f in files if f == fname]) else: self.filelists[path] = self.list_matching_files(path) self.filelists['props'] = self.list_matching_files("Properties/*.xml")
def _load_schema(self, filename): """ Load an XML schema document, returning the Schema object and raising appropriate lint errors on failure. :param filename: The full path to the schema file to load. :type filename: string :returns: lxml.etree.Schema - The loaded schema data """ try: return lxml.etree.XMLSchema(lxml.etree.parse(filename)) except IOError: err = sys.exc_info()[1] self.LintError("input-output-error", str(err)) except lxml.etree.XMLSchemaParseError: err = sys.exc_info()[1] self.LintError("schema-failed-to-parse", "Failed to process schema %s: %s" % (filename, err)) return None