from os import error from types import WrapperDescriptorType from pytz.tzinfo import memorized_timedelta #import requests import socket import time from request_wrapper import requests_wrapper as requests import datetime import iso8601 import pytz from urllib.parse import unquote import re debug = True if debug == True: import logging from requests import api logging.basicConfig() logging.getLogger().setLevel(logging.DEBUG) requests_log = logging.getLogger("requests.packages.urllib3") requests_log.setLevel(logging.DEBUG) requests_log.propagate = True #Setup re #regex = re.compile(r'*') # Setup session, this lets the parser re-use the connection instead of establishing a new connection for EACH request, not only does this cause a HUGE performance boost, it's also nicer to the API. session = requests.Session() base_api_url = "https://api.modrinth.com:443/api/v1" def failCheck(response, functOrigin): print("Status Code is: "+str(response.status_code)) if response.status_code == 429: sleep_time = int(response.headers["X-Ratelimit-Reset"])+1 print("Too many requests!"+'\n'+"Waiting for "+str(sleep_time)+" seconds...") print(response.headers) time.sleep(sleep_time) # Wait until API ratelimit is over print("Retrying "+functOrigin+"...") return True elif response.status_code != 200: raise error # Data Caching dataCache = {} def cacheData(function_name, cached_data): print("Caching data!") dataCache[function_name] = cached_data #print(dataCache) print("Stored "+function_name+"'s data to cache") def modInfo(project): print("Calling modInfo()...") if "modInfo" in dataCache: print("Returning cached data!") return dataCache["modInfo"] else: response = session.get(base_api_url+"/mod/"+project, family=socket.AF_INET) if failCheck(response, "modInfo") == True: #Attempt to requery API response = session.get(base_api_url+"/mod/"+project, family=socket.AF_INET) api_response = response.json() cacheData("modInfo", api_response) return api_response def getVersions(project): print("Calling getVersions()...") if "getVersions" in dataCache: print("Returning cached data!") return dataCache["getVersions"] else: workingDict = modInfo(project) versions = workingDict["versions"] cacheData("getVersions", versions) return versions def getAllModVersionInfo(project): print("Calling getAllModVersionInfo()...") if "getAllModVersionInfo" in dataCache: print("Returning cached data!") return dataCache["getAllModVersionInfo"] else: versions = getVersions(project) responseList = [] numberOfVersions = len(versions) for item in range(numberOfVersions): response = session.get(base_api_url+"/version/"+versions[item], family=socket.AF_INET) if failCheck(response, "getAllModVersionInfo") == True: #Attempt to requery API response = session.get(base_api_url+"/version/"+versions[item], family=socket.AF_INET) api_response = response.json() responseList.append(api_response) cacheData("getAllModVersionInfo", responseList) return responseList def determine(project, whatToDetermine): print("Calling determine()...") modInfo = getAllModVersionInfo(project) numberOfVersions = len(modInfo) determine = [] for item in range(numberOfVersions): workingDict = modInfo[item] determine.append(workingDict[whatToDetermine]) #print(str(item)+" "+str(determine[item])) return determine def getLatestVersion(project, **kwargs): print("Calling getLatestVersion()...") targetted_versions = kwargs.get('targetted_versions', None) if targetted_versions != None: versions = targetted_versions else: versions = getVersions(project) #print(versions) publishDates = determine(project, "date_published") #print(publishDates) # Get current date currentDate = pytz.utc.localize(datetime.datetime.utcnow()) #print(currentDate) convertedDates = {} numberOfVersions = len(versions) for item in range(numberOfVersions): convertTime = iso8601.parse_date(publishDates[item]) convertedDates[versions[item]] = convertTime shortestDate = {} for item in range(numberOfVersions): shortestDate[versions[item]] = currentDate - convertedDates[versions[item]] #print(shortestDate) # Sort the dictionary to find the most recent version latest = {key: val for key, val in sorted(shortestDate.items(), key = lambda ele: ele[1])} return list(latest.keys())[0] def key_filter(dict_to_filter, type_to_grab, **kwargs): print("Calling key_filter()...") return_both = kwargs.get('return_both', False) #versions = dict_to_filter #build_type = determine(project, key_to_grab) # Build a dictionary that ties the versions to the build type #build_type_dict = {} #number_of_versions = len(versions) #for item in range(number_of_versions): # build_type_dict[versions[item]] = build_type[item] #print(build_type_dict) # Sort dictionary to filter out only the release builds #build_type_dict output_version = [] output_query = [] print("Looking for "+str(type_to_grab)) for key, value in dict_to_filter.items(): #print("looking at "+str(value)) search = re.search(str(type_to_grab), str(value)) if search != None: print("Match!") #print(key) output_version.append(key) output_query.append(value) # Return output if return_both == False: return output_version elif return_both == True: #Tie the lists back together, and return it return_dict = {} return_list_count = len(key) for item in range(return_list_count): return_dict[output_query[item]] = output_version[item] return return_dict def getLatestVersion_by_number(project, version, **kwargs): targetted_versions = kwargs.get('targetted_versions', None) if targetted_versions != None: versions = targetted_versions else: versions = getVersions(project) print("Calling getLatestVersion_by_number()...") version_number = determine(project, "version_number") print(version_number) print("Looking for latest build for Minecraft version: "+ str(version)) version_numbers = {} number_of_versions = len(version_number) for item in range(number_of_versions): version_numbers[versions[item]] = version_number[item] print(version_numbers) # Ask the key_filter function to give us the builds that correspond to the requested version versions_to_target = key_filter(version_numbers, version, return_both=True) all_mc_mod_release = [] for key, value in versions_to_target.items(): mc_mod_release = re.split('-+', key) all_mc_mod_release.append(mc_mod_release) print(all_mc_mod_release) # Okay, take the last entry in the contents inside each mc_mod_release and output the higest value versions_to_parse_2 = [] for item in range(len(all_mc_mod_release)): versions_to_parse = all_mc_mod_release[item] versions_to_parse_2.append(versions_to_parse[-1]) latest_version = max(versions_to_parse_2) print(latest_version) #Finally, key_filter for the latest build version latest_version_id = key_filter(version_numbers, version+"-"+latest_version) return str(latest_version_id[0]) def getForMinecraftVersion(project, version, stability): print("Calling getForMinecraftVersion()...") print("Downloading",stability,"for Minecraft version", version) if stability == "stable": #Filter Game versions targetted_versions=key_filter("game_versions", version) #Filter Stable versions stable_versions=key_filter(project, targetted_versions, "version_type", "release") result = getLatestVersion_by_number(project, version, targetted_versions=stable_versions) if stability == "latest": #Filter Game versions targetted_versions=getVersions(project) stable_versions=(targetted_versions) result = getLatestVersion_by_number(project, version, targetted_versions=stable_versions) print("latest build for "+version+" is "+result) return result def getLatestStable(project): print("Calling getLatestStable()...") versions = getVersions(project) build_type = determine(project, "Release") # Build a dictionary that ties the versions to the build type build_type_dict = {} number_of_versions = len(versions) for item in range(number_of_versions): build_type_dict[versions[item]] = build_type[item] print(build_type_dict) return getLatestVersion_by_number(project, getVersions(project) ,targetted_versions=key_filter(project, getVersions(project), "version_type", "release")) def getDownloadURL(project, versionID): print("Calling getDownloadURL()...") versions = getVersions(project) versionInfo = getAllModVersionInfo(project) downloadURLs = {} downloadSHA1 = {} downloadFilenames = {} # Iterate through the nested lists/dicts for item in range(len(versions)): workingDict = versionInfo[item] #print("workingDict: "+str(workingDict)) workingList = workingDict["files"] #print("workingList: "+str(workingList)) workingDict2 = workingList[0] workingDict3 = workingDict2["hashes"] #print(workingDict3) downloadURLs[versions[item]] = unquote(workingDict2["url"]) downloadSHA1[versions[item]] = workingDict3["sha1"] downloadFilenames[versions[item]] = workingDict2["filename"] #print(downloadURLs) return [downloadURLs[versionID], downloadSHA1[versionID], downloadFilenames[versionID]]