Files
QuestAppLauncher_Assets/QuestAssetDL.py
oxmc7769 3d86f19c0f
Some checks failed
build-pack / build (push) Has been cancelled
Modernize
2026-03-18 09:16:56 -07:00

311 lines
9.0 KiB
Python

import json, os, queue, threading, requests, sqlite3, zipfile, argparse
from enum import Enum
from pathlib import Path
def main(platform="quest", locale="en_US", action="assets", output_dir_param="."):
global forced_locale
global hmd_type
global target_path
global output_dir
output_dir = Path(output_dir_param)
hmd_type = HMDTypes[platform.upper()]
forced_locale = locale
target_path = Path(output_dir) / hmd_type.value["name"]
target_path.mkdir(parents=True, exist_ok=True)
print("Retrieving Appinfo")
retrieve_app_info()
print("Resolving package names")
retrieve_all_package_names()
if action == "assets":
print("Downloading images")
download_images()
json_filename = (
"appnames_" + hmd_type.value["name"] + "_" + forced_locale + ".json"
)
print("Creating " + json_filename)
create_app_names_json(json_filename)
zip_filename = "iconpack_" + hmd_type.value["name"] + ".zip"
print("Creating " + zip_filename)
create_iconpack(zip_filename)
elif action == "free":
json_filename = "free_apps_" + hmd_type.value["name"] + ".json"
print("Creating " + json_filename)
create_free_apps_json(json_filename)
connection.close()
class HMDTypes(Enum):
QUEST = {"sectionID": "1888816384764129", "hmdType": "MONTEREY", "name": "quest"}
RIFT = {"sectionID": "1736210353282450", "hmdType": "RIFT", "name": "rift"}
GO = {"sectionID": "174868819587665", "hmdType": "PACIFIC", "name": "go"}
GEARVR = {"sectionID": "174868819587665", "hmdType": "GEARVR", "name": "gearvr"}
hmd_type = None
forced_locale = None
target_path = None
url = "https://graph.oculus.com/graphql"
app_list = dict()
connection = sqlite3.connect("packages.db")
cursor = connection.cursor()
cursor.execute(
"CREATE TABLE IF NOT EXISTS packages (app_id TEXT PRIMARY_KEY, package_name TEXT)"
)
uri_queue = queue.Queue()
def retrieve_app_info():
sectionCursor = None
has_next_page = True
while has_next_page:
sectionID = hmd_type.value["sectionID"]
hmdType = hmd_type.value["hmdType"]
variables = {
"sectionId": sectionID,
"sortOrder": None,
"sectionItemCount": 100,
"hmdType": hmdType,
}
if sectionCursor is not None:
variables["sectionCursor"] = sectionCursor
parameters = {
"forced_locale": forced_locale,
"access_token": "OC|1317831034909742|",
"variables": json.dumps(variables),
"doc_id": "1934814353250664",
}
response = requests.get(url, params=parameters)
json_response = response.json()
if "data" not in json_response:
print("API ERROR:", json_response)
break
all_items = json_response["data"]["node"]["all_items"]
edges = all_items["edges"]
page_info = all_items["page_info"]
has_next_page = page_info["has_next_page"]
sectionCursor = page_info.get("end_cursor")
for edge in edges:
app_id = edge["node"]["id"]
app_name = edge["node"]["display_name"]
app_image_uri = edge["node"]["cover_landscape_image"]["uri"]
node = edge["node"]
offer = node.get("current_offer") or {}
price_info = offer.get("price") or {}
price = price_info.get("offset_amount")
app_list[app_id] = {
"name": app_name,
"uri": app_image_uri,
"price": price,
"package_name": None,
}
print(f"\rFetched {len(app_list)} apps...", end="", flush=True)
# Safety: if cursor didn't advance, bail to avoid infinite loop
if not sectionCursor:
break
print()
print("Retrieved: " + str(len(app_list)))
def retrieve_all_package_names():
"""Resolve all package names upfront and store them in app_list."""
resolved = 0
skipped = 0
for app_id in list(app_list.keys()):
package_name = retrieve_package_name(app_id)
app_list[app_id]["package_name"] = package_name
if package_name:
resolved += 1
else:
skipped += 1
print(f"Resolved: {resolved}, Skipped: {skipped}")
def retrieve_package_name(app_id):
cursor.execute("SELECT package_name FROM packages WHERE app_id=?", (app_id,))
result = cursor.fetchone()
if result is not None:
return result[0]
print("Fetching package_name for app_id " + app_id)
parameters = {
"access_token": "OC|1317831034909742|",
"variables": json.dumps({"id": str(app_id)}),
"doc_id": "3326435794041206",
}
request = requests.get(url, params=parameters)
response = request.json()
if "data" not in response:
print("API ERROR (package lookup):", response)
return None
node = response["data"].get("node") or {}
binary = node.get("latestBinary") or {}
package_name = binary.get("package_name")
if not package_name:
print(f"Skipping app_id {app_id} (no package_name)")
return None
cursor.execute(
"INSERT INTO packages (app_id, package_name) VALUES (?,?)",
(app_id, package_name),
)
connection.commit()
return package_name
def download_file(file_url, filename):
os.makedirs((os.path.dirname(filename)), exist_ok=True)
if not os.path.isfile(filename):
response = requests.get(file_url, stream=True)
print("Downloading:" + str(filename) + " from " + file_url)
with open(filename, "wb") as out_file:
for chunk in response.iter_content(chunk_size=1048576):
out_file.write(chunk)
def download_images():
for app_id, app_info in app_list.items():
package_name = app_info["package_name"]
if not package_name:
continue
uri_queue.put(
{
"package_name": target_path / (package_name + ".jpg"),
"uri": app_info["uri"],
}
)
num_worker_threads = 8
threads = []
for i in range(num_worker_threads):
t = threading.Thread(target=worker)
t.start()
threads.append(t)
uri_queue.join()
for t in threads:
t.join()
def worker():
while 1:
try:
queue_item = uri_queue.get(block=False)
download_file(queue_item["uri"], queue_item["package_name"])
uri_queue.task_done()
except queue.Empty:
break
def create_app_names_json(json_filename):
app_names = dict()
for app_id, app_info in app_list.items():
package_name = app_info["package_name"]
if not package_name:
continue
app_names[package_name] = {"name": app_info["name"]}
with open(Path(output_dir) / json_filename, "w", encoding="utf-8") as file:
json.dump(app_names, file, sort_keys=True, indent=0, ensure_ascii=False)
def create_free_apps_json(json_filename):
free_apps = dict()
json_path = output_dir / json_filename
if json_path.is_file():
with open(json_path) as file:
free_apps = json.load(file)
for app_id, app_info in app_list.items():
if app_info["price"] == 0:
package_name = app_info["package_name"]
if not package_name:
continue
free_apps[app_id] = {
"name": app_info["name"],
"package": package_name,
}
with open(json_path, "w", encoding="utf-8") as file:
json.dump(free_apps, file, sort_keys=True, indent=0, ensure_ascii=False)
def create_iconpack(zip_filename):
with zipfile.ZipFile(Path(output_dir) / zip_filename, "w") as iconpack:
for app_id, app_info in app_list.items():
package_name = app_info["package_name"]
if not package_name:
continue
iconpack.write(target_path / (package_name + ".jpg"), package_name + ".jpg")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Download assets for QuestAppLauncher")
parser.add_argument(
"-p", "--platform", default="quest", choices=["quest", "rift", "go", "gearvr"]
)
parser.add_argument(
"-l",
"--locale",
default="en_US",
choices=[
"en_US",
"en_GB",
"de_DE",
"cs_CZ",
"da_DK",
"el_GR",
"es_ES",
"es_LA",
"fi_FI",
"fr_FR",
"it_IT",
"ja_JP",
"ko_KR",
"nb_NO",
"nl_NL",
"pl_PL",
"pt_BR",
"pt_PT",
"ro_RO",
"ru_RU",
"sv_SE",
"tr_TR",
"zh_CN",
"zh_HK",
"zh_TW",
],
)
parser.add_argument(
"-a",
"--action",
default="assets",
choices=["assets", "free"],
)
args = parser.parse_args()
main(args.platform, args.locale, args.action)