Refactor and cleanup codes. No functional changes.

This commit is contained in:
KongQun Yang
2019-01-23 15:16:31 -08:00
parent 84f66d2320
commit 93265ab9d1
207 changed files with 14893 additions and 3332 deletions

View File

@@ -1,11 +1,12 @@
################################################################################
# Copyright 2016 Google Inc.
# Copyright 2016 Google LLC.
#
# This software is licensed under the terms defined in the Widevine Master
# License Agreement. For a copy of this agreement, please contact
# widevine-licensing@google.com.
################################################################################
import base64
import datetime
import os
import shutil
@@ -21,7 +22,7 @@ from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.x509 import oid
import oem_certificate
import oem_certificate_generator_helper as oem_cert_gen_helper
import oem_certificate_test_helper as oem_cert_test_helper
class ArgParseObject(object):
@@ -38,7 +39,7 @@ class OemCertificateTest(unittest.TestCase):
system_id)
def test_generate_csr(self):
args = oem_cert_gen_helper.setup_csr_rgs()
args = oem_cert_test_helper.setup_csr_args()
oem_certificate.generate_csr(args)
# Verify CSR.
csr = x509.load_pem_x509_csr(args.output_csr_file.getvalue(),
@@ -59,6 +60,44 @@ class OemCertificateTest(unittest.TestCase):
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.ORGANIZATIONAL_UNIT_NAME)[0]
.value, args.organizational_unit_name)
self.assertEqual(
len(subject.get_attributes_for_oid(oid.NameOID.COMMON_NAME)), 0)
private_key = serialization.load_der_private_key(
args.output_private_key_file.getvalue(),
args.passphrase,
backend=backends.default_backend())
self.assertEqual(private_key.key_size, args.key_size)
self.assertEqual(csr.public_key().key_size, args.key_size)
# Verify csr and private key match.
self.assertEqual(csr.public_key().public_numbers(),
private_key.public_key().public_numbers())
def test_generate_csr_(self):
args = oem_cert_test_helper.setup_csr_args(common_name='MyCommonName')
oem_certificate.generate_csr(args)
# Verify CSR.
csr = x509.load_pem_x509_csr(args.output_csr_file.getvalue(),
backends.default_backend())
subject = csr.subject
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.COUNTRY_NAME)[0].value,
args.country_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.STATE_OR_PROVINCE_NAME)[0]
.value, args.state_or_province_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.LOCALITY_NAME)[0].value,
args.locality_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.ORGANIZATION_NAME)[0].value,
args.organization_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.ORGANIZATIONAL_UNIT_NAME)[0]
.value, args.organizational_unit_name)
self.assertEqual(
subject.get_attributes_for_oid(oid.NameOID.COMMON_NAME)[0]
.value, args.common_name)
private_key = serialization.load_der_private_key(
args.output_private_key_file.getvalue(),
@@ -71,7 +110,7 @@ class OemCertificateTest(unittest.TestCase):
private_key.public_key().public_numbers())
def test_generate_csr_with_keysize4096_and_passphrase(self):
args = oem_cert_gen_helper.setup_csr_rgs(
args = oem_cert_test_helper.setup_csr_args(
key_size=4096, passphrase='passphrase_4096')
oem_certificate.generate_csr(args)
private_key = serialization.load_der_private_key(
@@ -87,14 +126,14 @@ class OemCertificateTest(unittest.TestCase):
private_key.public_key().public_numbers())
def test_generate_intermediate_certificate(self):
csr_args = oem_cert_gen_helper.setup_csr_rgs()
csr_args = oem_cert_test_helper.setup_csr_args()
oem_certificate.generate_csr(csr_args)
csr_bytes = csr_args.output_csr_file.getvalue()
csr = x509.load_pem_x509_csr(csr_bytes, backends.default_backend())
root_key, root_certificate = (
oem_cert_gen_helper.create_root_certificate_and_key())
args = oem_cert_gen_helper.setup_intermediate_cert_args(
oem_cert_test_helper.create_root_certificate_and_key())
args = oem_cert_test_helper.setup_intermediate_cert_args(
csr_bytes, root_key, root_certificate)
oem_certificate.generate_intermediate_certificate(args)
@@ -118,18 +157,20 @@ class OemCertificateTest(unittest.TestCase):
def test_generate_intermediate_with_cert_mismatch_root_cert_and_key(self):
root_key1, _ = (
oem_cert_gen_helper.create_root_certificate_and_key())
_, root_certificate2 = oem_cert_gen_helper.create_root_certificate_and_key()
args = oem_cert_gen_helper.setup_intermediate_cert_args(
oem_cert_test_helper.create_root_certificate_and_key())
_, root_certificate2 = oem_cert_test_helper.create_root_certificate_and_key(
)
args = oem_cert_test_helper.setup_intermediate_cert_args(
'some csr data', root_key1, root_certificate2)
with self.assertRaises(ValueError) as context:
oem_certificate.generate_intermediate_certificate(args)
self.assertTrue('certificate does not match' in str(context.exception))
def test_generate_leaf_certificate(self):
def test_generate_leaf_certificate_from_pem_intermediate_cert(self):
intermediate_key_bytes, intermediate_certificate_bytes = (
oem_cert_gen_helper.create_intermediate_certificate_and_key_bytes())
args = oem_cert_gen_helper.setup_leaf_cert_args(
oem_cert_test_helper.create_intermediate_certificate_and_key_bytes(
pem_format=True))
args = oem_cert_test_helper.setup_leaf_cert_args(
intermediate_key_bytes, intermediate_certificate_bytes)
oem_certificate.generate_leaf_certificate(args)
@@ -141,7 +182,7 @@ class OemCertificateTest(unittest.TestCase):
intermediate_cert = certificates[1]
leaf_cert = certificates[0]
self.assertEqual(
intermediate_cert.public_bytes(serialization.Encoding.DER),
intermediate_cert.public_bytes(serialization.Encoding.PEM),
intermediate_certificate_bytes)
intermediate_cert.public_key().verify(leaf_cert.signature,
leaf_cert.tbs_certificate_bytes,
@@ -167,23 +208,46 @@ class OemCertificateTest(unittest.TestCase):
self.assertEqual(leaf_cert.public_key().public_numbers(),
leaf_key.public_key().public_numbers())
def test_generate_leaf_certificate_from_der_intermediate_cert(self):
intermediate_key_bytes, intermediate_certificate_bytes = (
oem_cert_test_helper.create_intermediate_certificate_and_key_bytes(
pem_format=False))
args = oem_cert_test_helper.setup_leaf_cert_args(
intermediate_key_bytes, intermediate_certificate_bytes)
oem_certificate.generate_leaf_certificate(args)
certificate_chain = oem_certificate.X509CertificateChain.load_der(
args.output_certificate_file.getvalue())
certificates = list(certificate_chain)
self.assertEqual(len(certificates), 2)
intermediate_cert = certificates[1]
leaf_cert = certificates[0]
self.assertEqual(
intermediate_cert.public_bytes(serialization.Encoding.DER),
intermediate_certificate_bytes)
intermediate_cert.public_key().verify(leaf_cert.signature,
leaf_cert.tbs_certificate_bytes,
padding.PKCS1v15(),
leaf_cert.signature_hash_algorithm)
def test_generate_leaf_certificate_with_keysize4096_and_passphrase(self):
intermediate_key_bytes, intermediate_certificate_bytes = (
oem_cert_gen_helper.create_intermediate_certificate_and_key_bytes())
args = oem_cert_gen_helper.setup_leaf_cert_args(
oem_cert_test_helper.create_intermediate_certificate_and_key_bytes())
args = oem_cert_test_helper.setup_leaf_cert_args(
intermediate_key_bytes,
intermediate_certificate_bytes,
key_size=4096,
passphrase='leaf passphrase')
oem_certificate.generate_leaf_certificate(args)
serialization.load_der_private_key(
leaf_key = serialization.load_der_private_key(
args.output_private_key_file.getvalue(),
'leaf passphrase',
backend=backends.default_backend())
self.assertEqual(4096, args.key_size)
self.assertEqual(4096, leaf_key.key_size)
def test_get_csr_info(self):
args = oem_cert_gen_helper.setup_csr_rgs()
args = oem_cert_test_helper.setup_csr_args()
oem_certificate.generate_csr(args)
args.file = StringIO.StringIO(args.output_csr_file.getvalue())
output = StringIO.StringIO()
@@ -198,9 +262,33 @@ class OemCertificateTest(unittest.TestCase):
Key Size: 4096"""
self.assertEqual(output.getvalue(), textwrap.dedent(expected_info))
def test_get_certificate_info(self):
def test_get_pem_certificate_info(self):
_, intermediate_certificate_bytes = (
oem_cert_gen_helper.create_intermediate_certificate_and_key_bytes())
oem_cert_test_helper.create_intermediate_certificate_and_key_bytes(
pem_format=True))
args = ArgParseObject()
args.file = StringIO.StringIO(intermediate_certificate_bytes)
output = StringIO.StringIO()
oem_certificate.get_info(args, output)
expected_info = """\
Certificate Subject Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.6, name=countryName)>, value=u'US')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.8, name=stateOrProvinceName)>, value=u'WA')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.7, name=localityName)>, value=u'Kirkland')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.10, name=organizationName)>, value=u'CompanyXYZ')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.11, name=organizationalUnitName)>, value=u'ContentProtection')>
Issuer Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.3, name=commonName)>, value=u'root_cert')>
Key Size: 4096
Widevine System Id: 2001
Not valid before: 2001-08-09 00:00:00
Not valid after: 2001-11-17 00:00:00"""
self.assertEqual(output.getvalue(), textwrap.dedent(expected_info))
def test_get_der_certificate_info(self):
_, intermediate_certificate_bytes = (
oem_cert_test_helper.create_intermediate_certificate_and_key_bytes(
pem_format=False))
args = ArgParseObject()
args.file = StringIO.StringIO(intermediate_certificate_bytes)
output = StringIO.StringIO()
@@ -222,8 +310,8 @@ class OemCertificateTest(unittest.TestCase):
def test_get_certificate_chain_info(self):
intermediate_key_bytes, intermediate_certificate_bytes = (
oem_cert_gen_helper.create_intermediate_certificate_and_key_bytes())
args = oem_cert_gen_helper.setup_leaf_cert_args(
oem_cert_test_helper.create_intermediate_certificate_and_key_bytes())
args = oem_cert_test_helper.setup_leaf_cert_args(
intermediate_key_bytes, intermediate_certificate_bytes)
oem_certificate.generate_leaf_certificate(args)
args.file = StringIO.StringIO(args.output_certificate_file.getvalue())
@@ -262,6 +350,92 @@ class OemCertificateTest(unittest.TestCase):
Not valid after: 2001-11-17 00:00:00"""
self.assertEqual(output.getvalue(), textwrap.dedent(expected_info))
def test_get_certificate_chain_info_fixed_input(self):
# This was generated from args.output_certificate_file in the test above.
data_b64 = (
'MIIJCQYJKoZIhvcNAQcCoIII+jCCCPYCAQExADAPBgkqhkiG9w0BBwGgAgQAoIII2jCC'
'A+wwggHUoAMCAQICEF8YrBKJsoFPrBNHO2LSenUwDQYJKoZIhvcNAQELBQAwXjELMAkG'
'A1UEBhMCVVMxCzAJBgNVBAgMAldBMREwDwYDVQQHDAhLaXJrbGFuZDETMBEGA1UECgwK'
'Q29tcGFueVhZWjEaMBgGA1UECwwRQ29udGVudFByb3RlY3Rpb24wHhcNMDEwODA5MDAw'
'MDAwWhcNMjMwNzA1MDAwMDAwWjByMRIwEAYDVQQDDAkyMDAxLWxlYWYxCzAJBgNVBAYT'
'AlVTMQswCQYDVQQIDAJXQTERMA8GA1UEBwwIS2lya2xhbmQxEzARBgNVBAoMCkNvbXBh'
'bnlYWVoxGjAYBgNVBAsMEUNvbnRlbnRQcm90ZWN0aW9uMIGfMA0GCSqGSIb3DQEBAQUA'
'A4GNADCBiQKBgQCvY7KZbrMNw/ltcDqTlB+Bu3E5Cbv/JV5Adhnwuk9OiPPJhjx+fx4r'
'Jo05hM1HImHZSB7NtSjUP2Z9tbL8Fa3DgtI6nAJdQZRGrMxfY3EKe2FoQFbbJFMMXqw9'
'yNWLjQzBBB7AkQekdXuJHsiZYAoARa2sSbYYLgQhaLLLj5VeVwIDAQABoxYwFDASBgor'
'BgEEAdZ5BAEBBAQCAgfRMA0GCSqGSIb3DQEBCwUAA4ICAQCO0eQY2brYOiRQLKsoCHhI'
'4/Mi3FOM+rfbqQzM+vesrwahDPLE389igMcNkYTX7QFwdeYMnoqtAeyiPGL42ussqY0h'
'xTdEOAdXJ2cz99ce8d9EnLWTWU4k1Bk/DAZRbIEPmEi2yigr/0pL1oU6J+uGWx3vf3Eh'
'2vVwDtU4ptSwOR9pcT3UgIfVnxVB49i94PqeQQv4JAQ3jezEzt91NkvGMAHTR602hWUU'
'nVlIfnzu9KZWVfr4iyh5vCMcT9YIuBzY5EaoCAcfx8hO7xXLRKfQ9MKfzLjbmoCvOp1o'
'kSLATonsY44JO1e6Uf+RfhtPslk7PoyTlkELBwyRJ+wIwJK9VYxvn5Wm72fjXgReNdDR'
'xL1hB9U6ccQKwdof+C1TVAANCYejPjGQNN9PzgpxFPMbmmphQWqU/K0c6NzP6WoGJFoS'
'HJBa2Mlvi41g1GsGHaJCqpI9bXOxuHhQr5jzEh650S1VuhGfzsO1ycfxIxXWqj1SNm/w'
'mIA504LbQ4o400Ym/QXL4pkeI3XNrIDCYm6Zp8lHuOeqsC7JUulg9O4X9BaQfUHWzbxP'
'UOSzkPaSMMZ6XP4f8ziUbi0OoDU9e2EnHqyc/csXqZJFdKqyySBYwMqDJi2U9nIc4fmT'
'rLKRLG3YcPspgo4fuNtiQMPVwYHypr0siAAuoHEo68Fle9lyijCCBOYwggNOoAMCAQIC'
'EF77T7iz1YiiXO156wo7yZYwDQYJKoZIhvcNAQELBQAwFDESMBAGA1UEAwwJcm9vdF9j'
'ZXJ0MB4XDTAxMDgwOTAwMDAwMFoXDTAxMTExNzAwMDAwMFowXjELMAkGA1UEBhMCVVMx'
'CzAJBgNVBAgMAldBMREwDwYDVQQHDAhLaXJrbGFuZDETMBEGA1UECgwKQ29tcGFueVhZ'
'WjEaMBgGA1UECwwRQ29udGVudFByb3RlY3Rpb24wggIiMA0GCSqGSIb3DQEBAQUAA4IC'
'DwAwggIKAoICAQC4rLWKHrw+TLvwZW3iZiDGbqVzXLTy7d78PmuAUfJEWW23hZUhG6vN'
'YzFpaoYgzDjaQ4O5wivdKKsL+oN0/ZY4cK5N8fcPhpY/iHJTC8/AdbC1qEnrBWsE2ptx'
'M1DKeA2a0Kk5sYQn55WyJaTuGjDf2C0AxveQuua7L4rzC+JadP5HczJngQlsqikXexxO'
'hHenvdwvovnPks93sdAz8OFopIpT/pXag8bPL224RpVoN5LGPDT9yZ8fCm2w5w2USQA6'
'Ir4HnCHpgekcY/lPU3CXhMecjXcBZ1k3fOm9j8U4hq9WM6lSSOUYdVpsZ+ZuIhDdhDuP'
'sxnjo5KSIU9gvDp4m8fgKuxfskKXv7CxkWVRM2AuX80eOBiIYK7UZOGasGmR9QeYQOHu'
'Nrak+JuhK1iQEcbOsX6TEYhLX5ihGxNo03V5XoURjiSo3y7g6NQ1stBiiAqIV7f9Iu9t'
'oTDvwsl7pBUKxO8FeW8W68Cp5M+RdUT853X/9DUWlpJvIecS7oe6/MVfyEKPfGUj1cOr'
'WFHTOKzNtbM+d4YEKjpIrD47RwtffJeZGfKfeWcvHq3gL3hR3O9VUVTl/m3rGz8/JYVW'
'9uJcAoR/ZoCzS71/fclzVOmZu/OHpsIYAicdkhUJLjMZgtdjfOR9VCFe9Aop7+yrrjiA'
'9iQZ38FHf33EZRUrUwIDAQABo2owaDAdBgNVHQ4EFgQUxyronsrwZ7qDmZgUz/xqFMnE'
'riswHwYDVR0jBBgwFoAUV1kgQN4+tYWvzhkBQvOuPtOlo0gwEgYDVR0TAQH/BAgwBgEB'
'/wIBADASBgorBgEEAdZ5BAEBBAQCAgfRMA0GCSqGSIb3DQEBCwUAA4IBgQAhXwS6/bTY'
'9ViWOfWGPYiGqpdvJ7B8ta/rdD3OwTgmTMOHTNNUt2YzsUYTTeW+yS5FKc4EC/51FRGT'
'sE658qNi/V5B87o30aA6z2C8YtJJgBBw1T2uHBJVTul9YyXprJTuBO0nHjx+gbGSoiDr'
'WG9SPG80ZwqTnG0EiHJeCiXfRfAWyYMqjMy0lJnQNNTKPeOh/U1iqMKQkGi0v5dHczXQ'
'bAtcnTowJeNn5zmhbAZTsVRds0Rp3QhlTshZRkYIjs38bPaKv87NG1wyhQvXwIwiLj0b'
'mWhkLpp2+Ug2DZhEOuGIYRdfcgR0bUh54FBHM+PttUc49OaNOMTZNFi2KZmYO30vT256'
'OSGis06hC6pggIzGA7tKP4ATSOBA1fe27ef0YD6pD2dAdQnhaXguDo3/eHbpUXXpqLr6'
'nm0mTbNTgcC673L5YA8qpQkAzk9vLg4UaslMbPfeKM8rqduJFcjTyVY3C4jBC0qxf6z6'
'vpWbEO7UpHHdfvWe9DEBODFbyXMxAA==')
args = oem_cert_test_helper.ArgParseObject()
args.file = StringIO.StringIO(base64.b64decode(data_b64))
output = StringIO.StringIO()
oem_certificate.get_info(args, output)
expected_info = """\
Certificate Subject Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.3, name=commonName)>, value=u'2001-leaf')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.6, name=countryName)>, value=u'US')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.8, name=stateOrProvinceName)>, value=u'WA')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.7, name=localityName)>, value=u'Kirkland')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.10, name=organizationName)>, value=u'CompanyXYZ')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.11, name=organizationalUnitName)>, value=u'ContentProtection')>
Issuer Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.6, name=countryName)>, value=u'US')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.8, name=stateOrProvinceName)>, value=u'WA')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.7, name=localityName)>, value=u'Kirkland')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.10, name=organizationName)>, value=u'CompanyXYZ')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.11, name=organizationalUnitName)>, value=u'ContentProtection')>
Key Size: 1024
Widevine System Id: 2001
Not valid before: 2001-08-09 00:00:00
Not valid after: 2023-07-05 00:00:00
Certificate Subject Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.6, name=countryName)>, value=u'US')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.8, name=stateOrProvinceName)>, value=u'WA')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.7, name=localityName)>, value=u'Kirkland')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.10, name=organizationName)>, value=u'CompanyXYZ')>
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.11, name=organizationalUnitName)>, value=u'ContentProtection')>
Issuer Name:
<NameAttribute(oid=<ObjectIdentifier(oid=2.5.4.3, name=commonName)>, value=u'root_cert')>
Key Size: 4096
Widevine System Id: 2001
Not valid before: 2001-08-09 00:00:00
Not valid after: 2001-11-17 00:00:00"""
self.assertEqual(output.getvalue(), textwrap.dedent(expected_info))
def test_secure_erase(self):
args = ArgParseObject()
args.file = tempfile.NamedTemporaryFile(delete=False)
@@ -303,6 +477,32 @@ class OemCertificateArgParseTest(unittest.TestCase):
self.assertEqual(args.output_private_key_file.mode, 'wb')
self.assertEqual(args.passphrase, 'pass')
self.assertEqual(args.func, oem_certificate.generate_csr)
self.assertIsNone(args.common_name)
def test_generate_csr_with_cn(self):
cmds = ('generate_csr --key_size 4096 -C USA -ST WA '
'-L Kirkland -O Company -OU Widevine -CN MyCommonName').split()
output_private_key_file = os.path.join(self.test_dir, 'private_key')
output_csr_file = os.path.join(self.test_dir, 'csr')
cmds.extend([
'--output_csr_file', output_csr_file, '--output_private_key_file',
output_private_key_file, '--passphrase', 'pass'
])
args = self.parser.parse_args(cmds)
self.assertEqual(args.key_size, 4096)
self.assertEqual(args.country_name, 'USA')
self.assertEqual(args.state_or_province_name, 'WA')
self.assertEqual(args.locality_name, 'Kirkland')
self.assertEqual(args.organization_name, 'Company')
self.assertEqual(args.organizational_unit_name, 'Widevine')
self.assertEqual(args.output_csr_file.name, output_csr_file)
self.assertEqual(args.output_csr_file.mode, 'wb')
self.assertEqual(args.output_private_key_file.name, output_private_key_file)
self.assertEqual(args.output_private_key_file.mode, 'wb')
self.assertEqual(args.passphrase, 'pass')
self.assertEqual(args.common_name, 'MyCommonName')
self.assertEqual(args.func, oem_certificate.generate_csr)
def _fill_file_with_dummy_contents(self, file_name):
with open(file_name, 'wb') as f: