Source release v3.3.0

This commit is contained in:
Gene Morgan
2017-05-04 14:01:27 -07:00
parent baa7b133d3
commit 8082775924
678 changed files with 51264 additions and 14200 deletions

View File

@@ -0,0 +1,52 @@
OEM certificate generation tool
===================================================
## Supports
- Generating CSR (certificate signing request)
- Generating OEM intermediate certificate (for testing)
- Generating OEM leaf certificate chain
- Erasing file securely
- Getting CSR/certificate/certificate chain information
## Prerequirements
- Install pip: https://pip.pypa.io/en/stable/installing/
- Install python cryptography: https://cryptography.io/en/latest/installation/
## Usage
Run `python oem_certificate.py --help` to see available commands.
The arguments can be partially or fully loaded from a configuration file, for
example, if file "location.cfg" is,
```
-C=US
-ST=CA
-L=Kirkland
-O=Some Company
-OU=Some Unit
```
A command of
```bash
python oem_certificate.py generate_csr @location.cfg -CN TestDevice1 \
--output_csr_file=csr.pem --output_private_key_file=key.der
```
is equivalent to
```bash
python oem_certificate.py generate_csr -CN TestDevice1 -C=US -ST=CA \
-L=Kirkland -O='Some Company' -OU='Some Unit' --output_csr_file=csr.pem \
--output_private_key_file=key.der.
```
Note that
- The arguments in the config file must be one per line;
- The arguments should not be quoted in the config file.
The script uses a default configuration file 'oem_certificate.cfg', which will
be loaded automatically if exists.

View File

