diff --git a/libwvdrmengine/tools/factory_upload_tool/wv_upload_tool.py b/libwvdrmengine/tools/factory_upload_tool/wv_upload_tool.py new file mode 100644 index 00000000..0e424638 --- /dev/null +++ b/libwvdrmengine/tools/factory_upload_tool/wv_upload_tool.py @@ -0,0 +1,668 @@ +#!/usr/bin/env python3 +# Copyright 2022 Google LLC. All rights reserved. +"""Uploader tool for sending device keys to Widevine remote provisioning server. + +This tool consumes an input file containing device info, which includes the +device's public key, and batch uploads it to Google. Once uploaded, the device +may use Widevine provisioning 4 to request OEM certificates from Google. + +This tool is designed to be used with the widevine factory extraction tool. +Therefore, the JSON output from widevine factory extraction tool is the expected +input, with one JSON string per line of input. +""" + +import argparse +import base64 +import http.server +import json +import os +import re +import subprocess +import sys +import urllib.parse +import urllib.request +import uuid +import webbrowser +import zipfile + +GOOGLE_AUTH_CLIENT_INSTALLED = False +try: + from google.auth.transport import requests + from google.oauth2 import service_account + GOOGLE_AUTH_CLIENT_INSTALLED = True +except ImportError: + pass + +""" +README: please fill in values for the following constants if not setting via command line. +* DEFAULT_ORG +* DEFAULT_SERVICE_ACCOUNT_INFO +""" +DEFAULT_ORG = '' +DEFAULT_SERVICE_ACCOUNT_INFO = dict( + type="service_account", + project_id="", + private_key_id= "", + private_key="", + client_email="", + client_id="", + auth_uri="https://accounts.google.com/o/oauth2/auth", + token_uri="https://oauth2.googleapis.com/token", + auth_provider_x509_cert_url="https://www.googleapis.com/oauth2/v1/certs", + client_x509_cert_url="", + universe_domain="googleapis.com" +) + +DEFAULT_BASE = 'https://widevine.googleapis.com/v1beta1' +UPLOAD_PATH = '/uniqueDeviceInfo:batchUpload' +TOKEN_CACHE_FILE = os.path.join( + os.path.expanduser('~'), '.device_info_uploader.token' +) + +OAUTH_SCOPE = 'https://www.googleapis.com/auth/widevine/frontend' +OAUTH_SERVICE_BASE = 'https://accounts.google.com/o/oauth2' +OAUTH_AUTHN_URL = OAUTH_SERVICE_BASE + '/auth' +OAUTH_TOKEN_URL = OAUTH_SERVICE_BASE + '/token' +SERVICE_ACCOUNT = None + +CALLOUT = """ +**For NON-GMS device testing only.** + +GMS devices please follow: +> https://docs.partner.android.com/gms/building/integrating/att-keys/rkp-test-upload +""" + +USAGE_DETAILS = """ +# Authentication: +1. authenticate with oauth credentials on command line: + > wv_upload_tool.py --oauth-credentials oauth_credentials.json +2. authenticate with service account credentials on command line: + > wv_upload_tool.py --service-account-credentials service_account_credentials.json +3. authenticate with DEFAULT_SERVICE_ACCOUNT_INFO embedded in script: + > wv_upload_tool.py + +# Usage Examples: +1. extract device metadata from android device(s) visible to adb and upload: + > wv_upload_tool.py +2. extract device metadata from bugreport(s) and upload: + > wv_upload_tool.py --bugreport ./bugreport.zip ./bugreport.txt +3. upload raw bcc's (device metadata must be supplied separately) + > wv_upload_tool.py --bcc-metadata ./device_metadata.json --bcc ./raw_bcc.bin +4. extract but don't upload; save output and upload later: + > wv_upload_tool.py -n > ./out.json ; wv_upload_tool.py --json-csr ./out.json +""" + +class OAuthHTTPRequestHandler(http.server.BaseHTTPRequestHandler): + """HTTP Handler used to accept the oauth response when the user logs in.""" + + def do_GET(self): # pylint: disable=invalid-name + """Handles GET, extracting the authorization code in the query params.""" + print(f'GET path: {self.path}') + parsed_path = urllib.parse.urlparse(self.path) + params = dict(urllib.parse.parse_qsl(parsed_path.query)) + if 'error' in params: + error = params['error'] + self.respond( + 400, error, f'Error received from the OAuth server: {error}.' + ) + sys.exit(-1) + elif 'code' not in params: + self.respond( + 400, + 'ERROR', + ( + 'Response from OAuth server is missing the authorization ' + f'code. Full response: "{self.path}"' + ), + ) + sys.exit(-1) + else: + self.respond( + 200, 'Success!', 'Success! You may close this browser window.' + ) + self.server.code = params['code'] + + def do_POST(self): # pylint: disable=invalid-name + print(f'POST path: {self.path}') + + def respond(self, code, title, message): + """Send a response to the HTTP client. + + Args: + code: The HTTP status code to send + title: The page title to display + message: The message to display to the user on the page + """ + if code != 200: + eprint(message) + self.send_response(code) + self.send_header('Content-type', 'text/html') + self.end_headers() + self.wfile.write( + ( + '' + f' {title}' + ' ' + f'

