348 lines
11 KiB
Python
348 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright 2022 Google LLC. All rights reserved.
|
|
"""Uploader tool for sending device keys to Widevine remote provisioning server.
|
|
|
|
This tool consumes an input file containing device info, which includes the
|
|
device's public key, and batch uploads it to Google. Once uploaded, the device
|
|
may use Widevine provisioning 4 to request OEM certificates from Google.
|
|
|
|
This tool is designed to be used with the widevine factory extraction tool.
|
|
Therefore, the JSON output from widevine factory extraction tool is the expected
|
|
input, with one JSON string per line of input.
|
|
"""
|
|
|
|
import argparse
|
|
import http.server
|
|
import json
|
|
import os
|
|
import sys
|
|
import urllib.parse
|
|
import urllib.request
|
|
import uuid
|
|
import webbrowser
|
|
|
|
DEFAULT_BASE = 'https://widevine.googleapis.com/v1/orgs/'
|
|
UPLOAD_PATH = '/uniqueDeviceInfo:batchUpload'
|
|
TOKEN_CACHE_FILE = os.path.join(
|
|
os.path.expanduser('~'), '.device_info_uploader.token')
|
|
|
|
OAUTH_SERVICE_BASE = 'https://accounts.google.com/o/oauth2'
|
|
OAUTH_AUTHN_URL = OAUTH_SERVICE_BASE + '/auth'
|
|
OAUTH_TOKEN_URL = OAUTH_SERVICE_BASE + '/token'
|
|
|
|
|
|
class OAuthHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
|
|
"""HTTP Handler used to accept the oauth response when the user logs in."""
|
|
|
|
def do_GET(self): # pylint: disable=invalid-name
|
|
"""Handles GET, extracting the authorization code in the query params."""
|
|
print(f'GET path: {self.path}')
|
|
parsed_path = urllib.parse.urlparse(self.path)
|
|
params = dict(urllib.parse.parse_qsl(parsed_path.query))
|
|
if 'error' in params:
|
|
error = params['error']
|
|
self.respond(400, error,
|
|
f'Error received from the OAuth server: {error}.')
|
|
sys.exit(-1)
|
|
elif 'code' not in params:
|
|
self.respond(400, 'ERROR',
|
|
('Response from OAuth server is missing the authorization '
|
|
f'code. Full response: "{self.path}"'))
|
|
sys.exit(-1)
|
|
else:
|
|
self.respond(200, 'Success!',
|
|
'Success! You may close this browser window.')
|
|
self.server.code = params['code']
|
|
|
|
def do_POST(self): # pylint: disable=invalid-name
|
|
print(f'POST path: {self.path}')
|
|
|
|
def respond(self, code, title, message):
|
|
"""Send a response to the HTTP client.
|
|
|
|
Args:
|
|
code: The HTTP status code to send
|
|
title: The page title to display
|
|
message: The message to display to the user on the page
|
|
"""
|
|
if code != 200:
|
|
eprint(message)
|
|
self.send_response(code)
|
|
self.send_header('Content-type', 'text/html')
|
|
self.end_headers()
|
|
self.wfile.write(('<html>'
|
|
f' <title>{title}</title>'
|
|
f' <body>'
|
|
f' <p style="font-size:24px;">{message}</p>'
|
|
f' </body>'
|
|
'</html>').encode('utf-8'))
|
|
|
|
|
|
class LocalOAuthReceiver(http.server.HTTPServer):
|
|
"""HTTP server that will wait for an OAuth authorization code."""
|
|
|
|
def __init__(self):
|
|
super(LocalOAuthReceiver, self).__init__(('127.0.0.1', 0),
|
|
OAuthHTTPRequestHandler)
|
|
self.code = None
|
|
|
|
def port(self):
|
|
return self.socket.getsockname()[1]
|
|
|
|
def wait_for_code(self):
|
|
print('Waiting for a response from the Google OAuth service.')
|
|
print('If you receive an error in your browser, interrupt this script.')
|
|
self.handle_request()
|
|
return self.code
|
|
|
|
|
|
def eprint(message):
|
|
print(message, file=sys.stderr)
|
|
|
|
|
|
def die(message):
|
|
eprint(message)
|
|
sys.exit(-1)
|
|
|
|
|
|
def parse_args():
|
|
"""Parse and return the command line args.
|
|
|
|
Returns:
|
|
An argparse.Namespace object populated with the arguments.
|
|
"""
|
|
parser = argparse.ArgumentParser(description='Upload device info')
|
|
parser.add_argument(
|
|
'--json-csr',
|
|
nargs='+',
|
|
required=True,
|
|
help='list of files containing JSON output from rkp_factory_extraction_tool'
|
|
)
|
|
parser.add_argument(
|
|
'--credentials', required=True, help='JSON credentials file')
|
|
|
|
parser.add_argument(
|
|
'--endpoint', default=DEFAULT_BASE, help='destination server URL')
|
|
|
|
parser.add_argument('--org-name', required=True, help='orgnization name')
|
|
|
|
parser.add_argument(
|
|
'--cache-token',
|
|
action='store_true',
|
|
help='Use a locally cached a refresh token')
|
|
return parser.parse_args()
|
|
|
|
|
|
def parse_json_csrs(filename, batches):
|
|
"""Parse the given file and insert it into batches.
|
|
|
|
If the input is not a valid JSON CSR blob, exit the program.
|
|
|
|
Args:
|
|
filename: The file that contains a JSON-formatted build and CSR
|
|
batches: Output dict containing a mapping from json dumped device metadata
|
|
to BCCs.
|
|
"""
|
|
line_count = 0
|
|
for line in open(filename):
|
|
line_count = line_count + 1
|
|
try:
|
|
obj = json.loads(line)
|
|
except json.JSONDecodeError as e:
|
|
die(f'{e.msg} {filename}:{line_count}, char {e.pos}')
|
|
|
|
try:
|
|
bcc = {'boot_certificate_chain': obj['bcc']}
|
|
device_metadata = json.dumps({
|
|
'company': obj['company'],
|
|
'architecture': obj['architecture'],
|
|
'name': obj['name'],
|
|
'model': obj['model'],
|
|
'product': obj['product'],
|
|
'build_info': obj['build_info']
|
|
})
|
|
except KeyError as e:
|
|
die(f'Invalid object at {filename}:{line_count}, missing {e}')
|
|
|
|
if device_metadata not in batches:
|
|
batches[device_metadata] = []
|
|
batches[device_metadata].append(bcc)
|
|
|
|
|
|
def format_request_body(args, device_metadata, bccs):
|
|
"""Generate a formatted request buffer for the given build and CSRs."""
|
|
request = {
|
|
'parent': 'orgs/' + args.org_name,
|
|
'request_id': uuid.uuid4().hex,
|
|
'metadata': json.loads(device_metadata),
|
|
'device_info': bccs,
|
|
}
|
|
return json.dumps(request).encode('utf-8')
|
|
|
|
|
|
def load_refresh_token():
|
|
if not os.path.exists(TOKEN_CACHE_FILE):
|
|
return None
|
|
with open(TOKEN_CACHE_FILE) as f:
|
|
return f.readline()
|
|
|
|
|
|
def store_refresh_token(refresh_token):
|
|
with open(TOKEN_CACHE_FILE, 'w') as f:
|
|
f.write(refresh_token)
|
|
|
|
|
|
def fetch_access_token(creds, cache_token=False, code=None, redirect_uri=None):
|
|
"""Fetch an oauth2 access token.
|
|
|
|
If a code is passed, then it is used to get the token. If code
|
|
is None, then look for a persisted refresh token and use that to
|
|
get the access token instead.
|
|
|
|
Args:
|
|
creds: The OAuth client credentials, including client secret and id.
|
|
cache_token: If True, then the refresh token is cached on disk so that the
|
|
user does not have to reauthenticate when the script is used again.
|
|
code: The OAuth authorization code, returned by Google's OAuth service.
|
|
redirect_uri: If an authorization code is supplied, then the redirect_uri
|
|
used to fetch the code must be passed here.
|
|
|
|
Returns:
|
|
A base64-encode OAuth access token, suitable for including in a request.
|
|
"""
|
|
request = urllib.request.Request(OAUTH_TOKEN_URL)
|
|
request.add_header('Content-Type', 'application/x-www-form-urlencoded')
|
|
body = 'client_id=' + creds['client_id']
|
|
body += '&client_secret=' + creds['client_secret']
|
|
|
|
if code is not None:
|
|
if redirect_uri is None:
|
|
raise ValueError('"code" was supplied, but "redirect_uri" is None')
|
|
body += '&grant_type=authorization_code'
|
|
body += '&code=' + code
|
|
body += '&redirect_uri=' + redirect_uri
|
|
else:
|
|
refresh_token = load_refresh_token()
|
|
if refresh_token is None:
|
|
return None
|
|
body += '&grant_type=refresh_token'
|
|
body += '&refresh_token=' + refresh_token
|
|
|
|
try:
|
|
response = urllib.request.urlopen(request, body.encode('utf-8'))
|
|
parsed_response = json.load(response)
|
|
if cache_token:
|
|
store_refresh_token(parsed_response['refresh_token'])
|
|
return parsed_response['access_token']
|
|
except urllib.error.HTTPError as e:
|
|
# Catch bogus/expired refresh tokens, but bubble up errors when
|
|
# an authorization code is used.
|
|
if code is None:
|
|
return None
|
|
die(f'Failed to receive access token: {e.code} {e.reason}')
|
|
|
|
|
|
def load_and_validate_creds(credfile):
|
|
"""Loads the credentials from the given file and validates them.
|
|
|
|
Args:
|
|
credfile: the name of the file containing the client credentials
|
|
|
|
Returns:
|
|
A map containing the credentials for connecting to the APE backend.
|
|
"""
|
|
credmap = json.load(open(credfile))
|
|
|
|
not_local_app_creds_error = (
|
|
'ERROR: Invalid credential file.\n'
|
|
' The given credentials do not appear to be for a locally installed\n'
|
|
' application. Please navigate to the credentials dashboard and\n'
|
|
' ensure that the "Type" of your client is "Desktop":\n'
|
|
' https://console.cloud.google.com/apis/credentials')
|
|
|
|
if 'installed' not in credmap:
|
|
die(not_local_app_creds_error)
|
|
|
|
creds = credmap['installed']
|
|
|
|
expected_keys = set(['client_id', 'client_secret', 'redirect_uris'])
|
|
if not expected_keys.issubset(creds.keys()):
|
|
die(('ERROR: Invalid credential file.\n'
|
|
' The given credentials do not appear to be valid. Please\n'
|
|
' re-download the client credentials file from the dashboard:\n'
|
|
' https://console.cloud.google.com/apis/credentials'))
|
|
|
|
if 'http://localhost' not in creds['redirect_uris']:
|
|
die(not_local_app_creds_error)
|
|
|
|
return creds
|
|
|
|
|
|
def authenticate_and_fetch_token(args):
|
|
"""Authenticate the user and fetch an OAUTH2 access token."""
|
|
creds = load_and_validate_creds(args.credentials)
|
|
|
|
access_type = 'online'
|
|
if args.cache_token:
|
|
token = fetch_access_token(creds)
|
|
if token is not None:
|
|
return token
|
|
access_type = 'offline'
|
|
|
|
httpd = LocalOAuthReceiver()
|
|
redirect_uri = f'http://127.0.0.1:{httpd.port()}'
|
|
url = (
|
|
OAUTH_AUTHN_URL + '?response_type=code' + '&client_id=' +
|
|
creds['client_id'] + '&redirect_uri=' + redirect_uri +
|
|
'&scope=https://www.googleapis.com/auth/widevine/frontend' +
|
|
'&access_type=' + access_type + '&prompt=select_account')
|
|
print('Opening your web browser to authenticate...')
|
|
if not webbrowser.open(url, new=1, autoraise=True):
|
|
print('Error opening the browser. Please open this link in a browser')
|
|
print(f'that is running on this same system:\n {url}\n')
|
|
code = httpd.wait_for_code()
|
|
return fetch_access_token(creds, args.cache_token, code, redirect_uri)
|
|
|
|
|
|
def upload_batch(args, device_metadata, bccs):
|
|
"""Batch upload all the CSRs associated build device_metadata.
|
|
|
|
Args:
|
|
args: The parsed command-line arguments
|
|
device_metadata: The build for which we're uploading CSRs
|
|
bccs: a list of BCCs to be uploaded for the given build
|
|
"""
|
|
print("Uploading {} bcc(s) for build '{}'".format(len(bccs), device_metadata))
|
|
body = format_request_body(args, device_metadata, bccs)
|
|
print(body)
|
|
print(args.endpoint + args.org_name + UPLOAD_PATH)
|
|
request = urllib.request.Request(args.endpoint + args.org_name + UPLOAD_PATH)
|
|
request.add_header('Content-Type', 'application/json')
|
|
request.add_header('X-GFE-SSL', 'yes')
|
|
request.add_header('Authorization',
|
|
'Bearer ' + authenticate_and_fetch_token(args))
|
|
try:
|
|
response = urllib.request.urlopen(request, body)
|
|
except urllib.error.HTTPError as e:
|
|
eprint(f'Error uploading bccs. {e}')
|
|
for line in e:
|
|
eprint(line.decode('utf-8').rstrip())
|
|
sys.exit(1)
|
|
|
|
while chunk := response.read(1024):
|
|
print(chunk.decode('utf-8'))
|
|
|
|
|
|
def main():
|
|
args = parse_args()
|
|
batches = {}
|
|
for filename in args.json_csr:
|
|
parse_json_csrs(filename, batches)
|
|
|
|
for device_metadata, bccs in batches.items():
|
|
upload_batch(args, device_metadata, bccs)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|