@@ -0,0 +1,492 @@
# Copyright 2017 Google Inc. All Rights Reserved.
"""OEM certificate generation tool.
Supports:
- Generating CSR (certificate signing request)
- Generating OEM intermediate certificate (for testing)
- Generating OEM leaf certificate chain
- Erasing file securely
- Getting CSR/certificate/certificate chain information
Prerequirements:
- Install pip: https://pip.pypa.io/en/stable/installing/
- Install python cryptography: https://cryptography.io/en/latest/installation/
Run 'python oem_certificate.py --help' to see available commands.
The arguments can be partially or fully loaded from a configuration file, for
example, if file "location.cfg" is,
-C=US
-ST=CA
-L=Kirkland
-O=Some Company
-OU=Some Unit
A command of
"python oem_certificate.py generate_csr @location.cfg -CN TestDevice1 "
"--output_csr_file=csr.pem --output_private_key_file=key.der",
is equivalent to
"python oem_certificate.py generate_csr -CN TestDevice1 -C=US -ST=CA "
"-L=Kirkland -O='Some Company' -OU='Some Unit' --output_csr_file=csr.pem "
"--output_private_key_file=key.der".
Note that (1) the arguments in the config file must be one per line; (2) the
arguments should not be quoted in the config file.
The script uses a default configuration file 'oem_certificate.cfg', which will
be loaded automatically if exists.
"""
import argparse
import datetime
import os
import sys
from cryptography import utils
from cryptography import x509
from cryptography.hazmat import backends
from cryptography.hazmat.backends import openssl
from cryptography.hazmat.backends.openssl import backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509 import oid
from pyasn1.codec.der import decoder
from pyasn1.codec.der import encoder
from pyasn1.type import univ
import six
class WidevineSystemId(x509.UnrecognizedExtension):
"""Implement WidevineSystemId x509 extension."""
# oid of Widevine system id.
oid = oid.ObjectIdentifier('1.3.6.1.4.1.11129.4.1.1')
def __init__(self, value):
"""Inits from raw bytes."""
super(WidevineSystemId, self).__init__(WidevineSystemId.oid, value)
@classmethod
def from_int_value(cls, int_value):
"""Construct from integer system id value."""
return cls(encoder.encode(univ.Integer(int_value)))
def int_value(self):
"""Return the integer value of the system id."""
return int(decoder.decode(self.value)[0])
class X509CertificateChain(object):
"""Implement x509 certificate chain object.
cryptography does not support certificate chain (pkcs7 container) right
now, so we have to implement it using low level functions directly.
"""
# Disable pylint on protected-access in class X509CertificateChain.
# pylint: disable=protected-access
def __init__(self, certificates):
"""Inits from certificate list."""
self._certificates = certificates
def __iter__(self):
"""Iterates through certificates."""
return self._certificates.__iter__()
@classmethod
def load_der(cls, certificate_chain_data):
"""Load from DER formatted data."""
bio = backend._bytes_to_bio(certificate_chain_data)
pkcs7 = backend._lib.d2i_PKCS7_bio(bio.bio, backend._ffi.NULL)
if pkcs7 == backend._ffi.NULL:
raise ValueError('Unable to load certificate chain')
pkcs7 = backend._ffi.gc(pkcs7, backend._lib.PKCS7_free)
if not backend._lib.PKCS7_type_is_signed(pkcs7):
raise ValueError('Invalid certificate chain')
x509_stack = pkcs7.d.sign.cert
certificates = []
for i in xrange(backend._lib.sk_X509_num(x509_stack)):
x509_value = backend._ffi.gc(
backend._lib.X509_dup(backend._lib.sk_X509_value(x509_stack, i)),
backend._lib.X509_free)
certificates.append(openssl.x509._Certificate(backend, x509_value))
return cls(certificates)
def der_bytes(self):
"""Return DER formatted bytes."""
x509_stack = backend._ffi.gc(backend._lib.sk_X509_new_null(),
backend._lib.sk_X509_free)
for certificate in self._certificates:
backend._lib.sk_X509_push(x509_stack, certificate._x509)
pkcs7_partial = 0x4000
p7 = backend._lib.PKCS7_sign(backend._ffi.NULL, backend._ffi.NULL,
x509_stack, backend._ffi.NULL, pkcs7_partial)
p7 = backend._ffi.gc(p7, backend._lib.PKCS7_free)
bio = backend._create_mem_bio_gc()
backend.openssl_assert(backend._lib.i2d_PKCS7_bio(bio, p7) == 1)
return backend._read_mem_bio(bio)
def _multiple_of_1024(key_size_str):
"""argparse custom type function for key size."""
key_size = int(key_size_str)
if key_size % 1024:
msg = '%r is not multiple of 1024' % key_size_str
raise argparse.ArgumentTypeError(msg)
return key_size
def _valid_date(date_str):
"""argparse custom type function dates."""
return datetime.datetime.strptime(date_str, '%Y-%m-%d')
def _get_encryption_algorithm(passphrase):
"""Convenient function to get the encryption algorithm."""
if passphrase:
return serialization.BestAvailableEncryption(passphrase)
else:
return serialization.NoEncryption()
def generate_csr(args):
"""Subparser handler for generating certificate signing request."""
key = rsa.generate_private_key(
public_exponent=65537,
key_size=args.key_size,
backend=backends.default_backend())
args.output_private_key_file.write(
key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=_get_encryption_algorithm(args.passphrase)))
x509_name = [
# Provide various details about who we are.
x509.NameAttribute(oid.NameOID.COUNTRY_NAME,
six.text_type(args.country_name)),
x509.NameAttribute(oid.NameOID.STATE_OR_PROVINCE_NAME,
six.text_type(args.state_or_province_name)),
x509.NameAttribute(oid.NameOID.LOCALITY_NAME,
six.text_type(args.locality_name)),
x509.NameAttribute(oid.NameOID.ORGANIZATION_NAME,
six.text_type(args.organization_name)),
x509.NameAttribute(oid.NameOID.ORGANIZATIONAL_UNIT_NAME,
six.text_type(args.organizational_unit_name)),
]
if args.common_name:
x509_name.append(x509.NameAttribute(oid.NameOID.COMMON_NAME,
six.text_type(args.common_name)))
csr = x509.CertificateSigningRequestBuilder().subject_name(
x509.Name(x509_name)).sign(key, hashes.SHA256(),
backends.default_backend())
args.output_csr_file.write(csr.public_bytes(serialization.Encoding.PEM))
def _random_serial_number():
"""Utility function to generate random serial number."""
return utils.int_from_bytes(os.urandom(16), byteorder='big')
def build_certificate(subject_name, issuer_name, system_id, not_valid_before,
valid_duration, public_key, signing_key, ca):
"""Utility function to build certificate."""
builder = x509.CertificateBuilder()
builder = builder.subject_name(subject_name).issuer_name(issuer_name)
if ca:
builder = builder.add_extension(
x509.SubjectKeyIdentifier.from_public_key(public_key), critical=False)
builder = builder.add_extension(
x509.AuthorityKeyIdentifier.from_issuer_public_key(
signing_key.public_key()),
critical=False)
builder = builder.add_extension(
x509.BasicConstraints(True, 0), critical=True)
if system_id:
builder = builder.add_extension(
WidevineSystemId.from_int_value(system_id), critical=False)
builder = builder.not_valid_before(not_valid_before).not_valid_after(
not_valid_before + datetime.timedelta(valid_duration)).serial_number(
_random_serial_number()).public_key(public_key)
return builder.sign(
private_key=signing_key,
algorithm=hashes.SHA256(),
backend=backends.default_backend())
def generate_intermediate_certificate(args):
"""Subparser handler for generating intermediate certificate."""
root_cert = x509.load_der_x509_certificate(args.root_certificate_file.read(),
backends.default_backend())
root_private_key = serialization.load_der_private_key(
args.root_private_key_file.read(),
password=args.root_private_key_passphrase,
backend=backends.default_backend())
if (root_private_key.public_key().public_numbers() !=
root_cert.public_key().public_numbers()):
raise ValueError('Root certificate does not match with root private key')
csr = x509.load_pem_x509_csr(args.csr_file.read(), backends.default_backend())
certificate = build_certificate(csr.subject, root_cert.subject,
args.system_id, args.not_valid_before,
args.valid_duration,
csr.public_key(), root_private_key, True)
args.output_certificate_file.write(
certificate.public_bytes(serialization.Encoding.DER))
def generate_leaf_certificate(args):
"""Subparser handler for generating leaf certificate."""
intermediate_cert_bytes = args.intermediate_certificate_file.read()
intermediate_cert = x509.load_der_x509_certificate(intermediate_cert_bytes,
backends.default_backend())
intermediate_private_key = serialization.load_der_private_key(
args.intermediate_private_key_file.read(),
password=args.intermediate_private_key_passphrase,
backend=backends.default_backend())
if (intermediate_private_key.public_key().public_numbers() !=
intermediate_cert.public_key().public_numbers()):
raise ValueError(
'Intermediate certificate does not match with intermediate private key')
system_id_raw_bytes = intermediate_cert.extensions.get_extension_for_oid(
WidevineSystemId.oid).value.value
system_id = WidevineSystemId(system_id_raw_bytes).int_value()
name_attributes = [
x509.NameAttribute(oid.NameOID.COMMON_NAME, u'{0}-leaf'.format(system_id))
]
name_attributes.extend([
attribute for attribute in intermediate_cert.subject
if attribute.oid != oid.NameOID.COMMON_NAME
])
subject_name = x509.Name(name_attributes)
leaf_private_key = rsa.generate_private_key(
public_exponent=65537,
key_size=args.key_size,
backend=backends.default_backend())
args.output_private_key_file.write(
leaf_private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=_get_encryption_algorithm(args.passphrase)))
certificate = build_certificate(subject_name, intermediate_cert.subject,
system_id, args.not_valid_before,
args.valid_duration,
leaf_private_key.public_key(),
intermediate_private_key, False)
args.output_certificate_file.write(
X509CertificateChain([certificate, intermediate_cert]).der_bytes())
def secure_erase(args):
"""Subparser handler for secure erasing of a file."""
length = args.file.tell()
for _ in xrange(args.passes):
args.file.seek(0)
for _ in xrange(length):
args.file.write(os.urandom(1))
args.file.close()
os.remove(args.file.name)
def _certificate_as_string(cert):
"""Utility function to format certificate as string."""
lines = ['Certificate Subject Name:']
lines.extend([' {0}'.format(attribute) for attribute in cert.subject])
lines.append('Issuer Name:')
lines.extend([' {0}'.format(attribute) for attribute in cert.issuer])
lines.append('Key Size: {0.key_size}'.format(cert.public_key()))
try:
system_id_raw_bytes = cert.extensions.get_extension_for_oid(
WidevineSystemId.oid).value.value
lines.append('Widevine System Id: {0}'.format(
WidevineSystemId(system_id_raw_bytes).int_value()))
except x509.ExtensionNotFound:
pass
lines.append('Not valid before: {0.not_valid_before}'.format(cert))
lines.append('Not valid after: {0.not_valid_after}'.format(cert))
return '\n'.join(lines)
def _csr_as_string(csr):
"""Utility function to format csr as string."""
lines = ['CSR Subject Name:']
lines.extend([' {0}'.format(attribute) for attribute in csr.subject])
lines.append('Key Size: {0.key_size}'.format(csr.public_key()))
return '\n'.join(lines)
def _handle_csr(data):
"""Utility function for get_info to parse csr."""
return _csr_as_string(
x509.load_pem_x509_csr(data, backends.default_backend()))
def _handle_certificate(data):
"""Utility function for get_info to parse certificate."""
return _certificate_as_string(
x509.load_der_x509_certificate(data, backends.default_backend()))
def _handle_certificate_chain(data):
"""Utility function for get_info to parse certificate chain."""
return '\n\n'.join([
_certificate_as_string(certificate)
for certificate in X509CertificateChain.load_der(data)
])
def get_info(args, out=sys.stdout):
"""Subparser handler to get csr or certificate information."""
# The input is either a CSR or a certificate, or a certificate chain.
# Loop through the corresponding handlers one by one.
data = args.file.read()
for handler in [_handle_csr, _handle_certificate, _handle_certificate_chain]:
try:
out.write(handler(data))
return
except ValueError:
pass
print('Error occurred. The input file is not a valid CSR, nor certificate, '
'nor certificate chain.')
def create_parser():
"""Command line parsing."""
parser = argparse.ArgumentParser(fromfile_prefix_chars='@')
subparsers = parser.add_subparsers()
# Subparser for certificate signing request generation.
parser_csr = subparsers.add_parser(
'generate_csr', help='generate certificate signing request')
parser_csr.add_argument(
'--key_size',
type=_multiple_of_1024,
default=2048,
help='specify RSA key size.')
parser_csr.add_argument('-C', '--country_name', required=True)
parser_csr.add_argument('-ST', '--state_or_province_name', required=True)
parser_csr.add_argument('-L', '--locality_name', required=True)
parser_csr.add_argument('-O', '--organization_name', required=True)
parser_csr.add_argument('-OU', '--organizational_unit_name', required=True)
parser_csr.add_argument('-CN', '--common_name', required=False)
parser_csr.add_argument(
'--output_csr_file', type=argparse.FileType('wb'), required=True)
parser_csr.add_argument(
'--output_private_key_file', type=argparse.FileType('wb'), required=True)
parser_csr.add_argument(
'--passphrase',
help=('specify an optional passphrase to encrypt the private key. The '
'private key is not encrypted if omitted.'))
parser_csr.set_defaults(func=generate_csr)
# Subparser for intermediate certificate generation.
parser_intermediate_cert = subparsers.add_parser(
'generate_intermediate_certificate',
help=('generate intermediate certificate from csr. This should only be '
'used for testing.'))
parser_intermediate_cert.add_argument(
'--not_valid_before',
type=_valid_date,
default=datetime.datetime.today(),
help='certificate validity start date - format YYYY-MM-DD')
parser_intermediate_cert.add_argument(
'--valid_duration',
type=int,
default=3650,
help='the certificate will expire after the specified number of days.')
parser_intermediate_cert.add_argument('--system_id', type=int, required=True)
parser_intermediate_cert.add_argument(
'--csr_file', type=argparse.FileType('rb'), required=True)
parser_intermediate_cert.add_argument(
'--root_certificate_file', type=argparse.FileType('rb'), required=True)
parser_intermediate_cert.add_argument(
'--root_private_key_file', type=argparse.FileType('rb'), required=True)
parser_intermediate_cert.add_argument('--root_private_key_passphrase')
parser_intermediate_cert.add_argument(
'--output_certificate_file', type=argparse.FileType('wb'), required=True)
parser_intermediate_cert.set_defaults(func=generate_intermediate_certificate)
# Subparser for leaf certificate generation.
parser_leaf_cert = subparsers.add_parser(
'generate_leaf_certificate', help='generate leaf certificate')
parser_leaf_cert.add_argument(
'--key_size',
type=_multiple_of_1024,
default=2048,
help='specify RSA key size.')
parser_leaf_cert.add_argument(
'--not_valid_before',
type=_valid_date,
default=datetime.datetime.today(),
help='certificate validity start date - format YYYY-MM-DD')
parser_leaf_cert.add_argument(
'--valid_duration',
type=int,
default=3650,
help='the certificate will expire after the specified number of days.')
parser_leaf_cert.add_argument(
'--intermediate_certificate_file',
type=argparse.FileType('rb'),
required=True)
parser_leaf_cert.add_argument(
'--intermediate_private_key_file',
type=argparse.FileType('rb'),
required=True)
parser_leaf_cert.add_argument('--intermediate_private_key_passphrase')
parser_leaf_cert.add_argument(
'--output_certificate_file', type=argparse.FileType('wb'), required=True)
parser_leaf_cert.add_argument(
'--output_private_key_file', type=argparse.FileType('wb'), required=True)
parser_leaf_cert.add_argument(
'--passphrase',
help=('specify an optional passphrase to encrypt the private key. The '
'private key is not encrypted if omitted.'))
parser_leaf_cert.set_defaults(func=generate_leaf_certificate)
# Subparser for secure file erase.
parser_erase = subparsers.add_parser('erase', help='erase a file securely')
parser_erase.add_argument(
'-F', '--file', type=argparse.FileType('a'), required=True)
parser_erase.add_argument(
'--passes',
type=int,
default=3,
help=('the file will be overwritten with random patterns for the '
'specified number of passes'))
parser_erase.set_defaults(func=secure_erase)
# Subparser to get CSR or certificate or certificate chain metadata.
parser_info = subparsers.add_parser(
'info', help='get CSR or certificate metadata')
parser_info.add_argument(
'-F', '--file', type=argparse.FileType('rb'), required=True)
parser_info.set_defaults(func=get_info)
return parser
def main():
args = sys.argv[1:]
config_file_name = 'oem_certificate.cfg'
if os.path.isfile(config_file_name):
print 'Load from args default configuration file: ', config_file_name
args.append('@' + config_file_name)
parser_args = create_parser().parse_args(args)
parser_args.func(parser_args)
if __name__ == '__main__':
main()

