import json, os, queue, threading, requests, sqlite3, zipfile, argparse from enum import Enum from pathlib import Path def main(platform="quest", locale="en_US", action="assets", output_dir_param="."): global forced_locale global hmd_type global target_path global output_dir output_dir = Path(output_dir_param) hmd_type = HMDTypes[platform.upper()] forced_locale = locale target_path = Path(output_dir) / hmd_type.value["name"] target_path.mkdir(parents=True, exist_ok=True) print("Retrieving Appinfo") retrieve_app_info() print("Resolving package names") retrieve_all_package_names() if action == "assets": print("Downloading images") download_images() json_filename = ( "appnames_" + hmd_type.value["name"] + "_" + forced_locale + ".json" ) print("Creating " + json_filename) create_app_names_json(json_filename) zip_filename = "iconpack_" + hmd_type.value["name"] + ".zip" print("Creating " + zip_filename) create_iconpack(zip_filename) elif action == "free": json_filename = "free_apps_" + hmd_type.value["name"] + ".json" print("Creating " + json_filename) create_free_apps_json(json_filename) connection.close() class HMDTypes(Enum): QUEST = {"sectionID": "1888816384764129", "hmdType": "MONTEREY", "name": "quest"} RIFT = {"sectionID": "1736210353282450", "hmdType": "RIFT", "name": "rift"} GO = {"sectionID": "174868819587665", "hmdType": "PACIFIC", "name": "go"} GEARVR = {"sectionID": "174868819587665", "hmdType": "GEARVR", "name": "gearvr"} hmd_type = None forced_locale = None target_path = None url = "https://graph.oculus.com/graphql" app_list = dict() connection = sqlite3.connect("packages.db") cursor = connection.cursor() cursor.execute( "CREATE TABLE IF NOT EXISTS packages (app_id TEXT PRIMARY_KEY, package_name TEXT)" ) uri_queue = queue.Queue() def retrieve_app_info(): sectionCursor = None has_next_page = True while has_next_page: sectionID = hmd_type.value["sectionID"] hmdType = hmd_type.value["hmdType"] variables = { "sectionId": sectionID, "sortOrder": None, "sectionItemCount": 100, "hmdType": hmdType, } if sectionCursor is not None: variables["sectionCursor"] = sectionCursor parameters = { "forced_locale": forced_locale, "access_token": "OC|1317831034909742|", "variables": json.dumps(variables), "doc_id": "1934814353250664", } response = requests.get(url, params=parameters) json_response = response.json() if "data" not in json_response: print("API ERROR:", json_response) break all_items = json_response["data"]["node"]["all_items"] edges = all_items["edges"] page_info = all_items["page_info"] has_next_page = page_info["has_next_page"] sectionCursor = page_info.get("end_cursor") for edge in edges: app_id = edge["node"]["id"] app_name = edge["node"]["display_name"] app_image_uri = edge["node"]["cover_landscape_image"]["uri"] node = edge["node"] offer = node.get("current_offer") or {} price_info = offer.get("price") or {} price = price_info.get("offset_amount") app_list[app_id] = { "name": app_name, "uri": app_image_uri, "price": price, "package_name": None, } print(f"\rFetched {len(app_list)} apps...", end="", flush=True) # Safety: if cursor didn't advance, bail to avoid infinite loop if not sectionCursor: break print() print("Retrieved: " + str(len(app_list))) def retrieve_all_package_names(): """Resolve all package names upfront and store them in app_list.""" resolved = 0 skipped = 0 for app_id in list(app_list.keys()): package_name = retrieve_package_name(app_id) app_list[app_id]["package_name"] = package_name if package_name: resolved += 1 else: skipped += 1 print(f"Resolved: {resolved}, Skipped: {skipped}") def retrieve_package_name(app_id): cursor.execute("SELECT package_name FROM packages WHERE app_id=?", (app_id,)) result = cursor.fetchone() if result is not None: return result[0] print("Fetching package_name for app_id " + app_id) parameters = { "access_token": "OC|1317831034909742|", "variables": json.dumps({"id": str(app_id)}), "doc_id": "3326435794041206", } request = requests.get(url, params=parameters) response = request.json() if "data" not in response: print("API ERROR (package lookup):", response) return None node = response["data"].get("node") or {} binary = node.get("latestBinary") or {} package_name = binary.get("package_name") if not package_name: print(f"Skipping app_id {app_id} (no package_name)") return None cursor.execute( "INSERT INTO packages (app_id, package_name) VALUES (?,?)", (app_id, package_name), ) connection.commit() return package_name def download_file(file_url, filename): os.makedirs((os.path.dirname(filename)), exist_ok=True) if not os.path.isfile(filename): response = requests.get(file_url, stream=True) print("Downloading:" + str(filename) + " from " + file_url) with open(filename, "wb") as out_file: for chunk in response.iter_content(chunk_size=1048576): out_file.write(chunk) def download_images(): for app_id, app_info in app_list.items(): package_name = app_info["package_name"] if not package_name: continue uri_queue.put( { "package_name": target_path / (package_name + ".jpg"), "uri": app_info["uri"], } ) num_worker_threads = 8 threads = [] for i in range(num_worker_threads): t = threading.Thread(target=worker) t.start() threads.append(t) uri_queue.join() for t in threads: t.join() def worker(): while 1: try: queue_item = uri_queue.get(block=False) download_file(queue_item["uri"], queue_item["package_name"]) uri_queue.task_done() except queue.Empty: break def create_app_names_json(json_filename): app_names = dict() for app_id, app_info in app_list.items(): package_name = app_info["package_name"] if not package_name: continue app_names[package_name] = {"name": app_info["name"]} with open(Path(output_dir) / json_filename, "w", encoding="utf-8") as file: json.dump(app_names, file, sort_keys=True, indent=0, ensure_ascii=False) def create_free_apps_json(json_filename): free_apps = dict() json_path = output_dir / json_filename if json_path.is_file(): with open(json_path) as file: free_apps = json.load(file) for app_id, app_info in app_list.items(): if app_info["price"] == 0: package_name = app_info["package_name"] if not package_name: continue free_apps[app_id] = { "name": app_info["name"], "package": package_name, } with open(json_path, "w", encoding="utf-8") as file: json.dump(free_apps, file, sort_keys=True, indent=0, ensure_ascii=False) def create_iconpack(zip_filename): with zipfile.ZipFile(Path(output_dir) / zip_filename, "w") as iconpack: for app_id, app_info in app_list.items(): package_name = app_info["package_name"] if not package_name: continue iconpack.write(target_path / (package_name + ".jpg"), package_name + ".jpg") if __name__ == "__main__": parser = argparse.ArgumentParser(description="Download assets for QuestAppLauncher") parser.add_argument( "-p", "--platform", default="quest", choices=["quest", "rift", "go", "gearvr"] ) parser.add_argument( "-l", "--locale", default="en_US", choices=[ "en_US", "en_GB", "de_DE", "cs_CZ", "da_DK", "el_GR", "es_ES", "es_LA", "fi_FI", "fr_FR", "it_IT", "ja_JP", "ko_KR", "nb_NO", "nl_NL", "pl_PL", "pt_BR", "pt_PT", "ro_RO", "ru_RU", "sv_SE", "tr_TR", "zh_CN", "zh_HK", "zh_TW", ], ) parser.add_argument( "-a", "--action", default="assets", choices=["assets", "free"], ) args = parser.parse_args() main(args.platform, args.locale, args.action)