111 lines
4.5 KiB
Python
111 lines
4.5 KiB
Python
################################################################################
|
|
# Copyright 2016 Google LLC.
|
|
#
|
|
# This software is licensed under the terms defined in the Widevine Master
|
|
# License Agreement. For a copy of this agreement, please contact
|
|
# widevine-licensing@google.com.
|
|
################################################################################
|
|
|
|
import unittest
|
|
|
|
from certificate_type import CertificateType
|
|
import crypto_utility
|
|
import test_data_provider
|
|
import test_data_utility
|
|
from provisioning_engine import ProvisioningEngine
|
|
from provisioning_status import ProvisioningStatus
|
|
from protos.public import certificate_provisioning_pb2
|
|
from protos.public import signed_drm_certificate_pb2
|
|
|
|
|
|
class NewSessionTest(unittest.TestCase):
|
|
|
|
def setUp(self):
|
|
self._engine = ProvisioningEngine()
|
|
test_data_utility.InitProvisionEngineWithTestData(
|
|
self._engine, verify_success=True)
|
|
test_data_utility.SetCertificateStatusListWithTestData(
|
|
self._engine, 0, verify_success=True)
|
|
self._data_provider = test_data_provider.TestDataProvider(
|
|
CertificateType.kCertificateTypeTesting)
|
|
|
|
def testNewSessionSuccess(self):
|
|
test_data_utility.AddDrmIntermediateCertificateWithTestData(
|
|
self._engine, 2001, verify_success=True)
|
|
|
|
(new_session, _) = test_data_utility.NewProvisioningSessionWithTestData(
|
|
self._engine, verify_success=True)
|
|
(status, raw_response, _) = new_session.ProcessMessage(
|
|
self._data_provider.message)
|
|
assert ProvisioningStatus.OK == status
|
|
|
|
signed_request = test_data_utility.ConvertToSignedProvisioningMessage(
|
|
self._data_provider.message)
|
|
|
|
unsigned_request = certificate_provisioning_pb2.ProvisioningRequest()
|
|
unsigned_request.ParseFromString(signed_request.message)
|
|
|
|
signed_response = test_data_utility.ConvertToSignedProvisioningMessage(
|
|
raw_response)
|
|
|
|
self._VerifyMessageSignature(self._data_provider.service_public_key,
|
|
signed_response)
|
|
|
|
unsigned_response = certificate_provisioning_pb2.ProvisioningResponse()
|
|
unsigned_response.ParseFromString(signed_response.message)
|
|
|
|
self._VerifyProvisioningResponse(unsigned_request, unsigned_response)
|
|
|
|
def testProcessInvalidMessage(self):
|
|
test_data_utility.AddDrmIntermediateCertificateWithTestData(
|
|
self._engine, 2001, verify_success=True)
|
|
(new_session, _) = test_data_utility.NewProvisioningSessionWithTestData(
|
|
self._engine)
|
|
(status, _, _) = new_session.ProcessMessage('INVALID_MESSAGE')
|
|
self.assertEqual(ProvisioningStatus.INVALID_REQUEST_MESSAGE, status)
|
|
|
|
def testNewSessionWithoutIntermediateCert(self):
|
|
(new_session, _) = test_data_utility.NewProvisioningSessionWithTestData(
|
|
self._engine, verify_success=True)
|
|
(status, _, _) = new_session.ProcessMessage(self._data_provider.message)
|
|
self.assertEqual(ProvisioningStatus.MISSING_DEVICE_MODEL_CERT,
|
|
status)
|
|
|
|
def testNewSessionInvalidDevicePublicKey(self):
|
|
test_data_utility.AddDrmIntermediateCertificateWithTestData(
|
|
self._engine, 2001, verify_success=True)
|
|
(_, session_status) = self._engine.NewProvisioningSession(
|
|
'INVALID_PUBLIC_KEY',
|
|
self._data_provider.device_private_key)
|
|
self.assertEqual(ProvisioningStatus.INVALID_DRM_DEVICE_PUBLIC_KEY,
|
|
session_status)
|
|
|
|
def testNewSessionInvalidDevicePrivateKey(self):
|
|
test_data_utility.AddDrmIntermediateCertificateWithTestData(
|
|
self._engine, 2001, verify_success=True)
|
|
(_, session_status) = self._engine.NewProvisioningSession(
|
|
self._data_provider.device_public_key,
|
|
'INVALID_PRIVATE_KEY')
|
|
self.assertEqual(ProvisioningStatus.INVALID_DRM_DEVICE_PRIVATE_KEY,
|
|
session_status)
|
|
|
|
def _VerifyMessageSignature(self, public_key, signed_response):
|
|
crypto_utility.VerifySignature(public_key, signed_response.signature,
|
|
signed_response.message)
|
|
|
|
def _VerifyCertSignature(self, public_key, signed_cert):
|
|
crypto_utility.VerifySignature(public_key, signed_cert.signature,
|
|
signed_cert.drm_certificate)
|
|
|
|
def _VerifyProvisioningResponse(self, request, response):
|
|
self.assertEqual(request.nonce, response.nonce)
|
|
|
|
signed_cert = signed_drm_certificate_pb2.SignedDrmCertificate()
|
|
signed_cert.ParseFromString(response.device_certificate)
|
|
|
|
self._VerifyCertSignature(self._data_provider.ca_public_key, signed_cert)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|