NON-GMS Android BCC upload tool

Based on CE upload tool: go/wvgerrit/153632

Bug: 321252577
Test: python3 ./wv_upload_tool.py
Change-Id: I266f11585c1025e49ed807bec0b91b83af6742ae
This commit is contained in:
Robert Shih
2024-01-23 15:41:01 -08:00
parent 8b52f39a58
commit 26be4cfdc2

View File

@@ -0,0 +1,668 @@
#!/usr/bin/env python3
# Copyright 2022 Google LLC. All rights reserved.
"""Uploader tool for sending device keys to Widevine remote provisioning server.
This tool consumes an input file containing device info, which includes the
device's public key, and batch uploads it to Google. Once uploaded, the device
may use Widevine provisioning 4 to request OEM certificates from Google.
This tool is designed to be used with the widevine factory extraction tool.
Therefore, the JSON output from widevine factory extraction tool is the expected
input, with one JSON string per line of input.
"""
import argparse
import base64
import http.server
import json
import os
import re
import subprocess
import sys
import urllib.parse
import urllib.request
import uuid
import webbrowser
import zipfile
GOOGLE_AUTH_CLIENT_INSTALLED = False
try:
from google.auth.transport import requests
from google.oauth2 import service_account
GOOGLE_AUTH_CLIENT_INSTALLED = True
except ImportError:
pass
"""
README: please fill in values for the following constants if not setting via command line.
* DEFAULT_ORG
* DEFAULT_SERVICE_ACCOUNT_INFO
"""
DEFAULT_ORG = ''
DEFAULT_SERVICE_ACCOUNT_INFO = dict(
type="service_account",
project_id="",
private_key_id= "",
private_key="",
client_email="",
client_id="",
auth_uri="https://accounts.google.com/o/oauth2/auth",
token_uri="https://oauth2.googleapis.com/token",
auth_provider_x509_cert_url="https://www.googleapis.com/oauth2/v1/certs",
client_x509_cert_url="",
universe_domain="googleapis.com"
)
DEFAULT_BASE = 'https://widevine.googleapis.com/v1beta1'
UPLOAD_PATH = '/uniqueDeviceInfo:batchUpload'
TOKEN_CACHE_FILE = os.path.join(
os.path.expanduser('~'), '.device_info_uploader.token'
)
OAUTH_SCOPE = 'https://www.googleapis.com/auth/widevine/frontend'
OAUTH_SERVICE_BASE = 'https://accounts.google.com/o/oauth2'
OAUTH_AUTHN_URL = OAUTH_SERVICE_BASE + '/auth'
OAUTH_TOKEN_URL = OAUTH_SERVICE_BASE + '/token'
SERVICE_ACCOUNT = None
CALLOUT = """
**For NON-GMS device testing only.**
GMS devices please follow:
> https://docs.partner.android.com/gms/building/integrating/att-keys/rkp-test-upload
"""
USAGE_DETAILS = """
# Authentication:
1. authenticate with oauth credentials on command line:
> wv_upload_tool.py --oauth-credentials oauth_credentials.json
2. authenticate with service account credentials on command line:
> wv_upload_tool.py --service-account-credentials service_account_credentials.json
3. authenticate with DEFAULT_SERVICE_ACCOUNT_INFO embedded in script:
> wv_upload_tool.py
# Usage Examples:
1. extract device metadata from android device(s) visible to adb and upload:
> wv_upload_tool.py
2. extract device metadata from bugreport(s) and upload:
> wv_upload_tool.py --bugreport ./bugreport.zip ./bugreport.txt
3. upload raw bcc's (device metadata must be supplied separately)
> wv_upload_tool.py --bcc-metadata ./device_metadata.json --bcc ./raw_bcc.bin
4. extract but don't upload; save output and upload later:
> wv_upload_tool.py -n > ./out.json ; wv_upload_tool.py --json-csr ./out.json
"""
class OAuthHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
"""HTTP Handler used to accept the oauth response when the user logs in."""
def do_GET(self): # pylint: disable=invalid-name
"""Handles GET, extracting the authorization code in the query params."""
print(f'GET path: {self.path}')
parsed_path = urllib.parse.urlparse(self.path)
params = dict(urllib.parse.parse_qsl(parsed_path.query))
if 'error' in params:
error = params['error']
self.respond(
400, error, f'Error received from the OAuth server: {error}.'
)
sys.exit(-1)
elif 'code' not in params:
self.respond(
400,
'ERROR',
(
'Response from OAuth server is missing the authorization '
f'code. Full response: "{self.path}"'
),
)
sys.exit(-1)
else:
self.respond(
200, 'Success!', 'Success! You may close this browser window.'
)
self.server.code = params['code']
def do_POST(self): # pylint: disable=invalid-name
print(f'POST path: {self.path}')
def respond(self, code, title, message):
"""Send a response to the HTTP client.
Args:
code: The HTTP status code to send
title: The page title to display
message: The message to display to the user on the page
"""
if code != 200:
eprint(message)
self.send_response(code)
self.send_header('Content-type', 'text/html')
self.end_headers()
self.wfile.write(
(
'<html>'
f' <title>{title}</title>'
' <body>'
f' <p style="font-size:24px;">{message}</p>'
' </body>'
'</html>'
).encode('utf-8')
)
class LocalOAuthReceiver(http.server.HTTPServer):
"""HTTP server that will wait for an OAuth authorization code."""
def __init__(self):
super(LocalOAuthReceiver, self).__init__(
('127.0.0.1', 0), OAuthHTTPRequestHandler
)
self.code = None
def port(self):
return self.socket.getsockname()[1]
def wait_for_code(self):
print('Waiting for a response from the Google OAuth service.')
print('If you receive an error in your browser, interrupt this script.')
self.handle_request()
return self.code
def eprint(message):
print(message, file=sys.stderr)
def die(message):
eprint(message)
sys.exit(-1)
def parse_args():
"""Parse and return the command line args.
Returns:
An argparse.Namespace object populated with the arguments.
"""
parser = argparse.ArgumentParser(description='Upload device info' + CALLOUT,
formatter_class=argparse.RawTextHelpFormatter,
epilog=USAGE_DETAILS)
parser.add_argument(
'--json-csr',
nargs='*',
default=[],
help="""list of files containing JSON with following keys:
1. architecture
2. bcc
3. build_info
4. company
5. model
6. name
7. product
""",
)
parser.add_argument(
'--bugreport',
nargs='*',
default=[],
# type=argparse.FileType('rb'),
help='list of bugreports to extract bcc & device metadata from',
)
parser.add_argument(
'--bcc',
nargs='*',
default=[],
type=argparse.FileType('rb'),
help='list of raw bcc files',
)
parser.add_argument(
'--bcc-metadata',
type=argparse.FileType('r'),
help='Metadata for raw bcc files; required with --bcc.\n'
'Same JSON fields are required as --json-csr except for `bcc`.',
)
parser.add_argument('--oauth-credentials', help='JSON OAuth credentials file')
parser.add_argument('--service-account-credentials', help='JSON service account key file')
parser.add_argument(
'--endpoint', default=DEFAULT_BASE, help='destination server URL'
)
parser.add_argument(
'--org-name', default=DEFAULT_ORG, help='orgnization name'
)
parser.add_argument(
'--cache-token',
action='store_true',
default=True,
help='Use a locally cached a refresh token',
)
parser.add_argument(
'-n',
'--dry-run',
action='store_true',
help='Print bcc and metadata to stdout instead of uploading, one bcc per line.\n'
'Output can passed to --json-csr for uploads.',
)
return parser.parse_args()
def parse_json_csrs(filename, batches):
"""Parse the given file and insert it into batches.
If the input is not a valid JSON CSR blob, exit the program.
Args:
filename: The file that contains a JSON-formatted build and CSR
batches: Output dict containing a mapping from json dumped device metadata
to BCCs.
"""
line_count = 0
for line in open(filename):
line_count = line_count + 1
try:
obj = json.loads(line)
except json.JSONDecodeError as e:
die(f'{e.msg} {filename}:{line_count}, char {e.pos}')
convert_bcc_for_upload(obj, batches)
def convert_bcc_for_upload(obj, batches):
"""Convert python bcc dict into format suitable for upload
If the input is not a valid python bcc dict, exit the program.
Args:
obj: The python dict that contains bcc and device metadata fields
batches: Output dict containing a mapping from json dumped device metadata
to BCCs.
"""
try:
bcc = {'boot_certificate_chain': obj['bcc']}
device_metadata = json.dumps(
{
'company': obj['company'],
'architecture': obj['architecture'],
'name': obj['name'],
'model': obj['model'],
'product': obj['product'],
'build_info': obj['build_info'],
},
sort_keys=True,
)
except KeyError as e:
die(f'Invalid object at {filename}:{line_count}, missing {e}')
if device_metadata not in batches:
batches[device_metadata] = []
batches[device_metadata].append(bcc)
def format_request_body(args, device_metadata, bccs):
"""Generate a formatted request buffer for the given build and CSRs."""
request = {
'parent': 'orgs/' + args.org_name,
'request_id': uuid.uuid4().hex,
'metadata': json.loads(device_metadata),
'device_info': bccs,
}
return json.dumps(request).encode('utf-8')
def load_refresh_token():
if not os.path.exists(TOKEN_CACHE_FILE):
return None
with open(TOKEN_CACHE_FILE) as f:
return f.readline()
def store_refresh_token(refresh_token):
with open(TOKEN_CACHE_FILE, 'w') as f:
f.write(refresh_token)
def fetch_access_token(creds, cache_token=False, code=None, redirect_uri=None):
"""Fetch an oauth2 access token.
If a code is passed, then it is used to get the token. If code
is None, then look for a persisted refresh token and use that to
get the access token instead.
Args:
creds: The OAuth client credentials, including client secret and id.
cache_token: If True, then the refresh token is cached on disk so that the
user does not have to reauthenticate when the script is used again.
code: The OAuth authorization code, returned by Google's OAuth service.
redirect_uri: If an authorization code is supplied, then the redirect_uri
used to fetch the code must be passed here.
Returns:
A base64-encode OAuth access token, suitable for including in a request.
"""
request = urllib.request.Request(OAUTH_TOKEN_URL)
request.add_header('Content-Type', 'application/x-www-form-urlencoded')
body = 'client_id=' + creds['client_id']
body += '&client_secret=' + creds['client_secret']
if code is not None:
if redirect_uri is None:
raise ValueError('"code" was supplied, but "redirect_uri" is None')
body += '&grant_type=authorization_code'
body += '&code=' + code
body += '&redirect_uri=' + redirect_uri
else:
refresh_token = load_refresh_token()
if refresh_token is None:
return None
body += '&grant_type=refresh_token'
body += '&refresh_token=' + refresh_token
try:
response = urllib.request.urlopen(request, body.encode('utf-8'))
parsed_response = json.load(response)
if cache_token:
store_refresh_token(parsed_response['refresh_token'])
return parsed_response['access_token']
except urllib.error.HTTPError as e:
# Catch bogus/expired refresh tokens, but bubble up errors when
# an authorization code is used.
if code is None:
return None
die(f'Failed to receive access token: {e.code} {e.reason}')
def load_and_validate_creds(credmap):
"""Loads the credentials from the given file and validates them.
Args:
credmap: python dict containing the client credentials
Returns:
A map containing the credentials for connecting to the APE backend.
"""
not_local_app_creds_error = (
'ERROR: Invalid credential file.\n'
' The given credentials do not appear to be for a locally installed\n'
' application. Please navigate to the credentials dashboard and\n'
' ensure that the "Type" of your client is "Desktop":\n'
' https://console.cloud.google.com/apis/credentials'
)
if 'installed' not in credmap:
die(not_local_app_creds_error)
creds = credmap['installed']
expected_keys = set(['client_id', 'client_secret', 'redirect_uris'])
if not expected_keys.issubset(creds.keys()):
die(
(
'ERROR: Invalid credential file.\n'
' The given credentials do not appear to be valid. Please\n'
' re-download the client credentials file from the dashboard:\n'
' https://console.cloud.google.com/apis/credentials'
)
)
if 'http://localhost' not in creds['redirect_uris']:
die(not_local_app_creds_error)
return creds
def fetch_service_account_token(args):
"""Use a service account to get an access token."""
if not GOOGLE_AUTH_CLIENT_INSTALLED:
die(
'Attempting to use service account but you have not '
'installed Google\'s auth client library.\n'
'Run the following command to install it:\n'
'pip3 install google-auth==2.13.0 requests==2.28'
)
global SERVICE_ACCOUNT
if SERVICE_ACCOUNT is None:
if args.service_account_credentials:
SERVICE_ACCOUNT = service_account.Credentials.from_service_account_file(
args.service_account_credentials,
scopes=[OAUTH_SCOPE],
)
else:
SERVICE_ACCOUNT = service_account.Credentials.from_service_account_info(
DEFAULT_SERVICE_ACCOUNT_INFO,
scopes=[OAUTH_SCOPE],
)
SERVICE_ACCOUNT.refresh(requests.Request())
return SERVICE_ACCOUNT.token
def authenticate_and_fetch_token(args):
"""Authenticate the user and fetch an OAUTH2 access token."""
if args.oauth_credentials:
credmap = json.load(open(args.oauth_credentials))
else:
return fetch_service_account_token(args)
creds = load_and_validate_creds(credmap)
access_type = 'online'
if args.cache_token:
token = fetch_access_token(creds)
if token is not None:
return token
access_type = 'offline'
httpd = LocalOAuthReceiver()
redirect_uri = f'http://127.0.0.1:{httpd.port()}'
url = (
OAUTH_AUTHN_URL
+ '?response_type=code'
+ '&client_id='
+ creds['client_id']
+ '&redirect_uri='
+ redirect_uri
+ '&scope=https://www.googleapis.com/auth/widevine/frontend'
+ '&access_type='
+ access_type
+ '&prompt=select_account'
)
print('Opening your web browser to authenticate...')
if not webbrowser.open(url, new=1, autoraise=True):
print('Error opening the browser. Please open this link in a browser')
print(f'that is running on this same system:\n {url}\n')
code = httpd.wait_for_code()
return fetch_access_token(creds, args.cache_token, code, redirect_uri)
def upload_batch(args, device_metadata, bccs):
"""Batch upload all the CSRs associated build device_metadata.
Args:
args: The parsed command-line arguments
device_metadata: The build for which we're uploading CSRs
bccs: a list of BCCs to be uploaded for the given build
"""
print("Uploading {} bcc(s) for build '{}'".format(len(bccs), device_metadata))
body = format_request_body(args, device_metadata, bccs)
print(body)
print(args.endpoint + UPLOAD_PATH)
request = urllib.request.Request(args.endpoint + UPLOAD_PATH)
request.add_header('Content-Type', 'application/json')
request.add_header('X-GFE-SSL', 'yes')
request.add_header(
'Authorization', 'Bearer ' + authenticate_and_fetch_token(args)
)
try:
response = urllib.request.urlopen(request, body)
except urllib.error.HTTPError as e:
eprint(f'Error uploading bccs. {e}')
for line in e:
eprint(line.decode('utf-8').rstrip())
sys.exit(1)
while chunk := response.read(1024):
print(chunk.decode('utf-8'))
def get_device_info_adb(s):
"""Get bcc & device metadata via adb.
Args:
s: device serial number
Returns:
A python dict that contains bcc and device metadata fields
"""
obj = dict()
sys_prop_names = dict(
company='ro.product.manufacturer',
model='ro.product.model',
name='ro.product.device',
product='ro.product.name',
architecture='ro.product.cpu.abi',
)
for attr, prop in sys_prop_names.items():
obj[attr] = subprocess.check_output(
f'adb -s {s} shell getprop {prop}'.split(), text=True
).strip()
wv_prop_names = dict(
boot_certificate_chain='bcc',
oemcrypto_build_info='build_info',
)
dump = subprocess.check_output(
f'adb -s {s} shell dumpsys android.hardware.drm.IDrmFactory/widevine -p'
.split(),
text=True,
).splitlines()
for line in dump[1:]:
kv = line.strip().split(': ', 1)
if len(kv) < 2:
continue
k, v = kv
if k in wv_prop_names:
obj[wv_prop_names[k]] = v.strip('"\'')
obj['bcc'] = base64.b64encode(bytes.fromhex(obj['bcc'])).decode()
return obj
def get_device_info_bugreport(br):
"""Get bcc & device metadata from bugreport.
Args:
br: bugreport file
Returns:
A python dict that contains bcc and device metadata fields
"""
z=None
section = ''
sections = set()
metadata = dict()
if zipfile.is_zipfile(br):
z=zipfile.ZipFile(br)
with z.open('main_entry.txt') as m:
fname = m.read().decode()
if z:
f=z.open(fname)
else:
f=open(br,'rb')
for line in f:
if line.startswith(
b'DUMP OF SERVICE android.hardware.drm.IDrmFactory/widevine:'
):
section = 'widevine'
elif line.find(b'- SYSTEM PROPERTIES (getprop) -') >= 0:
section = 'getprop'
elif line.startswith(b'-----') and section:
sections.add(section)
section = ''
if len(sections) == 2:
break
pattern = ''
if section == 'getprop':
pattern = b'^\\[(.+)\\]: \\[(.+)\\]$'
elif section == 'widevine':
pattern = b'\\s+([a-z_]+): (.+)$'
if pattern:
match = re.match(pattern, line)
if match:
k, v = match.groups()
metadata[k.decode()] = v.decode().strip('"\'')
prop_names = dict(
company='ro.product.manufacturer',
model='ro.product.model',
name='ro.product.device',
product='ro.product.name',
architecture='ro.product.cpu.abi',
bcc='boot_certificate_chain',
build_info='oemcrypto_build_info',
)
obj = dict()
for k in prop_names:
obj[k] = metadata[prop_names[k]]
obj['bcc'] = base64.b64encode(bytes.fromhex(obj['bcc'])).decode()
return obj
def main():
args = parse_args()
batches = {}
for filename in args.json_csr:
parse_json_csrs(filename, batches)
for br in args.bugreport:
convert_bcc_for_upload(get_device_info_bugreport(br), batches)
if args.bcc_metadata:
bcc_metadata = json.load(args.bcc_metadata)
for bcc in args.bcc:
if not args.bcc_metadata:
die('Missing --bcc-metadata')
tmp = bcc_metadata.copy()
tmp['bcc'] = base64.b64encode(bcc.read()).decode()
convert_bcc_for_upload(tmp, batches)
if not batches:
lines = subprocess.check_output('adb devices'.split(), text=True).split(
'\n'
)
for line in lines:
if line.endswith('device'):
serial = line.split()[0]
convert_bcc_for_upload(get_device_info_adb(serial), batches)
for device_metadata, bccs in batches.items():
if args.dry_run:
for bcc in bccs:
out = json.loads(device_metadata)
out['bcc'] = bcc['boot_certificate_chain']
print(json.dumps(out))
else:
upload_batch(args, device_metadata, bccs)
if __name__ == '__main__':
main()