Release provisioning sdk 84f66d2

This commit is contained in:
Widevine Buildbot
2017-03-29 20:21:58 +00:00
parent 7ab27e9629
commit a0a0e7d4e6
14 changed files with 278 additions and 135 deletions

Binary file not shown.

Binary file not shown.

View File

@@ -85,8 +85,9 @@ message ProvisioningResponse {
// The message authentication key. // The message authentication key.
message SignedProvisioningMessage { message SignedProvisioningMessage {
enum ProtocolVersion { enum ProtocolVersion {
VERSION_2 = 2; // Keybox factory-provisioned devices. PROVISIONING_20 = 2; // Keybox factory-provisioned devices.
VERSION_3 = 3; // OEM certificate factory-provisioned devices. PROVISIONING_30 = 3; // OEM certificate factory-provisioned devices.
INTEL_SIGMA_101 = 101; // Intel Sigma 1.0.1 protocol.
} }
// Serialized ProvisioningRequest or ProvisioningResponse. Required. // Serialized ProvisioningRequest or ProvisioningResponse. Required.
@@ -94,5 +95,5 @@ message SignedProvisioningMessage {
// HMAC-SHA256 (Keybox) or RSASSA-PSS (OEM) signature of message. Required. // HMAC-SHA256 (Keybox) or RSASSA-PSS (OEM) signature of message. Required.
optional bytes signature = 2; optional bytes signature = 2;
// Version number of provisioning protocol. // Version number of provisioning protocol.
optional ProtocolVersion protocol_version = 3 [default = VERSION_2]; optional ProtocolVersion protocol_version = 3 [default = PROVISIONING_20];
} }

View File

@@ -29,7 +29,7 @@ class ProvisioningSession;
class ProvisioningEngine { class ProvisioningEngine {
public: public:
ProvisioningEngine(); ProvisioningEngine();
~ProvisioningEngine(); virtual ~ProvisioningEngine();
// Initializes the provisioning engine with required credentials. // Initializes the provisioning engine with required credentials.
// * |certificate_type| indicates which type of certificate chains will be // * |certificate_type| indicates which type of certificate chains will be
@@ -66,7 +66,7 @@ class ProvisioningEngine {
// |certificate_status_list| expires after its creation time // |certificate_status_list| expires after its creation time
// (creation_time_seconds). Zero means it will never expire. // (creation_time_seconds). Zero means it will never expire.
// * Returns OK on success, or an appropriate error status code otherwise. // * Returns OK on success, or an appropriate error status code otherwise.
ProvisioningStatus SetCertificateStatusList( virtual ProvisioningStatus SetCertificateStatusList(
const std::string& certificate_status_list, const std::string& certificate_status_list,
uint32_t expiration_period_seconds); uint32_t expiration_period_seconds);
@@ -94,7 +94,7 @@ class ProvisioningEngine {
// * |cert_private_key_passphrase| is the passphrase for cert_private_key, // * |cert_private_key_passphrase| is the passphrase for cert_private_key,
// if any. // if any.
// * Returns OK on success, or an appropriate error status code otherwise. // * Returns OK on success, or an appropriate error status code otherwise.
ProvisioningStatus AddDrmIntermediateCertificate( virtual ProvisioningStatus AddDrmIntermediateCertificate(
const std::string& intermediate_cert, const std::string& intermediate_cert,
const std::string& cert_private_key, const std::string& cert_private_key,
const std::string& cert_private_key_passphrase); const std::string& cert_private_key_passphrase);
@@ -113,7 +113,7 @@ class ProvisioningEngine {
// message. // message.
// NOTE: All ProvisioningSession objects must be deleted before the // NOTE: All ProvisioningSession objects must be deleted before the
// ProvisioningEngine which created them. // ProvisioningEngine which created them.
ProvisioningStatus NewProvisioningSession( virtual ProvisioningStatus NewProvisioningSession(
const std::string& device_public_key, const std::string& device_public_key,
const std::string& device_private_key, const std::string& device_private_key,
std::unique_ptr<ProvisioningSession>* new_session) const; std::unique_ptr<ProvisioningSession>* new_session) const;

View File

@@ -22,7 +22,7 @@ class ProvisioningSessionImpl;
// Class which is used to implement the provisioning session state machine. // Class which is used to implement the provisioning session state machine.
class ProvisioningSession { class ProvisioningSession {
public: public:
~ProvisioningSession(); virtual ~ProvisioningSession();
// Process a message from the client device. // Process a message from the client device.
// * |message| is the message received from the client device. // * |message| is the message received from the client device.
@@ -31,13 +31,16 @@ class ProvisioningSession {
// * |done| will indicate, upon successful return, whether the provisioning // * |done| will indicate, upon successful return, whether the provisioning
// exchange is complete, and the ProvisioningSession can be deleted. // exchange is complete, and the ProvisioningSession can be deleted.
// Returns OK if successful, or an appropriate error status code otherwise. // Returns OK if successful, or an appropriate error status code otherwise.
ProvisioningStatus ProcessMessage(const std::string& message, virtual ProvisioningStatus ProcessMessage(const std::string& message,
std::string* response, std::string* response,
bool* done); bool* done);
// * Returns a ProvisioneddeviceInfo message containing information about the // * Returns a ProvisioneddeviceInfo message containing information about the
// type of device being provisioned. May return nullptr. // type of device being provisioned. May return nullptr.
const ProvisionedDeviceInfo* GetDeviceInfo() const; virtual const ProvisionedDeviceInfo* GetDeviceInfo() const;
protected:
ProvisioningSession(); // To enable mocking.
private: private:
#ifndef SWIGPYTHON #ifndef SWIGPYTHON

View File

@@ -23,5 +23,5 @@ typedef oldtype newtype;
%apply oldtype & INOUT { newtype & INOUT }; %apply oldtype & INOUT { newtype & INOUT };
%enddef %enddef
COPY_TYPEMAPS(int, int32); COPY_TYPEMAPS(int, int32_t);
COPY_TYPEMAPS(unsigned int, uint32); COPY_TYPEMAPS(unsigned int, uint32_t);

View File

@@ -8,6 +8,8 @@
"""Utility functions for cryptography.""" """Utility functions for cryptography."""
import logging
from cryptography.hazmat import backends from cryptography.hazmat import backends
from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives import serialization
@@ -18,6 +20,7 @@ def VerifySignature(public_key, signature, data):
hash_algorithm = hashes.SHA1() hash_algorithm = hashes.SHA1()
salt_len = 20 salt_len = 20
logging.info('Verying signature.')
key = serialization.load_der_public_key( key = serialization.load_der_public_key(
public_key, backend=backends.default_backend()) public_key, backend=backends.default_backend())
key.verify(signature, data, key.verify(signature, data,

View File

@@ -9,8 +9,10 @@
import unittest import unittest
import crypto_utility import crypto_utility
import pywrapcertificate_type
import pywrapprovisioning_engine import pywrapprovisioning_engine
import pywrapprovisioning_status import pywrapprovisioning_status
import test_data_provider
import test_data_utility import test_data_utility
from protos.public import signed_device_certificate_pb2 from protos.public import signed_device_certificate_pb2
@@ -25,21 +27,23 @@ class EngineGenerateCertificateTest(unittest.TestCase):
self._engine, 0, verify_success=True) self._engine, 0, verify_success=True)
test_data_utility.AddDrmIntermediateCertificateWithTestData( test_data_utility.AddDrmIntermediateCertificateWithTestData(
self._engine, 2001, verify_success=True) self._engine, 2001, verify_success=True)
self._data_provider = test_data_provider.TestDataProvider(
pywrapcertificate_type.kCertTesting)
def testSuccess(self): def testSuccess(self):
status, signed_cert_string = self._engine.GenerateDeviceDrmCertificate( status, signed_cert_string = self._engine.GenerateDeviceDrmCertificate(
2001, test_data_utility.DEVICE_PUBLIC_KEY, 'DEVICE_SERIAL_NUMBER') 2001, self._data_provider.device_public_key, 'DEVICE_SERIAL_NUMBER')
self.assertEqual(pywrapprovisioning_status.OK, status) self.assertEqual(pywrapprovisioning_status.OK, status)
signed_cert = signed_device_certificate_pb2.SignedDrmDeviceCertificate() signed_cert = signed_device_certificate_pb2.SignedDrmDeviceCertificate()
signed_cert.ParseFromString(signed_cert_string) signed_cert.ParseFromString(signed_cert_string)
crypto_utility.VerifySignature(test_data_utility.CA_PUBLIC_KEY, crypto_utility.VerifySignature(self._data_provider.ca_public_key,
signed_cert.signature, signed_cert.signature,
signed_cert.drm_certificate) signed_cert.drm_certificate)
def testEmptySerialNumber(self): def testEmptySerialNumber(self):
status, _ = self._engine.GenerateDeviceDrmCertificate( status, _ = self._engine.GenerateDeviceDrmCertificate(
2001, test_data_utility.DEVICE_PUBLIC_KEY, '') 2001, self._data_provider.device_public_key, '')
self.assertEqual(pywrapprovisioning_status.INVALID_SERIAL_NUMBER, status) self.assertEqual(pywrapprovisioning_status.INVALID_SERIAL_NUMBER, status)
def testEmptyPublicKey(self): def testEmptyPublicKey(self):
@@ -57,7 +61,7 @@ class EngineGenerateCertificateTest(unittest.TestCase):
def testMissingIntermediateCertificate(self): def testMissingIntermediateCertificate(self):
status, _ = self._engine.GenerateDeviceDrmCertificate( status, _ = self._engine.GenerateDeviceDrmCertificate(
2002, test_data_utility.DEVICE_PUBLIC_KEY, 'DEVICE_SERIAL_NUMBER') 2002, self._data_provider.device_public_key, 'DEVICE_SERIAL_NUMBER')
self.assertEqual(pywrapprovisioning_status.DEVICE_REVOKED, status) self.assertEqual(pywrapprovisioning_status.DEVICE_REVOKED, status)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -11,6 +11,7 @@ import unittest
import pywrapcertificate_type import pywrapcertificate_type
import pywrapprovisioning_engine import pywrapprovisioning_engine
import pywrapprovisioning_status import pywrapprovisioning_status
import test_data_provider
import test_data_utility import test_data_utility
@@ -18,6 +19,8 @@ class InitEngineTest(unittest.TestCase):
def setUp(self): def setUp(self):
self._engine = pywrapprovisioning_engine.ProvisioningEngine() self._engine = pywrapprovisioning_engine.ProvisioningEngine()
self._data_provider = test_data_provider.TestDataProvider(
pywrapcertificate_type.kCertTesting)
def testInitEngineSucceed(self): def testInitEngineSucceed(self):
test_data_utility.InitProvisionEngineWithTestData( test_data_utility.InitProvisionEngineWithTestData(
@@ -58,112 +61,111 @@ class InitEngineTest(unittest.TestCase):
def testInitEngineInvalidServiceDrmCert(self): def testInitEngineInvalidServiceDrmCert(self):
status = self._engine.Initialize( status = self._engine.Initialize(
pywrapcertificate_type.kCertTesting, 'INVALID_CERT', pywrapcertificate_type.kCertTesting, 'INVALID_CERT',
test_data_utility.SERVICE_PRIVATE_KEY, self._data_provider.service_private_key,
test_data_utility.SERVICE_PRIVATE_KEY_PASS, self._data_provider.service_private_key_passphrase,
test_data_utility.PROVISIONER_DRM_CERT, self._data_provider.provisioner_drm_cert,
test_data_utility.PROVISIONER_PRIVATE_KEY, self._data_provider.provisioner_private_key,
test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, self._data_provider.provisioner_private_key_passphrase,
test_data_utility.PROVISIONER_SPOID_SECRET) self._data_provider.provisioner_spoid_secret)
self.assertEqual(pywrapprovisioning_status.INVALID_SERVICE_DRM_CERTIFICATE, self.assertEqual(pywrapprovisioning_status.INVALID_SERVICE_DRM_CERTIFICATE,
status) status)
def testInitEngineInvalidServicePrivateKey(self): def testInitEngineInvalidServicePrivateKey(self):
status = self._engine.Initialize( status = self._engine.Initialize(
pywrapcertificate_type.kCertTesting, pywrapcertificate_type.kCertTesting,
test_data_utility.SERVICE_DRM_CERT, 'INVALID_KEY', self._data_provider.service_drm_cert, 'INVALID_KEY',
test_data_utility.SERVICE_PRIVATE_KEY_PASS, self._data_provider.service_private_key_passphrase,
test_data_utility.PROVISIONER_DRM_CERT, self._data_provider.provisioner_drm_cert,
test_data_utility.PROVISIONER_PRIVATE_KEY, self._data_provider.provisioner_private_key,
test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, self._data_provider.provisioner_private_key_passphrase,
test_data_utility.PROVISIONER_SPOID_SECRET) self._data_provider.provisioner_spoid_secret)
self.assertEqual(pywrapprovisioning_status.INVALID_SERVICE_PRIVATE_KEY, self.assertEqual(pywrapprovisioning_status.INVALID_SERVICE_PRIVATE_KEY,
status) status)
def testInitEngineWrongServicePrivateKey(self): def testInitEngineWrongServicePrivateKey(self):
status = self._engine.Initialize( status = self._engine.Initialize(
pywrapcertificate_type.kCertTesting, pywrapcertificate_type.kCertTesting,
test_data_utility.SERVICE_DRM_CERT, self._data_provider.service_drm_cert,
test_data_utility.PROVISIONER_PRIVATE_KEY, self._data_provider.provisioner_private_key,
test_data_utility.SERVICE_PRIVATE_KEY_PASS, self._data_provider.service_private_key_passphrase,
test_data_utility.PROVISIONER_DRM_CERT, self._data_provider.provisioner_drm_cert,
test_data_utility.PROVISIONER_PRIVATE_KEY, self._data_provider.provisioner_private_key,
test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, self._data_provider.provisioner_private_key_passphrase,
test_data_utility.PROVISIONER_SPOID_SECRET) self._data_provider.provisioner_spoid_secret)
self.assertEqual(pywrapprovisioning_status.INVALID_SERVICE_PRIVATE_KEY, self.assertEqual(pywrapprovisioning_status.INVALID_SERVICE_PRIVATE_KEY,
status) status)
def testInitEngineInvalidServicePrivateKeyPassphrase(self): def testInitEngineInvalidServicePrivateKeyPassphrase(self):
status = self._engine.Initialize( status = self._engine.Initialize(
pywrapcertificate_type.kCertTesting, pywrapcertificate_type.kCertTesting,
test_data_utility.SERVICE_DRM_CERT, self._data_provider.service_drm_cert,
test_data_utility.SERVICE_PRIVATE_KEY, 'INVALID_PASSPHRASE', self._data_provider.service_private_key, 'INVALID_PASSPHRASE',
test_data_utility.PROVISIONER_DRM_CERT, self._data_provider.provisioner_drm_cert,
test_data_utility.PROVISIONER_PRIVATE_KEY, self._data_provider.provisioner_private_key,
test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, self._data_provider.provisioner_private_key_passphrase,
test_data_utility.PROVISIONER_SPOID_SECRET) self._data_provider.provisioner_spoid_secret)
self.assertEqual(pywrapprovisioning_status.INVALID_SERVICE_PRIVATE_KEY, self.assertEqual(pywrapprovisioning_status.INVALID_SERVICE_PRIVATE_KEY,
status) status)
def testInitEngineInvalidDrmCert(self): def testInitEngineInvalidDrmCert(self):
status = self._engine.Initialize( status = self._engine.Initialize(
pywrapcertificate_type.kCertTesting, pywrapcertificate_type.kCertTesting,
test_data_utility.SERVICE_DRM_CERT, self._data_provider.service_drm_cert,
test_data_utility.SERVICE_PRIVATE_KEY, self._data_provider.service_private_key,
test_data_utility.SERVICE_PRIVATE_KEY_PASS, 'INVALID_CERT', self._data_provider.service_private_key_passphrase, 'INVALID_CERT',
test_data_utility.PROVISIONER_PRIVATE_KEY, self._data_provider.provisioner_private_key,
test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, self._data_provider.provisioner_private_key_passphrase,
test_data_utility.PROVISIONER_SPOID_SECRET) self._data_provider.provisioner_spoid_secret)
self.assertEqual( self.assertEqual(
pywrapprovisioning_status.INVALID_PROVISIONER_DRM_CERTIFICATE, status) pywrapprovisioning_status.INVALID_PROVISIONER_DRM_CERTIFICATE, status)
def testInitEngineInvalidDrmPrivateKey(self): def testInitEngineInvalidDrmPrivateKey(self):
status = self._engine.Initialize( status = self._engine.Initialize(
pywrapcertificate_type.kCertTesting, pywrapcertificate_type.kCertTesting,
test_data_utility.SERVICE_DRM_CERT, self._data_provider.service_drm_cert,
test_data_utility.SERVICE_PRIVATE_KEY, self._data_provider.service_private_key,
test_data_utility.SERVICE_PRIVATE_KEY_PASS, self._data_provider.service_private_key_passphrase,
test_data_utility.PROVISIONER_DRM_CERT, 'INVALID_KEY', self._data_provider.provisioner_drm_cert, 'INVALID_KEY',
test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, self._data_provider.provisioner_private_key_passphrase,
test_data_utility.PROVISIONER_SPOID_SECRET) self._data_provider.provisioner_spoid_secret)
self.assertEqual(pywrapprovisioning_status.INVALID_PROVISIONER_PRIVATE_KEY, self.assertEqual(pywrapprovisioning_status.INVALID_PROVISIONER_PRIVATE_KEY,
status) status)
def testInitEngineWrongDrmPrivateKey(self): def testInitEngineWrongDrmPrivateKey(self):
status = self._engine.Initialize( status = self._engine.Initialize(
pywrapcertificate_type.kCertTesting, pywrapcertificate_type.kCertTesting,
test_data_utility.SERVICE_DRM_CERT, self._data_provider.service_drm_cert,
test_data_utility.SERVICE_PRIVATE_KEY, self._data_provider.service_private_key,
test_data_utility.SERVICE_PRIVATE_KEY_PASS, self._data_provider.service_private_key_passphrase,
test_data_utility.PROVISIONER_DRM_CERT, self._data_provider.provisioner_drm_cert,
test_data_utility.SERVICE_PRIVATE_KEY, self._data_provider.service_private_key,
test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, self._data_provider.provisioner_private_key_passphrase,
test_data_utility.PROVISIONER_SPOID_SECRET) self._data_provider.provisioner_spoid_secret)
self.assertEqual(pywrapprovisioning_status.INVALID_PROVISIONER_PRIVATE_KEY, self.assertEqual(pywrapprovisioning_status.INVALID_PROVISIONER_PRIVATE_KEY,
status) status)
def testInitEngineInvalidDrmPrivateKeyPassphrase(self): def testInitEngineInvalidDrmPrivateKeyPassphrase(self):
status = self._engine.Initialize( status = self._engine.Initialize(
pywrapcertificate_type.kCertTesting, pywrapcertificate_type.kCertTesting,
test_data_utility.SERVICE_DRM_CERT, self._data_provider.service_drm_cert,
test_data_utility.SERVICE_PRIVATE_KEY, self._data_provider.service_private_key,
test_data_utility.SERVICE_PRIVATE_KEY_PASS, self._data_provider.service_private_key_passphrase,
test_data_utility.PROVISIONER_DRM_CERT, self._data_provider.provisioner_drm_cert,
test_data_utility.PROVISIONER_PRIVATE_KEY, self._data_provider.provisioner_private_key_passphrase,
'INVALID_PASSPHRASE', 'INVALID_PASSPHRASE',
test_data_utility.PROVISIONER_SPOID_SECRET) self._data_provider.provisioner_spoid_secret)
self.assertEqual(pywrapprovisioning_status.INVALID_PROVISIONER_PRIVATE_KEY, self.assertEqual(pywrapprovisioning_status.INVALID_PROVISIONER_PRIVATE_KEY,
status) status)
def testInitEngineInvalidSpoidSecret(self): def testInitEngineInvalidSpoidSecret(self):
status = self._engine.Initialize( status = self._engine.Initialize(
pywrapcertificate_type.kCertTesting, pywrapcertificate_type.kCertTesting,
test_data_utility.SERVICE_DRM_CERT, self._data_provider.service_drm_cert,
test_data_utility.SERVICE_PRIVATE_KEY, self._data_provider.service_private_key,
test_data_utility.SERVICE_PRIVATE_KEY_PASS, self._data_provider.service_private_key_passphrase,
test_data_utility.PROVISIONER_DRM_CERT, self._data_provider.provisioner_drm_cert,
test_data_utility.PROVISIONER_PRIVATE_KEY, self._data_provider.provisioner_private_key,
test_data_utility.PROVISIONER_PRIVATE_KEY_PASS, self._data_provider.provisioner_private_key_passphrase, '')
'')
self.assertEqual(pywrapprovisioning_status.INVALID_SPOID_SAUCE, status) self.assertEqual(pywrapprovisioning_status.INVALID_SPOID_SAUCE, status)
if __name__ == '__main__': if __name__ == '__main__':

View File

@@ -9,8 +9,10 @@
import unittest import unittest
import crypto_utility import crypto_utility
import pywrapcertificate_type
import pywrapprovisioning_engine import pywrapprovisioning_engine
import pywrapprovisioning_status import pywrapprovisioning_status
import test_data_provider
import test_data_utility import test_data_utility
from protos.public import certificate_provisioning_pb2 from protos.public import certificate_provisioning_pb2
from protos.public import signed_device_certificate_pb2 from protos.public import signed_device_certificate_pb2
@@ -24,6 +26,8 @@ class NewSessionTest(unittest.TestCase):
self._engine, verify_success=True) self._engine, verify_success=True)
test_data_utility.SetCertificateStatusListWithTestData( test_data_utility.SetCertificateStatusListWithTestData(
self._engine, 0, verify_success=True) self._engine, 0, verify_success=True)
self._data_provider = test_data_provider.TestDataProvider(
pywrapcertificate_type.kCertTesting)
def testNewSessionSuccess(self): def testNewSessionSuccess(self):
test_data_utility.AddDrmIntermediateCertificateWithTestData( test_data_utility.AddDrmIntermediateCertificateWithTestData(
@@ -32,11 +36,11 @@ class NewSessionTest(unittest.TestCase):
(_, new_session) = test_data_utility.NewProvisioningSessionWithTestData( (_, new_session) = test_data_utility.NewProvisioningSessionWithTestData(
self._engine, verify_success=True) self._engine, verify_success=True)
(status, raw_response, (status, raw_response,
_) = new_session.ProcessMessage(test_data_utility.MESSAGE) _) = new_session.ProcessMessage(self._data_provider.message)
test_data_utility.AssertSuccess(status, 'Failed to create session.') test_data_utility.AssertSuccess(status, 'Failed to create session.')
signed_request = test_data_utility.ConvertToSignedProvisioningMessage( signed_request = test_data_utility.ConvertToSignedProvisioningMessage(
test_data_utility.MESSAGE) self._data_provider.message)
unsigned_request = certificate_provisioning_pb2.ProvisioningRequest() unsigned_request = certificate_provisioning_pb2.ProvisioningRequest()
unsigned_request.ParseFromString(signed_request.message) unsigned_request.ParseFromString(signed_request.message)
@@ -44,7 +48,7 @@ class NewSessionTest(unittest.TestCase):
signed_response = test_data_utility.ConvertToSignedProvisioningMessage( signed_response = test_data_utility.ConvertToSignedProvisioningMessage(
raw_response) raw_response)
self._VerifyMessageSignature(test_data_utility.SERVICE_PUBLIC_KEY, self._VerifyMessageSignature(self._data_provider.service_public_key,
signed_response) signed_response)
unsigned_response = certificate_provisioning_pb2.ProvisioningResponse() unsigned_response = certificate_provisioning_pb2.ProvisioningResponse()
@@ -63,7 +67,8 @@ class NewSessionTest(unittest.TestCase):
def testNewSessionWithoutIntermediateCert(self): def testNewSessionWithoutIntermediateCert(self):
(_, new_session) = test_data_utility.NewProvisioningSessionWithTestData( (_, new_session) = test_data_utility.NewProvisioningSessionWithTestData(
self._engine, verify_success=True) self._engine, verify_success=True)
(status, _, _) = new_session.ProcessMessage(test_data_utility.MESSAGE) (status, _, _) = new_session.ProcessMessage(
self._data_provider.message)
self.assertEqual(pywrapprovisioning_status.MISSING_DRM_INTERMEDIATE_CERT, self.assertEqual(pywrapprovisioning_status.MISSING_DRM_INTERMEDIATE_CERT,
status) status)
@@ -71,7 +76,7 @@ class NewSessionTest(unittest.TestCase):
test_data_utility.AddDrmIntermediateCertificateWithTestData( test_data_utility.AddDrmIntermediateCertificateWithTestData(
self._engine, 2001, verify_success=True) self._engine, 2001, verify_success=True)
(session_status, _) = self._engine.NewProvisioningSession( (session_status, _) = self._engine.NewProvisioningSession(
'INVALID_PUBLIC_KEY', test_data_utility.DEVICE_PRIVATE_KEY) 'INVALID_PUBLIC_KEY', self._data_provider.device_private_key)
self.assertEqual(pywrapprovisioning_status.INVALID_DEVICE_PUBLIC_KEY, self.assertEqual(pywrapprovisioning_status.INVALID_DEVICE_PUBLIC_KEY,
session_status) session_status)
@@ -79,7 +84,7 @@ class NewSessionTest(unittest.TestCase):
test_data_utility.AddDrmIntermediateCertificateWithTestData( test_data_utility.AddDrmIntermediateCertificateWithTestData(
self._engine, 2001, verify_success=True) self._engine, 2001, verify_success=True)
(session_status, _) = self._engine.NewProvisioningSession( (session_status, _) = self._engine.NewProvisioningSession(
test_data_utility.DEVICE_PUBLIC_KEY, 'INVALID_PRIVATE_KEY') self._data_provider.device_public_key, 'INVALID_PRIVATE_KEY')
self.assertEqual(pywrapprovisioning_status.INVALID_DEVICE_PRIVATE_KEY, self.assertEqual(pywrapprovisioning_status.INVALID_DEVICE_PRIVATE_KEY,
session_status) session_status)
@@ -97,7 +102,8 @@ class NewSessionTest(unittest.TestCase):
signed_cert = signed_device_certificate_pb2.SignedDrmDeviceCertificate() signed_cert = signed_device_certificate_pb2.SignedDrmDeviceCertificate()
signed_cert.ParseFromString(response.device_certificate) signed_cert.ParseFromString(response.device_certificate)
self._VerifyCertSignature(test_data_utility.CA_PUBLIC_KEY, signed_cert) self._VerifyCertSignature(self._data_provider.ca_public_key,
signed_cert)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()

View File

@@ -18,9 +18,6 @@
UNIQUE_PTR_ARGOUT(widevine::ProvisioningSession, new_session); UNIQUE_PTR_ARGOUT(widevine::ProvisioningSession, new_session);
%apply int { CertificateType, ProvisioningStatus }; %apply int { CertificateType, ProvisioningStatus };
%apply int32 { int32_t };
%apply uint32 { uint32_t };
%apply std::string* OUTPUT { std::string* certificate }; %apply std::string* OUTPUT { std::string* certificate };
%{ %{

View File

@@ -0,0 +1,101 @@
################################################################################
# Copyright 2017 Google Inc.
#
# This software is licensed under the terms defined in the Widevine Master
# License Agreement. For a copy of this agreement, please contact
# widevine-licensing@google.com.
################################################################################
"""Class that provides test data for Provisioning SDK testing."""
import os
import pywrapcertificate_type
_TEST_CERT_DATA_FOLDER = os.path.join('example', 'example_data')
_DEV_CERT_DATA_FOLDER = os.path.join('example', 'dev_cert_example_data')
class TestDataProvider(object):
"""For for Test Data."""
def __init__(self, cert_type):
"""Initializes the TestData for Provisioning SDK tests."""
assert (cert_type in (
pywrapcertificate_type.kCertDevelopment,
pywrapcertificate_type.kCertTesting))
self._cert_type = cert_type
def _GetTestData(self, filename):
"""Helps read test data files such as certs and keys for SDK testing."""
current_dir = os.path.realpath(os.path.dirname(__file__))
if self._cert_type == pywrapcertificate_type.kCertDevelopment:
subfolder_path = _DEV_CERT_DATA_FOLDER
elif self._cert_type == pywrapcertificate_type.kCertTesting:
subfolder_path = _TEST_CERT_DATA_FOLDER
while not os.path.isdir(os.path.join(current_dir, subfolder_path)):
current_dir = os.path.dirname(current_dir)
filename = os.path.join(current_dir, subfolder_path, filename)
with open(filename, 'rb') as data_file:
data = data_file.read()
return data
@property
def service_drm_cert(self):
return self._GetTestData('service.cert')
@property
def service_public_key(self):
return self._GetTestData('service.public')
@property
def service_private_key(self):
return self._GetTestData('service.encrypted.private')
@property
def service_private_key_passphrase(self):
return self._GetTestData('service.passphrase')
@property
def provisioner_drm_cert(self):
return self._GetTestData('provisioner.cert')
@property
def provisioner_private_key(self):
return self._GetTestData('provisioner.encrypted.private')
@property
def provisioner_private_key_passphrase(self):
return self._GetTestData('provisioner.passphrase')
@property
def provisioner_spoid_secret(self):
return self._GetTestData('provisioner.spoid_secret')
@property
def ca_public_key(self):
return self._GetTestData('intermediate.public')
@property
def ca_private_key(self):
return self._GetTestData('intermediate.encrypted.private')
@property
def ca_private_key_passphrase(self):
return self._GetTestData('intermediate.passphrase')
@property
def device_public_key(self):
return self._GetTestData('user.public')
@property
def device_private_key(self):
return self._GetTestData('user.private')
@property
def message(self):
return self._GetTestData('message')
@property
def certificate_list(self):
return self._GetTestData('certificate_list')

View File

@@ -8,63 +8,48 @@
"""Utility class for Provisioning SDK testing.""" """Utility class for Provisioning SDK testing."""
import os import logging
import pywrapcertificate_type import pywrapcertificate_type
import pywrapprovisioning_status import pywrapprovisioning_status
import test_data_provider
from protos.public import certificate_provisioning_pb2 from protos.public import certificate_provisioning_pb2
TEST_DATA_FOLDER = os.path.join('example', 'example_data') logging.basicConfig(level=logging.DEBUG)
def GetTestData(filename): def InitProvisionEngineWithTestData(
current_dir = os.path.realpath(os.path.dirname(__file__)) engine, verify_success=False,
while not os.path.isdir(os.path.join(current_dir, TEST_DATA_FOLDER)): cert_type=pywrapcertificate_type.kCertTesting):
current_dir = os.path.dirname(current_dir)
filename = os.path.join(current_dir, TEST_DATA_FOLDER, filename)
with open(filename, 'rb') as data_file:
data = data_file.read()
return data
SERVICE_DRM_CERT = GetTestData('service.cert')
SERVICE_PUBLIC_KEY = GetTestData('service.public')
SERVICE_PRIVATE_KEY = GetTestData('service.encrypted.private')
SERVICE_PRIVATE_KEY_PASS = GetTestData('service.passphrase')
PROVISIONER_DRM_CERT = GetTestData('provisioner.cert')
PROVISIONER_PRIVATE_KEY = GetTestData('provisioner.encrypted.private')
PROVISIONER_PRIVATE_KEY_PASS = GetTestData('provisioner.passphrase')
PROVISIONER_SPOID_SECRET = GetTestData('provisioner.spoid_secret')
CA_PUBLIC_KEY = GetTestData('intermediate.public')
DEVICE_PUBLIC_KEY = GetTestData('user.public')
DEVICE_PRIVATE_KEY = GetTestData('user.private')
MESSAGE = GetTestData('message')
def InitProvisionEngineWithTestData(engine, verify_success=False):
"""Initialize the provisioning engine with sample credentials. """Initialize the provisioning engine with sample credentials.
Args: Args:
engine: a pywrapprovisioning_engine.ProvisioningEngine instance engine: a pywrapprovisioning_engine.ProvisioningEngine instance
verify_success: whether to verify that resulting status code equals OK verify_success: whether to verify that resulting status code equals OK
cert_type: The type of certificate to use for initializing SDK -
{kCertTesting/kCertDevelopment}
Returns: Returns:
OK on success, or an appropriate error status code otherwise. OK on success, or an appropriate error status code otherwise.
""" """
status = engine.Initialize(pywrapcertificate_type.kCertTesting, logging.info('Initializing provisioning engine with test data.')
SERVICE_DRM_CERT, SERVICE_PRIVATE_KEY, data_provider = test_data_provider.TestDataProvider(cert_type)
SERVICE_PRIVATE_KEY_PASS, PROVISIONER_DRM_CERT, status = engine.Initialize(cert_type,
PROVISIONER_PRIVATE_KEY, data_provider.service_drm_cert,
PROVISIONER_PRIVATE_KEY_PASS, data_provider.service_private_key,
PROVISIONER_SPOID_SECRET) data_provider.service_private_key_passphrase,
data_provider.provisioner_drm_cert,
data_provider.provisioner_private_key,
data_provider.provisioner_private_key_passphrase,
data_provider.provisioner_spoid_secret)
if verify_success: if verify_success:
AssertSuccess(status, 'Failed to initialize.') AssertSuccess(status, 'Failed to initialize.')
return status return status
def SetCertificateStatusListWithTestData(engine, def SetCertificateStatusListWithTestData(
expiration_period_seconds, engine, expiration_period_seconds, verify_success=False,
verify_success=False): cert_type=pywrapcertificate_type.kCertTesting):
"""Set the certificate status list with sample certificate status list. """Set the certificate status list with sample certificate status list.
Args: Args:
@@ -72,11 +57,15 @@ def SetCertificateStatusListWithTestData(engine,
expiration_period_seconds: number of seconds until certificate_status_list expiration_period_seconds: number of seconds until certificate_status_list
expires after its creation time expires after its creation time
verify_success: whether to verify that resulting status code equals OK verify_success: whether to verify that resulting status code equals OK
cert_type: The type of certificate to use for initializing SDK -
{kCertTesting/kCertDevelopment}
Returns: Returns:
OK on success, or an appropriate error status code otherwise. OK on success, or an appropriate error status code otherwise.
""" """
certificate_status_list = GetTestData('certificate_list') logging.info('Setting certificate status list with test data.')
data_provider = test_data_provider.TestDataProvider(cert_type)
certificate_status_list = data_provider.certificate_list
status = engine.SetCertificateStatusList(certificate_status_list, status = engine.SetCertificateStatusList(certificate_status_list,
expiration_period_seconds) expiration_period_seconds)
@@ -87,9 +76,9 @@ def SetCertificateStatusListWithTestData(engine,
return status return status
def AddDrmIntermediateCertificateWithTestData(engine, def AddDrmIntermediateCertificateWithTestData(
system_id, engine, system_id, verify_success=False,
verify_success=False): cert_type=pywrapcertificate_type.kCertTesting):
"""Generate an intermediate DRM cert and add it to provisioning engine. """Generate an intermediate DRM cert and add it to provisioning engine.
The intermediate DRM certificate is generated with sample public key and The intermediate DRM certificate is generated with sample public key and
@@ -100,19 +89,23 @@ def AddDrmIntermediateCertificateWithTestData(engine,
engine: a pywrapprovisioning_engine.ProvisioningEngine instance engine: a pywrapprovisioning_engine.ProvisioningEngine instance
system_id: Widevine system ID for the type of device system_id: Widevine system ID for the type of device
verify_success: whether to verify that resulting status code equals OK verify_success: whether to verify that resulting status code equals OK
cert_type: The type of certificate to use for initializing SDK -
{kCertTesting/kCertDevelopment}
Returns: Returns:
OK on success, or an appropriate error status code otherwise. OK on success, or an appropriate error status code otherwise.
""" """
ca_private_key = GetTestData('intermediate.encrypted.private') logging.info(
ca_private_key_passphrase = GetTestData('intermediate.passphrase') 'Generating DRM intermediate certificate for system_id <%d>.', system_id)
data_provider = test_data_provider.TestDataProvider(cert_type)
gen_status, ca_certificate = engine.GenerateDrmIntermediateCertificate( gen_status, ca_certificate = engine.GenerateDrmIntermediateCertificate(
system_id, CA_PUBLIC_KEY) system_id, data_provider.ca_public_key)
AssertSuccess(gen_status, 'Failed to generate intermediate certificate.') AssertSuccess(gen_status, 'Failed to generate intermediate certificate.')
logging.info('Adding DRM intermediate certificate.')
add_ca_status = engine.AddDrmIntermediateCertificate( add_ca_status = engine.AddDrmIntermediateCertificate(
ca_certificate, ca_private_key, ca_private_key_passphrase) ca_certificate, data_provider.ca_private_key,
data_provider.ca_private_key_passphrase)
if verify_success: if verify_success:
AssertSuccess(add_ca_status, 'Failed to add intermediate certificate.') AssertSuccess(add_ca_status, 'Failed to add intermediate certificate.')
@@ -120,24 +113,57 @@ def AddDrmIntermediateCertificateWithTestData(engine,
return add_ca_status return add_ca_status
def NewProvisioningSessionWithTestData(engine, verify_success=False): def GenerateDeviceDrmCertificate(engine, system_id, serial_number,
verify_success=False,
cert_type=pywrapcertificate_type.kCertTesting):
"""Generate a device DRM certificate.
Args:
engine: a pywrapprovisioning_engine.ProvisioningEngine instance
system_id: Widevine system ID for the type of device
serial_number: The serial number for the device DRM certificate.
verify_success: whether to verify that resulting status code equals OK
cert_type: The type of certificate to use for initializing SDK -
{kCertTesting/kCertDevelopment}
Returns:
OK on success, or an appropriate error status code otherwise.
"""
logging.info(
'Generating Device cert for system_id <%d> and serial_number <%s>.',
system_id, serial_number)
data_provider = test_data_provider.TestDataProvider(cert_type)
gen_status, ca_certificate = engine.GenerateDeviceDrmCertificate(
system_id, data_provider.device_public_key, serial_number)
if verify_success:
AssertSuccess(gen_status, 'Failed to generate device DRM certificate.')
return ca_certificate
def NewProvisioningSessionWithTestData(
engine, verify_success=False,
cert_type=pywrapcertificate_type.kCertTesting):
"""Create a provisioning session with sample device public and private keys. """Create a provisioning session with sample device public and private keys.
Args: Args:
engine: a pywrapprovisioning_engine.ProvisioningEngine instance engine: a pywrapprovisioning_engine.ProvisioningEngine instance
verify_success: whether to verify that resulting status code equals OK verify_success: whether to verify that resulting status code equals OK
cert_type: The type of certificate to use for initializing SDK -
{kCertTesting/kCertDevelopment}
Returns: Returns:
status: OK on success, or an appropriate error status code otherwise. status: OK on success, or an appropriate error status code otherwise.
new_session: A new provisioning_session. new_session: A new provisioning_session.
""" """
status, new_session = engine.NewProvisioningSession(DEVICE_PUBLIC_KEY, logging.info('Starting a new provisioning session with'
DEVICE_PRIVATE_KEY) 'sample device public and private keys.')
data_provider = test_data_provider.TestDataProvider(cert_type)
status, new_session = engine.NewProvisioningSession(
data_provider.device_public_key, data_provider.device_private_key)
if verify_success: if verify_success:
AssertSuccess(status, 'Failed to create session.') AssertSuccess(status, 'Failed to create session.')
return (status, new_session) return status, new_session
def AssertSuccess(status, message=None): def AssertSuccess(status, message=None):