diff --git a/example/example_data/user.private b/example/example_data/user.private index 5eb66de..eaf856f 100644 Binary files a/example/example_data/user.private and b/example/example_data/user.private differ diff --git a/example/provisioning_message_generator b/example/provisioning_message_generator index 0cd22eb..c80267d 100755 Binary files a/example/provisioning_message_generator and b/example/provisioning_message_generator differ diff --git a/libprovisioning_sdk.so b/libprovisioning_sdk.so index 3450896..a12ce21 100755 Binary files a/libprovisioning_sdk.so and b/libprovisioning_sdk.so differ diff --git a/protos/public/certificate_provisioning.proto b/protos/public/certificate_provisioning.proto index 3aacf45..06e7b94 100644 --- a/protos/public/certificate_provisioning.proto +++ b/protos/public/certificate_provisioning.proto @@ -85,8 +85,9 @@ message ProvisioningResponse { // The message authentication key. message SignedProvisioningMessage { enum ProtocolVersion { - VERSION_2 = 2; // Keybox factory-provisioned devices. - VERSION_3 = 3; // OEM certificate factory-provisioned devices. + PROVISIONING_20 = 2; // Keybox factory-provisioned devices. + PROVISIONING_30 = 3; // OEM certificate factory-provisioned devices. + INTEL_SIGMA_101 = 101; // Intel Sigma 1.0.1 protocol. } // Serialized ProvisioningRequest or ProvisioningResponse. Required. @@ -94,5 +95,5 @@ message SignedProvisioningMessage { // HMAC-SHA256 (Keybox) or RSASSA-PSS (OEM) signature of message. Required. optional bytes signature = 2; // Version number of provisioning protocol. - optional ProtocolVersion protocol_version = 3 [default = VERSION_2]; + optional ProtocolVersion protocol_version = 3 [default = PROVISIONING_20]; } diff --git a/provisioning_sdk/public/provisioning_engine.h b/provisioning_sdk/public/provisioning_engine.h index c6c88b2..d2f1590 100644 --- a/provisioning_sdk/public/provisioning_engine.h +++ b/provisioning_sdk/public/provisioning_engine.h @@ -29,7 +29,7 @@ class ProvisioningSession; class ProvisioningEngine { public: ProvisioningEngine(); - ~ProvisioningEngine(); + virtual ~ProvisioningEngine(); // Initializes the provisioning engine with required credentials. // * |certificate_type| indicates which type of certificate chains will be @@ -66,7 +66,7 @@ class ProvisioningEngine { // |certificate_status_list| expires after its creation time // (creation_time_seconds). Zero means it will never expire. // * Returns OK on success, or an appropriate error status code otherwise. - ProvisioningStatus SetCertificateStatusList( + virtual ProvisioningStatus SetCertificateStatusList( const std::string& certificate_status_list, uint32_t expiration_period_seconds); @@ -94,7 +94,7 @@ class ProvisioningEngine { // * |cert_private_key_passphrase| is the passphrase for cert_private_key, // if any. // * Returns OK on success, or an appropriate error status code otherwise. - ProvisioningStatus AddDrmIntermediateCertificate( + virtual ProvisioningStatus AddDrmIntermediateCertificate( const std::string& intermediate_cert, const std::string& cert_private_key, const std::string& cert_private_key_passphrase); @@ -113,7 +113,7 @@ class ProvisioningEngine { // message. // NOTE: All ProvisioningSession objects must be deleted before the // ProvisioningEngine which created them. - ProvisioningStatus NewProvisioningSession( + virtual ProvisioningStatus NewProvisioningSession( const std::string& device_public_key, const std::string& device_private_key, std::unique_ptr* new_session) const; diff --git a/provisioning_sdk/public/provisioning_session.h b/provisioning_sdk/public/provisioning_session.h index c83455d..abdce8f 100644 --- a/provisioning_sdk/public/provisioning_session.h +++ b/provisioning_sdk/public/provisioning_session.h @@ -22,7 +22,7 @@ class ProvisioningSessionImpl; // Class which is used to implement the provisioning session state machine. class ProvisioningSession { public: - ~ProvisioningSession(); + virtual ~ProvisioningSession(); // Process a message from the client device. // * |message| is the message received from the client device. @@ -31,13 +31,16 @@ class ProvisioningSession { // * |done| will indicate, upon successful return, whether the provisioning // exchange is complete, and the ProvisioningSession can be deleted. // Returns OK if successful, or an appropriate error status code otherwise. - ProvisioningStatus ProcessMessage(const std::string& message, - std::string* response, - bool* done); + virtual ProvisioningStatus ProcessMessage(const std::string& message, + std::string* response, + bool* done); // * Returns a ProvisioneddeviceInfo message containing information about the // type of device being provisioned. May return nullptr. - const ProvisionedDeviceInfo* GetDeviceInfo() const; + virtual const ProvisionedDeviceInfo* GetDeviceInfo() const; + + protected: + ProvisioningSession(); // To enable mocking. private: #ifndef SWIGPYTHON diff --git a/provisioning_sdk/public/python/base.i b/provisioning_sdk/public/python/base.i index db4d783..9154f77 100644 --- a/provisioning_sdk/public/python/base.i +++ b/provisioning_sdk/public/python/base.i @@ -23,5 +23,5 @@ typedef oldtype newtype; %apply oldtype & INOUT { newtype & INOUT }; %enddef -COPY_TYPEMAPS(int, int32); -COPY_TYPEMAPS(unsigned int, uint32); +COPY_TYPEMAPS(int, int32_t); +COPY_TYPEMAPS(unsigned int, uint32_t); diff --git a/provisioning_sdk/public/python/crypto_utility.py b/provisioning_sdk/public/python/crypto_utility.py index fde1f05..a637f98 100644 --- a/provisioning_sdk/public/python/crypto_utility.py +++ b/provisioning_sdk/public/python/crypto_utility.py @@ -8,6 +8,8 @@ """Utility functions for cryptography.""" +import logging + from cryptography.hazmat import backends from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import serialization @@ -18,6 +20,7 @@ def VerifySignature(public_key, signature, data): hash_algorithm = hashes.SHA1() salt_len = 20 + logging.info('Verying signature.') key = serialization.load_der_public_key( public_key, backend=backends.default_backend()) key.verify(signature, data, diff --git a/provisioning_sdk/public/python/engine_generate_certificate_test.py b/provisioning_sdk/public/python/engine_generate_certificate_test.py index 8c74cc3..a297a87 100644 --- a/provisioning_sdk/public/python/engine_generate_certificate_test.py +++ b/provisioning_sdk/public/python/engine_generate_certificate_test.py @@ -9,8 +9,10 @@ import unittest import crypto_utility +import pywrapcertificate_type import pywrapprovisioning_engine import pywrapprovisioning_status +import test_data_provider import test_data_utility from protos.public import signed_device_certificate_pb2 @@ -25,21 +27,23 @@ class EngineGenerateCertificateTest(unittest.TestCase): self._engine, 0, verify_success=True) test_data_utility.AddDrmIntermediateCertificateWithTestData( self._engine, 2001, verify_success=True) + self._data_provider = test_data_provider.TestDataProvider( + pywrapcertificate_type.kCertTesting) def testSuccess(self): status, signed_cert_string = self._engine.GenerateDeviceDrmCertificate( - 2001, test_data_utility.DEVICE_PUBLIC_KEY, 'DEVICE_SERIAL_NUMBER') + 2001, self._data_provider.device_public_key, 'DEVICE_SERIAL_NUMBER') self.assertEqual(pywrapprovisioning_status.OK, status) signed_cert = signed_device_certificate_pb2.SignedDrmDeviceCertificate() signed_cert.ParseFromString(signed_cert_string) - crypto_utility.VerifySignature(test_data_utility.CA_PUBLIC_KEY, + crypto_utility.VerifySignature(self._data_provider.ca_public_key, signed_cert.signature, signed_cert.drm_certificate) def testEmptySerialNumber(self): status, _ = self._engine.GenerateDeviceDrmCertificate( - 2001, test_data_utility.DEVICE_PUBLIC_KEY, '') + 2001, self._data_provider.device_public_key, '') self.assertEqual(pywrapprovisioning_status.INVALID_SERIAL_NUMBER, status) def testEmptyPublicKey(self): @@ -57,7 +61,7 @@ class EngineGenerateCertificateTest(unittest.TestCase): def testMissingIntermediateCertificate(self): status, _ = self._engine.GenerateDeviceDrmCertificate( - 2002, test_data_utility.DEVICE_PUBLIC_KEY, 'DEVICE_SERIAL_NUMBER') + 2002, self._data_provider.device_public_key, 'DEVICE_SERIAL_NUMBER') self.assertEqual(pywrapprovisioning_status.DEVICE_REVOKED, status) if __name__ == '__main__': diff --git a/provisioning_sdk/public/python/init_engine_test.py b/provisioning_sdk/public/python/init_engine_test.py index 5306208..238baa6 100644 --- a/provisioning_sdk/public/python/init_engine_test.py +++ b/provisioning_sdk/public/python/init_engine_test.py @@ -11,6 +11,7 @@ import unittest import pywrapcertificate_type import pywrapprovisioning_engine import pywrapprovisioning_status +import test_data_provider import test_data_utility @@ -18,6 +19,8 @@ class InitEngineTest(unittest.TestCase): def setUp(self): self._engine = pywrapprovisioning_engine.ProvisioningEngine() + self._data_provider = test_data_provider.TestDataProvider( + pywrapcertificate_type.kCertTesting) def testInitEngineSucceed(self): test_data_utility.InitProvisionEngineWithTestData( @@ -58,112 +61,111 @@ class InitEngineTest(unittest.TestCase): def testInitEngineInvalidServiceDrmCert(self): status = self._engine.Initialize( pywrapcertificate_type.kCertTesting, 'INVALID_CERT', - test_data_utility.SERVICE_PRIVATE_KEY, - test_data_utility.SERVICE_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_DRM_CERT, - test_data_utility.PROVISIONER_PRIVATE_KEY, - test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_SPOID_SECRET) + self._data_provider.service_private_key, + self._data_provider.service_private_key_passphrase, + self._data_provider.provisioner_drm_cert, + self._data_provider.provisioner_private_key, + self._data_provider.provisioner_private_key_passphrase, + self._data_provider.provisioner_spoid_secret) self.assertEqual(pywrapprovisioning_status.INVALID_SERVICE_DRM_CERTIFICATE, status) def testInitEngineInvalidServicePrivateKey(self): status = self._engine.Initialize( pywrapcertificate_type.kCertTesting, - test_data_utility.SERVICE_DRM_CERT, 'INVALID_KEY', - test_data_utility.SERVICE_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_DRM_CERT, - test_data_utility.PROVISIONER_PRIVATE_KEY, - test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_SPOID_SECRET) + self._data_provider.service_drm_cert, 'INVALID_KEY', + self._data_provider.service_private_key_passphrase, + self._data_provider.provisioner_drm_cert, + self._data_provider.provisioner_private_key, + self._data_provider.provisioner_private_key_passphrase, + self._data_provider.provisioner_spoid_secret) self.assertEqual(pywrapprovisioning_status.INVALID_SERVICE_PRIVATE_KEY, status) def testInitEngineWrongServicePrivateKey(self): status = self._engine.Initialize( pywrapcertificate_type.kCertTesting, - test_data_utility.SERVICE_DRM_CERT, - test_data_utility.PROVISIONER_PRIVATE_KEY, - test_data_utility.SERVICE_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_DRM_CERT, - test_data_utility.PROVISIONER_PRIVATE_KEY, - test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_SPOID_SECRET) + self._data_provider.service_drm_cert, + self._data_provider.provisioner_private_key, + self._data_provider.service_private_key_passphrase, + self._data_provider.provisioner_drm_cert, + self._data_provider.provisioner_private_key, + self._data_provider.provisioner_private_key_passphrase, + self._data_provider.provisioner_spoid_secret) self.assertEqual(pywrapprovisioning_status.INVALID_SERVICE_PRIVATE_KEY, status) def testInitEngineInvalidServicePrivateKeyPassphrase(self): status = self._engine.Initialize( pywrapcertificate_type.kCertTesting, - test_data_utility.SERVICE_DRM_CERT, - test_data_utility.SERVICE_PRIVATE_KEY, 'INVALID_PASSPHRASE', - test_data_utility.PROVISIONER_DRM_CERT, - test_data_utility.PROVISIONER_PRIVATE_KEY, - test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_SPOID_SECRET) + self._data_provider.service_drm_cert, + self._data_provider.service_private_key, 'INVALID_PASSPHRASE', + self._data_provider.provisioner_drm_cert, + self._data_provider.provisioner_private_key, + self._data_provider.provisioner_private_key_passphrase, + self._data_provider.provisioner_spoid_secret) self.assertEqual(pywrapprovisioning_status.INVALID_SERVICE_PRIVATE_KEY, status) def testInitEngineInvalidDrmCert(self): status = self._engine.Initialize( pywrapcertificate_type.kCertTesting, - test_data_utility.SERVICE_DRM_CERT, - test_data_utility.SERVICE_PRIVATE_KEY, - test_data_utility.SERVICE_PRIVATE_KEY_PASS, 'INVALID_CERT', - test_data_utility.PROVISIONER_PRIVATE_KEY, - test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_SPOID_SECRET) + self._data_provider.service_drm_cert, + self._data_provider.service_private_key, + self._data_provider.service_private_key_passphrase, 'INVALID_CERT', + self._data_provider.provisioner_private_key, + self._data_provider.provisioner_private_key_passphrase, + self._data_provider.provisioner_spoid_secret) self.assertEqual( pywrapprovisioning_status.INVALID_PROVISIONER_DRM_CERTIFICATE, status) def testInitEngineInvalidDrmPrivateKey(self): status = self._engine.Initialize( pywrapcertificate_type.kCertTesting, - test_data_utility.SERVICE_DRM_CERT, - test_data_utility.SERVICE_PRIVATE_KEY, - test_data_utility.SERVICE_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_DRM_CERT, 'INVALID_KEY', - test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_SPOID_SECRET) + self._data_provider.service_drm_cert, + self._data_provider.service_private_key, + self._data_provider.service_private_key_passphrase, + self._data_provider.provisioner_drm_cert, 'INVALID_KEY', + self._data_provider.provisioner_private_key_passphrase, + self._data_provider.provisioner_spoid_secret) self.assertEqual(pywrapprovisioning_status.INVALID_PROVISIONER_PRIVATE_KEY, status) def testInitEngineWrongDrmPrivateKey(self): status = self._engine.Initialize( pywrapcertificate_type.kCertTesting, - test_data_utility.SERVICE_DRM_CERT, - test_data_utility.SERVICE_PRIVATE_KEY, - test_data_utility.SERVICE_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_DRM_CERT, - test_data_utility.SERVICE_PRIVATE_KEY, - test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_SPOID_SECRET) + self._data_provider.service_drm_cert, + self._data_provider.service_private_key, + self._data_provider.service_private_key_passphrase, + self._data_provider.provisioner_drm_cert, + self._data_provider.service_private_key, + self._data_provider.provisioner_private_key_passphrase, + self._data_provider.provisioner_spoid_secret) self.assertEqual(pywrapprovisioning_status.INVALID_PROVISIONER_PRIVATE_KEY, status) def testInitEngineInvalidDrmPrivateKeyPassphrase(self): status = self._engine.Initialize( pywrapcertificate_type.kCertTesting, - test_data_utility.SERVICE_DRM_CERT, - test_data_utility.SERVICE_PRIVATE_KEY, - test_data_utility.SERVICE_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_DRM_CERT, - test_data_utility.PROVISIONER_PRIVATE_KEY, + self._data_provider.service_drm_cert, + self._data_provider.service_private_key, + self._data_provider.service_private_key_passphrase, + self._data_provider.provisioner_drm_cert, + self._data_provider.provisioner_private_key_passphrase, 'INVALID_PASSPHRASE', - test_data_utility.PROVISIONER_SPOID_SECRET) + self._data_provider.provisioner_spoid_secret) self.assertEqual(pywrapprovisioning_status.INVALID_PROVISIONER_PRIVATE_KEY, status) def testInitEngineInvalidSpoidSecret(self): status = self._engine.Initialize( pywrapcertificate_type.kCertTesting, - test_data_utility.SERVICE_DRM_CERT, - test_data_utility.SERVICE_PRIVATE_KEY, - test_data_utility.SERVICE_PRIVATE_KEY_PASS, - test_data_utility.PROVISIONER_DRM_CERT, - test_data_utility.PROVISIONER_PRIVATE_KEY, - test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, - '') + self._data_provider.service_drm_cert, + self._data_provider.service_private_key, + self._data_provider.service_private_key_passphrase, + self._data_provider.provisioner_drm_cert, + self._data_provider.provisioner_private_key, + self._data_provider.provisioner_private_key_passphrase, '') self.assertEqual(pywrapprovisioning_status.INVALID_SPOID_SAUCE, status) if __name__ == '__main__': diff --git a/provisioning_sdk/public/python/new_session_test.py b/provisioning_sdk/public/python/new_session_test.py index 8702b41..826a08c 100644 --- a/provisioning_sdk/public/python/new_session_test.py +++ b/provisioning_sdk/public/python/new_session_test.py @@ -9,8 +9,10 @@ import unittest import crypto_utility +import pywrapcertificate_type import pywrapprovisioning_engine import pywrapprovisioning_status +import test_data_provider import test_data_utility from protos.public import certificate_provisioning_pb2 from protos.public import signed_device_certificate_pb2 @@ -24,6 +26,8 @@ class NewSessionTest(unittest.TestCase): self._engine, verify_success=True) test_data_utility.SetCertificateStatusListWithTestData( self._engine, 0, verify_success=True) + self._data_provider = test_data_provider.TestDataProvider( + pywrapcertificate_type.kCertTesting) def testNewSessionSuccess(self): test_data_utility.AddDrmIntermediateCertificateWithTestData( @@ -32,11 +36,11 @@ class NewSessionTest(unittest.TestCase): (_, new_session) = test_data_utility.NewProvisioningSessionWithTestData( self._engine, verify_success=True) (status, raw_response, - _) = new_session.ProcessMessage(test_data_utility.MESSAGE) + _) = new_session.ProcessMessage(self._data_provider.message) test_data_utility.AssertSuccess(status, 'Failed to create session.') signed_request = test_data_utility.ConvertToSignedProvisioningMessage( - test_data_utility.MESSAGE) + self._data_provider.message) unsigned_request = certificate_provisioning_pb2.ProvisioningRequest() unsigned_request.ParseFromString(signed_request.message) @@ -44,7 +48,7 @@ class NewSessionTest(unittest.TestCase): signed_response = test_data_utility.ConvertToSignedProvisioningMessage( raw_response) - self._VerifyMessageSignature(test_data_utility.SERVICE_PUBLIC_KEY, + self._VerifyMessageSignature(self._data_provider.service_public_key, signed_response) unsigned_response = certificate_provisioning_pb2.ProvisioningResponse() @@ -63,7 +67,8 @@ class NewSessionTest(unittest.TestCase): def testNewSessionWithoutIntermediateCert(self): (_, new_session) = test_data_utility.NewProvisioningSessionWithTestData( self._engine, verify_success=True) - (status, _, _) = new_session.ProcessMessage(test_data_utility.MESSAGE) + (status, _, _) = new_session.ProcessMessage( + self._data_provider.message) self.assertEqual(pywrapprovisioning_status.MISSING_DRM_INTERMEDIATE_CERT, status) @@ -71,7 +76,7 @@ class NewSessionTest(unittest.TestCase): test_data_utility.AddDrmIntermediateCertificateWithTestData( self._engine, 2001, verify_success=True) (session_status, _) = self._engine.NewProvisioningSession( - 'INVALID_PUBLIC_KEY', test_data_utility.DEVICE_PRIVATE_KEY) + 'INVALID_PUBLIC_KEY', self._data_provider.device_private_key) self.assertEqual(pywrapprovisioning_status.INVALID_DEVICE_PUBLIC_KEY, session_status) @@ -79,7 +84,7 @@ class NewSessionTest(unittest.TestCase): test_data_utility.AddDrmIntermediateCertificateWithTestData( self._engine, 2001, verify_success=True) (session_status, _) = self._engine.NewProvisioningSession( - test_data_utility.DEVICE_PUBLIC_KEY, 'INVALID_PRIVATE_KEY') + self._data_provider.device_public_key, 'INVALID_PRIVATE_KEY') self.assertEqual(pywrapprovisioning_status.INVALID_DEVICE_PRIVATE_KEY, session_status) @@ -97,7 +102,8 @@ class NewSessionTest(unittest.TestCase): signed_cert = signed_device_certificate_pb2.SignedDrmDeviceCertificate() signed_cert.ParseFromString(response.device_certificate) - self._VerifyCertSignature(test_data_utility.CA_PUBLIC_KEY, signed_cert) + self._VerifyCertSignature(self._data_provider.ca_public_key, + signed_cert) if __name__ == '__main__': unittest.main() diff --git a/provisioning_sdk/public/python/provisioning_engine.i b/provisioning_sdk/public/python/provisioning_engine.i index 64c7c1e..7159148 100644 --- a/provisioning_sdk/public/python/provisioning_engine.i +++ b/provisioning_sdk/public/python/provisioning_engine.i @@ -18,9 +18,6 @@ UNIQUE_PTR_ARGOUT(widevine::ProvisioningSession, new_session); %apply int { CertificateType, ProvisioningStatus }; -%apply int32 { int32_t }; -%apply uint32 { uint32_t }; - %apply std::string* OUTPUT { std::string* certificate }; %{ diff --git a/provisioning_sdk/public/python/test_data_provider.py b/provisioning_sdk/public/python/test_data_provider.py new file mode 100644 index 0000000..cb7e3d5 --- /dev/null +++ b/provisioning_sdk/public/python/test_data_provider.py @@ -0,0 +1,101 @@ +################################################################################ +# Copyright 2017 Google Inc. +# +# This software is licensed under the terms defined in the Widevine Master +# License Agreement. For a copy of this agreement, please contact +# widevine-licensing@google.com. +################################################################################ + +"""Class that provides test data for Provisioning SDK testing.""" + +import os + +import pywrapcertificate_type + +_TEST_CERT_DATA_FOLDER = os.path.join('example', 'example_data') +_DEV_CERT_DATA_FOLDER = os.path.join('example', 'dev_cert_example_data') + + +class TestDataProvider(object): + """For for Test Data.""" + + def __init__(self, cert_type): + """Initializes the TestData for Provisioning SDK tests.""" + assert (cert_type in ( + pywrapcertificate_type.kCertDevelopment, + pywrapcertificate_type.kCertTesting)) + self._cert_type = cert_type + + def _GetTestData(self, filename): + """Helps read test data files such as certs and keys for SDK testing.""" + current_dir = os.path.realpath(os.path.dirname(__file__)) + if self._cert_type == pywrapcertificate_type.kCertDevelopment: + subfolder_path = _DEV_CERT_DATA_FOLDER + elif self._cert_type == pywrapcertificate_type.kCertTesting: + subfolder_path = _TEST_CERT_DATA_FOLDER + while not os.path.isdir(os.path.join(current_dir, subfolder_path)): + current_dir = os.path.dirname(current_dir) + filename = os.path.join(current_dir, subfolder_path, filename) + with open(filename, 'rb') as data_file: + data = data_file.read() + return data + + @property + def service_drm_cert(self): + return self._GetTestData('service.cert') + + @property + def service_public_key(self): + return self._GetTestData('service.public') + + @property + def service_private_key(self): + return self._GetTestData('service.encrypted.private') + + @property + def service_private_key_passphrase(self): + return self._GetTestData('service.passphrase') + + @property + def provisioner_drm_cert(self): + return self._GetTestData('provisioner.cert') + + @property + def provisioner_private_key(self): + return self._GetTestData('provisioner.encrypted.private') + + @property + def provisioner_private_key_passphrase(self): + return self._GetTestData('provisioner.passphrase') + + @property + def provisioner_spoid_secret(self): + return self._GetTestData('provisioner.spoid_secret') + + @property + def ca_public_key(self): + return self._GetTestData('intermediate.public') + + @property + def ca_private_key(self): + return self._GetTestData('intermediate.encrypted.private') + + @property + def ca_private_key_passphrase(self): + return self._GetTestData('intermediate.passphrase') + + @property + def device_public_key(self): + return self._GetTestData('user.public') + + @property + def device_private_key(self): + return self._GetTestData('user.private') + + @property + def message(self): + return self._GetTestData('message') + + @property + def certificate_list(self): + return self._GetTestData('certificate_list') diff --git a/provisioning_sdk/public/python/test_data_utility.py b/provisioning_sdk/public/python/test_data_utility.py index f794132..c1ca62f 100644 --- a/provisioning_sdk/public/python/test_data_utility.py +++ b/provisioning_sdk/public/python/test_data_utility.py @@ -8,63 +8,48 @@ """Utility class for Provisioning SDK testing.""" -import os +import logging import pywrapcertificate_type import pywrapprovisioning_status +import test_data_provider from protos.public import certificate_provisioning_pb2 -TEST_DATA_FOLDER = os.path.join('example', 'example_data') +logging.basicConfig(level=logging.DEBUG) -def GetTestData(filename): - current_dir = os.path.realpath(os.path.dirname(__file__)) - while not os.path.isdir(os.path.join(current_dir, TEST_DATA_FOLDER)): - current_dir = os.path.dirname(current_dir) - filename = os.path.join(current_dir, TEST_DATA_FOLDER, filename) - with open(filename, 'rb') as data_file: - data = data_file.read() - return data - - -SERVICE_DRM_CERT = GetTestData('service.cert') -SERVICE_PUBLIC_KEY = GetTestData('service.public') -SERVICE_PRIVATE_KEY = GetTestData('service.encrypted.private') -SERVICE_PRIVATE_KEY_PASS = GetTestData('service.passphrase') -PROVISIONER_DRM_CERT = GetTestData('provisioner.cert') -PROVISIONER_PRIVATE_KEY = GetTestData('provisioner.encrypted.private') -PROVISIONER_PRIVATE_KEY_PASS = GetTestData('provisioner.passphrase') -PROVISIONER_SPOID_SECRET = GetTestData('provisioner.spoid_secret') -CA_PUBLIC_KEY = GetTestData('intermediate.public') -DEVICE_PUBLIC_KEY = GetTestData('user.public') -DEVICE_PRIVATE_KEY = GetTestData('user.private') -MESSAGE = GetTestData('message') - - -def InitProvisionEngineWithTestData(engine, verify_success=False): +def InitProvisionEngineWithTestData( + engine, verify_success=False, + cert_type=pywrapcertificate_type.kCertTesting): """Initialize the provisioning engine with sample credentials. Args: engine: a pywrapprovisioning_engine.ProvisioningEngine instance verify_success: whether to verify that resulting status code equals OK + cert_type: The type of certificate to use for initializing SDK - + {kCertTesting/kCertDevelopment} Returns: OK on success, or an appropriate error status code otherwise. """ - status = engine.Initialize(pywrapcertificate_type.kCertTesting, - SERVICE_DRM_CERT, SERVICE_PRIVATE_KEY, - SERVICE_PRIVATE_KEY_PASS, PROVISIONER_DRM_CERT, - PROVISIONER_PRIVATE_KEY, - PROVISIONER_PRIVATE_KEY_PASS, - PROVISIONER_SPOID_SECRET) + logging.info('Initializing provisioning engine with test data.') + data_provider = test_data_provider.TestDataProvider(cert_type) + status = engine.Initialize(cert_type, + data_provider.service_drm_cert, + data_provider.service_private_key, + data_provider.service_private_key_passphrase, + data_provider.provisioner_drm_cert, + data_provider.provisioner_private_key, + data_provider.provisioner_private_key_passphrase, + data_provider.provisioner_spoid_secret) if verify_success: AssertSuccess(status, 'Failed to initialize.') return status -def SetCertificateStatusListWithTestData(engine, - expiration_period_seconds, - verify_success=False): +def SetCertificateStatusListWithTestData( + engine, expiration_period_seconds, verify_success=False, + cert_type=pywrapcertificate_type.kCertTesting): """Set the certificate status list with sample certificate status list. Args: @@ -72,11 +57,15 @@ def SetCertificateStatusListWithTestData(engine, expiration_period_seconds: number of seconds until certificate_status_list expires after its creation time verify_success: whether to verify that resulting status code equals OK + cert_type: The type of certificate to use for initializing SDK - + {kCertTesting/kCertDevelopment} Returns: OK on success, or an appropriate error status code otherwise. """ - certificate_status_list = GetTestData('certificate_list') + logging.info('Setting certificate status list with test data.') + data_provider = test_data_provider.TestDataProvider(cert_type) + certificate_status_list = data_provider.certificate_list status = engine.SetCertificateStatusList(certificate_status_list, expiration_period_seconds) @@ -87,9 +76,9 @@ def SetCertificateStatusListWithTestData(engine, return status -def AddDrmIntermediateCertificateWithTestData(engine, - system_id, - verify_success=False): +def AddDrmIntermediateCertificateWithTestData( + engine, system_id, verify_success=False, + cert_type=pywrapcertificate_type.kCertTesting): """Generate an intermediate DRM cert and add it to provisioning engine. The intermediate DRM certificate is generated with sample public key and @@ -100,19 +89,23 @@ def AddDrmIntermediateCertificateWithTestData(engine, engine: a pywrapprovisioning_engine.ProvisioningEngine instance system_id: Widevine system ID for the type of device verify_success: whether to verify that resulting status code equals OK + cert_type: The type of certificate to use for initializing SDK - + {kCertTesting/kCertDevelopment} Returns: OK on success, or an appropriate error status code otherwise. """ - ca_private_key = GetTestData('intermediate.encrypted.private') - ca_private_key_passphrase = GetTestData('intermediate.passphrase') - + logging.info( + 'Generating DRM intermediate certificate for system_id <%d>.', system_id) + data_provider = test_data_provider.TestDataProvider(cert_type) gen_status, ca_certificate = engine.GenerateDrmIntermediateCertificate( - system_id, CA_PUBLIC_KEY) + system_id, data_provider.ca_public_key) AssertSuccess(gen_status, 'Failed to generate intermediate certificate.') + logging.info('Adding DRM intermediate certificate.') add_ca_status = engine.AddDrmIntermediateCertificate( - ca_certificate, ca_private_key, ca_private_key_passphrase) + ca_certificate, data_provider.ca_private_key, + data_provider.ca_private_key_passphrase) if verify_success: AssertSuccess(add_ca_status, 'Failed to add intermediate certificate.') @@ -120,24 +113,57 @@ def AddDrmIntermediateCertificateWithTestData(engine, return add_ca_status -def NewProvisioningSessionWithTestData(engine, verify_success=False): +def GenerateDeviceDrmCertificate(engine, system_id, serial_number, + verify_success=False, + cert_type=pywrapcertificate_type.kCertTesting): + """Generate a device DRM certificate. + + Args: + engine: a pywrapprovisioning_engine.ProvisioningEngine instance + system_id: Widevine system ID for the type of device + serial_number: The serial number for the device DRM certificate. + verify_success: whether to verify that resulting status code equals OK + cert_type: The type of certificate to use for initializing SDK - + {kCertTesting/kCertDevelopment} + + Returns: + OK on success, or an appropriate error status code otherwise. + """ + logging.info( + 'Generating Device cert for system_id <%d> and serial_number <%s>.', + system_id, serial_number) + data_provider = test_data_provider.TestDataProvider(cert_type) + gen_status, ca_certificate = engine.GenerateDeviceDrmCertificate( + system_id, data_provider.device_public_key, serial_number) + if verify_success: + AssertSuccess(gen_status, 'Failed to generate device DRM certificate.') + return ca_certificate + + +def NewProvisioningSessionWithTestData( + engine, verify_success=False, + cert_type=pywrapcertificate_type.kCertTesting): """Create a provisioning session with sample device public and private keys. Args: engine: a pywrapprovisioning_engine.ProvisioningEngine instance verify_success: whether to verify that resulting status code equals OK + cert_type: The type of certificate to use for initializing SDK - + {kCertTesting/kCertDevelopment} Returns: status: OK on success, or an appropriate error status code otherwise. new_session: A new provisioning_session. """ - status, new_session = engine.NewProvisioningSession(DEVICE_PUBLIC_KEY, - DEVICE_PRIVATE_KEY) - + logging.info('Starting a new provisioning session with' + 'sample device public and private keys.') + data_provider = test_data_provider.TestDataProvider(cert_type) + status, new_session = engine.NewProvisioningSession( + data_provider.device_public_key, data_provider.device_private_key) if verify_success: AssertSuccess(status, 'Failed to create session.') - return (status, new_session) + return status, new_session def AssertSuccess(status, message=None):