View File

@@ -0,0 +1,150 @@
# Copyright 2017 Google Inc. All Rights Reserved.
"""Common utility functions for OEM certificate generation."""
import datetime
import StringIO
from cryptography import x509
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import rsa
from cryptography.x509 import oid
import oem_certificate
_COUNTRY_NAME = 'US'
_STATE_OR_PROVINCE_NAME = 'WA'
_LOCALITY_NAME = 'Kirkland'
_ORGANIZATION_NAME = 'CompanyXYZ'
_ORGANIZATIONAL_UNIT_NAME = 'ContentProtection'
_NOT_VALID_BEFORE = datetime.datetime(2001, 8, 9)
_VALID_DURATION = 100
_LEAF_CERT_VALID_DURATION = 8000
_SYSTEM_ID = 2001
_ROOT_PRIVATE_KEY_PASSPHRASE = 'root_passphrase'
class ArgParseObject(object):
"""A convenient object to allow adding arbitrary attribute to it."""
def create_root_certificate_and_key():
"""Creates a root certificate and key."""
key = rsa.generate_private_key(
public_exponent=65537,
key_size=3072,
backend=backends.default_backend())
subject_name = x509.Name(
[x509.NameAttribute(oid.NameOID.COMMON_NAME, u'root_cert')])
certificate = oem_certificate.build_certificate(
subject_name, subject_name, None,
datetime.datetime(2001, 8, 9), 1000, key.public_key(), key, True)
return (key, certificate)
def setup_csr_args(country_name=_COUNTRY_NAME,
state_or_province_name=_STATE_OR_PROVINCE_NAME,
locality_name=_LOCALITY_NAME,
organization_name=_ORGANIZATION_NAME,
organizational_unit_name=_ORGANIZATIONAL_UNIT_NAME,
key_size=4096,
output_csr_file=None,
output_private_key_file=None,
passphrase=None,
common_name=None):
"""Sets up arguments to OEM Certificate generator for generating csr."""
args = ArgParseObject()
args.key_size = key_size
args.country_name = country_name
args.state_or_province_name = state_or_province_name
args.locality_name = locality_name
args.organization_name = organization_name
args.organizational_unit_name = organizational_unit_name
args.common_name = common_name
if output_csr_file:
args.output_csr_file = output_csr_file
else:
args.output_csr_file = StringIO.StringIO()
if output_private_key_file:
args.output_private_key_file = output_private_key_file
else:
args.output_private_key_file = StringIO.StringIO()
args.passphrase = passphrase
return args
def setup_intermediate_cert_args(
csr_bytes, root_key, root_certificate, not_valid_before=_NOT_VALID_BEFORE,
valid_duration=_VALID_DURATION, system_id=_SYSTEM_ID,
root_private_key_passphrase=_ROOT_PRIVATE_KEY_PASSPHRASE,
output_certificate_file=None):
"""Sets up args to OEM Cert generator for generating intermediate cert."""
args = ArgParseObject()
args.not_valid_before = not_valid_before
args.valid_duration = valid_duration
args.system_id = system_id
args.csr_file = StringIO.StringIO(csr_bytes)
args.root_private_key_passphrase = root_private_key_passphrase
if output_certificate_file:
args.output_certificate_file = output_certificate_file
else:
args.output_certificate_file = StringIO.StringIO()
serialized_private_key = root_key.private_bytes(
serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.BestAvailableEncryption(
args.root_private_key_passphrase))
serialized_certificate = root_certificate.public_bytes(
serialization.Encoding.DER)
args.root_certificate_file = StringIO.StringIO(serialized_certificate)
args.root_private_key_file = StringIO.StringIO(serialized_private_key)
return args
def setup_leaf_cert_args(intermediate_key_bytes,
intermediate_certificate_bytes,
key_size=1024,
passphrase=None,
not_valid_before=_NOT_VALID_BEFORE,
valid_duration=_LEAF_CERT_VALID_DURATION,
output_certificate_file=None,
output_private_key_file=None):
"""Sets up args to OEM Certificate generator for generating leaf cert."""
args = ArgParseObject()
args.key_size = key_size
args.not_valid_before = not_valid_before
args.valid_duration = valid_duration
args.intermediate_private_key_passphrase = None
if output_certificate_file:
args.output_certificate_file = output_certificate_file
else:
args.output_certificate_file = StringIO.StringIO()
if output_private_key_file:
args.output_private_key_file = output_private_key_file
else:
args.output_private_key_file = StringIO.StringIO()
args.passphrase = passphrase
args.intermediate_private_key_file = StringIO.StringIO(
intermediate_key_bytes)
args.intermediate_certificate_file = StringIO.StringIO(
intermediate_certificate_bytes)
return args
def create_intermediate_certificate_and_key_bytes(key_size=4096,
passphrase=None):
"""Creates an intermediate certificate and key."""
csr_args = setup_csr_args(key_size=key_size, passphrase=passphrase)
oem_certificate.generate_csr(csr_args)
csr_bytes = csr_args.output_csr_file.getvalue()
root_key, root_certificate = create_root_certificate_and_key()
args = setup_intermediate_cert_args(csr_bytes, root_key, root_certificate)
oem_certificate.generate_intermediate_certificate(args)
return (csr_args.output_private_key_file.getvalue(),
args.output_certificate_file.getvalue())

