Sync wv_upload_tool.py from CDM udc-widevine-dev
wv_upload_tool.py is out of date. Copy the latest version from CDM udc-widevine-dev. Also cherry-pick changes from: https://widevine-internal-review.git.corp.google.com/c/cdm/+/208714 Bug: 372933063 Change-Id: I53643af88819031d3fb0d762a0c2c40a4828c594
This commit is contained in:
@@ -48,6 +48,7 @@ std::unique_ptr<File> FileSystem::Open(const std::string&, int) {
|
||||
return std::unique_ptr<File>(new FileImpl());
|
||||
}
|
||||
bool FileSystem::Exists(const std::string&) { return false; }
|
||||
bool FileSystem::Exists(const std::string&, int*) { return false; }
|
||||
bool FileSystem::Remove(const std::string&) { return false; }
|
||||
ssize_t FileSystem::FileSize(const std::string&) { return false; }
|
||||
bool FileSystem::List(const std::string&, std::vector<std::string>*) {
|
||||
@@ -141,7 +142,9 @@ OEMCryptoResult OEMCryptoInterface::GetOEMCryptoBuildInfo(
|
||||
result = BuildInformation(&build_info[0], &build_info_size);
|
||||
LOGI("BuildInformation second attempt result %d", result);
|
||||
}
|
||||
|
||||
if (result == OEMCrypto_SUCCESS) {
|
||||
build_info.resize(build_info_size);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
|
||||
@@ -9,88 +9,39 @@ may use Widevine provisioning 4 to request OEM certificates from Google.
|
||||
This tool is designed to be used with the widevine factory extraction tool.
|
||||
Therefore, the JSON output from widevine factory extraction tool is the expected
|
||||
input, with one JSON string per line of input.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import http.server
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import subprocess
|
||||
import sys
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
import uuid
|
||||
import webbrowser
|
||||
import zipfile
|
||||
|
||||
GOOGLE_AUTH_CLIENT_INSTALLED = False
|
||||
try:
|
||||
from google.auth.transport import requests
|
||||
from google.oauth2 import service_account
|
||||
GOOGLE_AUTH_CLIENT_INSTALLED = True
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
"""
|
||||
README: please fill in values for the following constants if not setting via command line.
|
||||
* DEFAULT_ORG
|
||||
* DEFAULT_SERVICE_ACCOUNT_INFO
|
||||
"""
|
||||
DEFAULT_ORG = ''
|
||||
DEFAULT_SERVICE_ACCOUNT_INFO = dict(
|
||||
type="service_account",
|
||||
project_id="",
|
||||
private_key_id= "",
|
||||
private_key="",
|
||||
client_email="",
|
||||
client_id="",
|
||||
auth_uri="https://accounts.google.com/o/oauth2/auth",
|
||||
token_uri="https://oauth2.googleapis.com/token",
|
||||
auth_provider_x509_cert_url="https://www.googleapis.com/oauth2/v1/certs",
|
||||
client_x509_cert_url="",
|
||||
universe_domain="googleapis.com"
|
||||
)
|
||||
|
||||
DEFAULT_BASE = 'https://widevine.googleapis.com/v1beta1'
|
||||
UPLOAD_PATH = '/uniqueDeviceInfo:batchUpload'
|
||||
TOKEN_CACHE_FILE = os.path.join(
|
||||
os.path.expanduser('~'), '.device_info_uploader.token'
|
||||
)
|
||||
|
||||
OAUTH_SCOPE = 'https://www.googleapis.com/auth/widevine/frontend'
|
||||
OAUTH_SERVICE_BASE = 'https://accounts.google.com/o/oauth2'
|
||||
OAUTH_AUTHN_URL = OAUTH_SERVICE_BASE + '/auth'
|
||||
OAUTH_TOKEN_URL = OAUTH_SERVICE_BASE + '/token'
|
||||
SERVICE_ACCOUNT = None
|
||||
|
||||
CALLOUT = """
|
||||
**For NON-GMS device testing only.**
|
||||
**This tool is for NON-GMS device testing only.**
|
||||
|
||||
GMS devices please follow:
|
||||
> https://docs.partner.android.com/gms/building/integrating/att-keys/rkp-test-upload
|
||||
"""
|
||||
|
||||
USAGE_DETAILS = """
|
||||
# Authentication:
|
||||
1. authenticate with oauth credentials on command line:
|
||||
> wv_upload_tool.py --oauth-credentials oauth_credentials.json
|
||||
2. authenticate with service account credentials on command line:
|
||||
> wv_upload_tool.py --service-account-credentials service_account_credentials.json
|
||||
3. authenticate with DEFAULT_SERVICE_ACCOUNT_INFO embedded in script:
|
||||
> wv_upload_tool.py
|
||||
import argparse
|
||||
from http import HTTPStatus
|
||||
import http.server
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import urllib.parse
|
||||
import urllib.request
|
||||
import uuid
|
||||
import webbrowser
|
||||
from google.auth.transport import requests
|
||||
from google.oauth2 import service_account
|
||||
|
||||
DEFAULT_BASE = 'https://widevine.googleapis.com/v1beta1'
|
||||
UPLOAD_PATH = '/uniqueDeviceInfo:batchUpload'
|
||||
BATCH_CHECK_PATH = '/uniqueDeviceInfo:batchCheck'
|
||||
TOKEN_CACHE_FILE = os.path.join(
|
||||
os.path.expanduser('~'), '.device_info_uploader.token'
|
||||
)
|
||||
|
||||
OAUTH_SERVICE_BASE = 'https://accounts.google.com/o/oauth2'
|
||||
OAUTH_AUTHN_URL = OAUTH_SERVICE_BASE + '/auth'
|
||||
OAUTH_TOKEN_URL = OAUTH_SERVICE_BASE + '/token'
|
||||
|
||||
OAUTH_SCOPES = ['https://www.googleapis.com/auth/widevine/frontend']
|
||||
|
||||
# Usage Examples:
|
||||
1. extract device metadata from android device(s) visible to adb and upload:
|
||||
> wv_upload_tool.py
|
||||
2. extract device metadata from bugreport(s) and upload:
|
||||
> wv_upload_tool.py --bugreport ./bugreport.zip ./bugreport.txt
|
||||
3. upload raw bcc's (device metadata must be supplied separately)
|
||||
> wv_upload_tool.py --bcc-metadata ./device_metadata.json --bcc ./raw_bcc.bin
|
||||
4. extract but don't upload; save output and upload later:
|
||||
> wv_upload_tool.py -n > ./out.json ; wv_upload_tool.py --json-csr ./out.json
|
||||
"""
|
||||
|
||||
class OAuthHTTPRequestHandler(http.server.BaseHTTPRequestHandler):
|
||||
"""HTTP Handler used to accept the oauth response when the user logs in."""
|
||||
@@ -184,72 +135,61 @@ def parse_args():
|
||||
Returns:
|
||||
An argparse.Namespace object populated with the arguments.
|
||||
"""
|
||||
parser = argparse.ArgumentParser(description='Upload device info' + CALLOUT,
|
||||
formatter_class=argparse.RawTextHelpFormatter,
|
||||
epilog=USAGE_DETAILS)
|
||||
parser = argparse.ArgumentParser(description='Widevine BCC Batch Upload/Check Tool')
|
||||
|
||||
parser.add_argument("--version", action="version", version="20240822") #yyyymmdd
|
||||
|
||||
parser.add_argument(
|
||||
'--json-csr',
|
||||
nargs='*',
|
||||
default=[],
|
||||
help="""list of files containing JSON with following keys:
|
||||
1. architecture
|
||||
2. bcc
|
||||
3. build_info
|
||||
4. company
|
||||
5. model
|
||||
6. name
|
||||
7. product
|
||||
""",
|
||||
nargs='+',
|
||||
required=True,
|
||||
help='list of files containing JSON output from factory extraction tool',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--bugreport',
|
||||
nargs='*',
|
||||
default=[],
|
||||
# type=argparse.FileType('rb'),
|
||||
help='list of bugreports to extract bcc & device metadata from',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--bcc',
|
||||
nargs='*',
|
||||
default=[],
|
||||
type=argparse.FileType('rb'),
|
||||
help='list of raw bcc files',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--bcc-metadata',
|
||||
type=argparse.FileType('r'),
|
||||
help='Metadata for raw bcc files; required with --bcc.\n'
|
||||
'Same JSON fields are required as --json-csr except for `bcc`.',
|
||||
)
|
||||
|
||||
parser.add_argument('--oauth-credentials', help='JSON OAuth credentials file')
|
||||
|
||||
parser.add_argument('--service-account-credentials', help='JSON service account key file')
|
||||
parser.add_argument('--credentials', help='JSON credentials file')
|
||||
|
||||
parser.add_argument(
|
||||
'--endpoint', default=DEFAULT_BASE, help='destination server URL'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--org-name', default=DEFAULT_ORG, help='orgnization name'
|
||||
)
|
||||
parser.add_argument('--org-name', required=True, help='orgnization name')
|
||||
|
||||
parser.add_argument(
|
||||
'--cache-token',
|
||||
action='store_true',
|
||||
default=True,
|
||||
help='Use a locally cached a refresh token',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'-n',
|
||||
'--dry-run',
|
||||
'--service-credentials', help='JSON credentials file for service account'
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--die-on-error',
|
||||
action='store_true',
|
||||
help='Print bcc and metadata to stdout instead of uploading, one bcc per line.\n'
|
||||
'Output can passed to --json-csr for uploads.',
|
||||
help='exit on error and stop uploading more CSRs',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--dryrun',
|
||||
action='store_true',
|
||||
help=(
|
||||
'Do not upload anything. Instead print out what actions would have'
|
||||
' been taken if the --dryrun flag had not been specified.'
|
||||
),
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--check',
|
||||
action='store_true',
|
||||
required=False,
|
||||
help='Perform a batch check on the CSRs.',
|
||||
)
|
||||
|
||||
parser.add_argument(
|
||||
'--verbose',
|
||||
action='store_true',
|
||||
required=False,
|
||||
help='Print request and response details.',
|
||||
)
|
||||
|
||||
return parser.parse_args()
|
||||
@@ -268,43 +208,31 @@ def parse_json_csrs(filename, batches):
|
||||
line_count = 0
|
||||
for line in open(filename):
|
||||
line_count = line_count + 1
|
||||
obj = {}
|
||||
try:
|
||||
obj = json.loads(line)
|
||||
except json.JSONDecodeError as e:
|
||||
die(f'{e.msg} {filename}:{line_count}, char {e.pos}')
|
||||
convert_bcc_for_upload(obj, batches)
|
||||
|
||||
try:
|
||||
bcc = {'boot_certificate_chain': obj['bcc']}
|
||||
device_metadata = json.dumps({
|
||||
'company': obj['company'],
|
||||
'architecture': obj['architecture'],
|
||||
'name': obj['name'],
|
||||
'model': obj['model'],
|
||||
'product': obj['product'],
|
||||
'build_info': obj['build_info'],
|
||||
'oemcrypto_build_info': obj['oemcrypto_build_info'],
|
||||
})
|
||||
if device_metadata not in batches:
|
||||
batches[device_metadata] = []
|
||||
batches[device_metadata].append(bcc)
|
||||
except KeyError as e:
|
||||
die(f'Invalid object at {filename}:{line_count}, missing {e}')
|
||||
|
||||
def convert_bcc_for_upload(obj, batches):
|
||||
"""Convert python bcc dict into format suitable for upload
|
||||
|
||||
If the input is not a valid python bcc dict, exit the program.
|
||||
|
||||
Args:
|
||||
obj: The python dict that contains bcc and device metadata fields
|
||||
batches: Output dict containing a mapping from json dumped device metadata
|
||||
to BCCs.
|
||||
"""
|
||||
|
||||
try:
|
||||
bcc = {'boot_certificate_chain': obj['bcc']}
|
||||
device_metadata = json.dumps(
|
||||
{
|
||||
'company': obj['company'],
|
||||
'architecture': obj['architecture'],
|
||||
'name': obj['name'],
|
||||
'model': obj['model'],
|
||||
'product': obj['product'],
|
||||
'build_info': obj['build_info'],
|
||||
},
|
||||
sort_keys=True,
|
||||
)
|
||||
except KeyError as e:
|
||||
die(f'Invalid object at {filename}:{line_count}, missing {e}')
|
||||
|
||||
if device_metadata not in batches:
|
||||
batches[device_metadata] = []
|
||||
batches[device_metadata].append(bcc)
|
||||
if line_count == 0:
|
||||
die('Empty BCC file!')
|
||||
|
||||
|
||||
def format_request_body(args, device_metadata, bccs):
|
||||
@@ -318,6 +246,17 @@ def format_request_body(args, device_metadata, bccs):
|
||||
return json.dumps(request).encode('utf-8')
|
||||
|
||||
|
||||
def format_check_request_body(args, bccs):
|
||||
"""Generate a formatted BatchCheck request buffer for the given build and CSRs."""
|
||||
request = {
|
||||
'parent': 'orgs/' + args.org_name,
|
||||
'request_id': uuid.uuid4().hex,
|
||||
'device_info': bccs,
|
||||
}
|
||||
|
||||
return json.dumps(request).encode('utf-8')
|
||||
|
||||
|
||||
def load_refresh_token():
|
||||
if not os.path.exists(TOKEN_CACHE_FILE):
|
||||
return None
|
||||
@@ -380,15 +319,16 @@ def fetch_access_token(creds, cache_token=False, code=None, redirect_uri=None):
|
||||
die(f'Failed to receive access token: {e.code} {e.reason}')
|
||||
|
||||
|
||||
def load_and_validate_creds(credmap):
|
||||
def load_and_validate_creds(credfile):
|
||||
"""Loads the credentials from the given file and validates them.
|
||||
|
||||
Args:
|
||||
credmap: python dict containing the client credentials
|
||||
credfile: the name of the file containing the client credentials
|
||||
|
||||
Returns:
|
||||
A map containing the credentials for connecting to the APE backend.
|
||||
"""
|
||||
credmap = json.load(open(credfile))
|
||||
|
||||
not_local_app_creds_error = (
|
||||
'ERROR: Invalid credential file.\n'
|
||||
@@ -405,14 +345,12 @@ def load_and_validate_creds(credmap):
|
||||
|
||||
expected_keys = set(['client_id', 'client_secret', 'redirect_uris'])
|
||||
if not expected_keys.issubset(creds.keys()):
|
||||
die(
|
||||
(
|
||||
'ERROR: Invalid credential file.\n'
|
||||
' The given credentials do not appear to be valid. Please\n'
|
||||
' re-download the client credentials file from the dashboard:\n'
|
||||
' https://console.cloud.google.com/apis/credentials'
|
||||
)
|
||||
)
|
||||
die((
|
||||
'ERROR: Invalid credential file.\n'
|
||||
' The given credentials do not appear to be valid. Please\n'
|
||||
' re-download the client credentials file from the dashboard:\n'
|
||||
' https://console.cloud.google.com/apis/credentials'
|
||||
))
|
||||
|
||||
if 'http://localhost' not in creds['redirect_uris']:
|
||||
die(not_local_app_creds_error)
|
||||
@@ -420,42 +358,25 @@ def load_and_validate_creds(credmap):
|
||||
return creds
|
||||
|
||||
|
||||
def fetch_service_account_token(args):
|
||||
"""Use a service account to get an access token."""
|
||||
if not GOOGLE_AUTH_CLIENT_INSTALLED:
|
||||
die(
|
||||
'Attempting to use service account but you have not '
|
||||
'installed Google\'s auth client library.\n'
|
||||
'Run the following command to install it:\n'
|
||||
'pip3 install google-auth==2.13.0 requests==2.28'
|
||||
)
|
||||
|
||||
global SERVICE_ACCOUNT
|
||||
|
||||
if SERVICE_ACCOUNT is None:
|
||||
if args.service_account_credentials:
|
||||
SERVICE_ACCOUNT = service_account.Credentials.from_service_account_file(
|
||||
args.service_account_credentials,
|
||||
scopes=[OAUTH_SCOPE],
|
||||
)
|
||||
else:
|
||||
SERVICE_ACCOUNT = service_account.Credentials.from_service_account_info(
|
||||
DEFAULT_SERVICE_ACCOUNT_INFO,
|
||||
scopes=[OAUTH_SCOPE],
|
||||
)
|
||||
SERVICE_ACCOUNT.refresh(requests.Request())
|
||||
|
||||
return SERVICE_ACCOUNT.token
|
||||
|
||||
|
||||
def authenticate_and_fetch_token(args):
|
||||
"""Authenticate the user and fetch an OAUTH2 access token."""
|
||||
if args.oauth_credentials:
|
||||
credmap = json.load(open(args.oauth_credentials))
|
||||
else:
|
||||
return fetch_service_account_token(args)
|
||||
"""Authenticate and fetch an OAUTH2 access token."""
|
||||
# Auth for service account
|
||||
if args.service_credentials:
|
||||
if not os.path.exists(args.service_credentials):
|
||||
die('Service account credentials file does not exist.')
|
||||
svc_account = service_account.Credentials.from_service_account_file(
|
||||
args.service_credentials,
|
||||
scopes=OAUTH_SCOPES,
|
||||
)
|
||||
svc_account.refresh(requests.Request())
|
||||
return svc_account.token
|
||||
|
||||
creds = load_and_validate_creds(credmap)
|
||||
# Auth for user account
|
||||
if args.credentials is None:
|
||||
die('User credentials is not provided.')
|
||||
if not os.path.exists(args.credentials):
|
||||
die('User credentials file does not exist.')
|
||||
creds = load_and_validate_creds(args.credentials)
|
||||
|
||||
access_type = 'online'
|
||||
if args.cache_token:
|
||||
@@ -494,16 +415,52 @@ def upload_batch(args, device_metadata, bccs):
|
||||
device_metadata: The build for which we're uploading CSRs
|
||||
bccs: a list of BCCs to be uploaded for the given build
|
||||
"""
|
||||
print("Uploading {} bcc(s) for build '{}'".format(len(bccs), device_metadata))
|
||||
print('Uploading {} bcc(s)'.format(len(bccs)))
|
||||
if args.verbose:
|
||||
print("Build: '{}'".format(device_metadata))
|
||||
body = format_request_body(args, device_metadata, bccs)
|
||||
print(body)
|
||||
print(args.endpoint + UPLOAD_PATH)
|
||||
request = urllib.request.Request(args.endpoint + UPLOAD_PATH)
|
||||
return batch_action_single_attempt(args, UPLOAD_PATH, body)
|
||||
|
||||
|
||||
def check_batch(args, device_metadata, bccs):
|
||||
"""Batch check all the CSRs.
|
||||
|
||||
Args:
|
||||
args: The parsed command-line arguments
|
||||
device_metadata: The build for which we're checking CSRs
|
||||
bccs: a list of BCCs to be checked for the given build
|
||||
"""
|
||||
print('Checking {} bcc(s)'.format(len(bccs)))
|
||||
if args.verbose:
|
||||
print("Build: '{}'".format(device_metadata))
|
||||
body = format_check_request_body(args, bccs)
|
||||
return batch_action_single_attempt(args, BATCH_CHECK_PATH, body)
|
||||
|
||||
|
||||
def batch_action_single_attempt(args, path, body):
|
||||
"""Batch action (upload or check existence) for all the CSRs in chunks.
|
||||
|
||||
Args:
|
||||
args: The parsed command-line arguments
|
||||
csrs: a list of CSRs to be uploaded/checked for the given build
|
||||
path: The endpoint url for the specific action
|
||||
body: The formatted request body
|
||||
"""
|
||||
if args.verbose:
|
||||
print('Request body:')
|
||||
print(body)
|
||||
print('Request target:')
|
||||
print(args.endpoint + path)
|
||||
request = urllib.request.Request(args.endpoint + path)
|
||||
request.add_header('Content-Type', 'application/json')
|
||||
request.add_header('X-GFE-SSL', 'yes')
|
||||
request.add_header(
|
||||
'Authorization', 'Bearer ' + authenticate_and_fetch_token(args)
|
||||
)
|
||||
if args.dryrun:
|
||||
print('dry run: would have reached to ' + request.full_url)
|
||||
return HTTPStatus.OK
|
||||
|
||||
try:
|
||||
response = urllib.request.urlopen(request, body)
|
||||
except urllib.error.HTTPError as e:
|
||||
@@ -512,154 +469,38 @@ def upload_batch(args, device_metadata, bccs):
|
||||
eprint(line.decode('utf-8').rstrip())
|
||||
sys.exit(1)
|
||||
|
||||
while chunk := response.read(1024):
|
||||
print(chunk.decode('utf-8'))
|
||||
|
||||
|
||||
def get_device_info_adb(s):
|
||||
"""Get bcc & device metadata via adb.
|
||||
|
||||
Args:
|
||||
s: device serial number
|
||||
|
||||
Returns:
|
||||
A python dict that contains bcc and device metadata fields
|
||||
"""
|
||||
obj = dict()
|
||||
|
||||
sys_prop_names = dict(
|
||||
company='ro.product.manufacturer',
|
||||
model='ro.product.model',
|
||||
name='ro.product.device',
|
||||
product='ro.product.name',
|
||||
architecture='ro.product.cpu.abi',
|
||||
)
|
||||
for attr, prop in sys_prop_names.items():
|
||||
obj[attr] = subprocess.check_output(
|
||||
f'adb -s {s} shell getprop {prop}'.split(), text=True
|
||||
).strip()
|
||||
|
||||
wv_prop_names = dict(
|
||||
boot_certificate_chain='bcc',
|
||||
oemcrypto_build_info='build_info',
|
||||
)
|
||||
dump = subprocess.check_output(
|
||||
f'adb -s {s} shell dumpsys android.hardware.drm.IDrmFactory/widevine -p'
|
||||
.split(),
|
||||
text=True,
|
||||
).splitlines()
|
||||
for line in dump[1:]:
|
||||
kv = line.strip().split(': ', 1)
|
||||
if len(kv) < 2:
|
||||
continue
|
||||
k, v = kv
|
||||
if k in wv_prop_names:
|
||||
obj[wv_prop_names[k]] = v.strip('"\'')
|
||||
|
||||
obj['bcc'] = base64.b64encode(bytes.fromhex(obj['bcc'])).decode()
|
||||
return obj
|
||||
|
||||
|
||||
def get_device_info_bugreport(br):
|
||||
"""Get bcc & device metadata from bugreport.
|
||||
|
||||
Args:
|
||||
br: bugreport file
|
||||
|
||||
Returns:
|
||||
A python dict that contains bcc and device metadata fields
|
||||
"""
|
||||
z=None
|
||||
section = ''
|
||||
sections = set()
|
||||
metadata = dict()
|
||||
if zipfile.is_zipfile(br):
|
||||
z=zipfile.ZipFile(br)
|
||||
with z.open('main_entry.txt') as m:
|
||||
fname = m.read().decode()
|
||||
|
||||
if z:
|
||||
f=z.open(fname)
|
||||
response_body = response.read().decode('utf-8')
|
||||
if args.verbose:
|
||||
print('Response body:')
|
||||
print(response_body)
|
||||
res = json.loads(response_body)
|
||||
if 'failedDeviceInfo' in res:
|
||||
eprint('Failed to upload/check some device info! Response body:')
|
||||
eprint(response_body)
|
||||
eprint('Failed: {} bcc(s)'.format(len(res['failedDeviceInfo'])))
|
||||
if args.die_on_error:
|
||||
sys.exit(1)
|
||||
elif 'successDeviceInfo' in res:
|
||||
print('Success: {} bcc(s)'.format(len(res['successDeviceInfo'])))
|
||||
else:
|
||||
f=open(br,'rb')
|
||||
|
||||
for line in f:
|
||||
if line.startswith(
|
||||
b'DUMP OF SERVICE android.hardware.drm.IDrmFactory/widevine:'
|
||||
):
|
||||
section = 'widevine'
|
||||
elif line.find(b'- SYSTEM PROPERTIES (getprop) -') >= 0:
|
||||
section = 'getprop'
|
||||
elif line.startswith(b'-----') and section:
|
||||
sections.add(section)
|
||||
section = ''
|
||||
|
||||
if len(sections) == 2:
|
||||
break
|
||||
|
||||
pattern = ''
|
||||
if section == 'getprop':
|
||||
pattern = b'^\\[(.+)\\]: \\[(.+)\\]$'
|
||||
elif section == 'widevine':
|
||||
pattern = b'\\s+([a-z_]+): (.+)$'
|
||||
|
||||
if pattern:
|
||||
match = re.match(pattern, line)
|
||||
if match:
|
||||
k, v = match.groups()
|
||||
metadata[k.decode()] = v.decode().strip('"\'')
|
||||
|
||||
prop_names = dict(
|
||||
company='ro.product.manufacturer',
|
||||
model='ro.product.model',
|
||||
name='ro.product.device',
|
||||
product='ro.product.name',
|
||||
architecture='ro.product.cpu.abi',
|
||||
bcc='boot_certificate_chain',
|
||||
build_info='oemcrypto_build_info',
|
||||
)
|
||||
|
||||
obj = dict()
|
||||
for k in prop_names:
|
||||
obj[k] = metadata[prop_names[k]]
|
||||
obj['bcc'] = base64.b64encode(bytes.fromhex(obj['bcc'])).decode()
|
||||
return obj
|
||||
|
||||
eprint('Failed with unexpected response:')
|
||||
eprint(response_body)
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
if args.dryrun:
|
||||
print('Dry run mode enabled. Service APIs will not be called.')
|
||||
|
||||
batches = {}
|
||||
for filename in args.json_csr:
|
||||
parse_json_csrs(filename, batches)
|
||||
|
||||
for br in args.bugreport:
|
||||
convert_bcc_for_upload(get_device_info_bugreport(br), batches)
|
||||
|
||||
if args.bcc_metadata:
|
||||
bcc_metadata = json.load(args.bcc_metadata)
|
||||
for bcc in args.bcc:
|
||||
if not args.bcc_metadata:
|
||||
die('Missing --bcc-metadata')
|
||||
tmp = bcc_metadata.copy()
|
||||
tmp['bcc'] = base64.b64encode(bcc.read()).decode()
|
||||
convert_bcc_for_upload(tmp, batches)
|
||||
|
||||
if not batches:
|
||||
lines = subprocess.check_output('adb devices'.split(), text=True).split(
|
||||
'\n'
|
||||
)
|
||||
for line in lines:
|
||||
if line.endswith('device'):
|
||||
serial = line.split()[0]
|
||||
convert_bcc_for_upload(get_device_info_adb(serial), batches)
|
||||
if len(batches) > 1:
|
||||
print('WARNING: {} different device metadata'.format(len(batches)))
|
||||
|
||||
for device_metadata, bccs in batches.items():
|
||||
if args.dry_run:
|
||||
for bcc in bccs:
|
||||
out = json.loads(device_metadata)
|
||||
out['bcc'] = bcc['boot_certificate_chain']
|
||||
print(json.dumps(out))
|
||||
if args.check:
|
||||
check_batch(args, device_metadata, bccs)
|
||||
else:
|
||||
upload_batch(args, device_metadata, bccs)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user