Move ASOP factory extraction tool to its own directory

Moved some source to common folder.
Added uploading script which is also shared by CE CDM partners.
Added README.

Test: m wv_factory_extraction_tool
Bug: 414642286
Change-Id: I565027b75528ab28f9f1eb8d9086c0213de992d0
This commit is contained in:
conglin
2025-06-10 18:20:43 +00:00
parent 1f77085571
commit 7496c1c84c
14 changed files with 555 additions and 7 deletions

View File

@@ -0,0 +1,71 @@
// Copyright 2021 Google LLC. All Rights Reserved. This file and proprietary
// source code may only be used and distributed under the Widevine License
// Agreement.
#ifndef WIDEVINE_OEMCRYPTO_INTERFACE_H_
#define WIDEVINE_OEMCRYPTO_INTERFACE_H_
#include <cstdint>
#include <string>
#include <vector>
#include "OEMCryptoCENC.h"
namespace widevine {
class OEMCryptoInterface {
public:
OEMCryptoInterface() = default;
OEMCryptoInterface(const OEMCryptoInterface&) = delete;
OEMCryptoInterface& operator=(const OEMCryptoInterface&) = delete;
virtual ~OEMCryptoInterface();
// Initializes this interface by providing path to the OEMCrypto library.
bool Init(const std::string& oemcrypto_path);
// Retrieves the boot certificate chain from OEMCrypto implementation.
OEMCryptoResult GetBcc(std::vector<uint8_t>& bcc);
// Retrieves the build information of the OEMCrypto library from OEMCrypto
// implementation.
OEMCryptoResult GetOEMCryptoBuildInfo(std::string& build_info);
// Retrieves the verified device information of the OEMCrypto library from
// OEMCrypto implementation.
OEMCryptoResult GetVerifiedDeviceInformation(
std::vector<uint8_t>& verified_device_info);
// Generates device registration CSR payload and signs it with the leaf cert
// of BCC.
OEMCryptoResult GetSignedCsrPayload(const std::vector<uint8_t>& challenge,
const std::vector<uint8_t>& device_info,
std::vector<uint8_t>& signed_csr_payload);
private:
typedef OEMCryptoResult (*Initialize_t)();
typedef OEMCryptoResult (*Terminate_t)();
typedef OEMCryptoResult (*GetBootCertificateChain_t)(
uint8_t* bcc, size_t* bcc_size, uint8_t* additional_signature,
size_t* additional_signature_size);
typedef OEMCryptoResult (*BuildInformation_t)(char* buffer,
size_t* buffer_length);
typedef OEMCryptoResult (*GetDeviceInformation_t)(uint8_t* device_info,
size_t* device_info_length);
typedef OEMCryptoResult (*GetDeviceSignedCsrPayload_t)(
const uint8_t* challenge, size_t challenge_length,
const uint8_t* device_info, size_t device_info_length,
uint8_t* signed_csr_payload, size_t* signed_csr_payload_length);
Initialize_t Initialize = nullptr;
Terminate_t Terminate = nullptr;
GetBootCertificateChain_t GetBootCertificateChain = nullptr;
BuildInformation_t BuildInformation = nullptr;
GetDeviceInformation_t GetDeviceInformation = nullptr;
GetDeviceSignedCsrPayload_t GetDeviceSignedCsrPayload = nullptr;
void* handle_ = nullptr;
};
} // namespace widevine
#endif // WIDEVINE_OEMCRYPTO_INTERFACE_H_

View File

@@ -0,0 +1,35 @@
// Copyright 2018 Google LLC. All Rights Reserved. This file and proprietary
// source code may only be used and distributed under the Widevine License
// Agreement.
#ifndef WVCDM_CORE_PROPERTIES_H_
#define WVCDM_CORE_PROPERTIES_H_
#include <map>
#include <memory>
#include <mutex>
#include <string>
#include "disallow_copy_and_assign.h"
namespace wvcdm {
// This class gives device information/meta data.
class Properties {
public:
static bool GetBrandName(std::string* brand_name);
static bool GetCompanyName(std::string* company_name);
static bool GetModelName(std::string* model_name);
static bool GetArchitectureName(std::string* arch_name);
static bool GetDeviceName(std::string* device_name);
static bool GetProductName(std::string* product_name);
static bool GetBuildInfo(std::string* build_info);
static bool GetOEMCryptoPath(std::string* library_name);
private:
CORE_DISALLOW_COPY_AND_ASSIGN(Properties);
};
} // namespace wvcdm
#endif // WVCDM_CORE_PROPERTIES_H_

