diff --git a/libwvdrmengine/tools/factory_upload_tool/src/WidevineOemcryptoInterface.cpp b/libwvdrmengine/tools/factory_upload_tool/src/WidevineOemcryptoInterface.cpp index ccb105a3..3e9a2b1b 100644 --- a/libwvdrmengine/tools/factory_upload_tool/src/WidevineOemcryptoInterface.cpp +++ b/libwvdrmengine/tools/factory_upload_tool/src/WidevineOemcryptoInterface.cpp @@ -48,6 +48,7 @@ std::unique_ptr FileSystem::Open(const std::string&, int) { return std::unique_ptr(new FileImpl()); } bool FileSystem::Exists(const std::string&) { return false; } +bool FileSystem::Exists(const std::string&, int*) { return false; } bool FileSystem::Remove(const std::string&) { return false; } ssize_t FileSystem::FileSize(const std::string&) { return false; } bool FileSystem::List(const std::string&, std::vector*) { @@ -141,7 +142,9 @@ OEMCryptoResult OEMCryptoInterface::GetOEMCryptoBuildInfo( result = BuildInformation(&build_info[0], &build_info_size); LOGI("BuildInformation second attempt result %d", result); } - + if (result == OEMCrypto_SUCCESS) { + build_info.resize(build_info_size); + } return result; } diff --git a/libwvdrmengine/tools/factory_upload_tool/wv_upload_tool.py b/libwvdrmengine/tools/factory_upload_tool/wv_upload_tool.py index 0e424638..ab3f5ff1 100644 --- a/libwvdrmengine/tools/factory_upload_tool/wv_upload_tool.py +++ b/libwvdrmengine/tools/factory_upload_tool/wv_upload_tool.py @@ -9,88 +9,39 @@ may use Widevine provisioning 4 to request OEM certificates from Google. This tool is designed to be used with the widevine factory extraction tool. Therefore, the JSON output from widevine factory extraction tool is the expected input, with one JSON string per line of input. -""" -import argparse -import base64 -import http.server -import json -import os -import re -import subprocess -import sys -import urllib.parse -import urllib.request -import uuid -import webbrowser -import zipfile - -GOOGLE_AUTH_CLIENT_INSTALLED = False -try: - from google.auth.transport import requests - from google.oauth2 import service_account - GOOGLE_AUTH_CLIENT_INSTALLED = True -except ImportError: - pass - -""" -README: please fill in values for the following constants if not setting via command line. -* DEFAULT_ORG -* DEFAULT_SERVICE_ACCOUNT_INFO -""" -DEFAULT_ORG = '' -DEFAULT_SERVICE_ACCOUNT_INFO = dict( - type="service_account", - project_id="", - private_key_id= "", - private_key="", - client_email="", - client_id="", - auth_uri="https://accounts.google.com/o/oauth2/auth", - token_uri="https://oauth2.googleapis.com/token", - auth_provider_x509_cert_url="https://www.googleapis.com/oauth2/v1/certs", - client_x509_cert_url="", - universe_domain="googleapis.com" -) - -DEFAULT_BASE = 'https://widevine.googleapis.com/v1beta1' -UPLOAD_PATH = '/uniqueDeviceInfo:batchUpload' -TOKEN_CACHE_FILE = os.path.join( - os.path.expanduser('~'), '.device_info_uploader.token' -) - -OAUTH_SCOPE = 'https://www.googleapis.com/auth/widevine/frontend' -OAUTH_SERVICE_BASE = 'https://accounts.google.com/o/oauth2' -OAUTH_AUTHN_URL = OAUTH_SERVICE_BASE + '/auth' -OAUTH_TOKEN_URL = OAUTH_SERVICE_BASE + '/token' -SERVICE_ACCOUNT = None - -CALLOUT = """ -**For NON-GMS device testing only.** +**This tool is for NON-GMS device testing only.** GMS devices please follow: > https://docs.partner.android.com/gms/building/integrating/att-keys/rkp-test-upload """ -USAGE_DETAILS = """ -# Authentication: -1. authenticate with oauth credentials on command line: - > wv_upload_tool.py --oauth-credentials oauth_credentials.json -2. authenticate with service account credentials on command line: - > wv_upload_tool.py --service-account-credentials service_account_credentials.json -3. authenticate with DEFAULT_SERVICE_ACCOUNT_INFO embedded in script: - > wv_upload_tool.py +import argparse +from http import HTTPStatus +import http.server +import json +import os +import sys +import urllib.parse +import urllib.request +import uuid +import webbrowser +from google.auth.transport import requests +from google.oauth2 import service_account + +DEFAULT_BASE = 'https://widevine.googleapis.com/v1beta1' +UPLOAD_PATH = '/uniqueDeviceInfo:batchUpload' +BATCH_CHECK_PATH = '/uniqueDeviceInfo:batchCheck' +TOKEN_CACHE_FILE = os.path.join( + os.path.expanduser('~'), '.device_info_uploader.token' +) + +OAUTH_SERVICE_BASE = 'https://accounts.google.com/o/oauth2' +OAUTH_AUTHN_URL = OAUTH_SERVICE_BASE + '/auth' +OAUTH_TOKEN_URL = OAUTH_SERVICE_BASE + '/token' + +OAUTH_SCOPES = ['https://www.googleapis.com/auth/widevine/frontend'] -# Usage Examples: -1. extract device metadata from android device(s) visible to adb and upload: - > wv_upload_tool.py -2. extract device metadata from bugreport(s) and upload: - > wv_upload_tool.py --bugreport ./bugreport.zip ./bugreport.txt -3. upload raw bcc's (device metadata must be supplied separately) - > wv_upload_tool.py --bcc-metadata ./device_metadata.json --bcc ./raw_bcc.bin -4. extract but don't upload; save output and upload later: - > wv_upload_tool.py -n > ./out.json ; wv_upload_tool.py --json-csr ./out.json -""" class OAuthHTTPRequestHandler(http.server.BaseHTTPRequestHandler): """HTTP Handler used to accept the oauth response when the user logs in.""" @@ -184,72 +135,61 @@ def parse_args(): Returns: An argparse.Namespace object populated with the arguments. """ - parser = argparse.ArgumentParser(description='Upload device info' + CALLOUT, - formatter_class=argparse.RawTextHelpFormatter, - epilog=USAGE_DETAILS) + parser = argparse.ArgumentParser(description='Widevine BCC Batch Upload/Check Tool') + + parser.add_argument("--version", action="version", version="20240822") #yyyymmdd + parser.add_argument( '--json-csr', - nargs='*', - default=[], - help="""list of files containing JSON with following keys: - 1. architecture - 2. bcc - 3. build_info - 4. company - 5. model - 6. name - 7. product - """, + nargs='+', + required=True, + help='list of files containing JSON output from factory extraction tool', ) - - parser.add_argument( - '--bugreport', - nargs='*', - default=[], - # type=argparse.FileType('rb'), - help='list of bugreports to extract bcc & device metadata from', - ) - - parser.add_argument( - '--bcc', - nargs='*', - default=[], - type=argparse.FileType('rb'), - help='list of raw bcc files', - ) - - parser.add_argument( - '--bcc-metadata', - type=argparse.FileType('r'), - help='Metadata for raw bcc files; required with --bcc.\n' - 'Same JSON fields are required as --json-csr except for `bcc`.', - ) - - parser.add_argument('--oauth-credentials', help='JSON OAuth credentials file') - - parser.add_argument('--service-account-credentials', help='JSON service account key file') + parser.add_argument('--credentials', help='JSON credentials file') parser.add_argument( '--endpoint', default=DEFAULT_BASE, help='destination server URL' ) - parser.add_argument( - '--org-name', default=DEFAULT_ORG, help='orgnization name' - ) + parser.add_argument('--org-name', required=True, help='orgnization name') parser.add_argument( '--cache-token', action='store_true', - default=True, help='Use a locally cached a refresh token', ) parser.add_argument( - '-n', - '--dry-run', + '--service-credentials', help='JSON credentials file for service account' + ) + + parser.add_argument( + '--die-on-error', action='store_true', - help='Print bcc and metadata to stdout instead of uploading, one bcc per line.\n' - 'Output can passed to --json-csr for uploads.', + help='exit on error and stop uploading more CSRs', + ) + + parser.add_argument( + '--dryrun', + action='store_true', + help=( + 'Do not upload anything. Instead print out what actions would have' + ' been taken if the --dryrun flag had not been specified.' + ), + ) + + parser.add_argument( + '--check', + action='store_true', + required=False, + help='Perform a batch check on the CSRs.', + ) + + parser.add_argument( + '--verbose', + action='store_true', + required=False, + help='Print request and response details.', ) return parser.parse_args() @@ -268,43 +208,31 @@ def parse_json_csrs(filename, batches): line_count = 0 for line in open(filename): line_count = line_count + 1 + obj = {} try: obj = json.loads(line) except json.JSONDecodeError as e: die(f'{e.msg} {filename}:{line_count}, char {e.pos}') - convert_bcc_for_upload(obj, batches) + try: + bcc = {'boot_certificate_chain': obj['bcc']} + device_metadata = json.dumps({ + 'company': obj['company'], + 'architecture': obj['architecture'], + 'name': obj['name'], + 'model': obj['model'], + 'product': obj['product'], + 'build_info': obj['build_info'], + 'oemcrypto_build_info': obj['oemcrypto_build_info'], + }) + if device_metadata not in batches: + batches[device_metadata] = [] + batches[device_metadata].append(bcc) + except KeyError as e: + die(f'Invalid object at {filename}:{line_count}, missing {e}') -def convert_bcc_for_upload(obj, batches): - """Convert python bcc dict into format suitable for upload - - If the input is not a valid python bcc dict, exit the program. - - Args: - obj: The python dict that contains bcc and device metadata fields - batches: Output dict containing a mapping from json dumped device metadata - to BCCs. - """ - - try: - bcc = {'boot_certificate_chain': obj['bcc']} - device_metadata = json.dumps( - { - 'company': obj['company'], - 'architecture': obj['architecture'], - 'name': obj['name'], - 'model': obj['model'], - 'product': obj['product'], - 'build_info': obj['build_info'], - }, - sort_keys=True, - ) - except KeyError as e: - die(f'Invalid object at {filename}:{line_count}, missing {e}') - - if device_metadata not in batches: - batches[device_metadata] = [] - batches[device_metadata].append(bcc) + if line_count == 0: + die('Empty BCC file!') def format_request_body(args, device_metadata, bccs): @@ -318,6 +246,17 @@ def format_request_body(args, device_metadata, bccs): return json.dumps(request).encode('utf-8') +def format_check_request_body(args, bccs): + """Generate a formatted BatchCheck request buffer for the given build and CSRs.""" + request = { + 'parent': 'orgs/' + args.org_name, + 'request_id': uuid.uuid4().hex, + 'device_info': bccs, + } + + return json.dumps(request).encode('utf-8') + + def load_refresh_token(): if not os.path.exists(TOKEN_CACHE_FILE): return None @@ -380,15 +319,16 @@ def fetch_access_token(creds, cache_token=False, code=None, redirect_uri=None): die(f'Failed to receive access token: {e.code} {e.reason}') -def load_and_validate_creds(credmap): +def load_and_validate_creds(credfile): """Loads the credentials from the given file and validates them. Args: - credmap: python dict containing the client credentials + credfile: the name of the file containing the client credentials Returns: A map containing the credentials for connecting to the APE backend. """ + credmap = json.load(open(credfile)) not_local_app_creds_error = ( 'ERROR: Invalid credential file.\n' @@ -405,14 +345,12 @@ def load_and_validate_creds(credmap): expected_keys = set(['client_id', 'client_secret', 'redirect_uris']) if not expected_keys.issubset(creds.keys()): - die( - ( - 'ERROR: Invalid credential file.\n' - ' The given credentials do not appear to be valid. Please\n' - ' re-download the client credentials file from the dashboard:\n' - ' https://console.cloud.google.com/apis/credentials' - ) - ) + die(( + 'ERROR: Invalid credential file.\n' + ' The given credentials do not appear to be valid. Please\n' + ' re-download the client credentials file from the dashboard:\n' + ' https://console.cloud.google.com/apis/credentials' + )) if 'http://localhost' not in creds['redirect_uris']: die(not_local_app_creds_error) @@ -420,42 +358,25 @@ def load_and_validate_creds(credmap): return creds -def fetch_service_account_token(args): - """Use a service account to get an access token.""" - if not GOOGLE_AUTH_CLIENT_INSTALLED: - die( - 'Attempting to use service account but you have not ' - 'installed Google\'s auth client library.\n' - 'Run the following command to install it:\n' - 'pip3 install google-auth==2.13.0 requests==2.28' - ) - - global SERVICE_ACCOUNT - - if SERVICE_ACCOUNT is None: - if args.service_account_credentials: - SERVICE_ACCOUNT = service_account.Credentials.from_service_account_file( - args.service_account_credentials, - scopes=[OAUTH_SCOPE], - ) - else: - SERVICE_ACCOUNT = service_account.Credentials.from_service_account_info( - DEFAULT_SERVICE_ACCOUNT_INFO, - scopes=[OAUTH_SCOPE], - ) - SERVICE_ACCOUNT.refresh(requests.Request()) - - return SERVICE_ACCOUNT.token - - def authenticate_and_fetch_token(args): - """Authenticate the user and fetch an OAUTH2 access token.""" - if args.oauth_credentials: - credmap = json.load(open(args.oauth_credentials)) - else: - return fetch_service_account_token(args) + """Authenticate and fetch an OAUTH2 access token.""" + # Auth for service account + if args.service_credentials: + if not os.path.exists(args.service_credentials): + die('Service account credentials file does not exist.') + svc_account = service_account.Credentials.from_service_account_file( + args.service_credentials, + scopes=OAUTH_SCOPES, + ) + svc_account.refresh(requests.Request()) + return svc_account.token - creds = load_and_validate_creds(credmap) + # Auth for user account + if args.credentials is None: + die('User credentials is not provided.') + if not os.path.exists(args.credentials): + die('User credentials file does not exist.') + creds = load_and_validate_creds(args.credentials) access_type = 'online' if args.cache_token: @@ -494,16 +415,52 @@ def upload_batch(args, device_metadata, bccs): device_metadata: The build for which we're uploading CSRs bccs: a list of BCCs to be uploaded for the given build """ - print("Uploading {} bcc(s) for build '{}'".format(len(bccs), device_metadata)) + print('Uploading {} bcc(s)'.format(len(bccs))) + if args.verbose: + print("Build: '{}'".format(device_metadata)) body = format_request_body(args, device_metadata, bccs) - print(body) - print(args.endpoint + UPLOAD_PATH) - request = urllib.request.Request(args.endpoint + UPLOAD_PATH) + return batch_action_single_attempt(args, UPLOAD_PATH, body) + + +def check_batch(args, device_metadata, bccs): + """Batch check all the CSRs. + + Args: + args: The parsed command-line arguments + device_metadata: The build for which we're checking CSRs + bccs: a list of BCCs to be checked for the given build + """ + print('Checking {} bcc(s)'.format(len(bccs))) + if args.verbose: + print("Build: '{}'".format(device_metadata)) + body = format_check_request_body(args, bccs) + return batch_action_single_attempt(args, BATCH_CHECK_PATH, body) + + +def batch_action_single_attempt(args, path, body): + """Batch action (upload or check existence) for all the CSRs in chunks. + + Args: + args: The parsed command-line arguments + csrs: a list of CSRs to be uploaded/checked for the given build + path: The endpoint url for the specific action + body: The formatted request body + """ + if args.verbose: + print('Request body:') + print(body) + print('Request target:') + print(args.endpoint + path) + request = urllib.request.Request(args.endpoint + path) request.add_header('Content-Type', 'application/json') request.add_header('X-GFE-SSL', 'yes') request.add_header( 'Authorization', 'Bearer ' + authenticate_and_fetch_token(args) ) + if args.dryrun: + print('dry run: would have reached to ' + request.full_url) + return HTTPStatus.OK + try: response = urllib.request.urlopen(request, body) except urllib.error.HTTPError as e: @@ -512,154 +469,38 @@ def upload_batch(args, device_metadata, bccs): eprint(line.decode('utf-8').rstrip()) sys.exit(1) - while chunk := response.read(1024): - print(chunk.decode('utf-8')) - - -def get_device_info_adb(s): - """Get bcc & device metadata via adb. - - Args: - s: device serial number - - Returns: - A python dict that contains bcc and device metadata fields - """ - obj = dict() - - sys_prop_names = dict( - company='ro.product.manufacturer', - model='ro.product.model', - name='ro.product.device', - product='ro.product.name', - architecture='ro.product.cpu.abi', - ) - for attr, prop in sys_prop_names.items(): - obj[attr] = subprocess.check_output( - f'adb -s {s} shell getprop {prop}'.split(), text=True - ).strip() - - wv_prop_names = dict( - boot_certificate_chain='bcc', - oemcrypto_build_info='build_info', - ) - dump = subprocess.check_output( - f'adb -s {s} shell dumpsys android.hardware.drm.IDrmFactory/widevine -p' - .split(), - text=True, - ).splitlines() - for line in dump[1:]: - kv = line.strip().split(': ', 1) - if len(kv) < 2: - continue - k, v = kv - if k in wv_prop_names: - obj[wv_prop_names[k]] = v.strip('"\'') - - obj['bcc'] = base64.b64encode(bytes.fromhex(obj['bcc'])).decode() - return obj - - -def get_device_info_bugreport(br): - """Get bcc & device metadata from bugreport. - - Args: - br: bugreport file - - Returns: - A python dict that contains bcc and device metadata fields - """ - z=None - section = '' - sections = set() - metadata = dict() - if zipfile.is_zipfile(br): - z=zipfile.ZipFile(br) - with z.open('main_entry.txt') as m: - fname = m.read().decode() - - if z: - f=z.open(fname) + response_body = response.read().decode('utf-8') + if args.verbose: + print('Response body:') + print(response_body) + res = json.loads(response_body) + if 'failedDeviceInfo' in res: + eprint('Failed to upload/check some device info! Response body:') + eprint(response_body) + eprint('Failed: {} bcc(s)'.format(len(res['failedDeviceInfo']))) + if args.die_on_error: + sys.exit(1) + elif 'successDeviceInfo' in res: + print('Success: {} bcc(s)'.format(len(res['successDeviceInfo']))) else: - f=open(br,'rb') - - for line in f: - if line.startswith( - b'DUMP OF SERVICE android.hardware.drm.IDrmFactory/widevine:' - ): - section = 'widevine' - elif line.find(b'- SYSTEM PROPERTIES (getprop) -') >= 0: - section = 'getprop' - elif line.startswith(b'-----') and section: - sections.add(section) - section = '' - - if len(sections) == 2: - break - - pattern = '' - if section == 'getprop': - pattern = b'^\\[(.+)\\]: \\[(.+)\\]$' - elif section == 'widevine': - pattern = b'\\s+([a-z_]+): (.+)$' - - if pattern: - match = re.match(pattern, line) - if match: - k, v = match.groups() - metadata[k.decode()] = v.decode().strip('"\'') - - prop_names = dict( - company='ro.product.manufacturer', - model='ro.product.model', - name='ro.product.device', - product='ro.product.name', - architecture='ro.product.cpu.abi', - bcc='boot_certificate_chain', - build_info='oemcrypto_build_info', - ) - - obj = dict() - for k in prop_names: - obj[k] = metadata[prop_names[k]] - obj['bcc'] = base64.b64encode(bytes.fromhex(obj['bcc'])).decode() - return obj - + eprint('Failed with unexpected response:') + eprint(response_body) def main(): args = parse_args() + if args.dryrun: + print('Dry run mode enabled. Service APIs will not be called.') batches = {} for filename in args.json_csr: parse_json_csrs(filename, batches) - for br in args.bugreport: - convert_bcc_for_upload(get_device_info_bugreport(br), batches) - - if args.bcc_metadata: - bcc_metadata = json.load(args.bcc_metadata) - for bcc in args.bcc: - if not args.bcc_metadata: - die('Missing --bcc-metadata') - tmp = bcc_metadata.copy() - tmp['bcc'] = base64.b64encode(bcc.read()).decode() - convert_bcc_for_upload(tmp, batches) - - if not batches: - lines = subprocess.check_output('adb devices'.split(), text=True).split( - '\n' - ) - for line in lines: - if line.endswith('device'): - serial = line.split()[0] - convert_bcc_for_upload(get_device_info_adb(serial), batches) + if len(batches) > 1: + print('WARNING: {} different device metadata'.format(len(batches))) for device_metadata, bccs in batches.items(): - if args.dry_run: - for bcc in bccs: - out = json.loads(device_metadata) - out['bcc'] = bcc['boot_certificate_chain'] - print(json.dumps(out)) + if args.check: + check_batch(args, device_metadata, bccs) else: upload_batch(args, device_metadata, bccs)