""" Bcfg2.Encryption provides a number of convenience methods for
handling encryption in Bcfg2. See :ref:`server-encryption` for more
details. """
import os
import sys
from M2Crypto import Rand
from M2Crypto.EVP import Cipher, EVPError
from Bcfg2.Compat import StringIO, md5, b64encode, b64decode
#: Constant representing the encryption operation for
#: :class:`M2Crypto.EVP.Cipher`, which uses a simple integer. This
#: makes our code more readable.
ENCRYPT = 1
#: Constant representing the decryption operation for
#: :class:`M2Crypto.EVP.Cipher`, which uses a simple integer. This
#: makes our code more readable.
DECRYPT = 0
#: Default cipher algorithm. To get a full list of valid algorithms,
#: you can run::
#:
#: openssl list-cipher-algorithms | grep -v ' => ' | \
#: tr 'A-Z-' 'a-z_' | sort -u
ALGORITHM = "aes_256_cbc"
#: Default initialization vector. For best security, you should use a
#: unique IV for each message. :func:`ssl_encrypt` does this in an
#: automated fashion.
IV = r'\0' * 16
#: The config file section encryption options and passphrases are
#: stored in
CFG_SECTION = "encryption"
#: The config option used to store the algorithm
CFG_ALGORITHM = "algorithm"
#: The config option used to store the decryption strictness
CFG_DECRYPT = "decrypt"
Rand.rand_seed(os.urandom(1024))
def _cipher_filter(cipher, instr):
""" M2Crypto reads and writes file-like objects, so this uses
StringIO to pass data through it """
inbuf = StringIO(instr)
outbuf = StringIO()
while 1:
buf = inbuf.read()
if not buf:
break
outbuf.write(cipher.update(buf))
outbuf.write(cipher.final())
rv = outbuf.getvalue()
inbuf.close()
outbuf.close()
return rv
[docs]def str_encrypt(plaintext, key, iv=IV, algorithm=ALGORITHM, salt=None):
""" Encrypt a string with a key. For a higher-level encryption
interface, see :func:`ssl_encrypt`.
:param plaintext: The plaintext data to encrypt
:type plaintext: string
:param key: The key to encrypt the data with
:type key: string
:param iv: The initialization vector
:type iv: string
:param algorithm: The cipher algorithm to use
:type algorithm: string
:param salt: The salt to use
:type salt: string
:returns: string - The decrypted data
"""
cipher = Cipher(alg=algorithm, key=key, iv=iv, op=ENCRYPT, salt=salt)
return _cipher_filter(cipher, plaintext)
[docs]def str_decrypt(crypted, key, iv=IV, algorithm=ALGORITHM):
""" Decrypt a string with a key. For a higher-level decryption
interface, see :func:`ssl_decrypt`.
:param crypted: The raw binary encrypted data
:type crypted: string
:param key: The encryption key to decrypt with
:type key: string
:param iv: The initialization vector
:type iv: string
:param algorithm: The cipher algorithm to use
:type algorithm: string
:returns: string - The decrypted data
"""
cipher = Cipher(alg=algorithm, key=key, iv=iv, op=DECRYPT)
return _cipher_filter(cipher, crypted)
[docs]def ssl_decrypt(data, passwd, algorithm=ALGORITHM):
""" Decrypt openssl-encrypted data. This can decrypt data
encrypted by :func:`ssl_encrypt`, or ``openssl enc``. It performs
a base64 decode first if the data is base64 encoded, and
automatically determines the salt and initialization vector (both
of which are embedded in the encrypted data).
:param data: The encrypted data (either base64-encoded or raw
binary) to decrypt
:type data: string
:param passwd: The password to use to decrypt the data
:type passwd: string
:param algorithm: The cipher algorithm to use
:type algorithm: string
:returns: string - The decrypted data
"""
# base64-decode the data
try:
data = b64decode(data)
except TypeError:
# we do not include the data in the error message, because one
# of the common causes of this is data that claims to be
# encrypted but is not. we don't want to include a plaintext
# secret in the error logs.
raise TypeError("Could not decode base64 data: %s" %
sys.exc_info()[1])
salt = data[8:16]
# pylint: disable=E1101,E1121
hashes = [md5(passwd + salt).digest()]
for i in range(1, 3):
hashes.append(md5(hashes[i - 1] + passwd + salt).digest())
# pylint: enable=E1101,E1121
key = hashes[0] + hashes[1]
iv = hashes[2]
return str_decrypt(data[16:], key=key, iv=iv, algorithm=algorithm)
[docs]def ssl_encrypt(plaintext, passwd, algorithm=ALGORITHM, salt=None):
""" Encrypt data in a format that is openssl compatible.
:param plaintext: The plaintext data to encrypt
:type plaintext: string
:param passwd: The password to use to encrypt the data
:type passwd: string
:param algorithm: The cipher algorithm to use
:type algorithm: string
:param salt: The salt to use. If none is provided, one will be
randomly generated.
:type salt: bytes
:returns: string - The base64-encoded, salted, encrypted string.
The string includes a trailing newline to make it fully
compatible with openssl command-line tools.
"""
if salt is None:
salt = Rand.rand_bytes(8)
# pylint: disable=E1101,E1121
hashes = [md5(passwd + salt).digest()]
for i in range(1, 3):
hashes.append(md5(hashes[i - 1] + passwd + salt).digest())
# pylint: enable=E1101,E1121
key = hashes[0] + hashes[1]
iv = hashes[2]
crypted = str_encrypt(plaintext, key=key, salt=salt, iv=iv,
algorithm=algorithm)
return b64encode("Salted__" + salt + crypted) + "\n"
[docs]def get_algorithm(setup):
""" Get the cipher algorithm from the config file. This is used
in case someone uses the OpenSSL algorithm name (e.g.,
"AES-256-CBC") instead of the M2Crypto name (e.g., "aes_256_cbc"),
and to handle errors in a sensible way and deduplicate this code.
:param setup: The Bcfg2 option set to extract passphrases from
:type setup: Bcfg2.Options.OptionParser
:returns: dict - a dict of ``<passphrase name>``: ``<passphrase>``
"""
return setup.cfp.get(CFG_SECTION, CFG_ALGORITHM,
default=ALGORITHM).lower().replace("-", "_")
[docs]def get_passphrases(setup):
""" Get all candidate encryption passphrases from the config file.
:param setup: The Bcfg2 option set to extract passphrases from
:type setup: Bcfg2.Options.OptionParser
:returns: dict - a dict of ``<passphrase name>``: ``<passphrase>``
"""
section = CFG_SECTION
if setup.cfp.has_section(section):
return dict([(o, setup.cfp.get(section, o))
for o in setup.cfp.options(section)
if o not in [CFG_ALGORITHM, CFG_DECRYPT]])
else:
return dict()
[docs]def bruteforce_decrypt(crypted, passphrases=None, setup=None,
algorithm=ALGORITHM):
""" Convenience method to decrypt the given encrypted string by
trying the given passphrases or all passphrases (as returned by
:func:`get_passphrases`) sequentially until one is found that
works.
Either ``passphrases`` or ``setup`` must be provided.
:param crypted: The data to decrypt
:type crypted: string
:param passphrases: The passphrases to try.
:type passphrases: list
:param setup: A Bcfg2 option set to extract passphrases from
:type setup: Bcfg2.Options.OptionParser
:param algorithm: The cipher algorithm to use
:type algorithm: string
:returns: string - The decrypted data
:raises: :class:`M2Crypto.EVP.EVPError`, if the data cannot be decrypted
"""
if passphrases is None:
passphrases = get_passphrases(setup).values()
for passwd in passphrases:
try:
return ssl_decrypt(crypted, passwd, algorithm=algorithm)
except EVPError:
pass
raise EVPError("Failed to decrypt")