View File

@@ -0,0 +1,166 @@
// Copyright 2021 Google LLC. All Rights Reserved. This file and proprietary
// source code may only be used and distributed under the Widevine License
// Agreement.
#include "WidevineOemcryptoInterface.h"
#include <dlfcn.h>
#include "OEMCryptoCENC.h"
#include "log.h"
// These macros lookup the obfuscated name used for OEMCrypto.
#define QUOTE_DEFINE(A) #A
#define QUOTE(A) QUOTE_DEFINE(A)
#define LOOKUP(handle, name) dlsym(handle, QUOTE(name))
#define LOAD_SYM(name) \
name = reinterpret_cast<name##_t>(LOOKUP(handle_, OEMCrypto_##name)); \
if (name == nullptr) { \
LOGE("%s", dlerror()); \
return false; \
}
#define LOAD_SYM_IF_EXIST(name) \
name = reinterpret_cast<name##_t>(LOOKUP(handle_, OEMCrypto_##name));
namespace widevine {
OEMCryptoInterface::~OEMCryptoInterface() {
if (Terminate != nullptr) {
Terminate();
}
if (handle_ != nullptr) {
dlclose(handle_);
}
}
bool OEMCryptoInterface::Init(const std::string& oemcrypto_path) {
dlerror();
handle_ = dlopen(oemcrypto_path.c_str(), RTLD_LAZY | RTLD_GLOBAL);
if (handle_ == nullptr) {
LOGE("Can't open OEMCrypto library: %s", dlerror());
return false;
}
LOGI("OEMCrypto library opened.");
LOAD_SYM(Initialize);
LOAD_SYM(Terminate);
LOAD_SYM(GetBootCertificateChain);
LOAD_SYM(BuildInformation);
LOAD_SYM_IF_EXIST(GetDeviceInformation);
LOAD_SYM_IF_EXIST(GetDeviceSignedCsrPayload);
OEMCryptoResult status = Initialize();
if (status != OEMCrypto_SUCCESS) {
LOGE("OEMCrypto Initialize failed: %d", status);
return false;
}
return true;
}
OEMCryptoResult OEMCryptoInterface::GetBcc(std::vector<uint8_t>& bcc) {
if (handle_ == nullptr) {
return OEMCrypto_ERROR_INIT_FAILED;
}
bcc.resize(0);
size_t bcc_size = 0;
std::vector<uint8_t> additional_signature; // It should be empty.
size_t additional_signature_size = 0;
OEMCryptoResult result = GetBootCertificateChain(bcc.data(), &bcc_size,
additional_signature.data(),
&additional_signature_size);
LOGI("GetBootCertificateChain first attempt result %d", result);
if (additional_signature_size != 0) {
LOGW(
"The additional_signature_size required by OEMCrypto is %zu, while it "
"is expected to be zero.",
additional_signature_size);
}
if (result == OEMCrypto_ERROR_SHORT_BUFFER) {
bcc.resize(bcc_size);
additional_signature.resize(additional_signature_size);
result = GetBootCertificateChain(bcc.data(), &bcc_size,
additional_signature.data(),
&additional_signature_size);
if (result == OEMCrypto_SUCCESS) bcc.resize(bcc_size);
LOGI("GetBootCertificateChain second attempt result %d", result);
}
return result;
}
OEMCryptoResult OEMCryptoInterface::GetOEMCryptoBuildInfo(
std::string& build_info) {
if (handle_ == nullptr) {
return OEMCrypto_ERROR_INIT_FAILED;
}
build_info.resize(0);
size_t build_info_size = 0;
OEMCryptoResult result = BuildInformation(&build_info[0], &build_info_size);
LOGI("BuildInformation first attempt result %d", result);
if (result == OEMCrypto_ERROR_SHORT_BUFFER) {
build_info.resize(build_info_size);
result = BuildInformation(&build_info[0], &build_info_size);
LOGI("BuildInformation second attempt result %d", result);
}
return result;
}
OEMCryptoResult OEMCryptoInterface::GetVerifiedDeviceInformation(
std::vector<uint8_t>& verified_device_info) {
if (handle_ == nullptr) {
return OEMCrypto_ERROR_INIT_FAILED;
}
if (GetDeviceInformation == nullptr) {
return OEMCrypto_ERROR_NOT_IMPLEMENTED;
}
verified_device_info.resize(0);
size_t verified_device_info_size = 0;
OEMCryptoResult result = GetDeviceInformation(verified_device_info.data(),
&verified_device_info_size);
LOGI("GetVerifiedDeviceInformation first attempt result %d", result);
if (result == OEMCrypto_ERROR_SHORT_BUFFER) {
verified_device_info.resize(verified_device_info_size);
result = GetDeviceInformation(verified_device_info.data(),
&verified_device_info_size);
verified_device_info.resize(verified_device_info_size);
LOGI("GetVerifiedDeviceInformation second attempt result %d", result);
}
return result;
}
OEMCryptoResult OEMCryptoInterface::GetSignedCsrPayload(
const std::vector<uint8_t>& challenge,
const std::vector<uint8_t>& device_info,
std::vector<uint8_t>& signed_csr_payload) {
if (handle_ == nullptr) {
return OEMCrypto_ERROR_INIT_FAILED;
}
if (GetDeviceSignedCsrPayload == nullptr) {
return OEMCrypto_ERROR_NOT_IMPLEMENTED;
}
size_t signed_csr_payload_size = signed_csr_payload.size();
OEMCryptoResult result = GetDeviceSignedCsrPayload(
challenge.data(), challenge.size(), device_info.data(),
device_info.size(), signed_csr_payload.data(), &signed_csr_payload_size);
LOGI("GetDeviceSignedCsrPayload first attempt result %d", result);
if (result == OEMCrypto_ERROR_SHORT_BUFFER) {
signed_csr_payload.resize(signed_csr_payload_size);
result = GetDeviceSignedCsrPayload(challenge.data(), challenge.size(),
device_info.data(), device_info.size(),
signed_csr_payload.data(),
&signed_csr_payload_size);
signed_csr_payload.resize(signed_csr_payload_size);
LOGI("GetDeviceSignedCsrPayload second attempt result %d", result);
}
return result;
}
} // namespace widevine