{message}

' + ' ' + '' + ).encode('utf-8') + ) + + +class LocalOAuthReceiver(http.server.HTTPServer): + """HTTP server that will wait for an OAuth authorization code.""" + + def __init__(self): + super(LocalOAuthReceiver, self).__init__( + ('127.0.0.1', 0), OAuthHTTPRequestHandler + ) + self.code = None + + def port(self): + return self.socket.getsockname()[1] + + def wait_for_code(self): + print('Waiting for a response from the Google OAuth service.') + print('If you receive an error in your browser, interrupt this script.') + self.handle_request() + return self.code + + +def eprint(message): + print(message, file=sys.stderr) + + +def die(message): + eprint(message) + sys.exit(-1) + + +def parse_args(): + """Parse and return the command line args. + + Returns: + An argparse.Namespace object populated with the arguments. + """ + parser = argparse.ArgumentParser(description='Upload device info' + CALLOUT, + formatter_class=argparse.RawTextHelpFormatter, + epilog=USAGE_DETAILS) + parser.add_argument( + '--json-csr', + nargs='*', + default=[], + help="""list of files containing JSON with following keys: + 1. architecture + 2. bcc + 3. build_info + 4. company + 5. model + 6. name + 7. product + """, + ) + + parser.add_argument( + '--bugreport', + nargs='*', + default=[], + # type=argparse.FileType('rb'), + help='list of bugreports to extract bcc & device metadata from', + ) + + parser.add_argument( + '--bcc', + nargs='*', + default=[], + type=argparse.FileType('rb'), + help='list of raw bcc files', + ) + + parser.add_argument( + '--bcc-metadata', + type=argparse.FileType('r'), + help='Metadata for raw bcc files; required with --bcc.\n' + 'Same JSON fields are required as --json-csr except for `bcc`.', + ) + + parser.add_argument('--oauth-credentials', help='JSON OAuth credentials file') + + parser.add_argument('--service-account-credentials', help='JSON service account key file') + + parser.add_argument( + '--endpoint', default=DEFAULT_BASE, help='destination server URL' + ) + + parser.add_argument( + '--org-name', default=DEFAULT_ORG, help='orgnization name' + ) + + parser.add_argument( + '--cache-token', + action='store_true', + default=True, + help='Use a locally cached a refresh token', + ) + + parser.add_argument( + '-n', + '--dry-run', + action='store_true', + help='Print bcc and metadata to stdout instead of uploading, one bcc per line.\n' + 'Output can passed to --json-csr for uploads.', + ) + + return parser.parse_args() + + +def parse_json_csrs(filename, batches): + """Parse the given file and insert it into batches. + + If the input is not a valid JSON CSR blob, exit the program. + + Args: + filename: The file that contains a JSON-formatted build and CSR + batches: Output dict containing a mapping from json dumped device metadata + to BCCs. + """ + line_count = 0 + for line in open(filename): + line_count = line_count + 1 + try: + obj = json.loads(line) + except json.JSONDecodeError as e: + die(f'{e.msg} {filename}:{line_count}, char {e.pos}') + convert_bcc_for_upload(obj, batches) + + +def convert_bcc_for_upload(obj, batches): + """Convert python bcc dict into format suitable for upload + + If the input is not a valid python bcc dict, exit the program. + + Args: + obj: The python dict that contains bcc and device metadata fields + batches: Output dict containing a mapping from json dumped device metadata + to BCCs. + """ + + try: + bcc = {'boot_certificate_chain': obj['bcc']} + device_metadata = json.dumps( + { + 'company': obj['company'], + 'architecture': obj['architecture'], + 'name': obj['name'], + 'model': obj['model'], + 'product': obj['product'], + 'build_info': obj['build_info'], + }, + sort_keys=True, + ) + except KeyError as e: + die(f'Invalid object at {filename}:{line_count}, missing {e}') + + if device_metadata not in batches: + batches[device_metadata] = [] + batches[device_metadata].append(bcc) + + +def format_request_body(args, device_metadata, bccs): + """Generate a formatted request buffer for the given build and CSRs.""" + request = { + 'parent': 'orgs/' + args.org_name, + 'request_id': uuid.uuid4().hex, + 'metadata': json.loads(device_metadata), + 'device_info': bccs, + } + return json.dumps(request).encode('utf-8') + + +def load_refresh_token(): + if not os.path.exists(TOKEN_CACHE_FILE): + return None + with open(TOKEN_CACHE_FILE) as f: + return f.readline() + + +def store_refresh_token(refresh_token): + with open(TOKEN_CACHE_FILE, 'w') as f: + f.write(refresh_token) + + +def fetch_access_token(creds, cache_token=False, code=None, redirect_uri=None): + """Fetch an oauth2 access token. + + If a code is passed, then it is used to get the token. If code + is None, then look for a persisted refresh token and use that to + get the access token instead. + + Args: + creds: The OAuth client credentials, including client secret and id. + cache_token: If True, then the refresh token is cached on disk so that the + user does not have to reauthenticate when the script is used again. + code: The OAuth authorization code, returned by Google's OAuth service. + redirect_uri: If an authorization code is supplied, then the redirect_uri + used to fetch the code must be passed here. + + Returns: + A base64-encode OAuth access token, suitable for including in a request. + """ + request = urllib.request.Request(OAUTH_TOKEN_URL) + request.add_header('Content-Type', 'application/x-www-form-urlencoded') + body = 'client_id=' + creds['client_id'] + body += '&client_secret=' + creds['client_secret'] + + if code is not None: + if redirect_uri is None: + raise ValueError('"code" was supplied, but "redirect_uri" is None') + body += '&grant_type=authorization_code' + body += '&code=' + code + body += '&redirect_uri=' + redirect_uri + else: + refresh_token = load_refresh_token() + if refresh_token is None: + return None + body += '&grant_type=refresh_token' + body += '&refresh_token=' + refresh_token + + try: + response = urllib.request.urlopen(request, body.encode('utf-8')) + parsed_response = json.load(response) + if cache_token: + store_refresh_token(parsed_response['refresh_token']) + return parsed_response['access_token'] + except urllib.error.HTTPError as e: + # Catch bogus/expired refresh tokens, but bubble up errors when + # an authorization code is used. + if code is None: + return None + die(f'Failed to receive access token: {e.code} {e.reason}') + + +def load_and_validate_creds(credmap): + """Loads the credentials from the given file and validates them. + + Args: + credmap: python dict containing the client credentials + + Returns: + A map containing the credentials for connecting to the APE backend. + """ + + not_local_app_creds_error = ( + 'ERROR: Invalid credential file.\n' + ' The given credentials do not appear to be for a locally installed\n' + ' application. Please navigate to the credentials dashboard and\n' + ' ensure that the "Type" of your client is "Desktop":\n' + ' https://console.cloud.google.com/apis/credentials' + ) + + if 'installed' not in credmap: + die(not_local_app_creds_error) + + creds = credmap['installed'] + + expected_keys = set(['client_id', 'client_secret', 'redirect_uris']) + if not expected_keys.issubset(creds.keys()): + die( + ( + 'ERROR: Invalid credential file.\n' + ' The given credentials do not appear to be valid. Please\n' + ' re-download the client credentials file from the dashboard:\n' + ' https://console.cloud.google.com/apis/credentials' + ) + ) + + if 'http://localhost' not in creds['redirect_uris']: + die(not_local_app_creds_error) + + return creds + + +def fetch_service_account_token(args): + """Use a service account to get an access token.""" + if not GOOGLE_AUTH_CLIENT_INSTALLED: + die( + 'Attempting to use service account but you have not ' + 'installed Google\'s auth client library.\n' + 'Run the following command to install it:\n' + 'pip3 install google-auth==2.13.0 requests==2.28' + ) + + global SERVICE_ACCOUNT + + if SERVICE_ACCOUNT is None: + if args.service_account_credentials: + SERVICE_ACCOUNT = service_account.Credentials.from_service_account_file( + args.service_account_credentials, + scopes=[OAUTH_SCOPE], + ) + else: + SERVICE_ACCOUNT = service_account.Credentials.from_service_account_info( + DEFAULT_SERVICE_ACCOUNT_INFO, + scopes=[OAUTH_SCOPE], + ) + SERVICE_ACCOUNT.refresh(requests.Request()) + + return SERVICE_ACCOUNT.token + + +def authenticate_and_fetch_token(args): + """Authenticate the user and fetch an OAUTH2 access token.""" + if args.oauth_credentials: + credmap = json.load(open(args.oauth_credentials)) + else: + return fetch_service_account_token(args) + + creds = load_and_validate_creds(credmap) + + access_type = 'online' + if args.cache_token: + token = fetch_access_token(creds) + if token is not None: + return token + access_type = 'offline' + + httpd = LocalOAuthReceiver() + redirect_uri = f'http://127.0.0.1:{httpd.port()}' + url = ( + OAUTH_AUTHN_URL + + '?response_type=code' + + '&client_id=' + + creds['client_id'] + + '&redirect_uri=' + + redirect_uri + + '&scope=https://www.googleapis.com/auth/widevine/frontend' + + '&access_type=' + + access_type + + '&prompt=select_account' + ) + print('Opening your web browser to authenticate...') + if not webbrowser.open(url, new=1, autoraise=True): + print('Error opening the browser. Please open this link in a browser') + print(f'that is running on this same system:\n {url}\n') + code = httpd.wait_for_code() + return fetch_access_token(creds, args.cache_token, code, redirect_uri) + + +def upload_batch(args, device_metadata, bccs): + """Batch upload all the CSRs associated build device_metadata. + + Args: + args: The parsed command-line arguments + device_metadata: The build for which we're uploading CSRs + bccs: a list of BCCs to be uploaded for the given build + """ + print("Uploading {} bcc(s) for build '{}'".format(len(bccs), device_metadata)) + body = format_request_body(args, device_metadata, bccs) + print(body) + print(args.endpoint + UPLOAD_PATH) + request = urllib.request.Request(args.endpoint + UPLOAD_PATH) + request.add_header('Content-Type', 'application/json') + request.add_header('X-GFE-SSL', 'yes') + request.add_header( + 'Authorization', 'Bearer ' + authenticate_and_fetch_token(args) + ) + try: + response = urllib.request.urlopen(request, body) + except urllib.error.HTTPError as e: + eprint(f'Error uploading bccs. {e}') + for line in e: + eprint(line.decode('utf-8').rstrip()) + sys.exit(1) + + while chunk := response.read(1024): + print(chunk.decode('utf-8')) + + +def get_device_info_adb(s): + """Get bcc & device metadata via adb. + + Args: + s: device serial number + + Returns: + A python dict that contains bcc and device metadata fields + """ + obj = dict() + + sys_prop_names = dict( + company='ro.product.manufacturer', + model='ro.product.model', + name='ro.product.device', + product='ro.product.name', + architecture='ro.product.cpu.abi', + ) + for attr, prop in sys_prop_names.items(): + obj[attr] = subprocess.check_output( + f'adb -s {s} shell getprop {prop}'.split(), text=True + ).strip() + + wv_prop_names = dict( + boot_certificate_chain='bcc', + oemcrypto_build_info='build_info', + ) + dump = subprocess.check_output( + f'adb -s {s} shell dumpsys android.hardware.drm.IDrmFactory/widevine -p' + .split(), + text=True, + ).splitlines() + for line in dump[1:]: + kv = line.strip().split(': ', 1) + if len(kv) < 2: + continue + k, v = kv + if k in wv_prop_names: + obj[wv_prop_names[k]] = v.strip('"\'') + + obj['bcc'] = base64.b64encode(bytes.fromhex(obj['bcc'])).decode() + return obj + + +def get_device_info_bugreport(br): + """Get bcc & device metadata from bugreport. + + Args: + br: bugreport file + + Returns: + A python dict that contains bcc and device metadata fields + """ + z=None + section = '' + sections = set() + metadata = dict() + if zipfile.is_zipfile(br): + z=zipfile.ZipFile(br) + with z.open('main_entry.txt') as m: + fname = m.read().decode() + + if z: + f=z.open(fname) + else: + f=open(br,'rb') + + for line in f: + if line.startswith( + b'DUMP OF SERVICE android.hardware.drm.IDrmFactory/widevine:' + ): + section = 'widevine' + elif line.find(b'- SYSTEM PROPERTIES (getprop) -') >= 0: + section = 'getprop' + elif line.startswith(b'-----') and section: + sections.add(section) + section = '' + + if len(sections) == 2: + break + + pattern = '' + if section == 'getprop': + pattern = b'^\\[(.+)\\]: \\[(.+)\\]$' + elif section == 'widevine': + pattern = b'\\s+([a-z_]+): (.+)$' + + if pattern: + match = re.match(pattern, line) + if match: + k, v = match.groups() + metadata[k.decode()] = v.decode().strip('"\'') + + prop_names = dict( + company='ro.product.manufacturer', + model='ro.product.model', + name='ro.product.device', + product='ro.product.name', + architecture='ro.product.cpu.abi', + bcc='boot_certificate_chain', + build_info='oemcrypto_build_info', + ) + + obj = dict() + for k in prop_names: + obj[k] = metadata[prop_names[k]] + obj['bcc'] = base64.b64encode(bytes.fromhex(obj['bcc'])).decode() + return obj + + +def main(): + args = parse_args() + + batches = {} + for filename in args.json_csr: + parse_json_csrs(filename, batches) + + for br in args.bugreport: + convert_bcc_for_upload(get_device_info_bugreport(br), batches) + + if args.bcc_metadata: + bcc_metadata = json.load(args.bcc_metadata) + for bcc in args.bcc: + if not args.bcc_metadata: + die('Missing --bcc-metadata') + tmp = bcc_metadata.copy() + tmp['bcc'] = base64.b64encode(bcc.read()).decode() + convert_bcc_for_upload(tmp, batches) + + if not batches: + lines = subprocess.check_output('adb devices'.split(), text=True).split( + '\n' + ) + for line in lines: + if line.endswith('device'): + serial = line.split()[0] + convert_bcc_for_upload(get_device_info_adb(serial), batches) + + for device_metadata, bccs in batches.items(): + if args.dry_run: + for bcc in bccs: + out = json.loads(device_metadata) + out['bcc'] = bcc['boot_certificate_chain'] + print(json.dumps(out)) + else: + upload_batch(args, device_metadata, bccs) + + +if __name__ == '__main__': + main()