From 7b2732169cf7cf126847d5e10a7adeeaf624bbdc Mon Sep 17 00:00:00 2001 From: Fred Gylys-Colwell Date: Thu, 4 May 2017 12:54:15 -0700 Subject: [PATCH] Deliver oem_certificate_generator Merge from Widevine repo of http://go/wvgerrit/26880 and http://go/wvgerrit/26062 Deliver oem_certificate_generator b/37996004 Testing: Tool delivery only Change-Id: Id95bd3cdd7cf5b48d225624747ab1f8499e4f438 --- .../oem_certificate_generator/README.md | 52 ++ .../oem_certificate.py | 492 ++++++++++++++++++ 2 files changed, 544 insertions(+) create mode 100644 libwvdrmengine/oem_certificate_generator/README.md create mode 100644 libwvdrmengine/oem_certificate_generator/oem_certificate.py diff --git a/libwvdrmengine/oem_certificate_generator/README.md b/libwvdrmengine/oem_certificate_generator/README.md new file mode 100644 index 00000000..92b4c5eb --- /dev/null +++ b/libwvdrmengine/oem_certificate_generator/README.md @@ -0,0 +1,52 @@ +OEM certificate generation tool +=================================================== + +## Supports + + - Generating CSR (certificate signing request) + - Generating OEM intermediate certificate (for testing) + - Generating OEM leaf certificate chain + - Erasing file securely + - Getting CSR/certificate/certificate chain information + +## Prerequirements + + - Install pip: https://pip.pypa.io/en/stable/installing/ + - Install python cryptography: https://cryptography.io/en/latest/installation/ + +## Usage + +Run `python oem_certificate.py --help` to see available commands. + +The arguments can be partially or fully loaded from a configuration file, for +example, if file "location.cfg" is, + +``` + -C=US + -ST=CA + -L=Kirkland + -O=Some Company + -OU=Some Unit +``` + +A command of + +```bash + python oem_certificate.py generate_csr @location.cfg -CN TestDevice1 \ + --output_csr_file=csr.pem --output_private_key_file=key.der +``` + +is equivalent to + +```bash + python oem_certificate.py generate_csr -CN TestDevice1 -C=US -ST=CA \ + -L=Kirkland -O='Some Company' -OU='Some Unit' --output_csr_file=csr.pem \ + --output_private_key_file=key.der. +``` + +Note that + - The arguments in the config file must be one per line; + - The arguments should not be quoted in the config file. + +The script uses a default configuration file 'oem_certificate.cfg', which will +be loaded automatically if exists. diff --git a/libwvdrmengine/oem_certificate_generator/oem_certificate.py b/libwvdrmengine/oem_certificate_generator/oem_certificate.py new file mode 100644 index 00000000..07080db5 --- /dev/null +++ b/libwvdrmengine/oem_certificate_generator/oem_certificate.py @@ -0,0 +1,492 @@ +# Copyright 2017 Google Inc. All Rights Reserved. + +"""OEM certificate generation tool. + +Supports: + - Generating CSR (certificate signing request) + - Generating OEM intermediate certificate (for testing) + - Generating OEM leaf certificate chain + - Erasing file securely + - Getting CSR/certificate/certificate chain information + +Prerequirements: + - Install pip: https://pip.pypa.io/en/stable/installing/ + - Install python cryptography: https://cryptography.io/en/latest/installation/ + +Run 'python oem_certificate.py --help' to see available commands. + +The arguments can be partially or fully loaded from a configuration file, for +example, if file "location.cfg" is, + + -C=US + -ST=CA + -L=Kirkland + -O=Some Company + -OU=Some Unit + +A command of + "python oem_certificate.py generate_csr @location.cfg -CN TestDevice1 " + "--output_csr_file=csr.pem --output_private_key_file=key.der", +is equivalent to + "python oem_certificate.py generate_csr -CN TestDevice1 -C=US -ST=CA " + "-L=Kirkland -O='Some Company' -OU='Some Unit' --output_csr_file=csr.pem " + "--output_private_key_file=key.der". + +Note that (1) the arguments in the config file must be one per line; (2) the +arguments should not be quoted in the config file. + +The script uses a default configuration file 'oem_certificate.cfg', which will +be loaded automatically if exists. +""" + +import argparse +import datetime +import os +import sys + +from cryptography import utils +from cryptography import x509 +from cryptography.hazmat import backends +from cryptography.hazmat.backends import openssl +from cryptography.hazmat.backends.openssl import backend +from cryptography.hazmat.primitives import hashes +from cryptography.hazmat.primitives import serialization +from cryptography.hazmat.primitives.asymmetric import rsa +from cryptography.x509 import oid +from pyasn1.codec.der import decoder +from pyasn1.codec.der import encoder +from pyasn1.type import univ +import six + + +class WidevineSystemId(x509.UnrecognizedExtension): + """Implement WidevineSystemId x509 extension.""" + + # oid of Widevine system id. + oid = oid.ObjectIdentifier('1.3.6.1.4.1.11129.4.1.1') + + def __init__(self, value): + """Inits from raw bytes.""" + super(WidevineSystemId, self).__init__(WidevineSystemId.oid, value) + + @classmethod + def from_int_value(cls, int_value): + """Construct from integer system id value.""" + return cls(encoder.encode(univ.Integer(int_value))) + + def int_value(self): + """Return the integer value of the system id.""" + return int(decoder.decode(self.value)[0]) + + +class X509CertificateChain(object): + """Implement x509 certificate chain object. + + cryptography does not support certificate chain (pkcs7 container) right + now, so we have to implement it using low level functions directly. + """ + + # Disable pylint on protected-access in class X509CertificateChain. + # pylint: disable=protected-access + + def __init__(self, certificates): + """Inits from certificate list.""" + self._certificates = certificates + + def __iter__(self): + """Iterates through certificates.""" + return self._certificates.__iter__() + + @classmethod + def load_der(cls, certificate_chain_data): + """Load from DER formatted data.""" + bio = backend._bytes_to_bio(certificate_chain_data) + pkcs7 = backend._lib.d2i_PKCS7_bio(bio.bio, backend._ffi.NULL) + if pkcs7 == backend._ffi.NULL: + raise ValueError('Unable to load certificate chain') + pkcs7 = backend._ffi.gc(pkcs7, backend._lib.PKCS7_free) + if not backend._lib.PKCS7_type_is_signed(pkcs7): + raise ValueError('Invalid certificate chain') + + x509_stack = pkcs7.d.sign.cert + certificates = [] + for i in xrange(backend._lib.sk_X509_num(x509_stack)): + x509_value = backend._ffi.gc( + backend._lib.X509_dup(backend._lib.sk_X509_value(x509_stack, i)), + backend._lib.X509_free) + certificates.append(openssl.x509._Certificate(backend, x509_value)) + return cls(certificates) + + def der_bytes(self): + """Return DER formatted bytes.""" + x509_stack = backend._ffi.gc(backend._lib.sk_X509_new_null(), + backend._lib.sk_X509_free) + for certificate in self._certificates: + backend._lib.sk_X509_push(x509_stack, certificate._x509) + + pkcs7_partial = 0x4000 + p7 = backend._lib.PKCS7_sign(backend._ffi.NULL, backend._ffi.NULL, + x509_stack, backend._ffi.NULL, pkcs7_partial) + p7 = backend._ffi.gc(p7, backend._lib.PKCS7_free) + + bio = backend._create_mem_bio_gc() + backend.openssl_assert(backend._lib.i2d_PKCS7_bio(bio, p7) == 1) + return backend._read_mem_bio(bio) + + +def _multiple_of_1024(key_size_str): + """argparse custom type function for key size.""" + key_size = int(key_size_str) + if key_size % 1024: + msg = '%r is not multiple of 1024' % key_size_str + raise argparse.ArgumentTypeError(msg) + return key_size + + +def _valid_date(date_str): + """argparse custom type function dates.""" + return datetime.datetime.strptime(date_str, '%Y-%m-%d') + + +def _get_encryption_algorithm(passphrase): + """Convenient function to get the encryption algorithm.""" + if passphrase: + return serialization.BestAvailableEncryption(passphrase) + else: + return serialization.NoEncryption() + + +def generate_csr(args): + """Subparser handler for generating certificate signing request.""" + key = rsa.generate_private_key( + public_exponent=65537, + key_size=args.key_size, + backend=backends.default_backend()) + args.output_private_key_file.write( + key.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=_get_encryption_algorithm(args.passphrase))) + + x509_name = [ + # Provide various details about who we are. + x509.NameAttribute(oid.NameOID.COUNTRY_NAME, + six.text_type(args.country_name)), + x509.NameAttribute(oid.NameOID.STATE_OR_PROVINCE_NAME, + six.text_type(args.state_or_province_name)), + x509.NameAttribute(oid.NameOID.LOCALITY_NAME, + six.text_type(args.locality_name)), + x509.NameAttribute(oid.NameOID.ORGANIZATION_NAME, + six.text_type(args.organization_name)), + x509.NameAttribute(oid.NameOID.ORGANIZATIONAL_UNIT_NAME, + six.text_type(args.organizational_unit_name)), + ] + if args.common_name: + x509_name.append(x509.NameAttribute(oid.NameOID.COMMON_NAME, + six.text_type(args.common_name))) + csr = x509.CertificateSigningRequestBuilder().subject_name( + x509.Name(x509_name)).sign(key, hashes.SHA256(), + backends.default_backend()) + args.output_csr_file.write(csr.public_bytes(serialization.Encoding.PEM)) + + +def _random_serial_number(): + """Utility function to generate random serial number.""" + return utils.int_from_bytes(os.urandom(16), byteorder='big') + + +def build_certificate(subject_name, issuer_name, system_id, not_valid_before, + valid_duration, public_key, signing_key, ca): + """Utility function to build certificate.""" + builder = x509.CertificateBuilder() + builder = builder.subject_name(subject_name).issuer_name(issuer_name) + + if ca: + builder = builder.add_extension( + x509.SubjectKeyIdentifier.from_public_key(public_key), critical=False) + builder = builder.add_extension( + x509.AuthorityKeyIdentifier.from_issuer_public_key( + signing_key.public_key()), + critical=False) + builder = builder.add_extension( + x509.BasicConstraints(True, 0), critical=True) + if system_id: + builder = builder.add_extension( + WidevineSystemId.from_int_value(system_id), critical=False) + + builder = builder.not_valid_before(not_valid_before).not_valid_after( + not_valid_before + datetime.timedelta(valid_duration)).serial_number( + _random_serial_number()).public_key(public_key) + return builder.sign( + private_key=signing_key, + algorithm=hashes.SHA256(), + backend=backends.default_backend()) + + +def generate_intermediate_certificate(args): + """Subparser handler for generating intermediate certificate.""" + root_cert = x509.load_der_x509_certificate(args.root_certificate_file.read(), + backends.default_backend()) + root_private_key = serialization.load_der_private_key( + args.root_private_key_file.read(), + password=args.root_private_key_passphrase, + backend=backends.default_backend()) + if (root_private_key.public_key().public_numbers() != + root_cert.public_key().public_numbers()): + raise ValueError('Root certificate does not match with root private key') + csr = x509.load_pem_x509_csr(args.csr_file.read(), backends.default_backend()) + + certificate = build_certificate(csr.subject, root_cert.subject, + args.system_id, args.not_valid_before, + args.valid_duration, + csr.public_key(), root_private_key, True) + args.output_certificate_file.write( + certificate.public_bytes(serialization.Encoding.DER)) + + +def generate_leaf_certificate(args): + """Subparser handler for generating leaf certificate.""" + intermediate_cert_bytes = args.intermediate_certificate_file.read() + intermediate_cert = x509.load_der_x509_certificate(intermediate_cert_bytes, + backends.default_backend()) + intermediate_private_key = serialization.load_der_private_key( + args.intermediate_private_key_file.read(), + password=args.intermediate_private_key_passphrase, + backend=backends.default_backend()) + if (intermediate_private_key.public_key().public_numbers() != + intermediate_cert.public_key().public_numbers()): + raise ValueError( + 'Intermediate certificate does not match with intermediate private key') + + system_id_raw_bytes = intermediate_cert.extensions.get_extension_for_oid( + WidevineSystemId.oid).value.value + system_id = WidevineSystemId(system_id_raw_bytes).int_value() + + name_attributes = [ + x509.NameAttribute(oid.NameOID.COMMON_NAME, u'{0}-leaf'.format(system_id)) + ] + name_attributes.extend([ + attribute for attribute in intermediate_cert.subject + if attribute.oid != oid.NameOID.COMMON_NAME + ]) + subject_name = x509.Name(name_attributes) + + leaf_private_key = rsa.generate_private_key( + public_exponent=65537, + key_size=args.key_size, + backend=backends.default_backend()) + args.output_private_key_file.write( + leaf_private_key.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=_get_encryption_algorithm(args.passphrase))) + + certificate = build_certificate(subject_name, intermediate_cert.subject, + system_id, args.not_valid_before, + args.valid_duration, + leaf_private_key.public_key(), + intermediate_private_key, False) + args.output_certificate_file.write( + X509CertificateChain([certificate, intermediate_cert]).der_bytes()) + + +def secure_erase(args): + """Subparser handler for secure erasing of a file.""" + length = args.file.tell() + for _ in xrange(args.passes): + args.file.seek(0) + for _ in xrange(length): + args.file.write(os.urandom(1)) + args.file.close() + os.remove(args.file.name) + + +def _certificate_as_string(cert): + """Utility function to format certificate as string.""" + lines = ['Certificate Subject Name:'] + lines.extend([' {0}'.format(attribute) for attribute in cert.subject]) + lines.append('Issuer Name:') + lines.extend([' {0}'.format(attribute) for attribute in cert.issuer]) + lines.append('Key Size: {0.key_size}'.format(cert.public_key())) + try: + system_id_raw_bytes = cert.extensions.get_extension_for_oid( + WidevineSystemId.oid).value.value + lines.append('Widevine System Id: {0}'.format( + WidevineSystemId(system_id_raw_bytes).int_value())) + except x509.ExtensionNotFound: + pass + lines.append('Not valid before: {0.not_valid_before}'.format(cert)) + lines.append('Not valid after: {0.not_valid_after}'.format(cert)) + return '\n'.join(lines) + + +def _csr_as_string(csr): + """Utility function to format csr as string.""" + lines = ['CSR Subject Name:'] + lines.extend([' {0}'.format(attribute) for attribute in csr.subject]) + lines.append('Key Size: {0.key_size}'.format(csr.public_key())) + return '\n'.join(lines) + + +def _handle_csr(data): + """Utility function for get_info to parse csr.""" + return _csr_as_string( + x509.load_pem_x509_csr(data, backends.default_backend())) + + +def _handle_certificate(data): + """Utility function for get_info to parse certificate.""" + return _certificate_as_string( + x509.load_der_x509_certificate(data, backends.default_backend())) + + +def _handle_certificate_chain(data): + """Utility function for get_info to parse certificate chain.""" + return '\n\n'.join([ + _certificate_as_string(certificate) + for certificate in X509CertificateChain.load_der(data) + ]) + + +def get_info(args, out=sys.stdout): + """Subparser handler to get csr or certificate information.""" + # The input is either a CSR or a certificate, or a certificate chain. + # Loop through the corresponding handlers one by one. + data = args.file.read() + for handler in [_handle_csr, _handle_certificate, _handle_certificate_chain]: + try: + out.write(handler(data)) + return + except ValueError: + pass + print('Error occurred. The input file is not a valid CSR, nor certificate, ' + 'nor certificate chain.') + + +def create_parser(): + """Command line parsing.""" + parser = argparse.ArgumentParser(fromfile_prefix_chars='@') + subparsers = parser.add_subparsers() + + # Subparser for certificate signing request generation. + parser_csr = subparsers.add_parser( + 'generate_csr', help='generate certificate signing request') + parser_csr.add_argument( + '--key_size', + type=_multiple_of_1024, + default=2048, + help='specify RSA key size.') + parser_csr.add_argument('-C', '--country_name', required=True) + parser_csr.add_argument('-ST', '--state_or_province_name', required=True) + parser_csr.add_argument('-L', '--locality_name', required=True) + parser_csr.add_argument('-O', '--organization_name', required=True) + parser_csr.add_argument('-OU', '--organizational_unit_name', required=True) + parser_csr.add_argument('-CN', '--common_name', required=False) + parser_csr.add_argument( + '--output_csr_file', type=argparse.FileType('wb'), required=True) + parser_csr.add_argument( + '--output_private_key_file', type=argparse.FileType('wb'), required=True) + parser_csr.add_argument( + '--passphrase', + help=('specify an optional passphrase to encrypt the private key. The ' + 'private key is not encrypted if omitted.')) + parser_csr.set_defaults(func=generate_csr) + + # Subparser for intermediate certificate generation. + parser_intermediate_cert = subparsers.add_parser( + 'generate_intermediate_certificate', + help=('generate intermediate certificate from csr. This should only be ' + 'used for testing.')) + parser_intermediate_cert.add_argument( + '--not_valid_before', + type=_valid_date, + default=datetime.datetime.today(), + help='certificate validity start date - format YYYY-MM-DD') + parser_intermediate_cert.add_argument( + '--valid_duration', + type=int, + default=3650, + help='the certificate will expire after the specified number of days.') + parser_intermediate_cert.add_argument('--system_id', type=int, required=True) + parser_intermediate_cert.add_argument( + '--csr_file', type=argparse.FileType('rb'), required=True) + parser_intermediate_cert.add_argument( + '--root_certificate_file', type=argparse.FileType('rb'), required=True) + parser_intermediate_cert.add_argument( + '--root_private_key_file', type=argparse.FileType('rb'), required=True) + parser_intermediate_cert.add_argument('--root_private_key_passphrase') + parser_intermediate_cert.add_argument( + '--output_certificate_file', type=argparse.FileType('wb'), required=True) + parser_intermediate_cert.set_defaults(func=generate_intermediate_certificate) + + # Subparser for leaf certificate generation. + parser_leaf_cert = subparsers.add_parser( + 'generate_leaf_certificate', help='generate leaf certificate') + parser_leaf_cert.add_argument( + '--key_size', + type=_multiple_of_1024, + default=2048, + help='specify RSA key size.') + parser_leaf_cert.add_argument( + '--not_valid_before', + type=_valid_date, + default=datetime.datetime.today(), + help='certificate validity start date - format YYYY-MM-DD') + parser_leaf_cert.add_argument( + '--valid_duration', + type=int, + default=3650, + help='the certificate will expire after the specified number of days.') + parser_leaf_cert.add_argument( + '--intermediate_certificate_file', + type=argparse.FileType('rb'), + required=True) + parser_leaf_cert.add_argument( + '--intermediate_private_key_file', + type=argparse.FileType('rb'), + required=True) + parser_leaf_cert.add_argument('--intermediate_private_key_passphrase') + parser_leaf_cert.add_argument( + '--output_certificate_file', type=argparse.FileType('wb'), required=True) + parser_leaf_cert.add_argument( + '--output_private_key_file', type=argparse.FileType('wb'), required=True) + parser_leaf_cert.add_argument( + '--passphrase', + help=('specify an optional passphrase to encrypt the private key. The ' + 'private key is not encrypted if omitted.')) + parser_leaf_cert.set_defaults(func=generate_leaf_certificate) + + # Subparser for secure file erase. + parser_erase = subparsers.add_parser('erase', help='erase a file securely') + parser_erase.add_argument( + '-F', '--file', type=argparse.FileType('a'), required=True) + parser_erase.add_argument( + '--passes', + type=int, + default=3, + help=('the file will be overwritten with random patterns for the ' + 'specified number of passes')) + parser_erase.set_defaults(func=secure_erase) + + # Subparser to get CSR or certificate or certificate chain metadata. + parser_info = subparsers.add_parser( + 'info', help='get CSR or certificate metadata') + parser_info.add_argument( + '-F', '--file', type=argparse.FileType('rb'), required=True) + parser_info.set_defaults(func=get_info) + + return parser + + +def main(): + args = sys.argv[1:] + config_file_name = 'oem_certificate.cfg' + if os.path.isfile(config_file_name): + print 'Load from args default configuration file: ', config_file_name + args.append('@' + config_file_name) + parser_args = create_parser().parse_args(args) + parser_args.func(parser_args) + + +if __name__ == '__main__': + main()