View File

@@ -0,0 +1,513 @@
#!/usr/bin/env python3
# Copyright 2022 Google LLC. All rights reserved.
"""Uploader tool for sending device keys to Widevine remote provisioning server.
This tool consumes an input file containing device info, which includes the
device's public key, and batch uploads it to Google. Once uploaded, the device
may use Widevine provisioning 4 to request OEM certificates from Google.
This tool is designed to be used with the widevine factory extraction tool.
Therefore, the JSON output from widevine factory extraction tool is the expected
input, with one JSON string per line of input.
"""
import argparse
from http import HTTPStatus
import http.server
import json
import os
import sys
import urllib.parse
import urllib.request
import uuid
import webbrowser
from google.auth.transport import requests
from google.oauth2 import service_account
DEFAULT_BASE = 'https://widevine.googleapis.com/v1beta1'
UPLOAD_PATH = '/uniqueDeviceInfo:batchUpload'
BATCH_CHECK_PATH = '/uniqueDeviceInfo:batchCheck'
TOKEN_CACHE_FILE = os.path.join(
os.path.expanduser('~'), '.device_info_uploader.token'
)
OAUTH_SERVICE_BASE = 'https://accounts.google.com/o/oauth2'
OAUTH_AUTHN_URL = OAUTH_SERVICE_BASE + '/auth'
OAUTH_TOKEN_URL = OAUTH_SERVICE_BASE + '/token'
OAUTH_SCOPES = ['https://www.googleapis.com/auth/widevine/frontend']
class OAuthHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
"""HTTP Handler used to accept the oauth response when the user logs in."""
def do_GET(self): # pylint: disable=invalid-name
"""Handles GET, extracting the authorization code in the query params."""
print(f'GET path: {self.path}')
parsed_path = urllib.parse.urlparse(self.path)
params = dict(urllib.parse.parse_qsl(parsed_path.query))
if 'error' in params:
error = params['error']
self.respond(
400, error, f'Error received from the OAuth server: {error}.'
)
sys.exit(-1)
elif 'code' not in params:
self.respond(
400,
'ERROR',
(
'Response from OAuth server is missing the authorization '
f'code. Full response: "{self.path}"'
),
)
sys.exit(-1)
else:
self.respond(
200, 'Success!', 'Success! You may close this browser window.'
)
self.server.code = params['code']
def do_POST(self): # pylint: disable=invalid-name
print(f'POST path: {self.path}')
def respond(self, code, title, message):
"""Send a response to the HTTP client.
Args:
code: The HTTP status code to send
title: The page title to display
message: The message to display to the user on the page
"""
if code != 200:
eprint(message)
self.send_response(code)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(
(
'<html>'
f' <title>{title}</title>'
' <body>'
f' <p style="font-size:24px;">{message}</p>'
' </body>'
'</html>'
).encode('utf-8')
)
class LocalOAuthReceiver(http.server.HTTPServer):
"""HTTP server that will wait for an OAuth authorization code."""
def __init__(self):
super(LocalOAuthReceiver, self).__init__(
('127.0.0.1', 0), OAuthHTTPRequestHandler
)
self.code = None
def port(self):
return self.socket.getsockname()[1]
def wait_for_code(self):
print('Waiting for a response from the Google OAuth service.')
print('If you receive an error in your browser, interrupt this script.')
self.handle_request()
return self.code
def eprint(message):
print(message, file=sys.stderr)
def die(message):
eprint(message)
sys.exit(-1)
def parse_args():
"""Parse and return the command line args.
Returns:
An argparse.Namespace object populated with the arguments.
"""
parser = argparse.ArgumentParser(
description='Widevine BCC Batch Upload/Check Tool'
)
parser.add_argument(
'--version', action='version', version='20240822'
) # yyyymmdd
parser.add_argument(
'--json-csr',
nargs='+',
required=True,
help='list of files containing JSON output from factory extraction tool',
)
parser.add_argument('--credentials', help='JSON credentials file')
parser.add_argument(
'--endpoint', default=DEFAULT_BASE, help='destination server URL'
)
parser.add_argument('--org-name', required=True, help='orgnization name')
parser.add_argument(
'--cache-token',
action='store_true',
help='Use a locally cached a refresh token',
)
parser.add_argument(
'--service-credentials', help='JSON credentials file for service account'
)
parser.add_argument(
'--die-on-error',
action='store_true',
help='exit on error and stop uploading more CSRs',
)
parser.add_argument(
'--dryrun',
action='store_true',
help=(
'Do not upload anything. Instead print out what actions would have'
' been taken if the --dryrun flag had not been specified.'
),
)
parser.add_argument(
'--check',
action='store_true',
required=False,
help='Perform a batch check on the CSRs.',
)
parser.add_argument(
'--verbose',
action='store_true',
required=False,
help='Print request and response details.',
)
return parser.parse_args()
def parse_json_csrs(filename, batches):
"""Parse the given file and insert it into batches.
If the input is not a valid JSON CSR blob, exit the program.
Args:
filename: The file that contains a JSON-formatted build and CSR
batches: Output dict containing a mapping from json dumped device metadata
to BCCs.
"""
base_filename = os.path.basename(filename)
line_count = 0
for line in open(filename):
line_count = line_count + 1
obj = {}
try:
obj = json.loads(line)
except json.JSONDecodeError as e:
die(f'{e.msg} {filename}:{line_count}, char {e.pos}')
try:
bcc = {
'boot_certificate_chain': obj['bcc'],
'name': f'{base_filename}#{line_count}',
}
device_metadata = json.dumps({
'company': obj['company'],
'architecture': obj['architecture'],
'name': obj['name'],
'model': obj['model'],
'product': obj['product'],
'build_info': obj['build_info'],
'oemcrypto_build_info': obj['oemcrypto_build_info'],
})
if device_metadata not in batches:
batches[device_metadata] = []
batches[device_metadata].append(bcc)
except KeyError as e:
die(f'Invalid object at {filename}:{line_count}, missing {e}')
if line_count == 0:
die('Empty BCC file!')
def format_request_body(args, device_metadata, bccs):
"""Generate a formatted request buffer for the given build and CSRs."""
request = {
'parent': 'orgs/' + args.org_name,
'request_id': uuid.uuid4().hex,
'metadata': json.loads(device_metadata),
'device_info': bccs,
}
return json.dumps(request).encode('utf-8')
def format_check_request_body(args, bccs):
"""Generate a formatted BatchCheck request buffer for the given build and CSRs."""
request = {
'parent': 'orgs/' + args.org_name,
'request_id': uuid.uuid4().hex,
'device_info': bccs,
}
return json.dumps(request).encode('utf-8')
def load_refresh_token():
if not os.path.exists(TOKEN_CACHE_FILE):
return None
with open(TOKEN_CACHE_FILE) as f:
return f.readline()
def store_refresh_token(refresh_token):
with open(TOKEN_CACHE_FILE, 'w') as f:
f.write(refresh_token)
def fetch_access_token(creds, cache_token=False, code=None, redirect_uri=None):
"""Fetch an oauth2 access token.
If a code is passed, then it is used to get the token. If code
is None, then look for a persisted refresh token and use that to
get the access token instead.
Args:
creds: The OAuth client credentials, including client secret and id.
cache_token: If True, then the refresh token is cached on disk so that the
user does not have to reauthenticate when the script is used again.
code: The OAuth authorization code, returned by Google's OAuth service.
redirect_uri: If an authorization code is supplied, then the redirect_uri
used to fetch the code must be passed here.
Returns:
A base64-encode OAuth access token, suitable for including in a request.
"""
request = urllib.request.Request(OAUTH_TOKEN_URL)
request.add_header('Content-Type', 'application/x-www-form-urlencoded')
body = 'client_id=' + creds['client_id']
body += '&client_secret=' + creds['client_secret']
if code is not None:
if redirect_uri is None:
raise ValueError('"code" was supplied, but "redirect_uri" is None')
body += '&grant_type=authorization_code'
body += '&code=' + code
body += '&redirect_uri=' + redirect_uri
else:
refresh_token = load_refresh_token()
if refresh_token is None:
return None
body += '&grant_type=refresh_token'
body += '&refresh_token=' + refresh_token
try:
response = urllib.request.urlopen(request, body.encode('utf-8'))
parsed_response = json.load(response)
if cache_token:
store_refresh_token(parsed_response['refresh_token'])
return parsed_response['access_token']
except urllib.error.HTTPError as e:
# Catch bogus/expired refresh tokens, but bubble up errors when
# an authorization code is used.
if code is None:
return None
die(f'Failed to receive access token: {e.code} {e.reason}')
def load_and_validate_creds(credfile):
"""Loads the credentials from the given file and validates them.
Args:
credfile: the name of the file containing the client credentials
Returns:
A map containing the credentials for connecting to the APE backend.
"""
credmap = json.load(open(credfile))
not_local_app_creds_error = (
'ERROR: Invalid credential file.\n'
' The given credentials do not appear to be for a locally installed\n'
' application. Please navigate to the credentials dashboard and\n'
' ensure that the "Type" of your client is "Desktop":\n'
' https://console.cloud.google.com/apis/credentials'
)
if 'installed' not in credmap:
die(not_local_app_creds_error)
creds = credmap['installed']
expected_keys = set(['client_id', 'client_secret', 'redirect_uris'])
if not expected_keys.issubset(creds.keys()):
die((
'ERROR: Invalid credential file.\n'
' The given credentials do not appear to be valid. Please\n'
' re-download the client credentials file from the dashboard:\n'
' https://console.cloud.google.com/apis/credentials'
))
if 'http://localhost' not in creds['redirect_uris']:
die(not_local_app_creds_error)
return creds
def authenticate_and_fetch_token(args):
"""Authenticate and fetch an OAUTH2 access token."""
# Auth for service account
if args.service_credentials:
if not os.path.exists(args.service_credentials):
die('Service account credentials file does not exist.')
svc_account = service_account.Credentials.from_service_account_file(
args.service_credentials,
scopes=OAUTH_SCOPES,
)
svc_account.refresh(requests.Request())
return svc_account.token
# Auth for user account
if args.credentials is None:
die('User credentials is not provided.')
if not os.path.exists(args.credentials):
die('User credentials file does not exist.')
creds = load_and_validate_creds(args.credentials)
access_type = 'online'
if args.cache_token:
token = fetch_access_token(creds)
if token is not None:
return token
access_type = 'offline'
httpd = LocalOAuthReceiver()
redirect_uri = f'http://127.0.0.1:{httpd.port()}'
url = (
OAUTH_AUTHN_URL
+ '?response_type=code'
+ '&client_id='
+ creds['client_id']
+ '&redirect_uri='
+ redirect_uri
+ '&scope=https://www.googleapis.com/auth/widevine/frontend'
+ '&access_type='
+ access_type
+ '&prompt=select_account'
)
print('Opening your web browser to authenticate...')
if not webbrowser.open(url, new=1, autoraise=True):
print('Error opening the browser. Please open this link in a browser')
print(f'that is running on this same system:\n {url}\n')
code = httpd.wait_for_code()
return fetch_access_token(creds, args.cache_token, code, redirect_uri)
def upload_batch(args, device_metadata, bccs):
"""Batch upload all the CSRs associated build device_metadata.
Args:
args: The parsed command-line arguments
device_metadata: The build for which we're uploading CSRs
bccs: a list of BCCs to be uploaded for the given build
"""
print('Uploading {} bcc(s)'.format(len(bccs)))
if args.verbose:
print("Build: '{}'".format(device_metadata))
body = format_request_body(args, device_metadata, bccs)
return batch_action_single_attempt(args, UPLOAD_PATH, body)
def check_batch(args, device_metadata, bccs):
"""Batch check all the CSRs.
Args:
args: The parsed command-line arguments
device_metadata: The build for which we're checking CSRs
bccs: a list of BCCs to be checked for the given build
"""
print('Checking {} bcc(s)'.format(len(bccs)))
if args.verbose:
print("Build: '{}'".format(device_metadata))
body = format_check_request_body(args, bccs)
return batch_action_single_attempt(args, BATCH_CHECK_PATH, body)
def batch_action_single_attempt(args, path, body):
"""Batch action (upload or check existence) for all the CSRs in chunks.
Args:
args: The parsed command-line arguments
csrs: a list of CSRs to be uploaded/checked for the given build
path: The endpoint url for the specific action
body: The formatted request body
"""
if args.verbose:
print('Request body:')
print(body)
print('Request target:')
print(args.endpoint + path)
request = urllib.request.Request(args.endpoint + path)
request.add_header('Content-Type', 'application/json')
request.add_header('X-GFE-SSL', 'yes')
request.add_header(
'Authorization', 'Bearer ' + authenticate_and_fetch_token(args)
)
if args.dryrun:
print('dry run: would have reached to ' + request.full_url)
return HTTPStatus.OK
try:
response = urllib.request.urlopen(request, body)
except urllib.error.HTTPError as e:
eprint(f'Error uploading bccs. {e}')
for line in e:
eprint(line.decode('utf-8').rstrip())
sys.exit(1)
response_body = response.read().decode('utf-8')
if args.verbose:
print('Response body:')
print(response_body)
res = json.loads(response_body)
if 'failedDeviceInfo' in res:
eprint('Failed to upload/check some device info! Response body:')
eprint(response_body)
eprint('Failed: {} bcc(s)'.format(len(res['failedDeviceInfo'])))
if args.die_on_error:
sys.exit(1)
elif 'successDeviceInfo' in res:
print('Success: {} bcc(s)'.format(len(res['successDeviceInfo'])))
else:
eprint('Failed with unexpected response:')
eprint(response_body)
def main():
args = parse_args()
if args.dryrun:
print('Dry run mode enabled. Service APIs will not be called.')
batches = {}
for filename in args.json_csr:
parse_json_csrs(filename, batches)
if len(batches) > 1:
print('WARNING: {} different device metadata'.format(len(batches)))
for device_metadata, bccs in batches.items():
if args.check:
check_batch(args, device_metadata, bccs)
else:
upload_batch(args, device_metadata, bccs)
if __name__ == '__main__':
main()