View File

@@ -0,0 +1,518 @@
# Copyright 2017 Google Inc. All Rights Reserved.
import datetime
import os
import shutil
import StringIO
import tempfile
import textwrap
import unittest
from cryptography import x509
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import serialization
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.x509 import oid
import oem_certificate
import oem_certificate_generator_helper as oem_cert_gen_helper
class ArgParseObject(object):
"""A convenient object to allow adding arbitrary attribute to it."""
pass
class OemCertificateTest(unittest.TestCase):
def test_widevine_system_id(self):
system_id = 1234567890123
self.assertEqual(
oem_certificate.WidevineSystemId.from_int_value(system_id).int_value(),
system_id)
def test_generate_csr(self):
args = oem_cert_gen_helper.setup_csr_args()
oem_certificate.generate_csr(args)
# Verify CSR.
csr = x509.load_pem_x509_csr(args.output_csr_file.getvalue(),
backends.default_backend())
subject = csr.subject
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.COUNTRY_NAME)[0].value,
args.country_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.STATE_OR_PROVINCE_NAME)[0]
.value, args.state_or_province_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.LOCALITY_NAME)[0].value,
args.locality_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.ORGANIZATION_NAME)[0].value,
args.organization_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.ORGANIZATIONAL_UNIT_NAME)[0]
.value, args.organizational_unit_name)
self.assertEqual(
len(subject.get_attributes_for_oid(oid.NameOID.COMMON_NAME)), 0)
private_key = serialization.load_der_private_key(
args.output_private_key_file.getvalue(),
args.passphrase,
backend=backends.default_backend())
self.assertEqual(private_key.key_size, args.key_size)
self.assertEqual(csr.public_key().key_size, args.key_size)
# Verify csr and private key match.
self.assertEqual(csr.public_key().public_numbers(),
private_key.public_key().public_numbers())
def test_generate_csr_(self):
args = oem_cert_gen_helper.setup_csr_args(common_name='MyCommonName')
oem_certificate.generate_csr(args)
# Verify CSR.
csr = x509.load_pem_x509_csr(args.output_csr_file.getvalue(),
backends.default_backend())
subject = csr.subject
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.COUNTRY_NAME)[0].value,
args.country_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.STATE_OR_PROVINCE_NAME)[0]
.value, args.state_or_province_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.LOCALITY_NAME)[0].value,
args.locality_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.ORGANIZATION_NAME)[0].value,
args.organization_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.ORGANIZATIONAL_UNIT_NAME)[0]
.value, args.organizational_unit_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.COMMON_NAME)[0]
.value, args.common_name)
private_key = serialization.load_der_private_key(
args.output_private_key_file.getvalue(),
args.passphrase,
backend=backends.default_backend())
self.assertEqual(private_key.key_size, args.key_size)
self.assertEqual(csr.public_key().key_size, args.key_size)
# Verify csr and private key match.
self.assertEqual(csr.public_key().public_numbers(),
private_key.public_key().public_numbers())
def test_generate_csr_with_keysize4096_and_passphrase(self):
args = oem_cert_gen_helper.setup_csr_args(
key_size=4096, passphrase='passphrase_4096')
oem_certificate.generate_csr(args)
private_key = serialization.load_der_private_key(
args.output_private_key_file.getvalue(),
'passphrase_4096',
backend=backends.default_backend())
csr = x509.load_pem_x509_csr(args.output_csr_file.getvalue(),
backends.default_backend())
self.assertEqual(private_key.key_size, 4096)
self.assertEqual(csr.public_key().key_size, 4096)
# Verify csr and private key match.
self.assertEqual(csr.public_key().public_numbers(),
private_key.public_key().public_numbers())
def test_generate_intermediate_certificate(self):
csr_args = oem_cert_gen_helper.setup_csr_args()
oem_certificate.generate_csr(csr_args)
csr_bytes = csr_args.output_csr_file.getvalue()
csr = x509.load_pem_x509_csr(csr_bytes, backends.default_backend())
root_key, root_certificate = (
oem_cert_gen_helper.create_root_certificate_and_key())
args = oem_cert_gen_helper.setup_intermediate_cert_args(
csr_bytes, root_key, root_certificate)
oem_certificate.generate_intermediate_certificate(args)
cert = x509.load_der_x509_certificate(
args.output_certificate_file.getvalue(), backends.default_backend())
self.assertEqual(cert.issuer, root_certificate.subject)
self.assertEqual(cert.subject, csr.subject)
system_id_raw_bytes = cert.extensions.get_extension_for_oid(
oem_certificate.WidevineSystemId.oid).value.value
self.assertEqual(
oem_certificate.WidevineSystemId(system_id_raw_bytes).int_value(),
args.system_id)
self.assertEqual(cert.not_valid_before, datetime.datetime(2001, 8, 9))
self.assertEqual(cert.not_valid_after, datetime.datetime(2001, 11, 17))
root_key.public_key().verify(cert.signature, cert.tbs_certificate_bytes,
padding.PKCS1v15(),
cert.signature_hash_algorithm)
def test_generate_intermediate_with_cert_mismatch_root_cert_and_key(self):
root_key1, _ = (
oem_cert_gen_helper.create_root_certificate_and_key())
_, root_certificate2 = oem_cert_gen_helper.create_root_certificate_and_key()
args = oem_cert_gen_helper.setup_intermediate_cert_args(
'some csr data', root_key1, root_certificate2)
with self.assertRaises(ValueError) as context:
oem_certificate.generate_intermediate_certificate(args)
self.assertTrue('certificate does not match' in str(context.exception))
def test_generate_leaf_certificate(self):
intermediate_key_bytes, intermediate_certificate_bytes = (
oem_cert_gen_helper.create_intermediate_certificate_and_key_bytes())
args = oem_cert_gen_helper.setup_leaf_cert_args(
intermediate_key_bytes, intermediate_certificate_bytes)
oem_certificate.generate_leaf_certificate(args)
certificate_chain = oem_certificate.X509CertificateChain.load_der(
args.output_certificate_file.getvalue())
certificates = list(certificate_chain)
self.assertEqual(len(certificates), 2)
intermediate_cert = certificates[1]
leaf_cert = certificates[0]
self.assertEqual(
intermediate_cert.public_bytes(serialization.Encoding.DER),
intermediate_certificate_bytes)
intermediate_cert.public_key().verify(leaf_cert.signature,
leaf_cert.tbs_certificate_bytes,
padding.PKCS1v15(),
leaf_cert.signature_hash_algorithm)
self.assertEqual(leaf_cert.not_valid_before, datetime.datetime(2001, 8, 9))
self.assertEqual(leaf_cert.not_valid_after, datetime.datetime(2023, 7, 5))
system_id_raw_bytes = leaf_cert.extensions.get_extension_for_oid(
oem_certificate.WidevineSystemId.oid).value.value
self.assertEqual(
oem_certificate.WidevineSystemId(system_id_raw_bytes).int_value(),
2001)
leaf_key = serialization.load_der_private_key(
args.output_private_key_file.getvalue(),
args.passphrase,
backend=backends.default_backend())
self.assertEqual(leaf_key.key_size, args.key_size)
self.assertEqual(leaf_cert.public_key().key_size, args.key_size)
# Verify cert and private key match.
self.assertEqual(leaf_cert.public_key().public_numbers(),
leaf_key.public_key().public_numbers())
def test_generate_leaf_certificate_with_keysize4096_and_passphrase(self):
intermediate_key_bytes, intermediate_certificate_bytes = (
oem_cert_gen_helper.create_intermediate_certificate_and_key_bytes())
args = oem_cert_gen_helper.setup_leaf_cert_args(
intermediate_key_bytes,
intermediate_certificate_bytes,
key_size=4096,
passphrase='leaf passphrase')
oem_certificate.generate_leaf_certificate(args)
serialization.load_der_private_key(
args.output_private_key_file.getvalue(),
'leaf passphrase',
backend=backends.default_backend())
self.assertEqual(4096, args.key_size)
def test_get_csr_info(self):
args = oem_cert_gen_helper.setup_csr_args()
oem_certificate.generate_csr(args)
args.file = StringIO.StringIO(args.output_csr_file.getvalue())
output = StringIO.StringIO()
oem_certificate.get_info(args, output)
expected_info = """\
CSR Subject Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.6, name=countryName)>, value=u'US')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.8, name=stateOrProvinceName)>, value=u'WA')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.7, name=localityName)>, value=u'Kirkland')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.10, name=organizationName)>, value=u'CompanyXYZ')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.11, name=organizationalUnitName)>, value=u'ContentProtection')>
Key Size: 4096"""
self.assertEqual(output.getvalue(), textwrap.dedent(expected_info))
def test_get_certificate_info(self):
_, intermediate_certificate_bytes = (
oem_cert_gen_helper.create_intermediate_certificate_and_key_bytes())
args = ArgParseObject()
args.file = StringIO.StringIO(intermediate_certificate_bytes)
output = StringIO.StringIO()
oem_certificate.get_info(args, output)
expected_info = """\
Certificate Subject Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.6, name=countryName)>, value=u'US')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.8, name=stateOrProvinceName)>, value=u'WA')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.7, name=localityName)>, value=u'Kirkland')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.10, name=organizationName)>, value=u'CompanyXYZ')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.11, name=organizationalUnitName)>, value=u'ContentProtection')>
Issuer Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.3, name=commonName)>, value=u'root_cert')>
Key Size: 4096
Widevine System Id: 2001
Not valid before: 2001-08-09 00:00:00
Not valid after: 2001-11-17 00:00:00"""
self.assertEqual(output.getvalue(), textwrap.dedent(expected_info))
def test_get_certificate_chain_info(self):
intermediate_key_bytes, intermediate_certificate_bytes = (
oem_cert_gen_helper.create_intermediate_certificate_and_key_bytes())
args = oem_cert_gen_helper.setup_leaf_cert_args(
intermediate_key_bytes, intermediate_certificate_bytes)
oem_certificate.generate_leaf_certificate(args)
args.file = StringIO.StringIO(args.output_certificate_file.getvalue())
output = StringIO.StringIO()
oem_certificate.get_info(args, output)
expected_info = """\
Certificate Subject Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.3, name=commonName)>, value=u'2001-leaf')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.6, name=countryName)>, value=u'US')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.8, name=stateOrProvinceName)>, value=u'WA')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.7, name=localityName)>, value=u'Kirkland')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.10, name=organizationName)>, value=u'CompanyXYZ')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.11, name=organizationalUnitName)>, value=u'ContentProtection')>
Issuer Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.6, name=countryName)>, value=u'US')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.8, name=stateOrProvinceName)>, value=u'WA')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.7, name=localityName)>, value=u'Kirkland')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.10, name=organizationName)>, value=u'CompanyXYZ')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.11, name=organizationalUnitName)>, value=u'ContentProtection')>
Key Size: 1024
Widevine System Id: 2001
Not valid before: 2001-08-09 00:00:00
Not valid after: 2023-07-05 00:00:00
Certificate Subject Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.6, name=countryName)>, value=u'US')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.8, name=stateOrProvinceName)>, value=u'WA')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.7, name=localityName)>, value=u'Kirkland')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.10, name=organizationName)>, value=u'CompanyXYZ')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.11, name=organizationalUnitName)>, value=u'ContentProtection')>
Issuer Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.3, name=commonName)>, value=u'root_cert')>
Key Size: 4096
Widevine System Id: 2001
Not valid before: 2001-08-09 00:00:00
Not valid after: 2001-11-17 00:00:00"""
self.assertEqual(output.getvalue(), textwrap.dedent(expected_info))
def test_secure_erase(self):
args = ArgParseObject()
args.file = tempfile.NamedTemporaryFile(delete=False)
args.passes = 2
self.assertTrue(os.path.exists(args.file.name))
oem_certificate.secure_erase(args)
self.assertFalse(os.path.exists(args.file.name))
class OemCertificateArgParseTest(unittest.TestCase):
def setUp(self):
self.parser = oem_certificate.create_parser()
self.test_dir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.test_dir)
def test_generate_csr(self):
cmds = ('generate_csr --key_size 4096 -C USA -ST WA '
'-L Kirkland -O Company -OU Widevine').split()
output_private_key_file = os.path.join(self.test_dir, 'private_key')
output_csr_file = os.path.join(self.test_dir, 'csr')
cmds.extend([
'--output_csr_file', output_csr_file, '--output_private_key_file',
output_private_key_file, '--passphrase', 'pass'
])
args = self.parser.parse_args(cmds)
self.assertEqual(args.key_size, 4096)
self.assertEqual(args.country_name, 'USA')
self.assertEqual(args.state_or_province_name, 'WA')
self.assertEqual(args.locality_name, 'Kirkland')
self.assertEqual(args.organization_name, 'Company')
self.assertEqual(args.organizational_unit_name, 'Widevine')
self.assertEqual(args.output_csr_file.name, output_csr_file)
self.assertEqual(args.output_csr_file.mode, 'wb')
self.assertEqual(args.output_private_key_file.name, output_private_key_file)
self.assertEqual(args.output_private_key_file.mode, 'wb')
self.assertEqual(args.passphrase, 'pass')
self.assertEqual(args.func, oem_certificate.generate_csr)
self.assertIsNone(args.common_name)
def test_generate_csr_with_cn(self):
cmds = ('generate_csr --key_size 4096 -C USA -ST WA '
'-L Kirkland -O Company -OU Widevine -CN MyCommonName').split()
output_private_key_file = os.path.join(self.test_dir, 'private_key')
output_csr_file = os.path.join(self.test_dir, 'csr')
cmds.extend([
'--output_csr_file', output_csr_file, '--output_private_key_file',
output_private_key_file, '--passphrase', 'pass'
])
args = self.parser.parse_args(cmds)
self.assertEqual(args.key_size, 4096)
self.assertEqual(args.country_name, 'USA')
self.assertEqual(args.state_or_province_name, 'WA')
self.assertEqual(args.locality_name, 'Kirkland')
self.assertEqual(args.organization_name, 'Company')
self.assertEqual(args.organizational_unit_name, 'Widevine')
self.assertEqual(args.output_csr_file.name, output_csr_file)
self.assertEqual(args.output_csr_file.mode, 'wb')
self.assertEqual(args.output_private_key_file.name, output_private_key_file)
self.assertEqual(args.output_private_key_file.mode, 'wb')
self.assertEqual(args.passphrase, 'pass')
self.assertEqual(args.common_name, 'MyCommonName')
self.assertEqual(args.func, oem_certificate.generate_csr)
def _fill_file_with_dummy_contents(self, file_name):
with open(file_name, 'wb') as f:
f.write('dummy')
def test_generate_csr_invalid_key_size(self):
cmds = ('generate_csr --key_size unknown -C USA -ST WA '
'-L Kirkland -O Company -OU Widevine').split()
output_private_key_file = os.path.join(self.test_dir, 'private_key')
output_csr_file = os.path.join(self.test_dir, 'csr')
cmds.extend([
'--output_csr_file', output_csr_file, '--output_private_key_file',
output_private_key_file, '--passphrase', 'pass'
])
with self.assertRaises(SystemExit) as context:
self.parser.parse_args(cmds)
self.assertEqual(context.exception.code, 2)
def test_generate_intermediate_cert(self):
cmds = (
'generate_intermediate_certificate --valid_duration 10 --system_id 100'
).split()
csr_file = os.path.join(self.test_dir, 'csr')
self._fill_file_with_dummy_contents(csr_file)
root_certificate_file = os.path.join(self.test_dir, 'root_cert')
self._fill_file_with_dummy_contents(root_certificate_file)
root_private_key_file = os.path.join(self.test_dir, 'root_private_key')
self._fill_file_with_dummy_contents(root_private_key_file)
output_certificate_file = os.path.join(self.test_dir, 'cert')
cmds.extend([
'--csr_file', csr_file, '--root_certificate_file',
root_certificate_file, '--root_private_key_file', root_private_key_file,
'--root_private_key_passphrase', 'root_key',
'--output_certificate_file', output_certificate_file
])
args = self.parser.parse_args(cmds)
self.assertAlmostEqual(
args.not_valid_before,
datetime.datetime.today(),
delta=datetime.timedelta(seconds=60))
self.assertEqual(args.valid_duration, 10)
self.assertEqual(args.system_id, 100)
self.assertEqual(args.csr_file.name, csr_file)
self.assertEqual(args.csr_file.mode, 'rb')
self.assertEqual(args.root_certificate_file.name, root_certificate_file)
self.assertEqual(args.root_certificate_file.mode, 'rb')
self.assertEqual(args.root_private_key_file.name, root_private_key_file)
self.assertEqual(args.root_private_key_file.mode, 'rb')
self.assertEqual(args.root_private_key_passphrase, 'root_key')
self.assertEqual(args.output_certificate_file.name, output_certificate_file)
self.assertEqual(args.output_certificate_file.mode, 'wb')
self.assertEqual(args.func,
oem_certificate.generate_intermediate_certificate)
def test_generate_leaf_cert(self):
cmds = ('generate_leaf_certificate --not_valid_before 2016-01-02 '
'--valid_duration 10').split()
intermediate_certificate_file = os.path.join(self.test_dir,
'intermediate_cert')
self._fill_file_with_dummy_contents(intermediate_certificate_file)
intermediate_private_key_file = os.path.join(self.test_dir,
'intermediate_private_key')
self._fill_file_with_dummy_contents(intermediate_private_key_file)
output_certificate_file = os.path.join(self.test_dir, 'cert')
output_private_key_file = os.path.join(self.test_dir, 'key')
cmds.extend([
'--intermediate_certificate_file', intermediate_certificate_file,
'--intermediate_private_key_file', intermediate_private_key_file,
'--intermediate_private_key_passphrase', 'intermediate_key',
'--output_certificate_file', output_certificate_file,
'--output_private_key_file', output_private_key_file, '--passphrase',
'leaf_key'
])
args = self.parser.parse_args(cmds)
self.assertEqual(args.not_valid_before, datetime.datetime(2016, 1, 2))
self.assertEqual(args.valid_duration, 10)
self.assertEqual(args.intermediate_certificate_file.name,
intermediate_certificate_file)
self.assertEqual(args.intermediate_certificate_file.mode, 'rb')
self.assertEqual(args.intermediate_private_key_file.name,
intermediate_private_key_file)
self.assertEqual(args.intermediate_private_key_file.mode, 'rb')
self.assertEqual(args.intermediate_private_key_passphrase,
'intermediate_key')
self.assertEqual(args.output_certificate_file.name, output_certificate_file)
self.assertEqual(args.output_certificate_file.mode, 'wb')
self.assertEqual(args.output_private_key_file.name, output_private_key_file)
self.assertEqual(args.output_private_key_file.mode, 'wb')
self.assertEqual(args.passphrase, 'leaf_key')
self.assertEqual(args.func, oem_certificate.generate_leaf_certificate)
def test_generate_leaf_cert_invalid_date(self):
cmds = ('generate_leaf_certificate --not_valid_before invaid-date '
'--valid_duration 10').split()
intermediate_certificate_file = os.path.join(self.test_dir,
'intermediate_cert')
self._fill_file_with_dummy_contents(intermediate_certificate_file)
intermediate_private_key_file = os.path.join(self.test_dir,
'intermediate_private_key')
self._fill_file_with_dummy_contents(intermediate_private_key_file)
output_certificate_file = os.path.join(self.test_dir, 'cert')
output_private_key_file = os.path.join(self.test_dir, 'key')
cmds.extend([
'--intermediate_certificate_file', intermediate_certificate_file,
'--intermediate_private_key_file', intermediate_private_key_file,
'--intermediate_private_key_passphrase', 'intermediate_key',
'--output_certificate_file', output_certificate_file,
'--output_private_key_file', output_private_key_file, '--passphrase',
'leaf_key'
])
with self.assertRaises(SystemExit) as context:
self.parser.parse_args(cmds)
self.assertEqual(context.exception.code, 2)
def test_secure_erase(self):
file_path = os.path.join(self.test_dir, 'file')
self._fill_file_with_dummy_contents(file_path)
cmds = ['erase', '-F', file_path, '--passes', '2']
args = self.parser.parse_args(cmds)
self.assertEqual(args.passes, 2)
self.assertEqual(args.file.name, file_path)
self.assertEqual(args.file.mode, 'a')
self.assertEqual(args.func, oem_certificate.secure_erase)
def test_get_info(self):
file_path = os.path.join(self.test_dir, 'file')
self._fill_file_with_dummy_contents(file_path)
cmds = ['info', '-F', file_path]
args = self.parser.parse_args(cmds)
self.assertEqual(args.file.name, file_path)
self.assertEqual(args.file.mode, 'rb')
self.assertEqual(args.func, oem_certificate.get_info)
def test_arbitrary_commands(self):
with self.assertRaises(SystemExit) as context:
self.parser.parse_args(['unsupport', '--commands'])
self.assertEqual(context.exception.code, 2)
def test_no_argument(self):
with self.assertRaises(SystemExit) as context:
self.parser.parse_args([])
self.assertEqual(context.exception.code, 2)
if __name__ == '__main__':
unittest.main()