254 lines
10 KiB
Python
254 lines
10 KiB
Python
from os import error
|
|
from types import WrapperDescriptorType
|
|
from pytz.tzinfo import memorized_timedelta
|
|
#import requests
|
|
import socket
|
|
import time
|
|
from request_wrapper import requests_wrapper as requests
|
|
import datetime
|
|
import iso8601
|
|
import pytz
|
|
from urllib.parse import unquote
|
|
import re
|
|
debug = True
|
|
if debug == True:
|
|
import logging
|
|
from requests import api
|
|
logging.basicConfig()
|
|
logging.getLogger().setLevel(logging.DEBUG)
|
|
requests_log = logging.getLogger("requests.packages.urllib3")
|
|
requests_log.setLevel(logging.DEBUG)
|
|
requests_log.propagate = True
|
|
|
|
#Setup re
|
|
#regex = re.compile(r'*')
|
|
# Setup session, this lets the parser re-use the connection instead of establishing a new connection for EACH request, not only does this cause a HUGE performance boost, it's also nicer to the API.
|
|
session = requests.Session()
|
|
base_api_url = "https://api.modrinth.com:443/v2"
|
|
def failCheck(response, functOrigin):
|
|
print("Status Code is: "+str(response.status_code))
|
|
if response.status_code == 429:
|
|
sleep_time = int(response.headers["X-Ratelimit-Reset"])+1
|
|
print("Too many requests!"+'\n'+"Waiting for "+str(sleep_time)+" seconds...")
|
|
print(response.headers)
|
|
time.sleep(sleep_time) # Wait until API ratelimit is over
|
|
print("Retrying "+functOrigin+"...")
|
|
return True
|
|
elif response.status_code != 200:
|
|
raise error
|
|
# Data Caching
|
|
dataCache = {}
|
|
def cacheData(function_name, cached_data):
|
|
print("Caching data!")
|
|
dataCache[function_name] = cached_data
|
|
#print(dataCache)
|
|
print("Stored "+function_name+"'s data to cache")
|
|
|
|
def modInfo(project):
|
|
print("Calling modInfo()...")
|
|
if "modInfo" in dataCache:
|
|
print("Returning cached data!")
|
|
return dataCache["modInfo"]
|
|
else:
|
|
response = session.get(base_api_url+"/project/"+project, family=socket.AF_INET)
|
|
if failCheck(response, "modInfo") == True: #Attempt to requery API
|
|
response = session.get(base_api_url+"/mod/"+project, family=socket.AF_INET)
|
|
api_response = response.json()
|
|
cacheData("modInfo", api_response)
|
|
return api_response
|
|
|
|
def getVersions(project):
|
|
print("Calling getVersions()...")
|
|
if "getVersions" in dataCache:
|
|
print("Returning cached data!")
|
|
return dataCache["getVersions"]
|
|
else:
|
|
workingDict = modInfo(project)
|
|
versions = workingDict["versions"]
|
|
cacheData("getVersions", versions)
|
|
return versions
|
|
|
|
def getAllModVersionInfo(project):
|
|
print("Calling getAllModVersionInfo()...")
|
|
if "getAllModVersionInfo" in dataCache:
|
|
print("Returning cached data!")
|
|
return dataCache["getAllModVersionInfo"]
|
|
else:
|
|
versions = getVersions(project)
|
|
responseList = []
|
|
numberOfVersions = len(versions)
|
|
for item in range(numberOfVersions):
|
|
response = session.get(base_api_url+"/version/"+versions[item], family=socket.AF_INET)
|
|
if failCheck(response, "getAllModVersionInfo") == True: #Attempt to requery API
|
|
response = session.get(base_api_url+"/version/"+versions[item], family=socket.AF_INET)
|
|
api_response = response.json()
|
|
responseList.append(api_response)
|
|
cacheData("getAllModVersionInfo", responseList)
|
|
return responseList
|
|
|
|
def determine(project, whatToDetermine):
|
|
print("Calling determine()...")
|
|
modInfo = getAllModVersionInfo(project)
|
|
numberOfVersions = len(modInfo)
|
|
determine = []
|
|
for item in range(numberOfVersions):
|
|
workingDict = modInfo[item]
|
|
determine.append(workingDict[whatToDetermine])
|
|
#print(str(item)+" "+str(determine[item]))
|
|
return determine
|
|
|
|
def getLatestVersion(project, **kwargs):
|
|
print("Calling getLatestVersion()...")
|
|
targetted_versions = kwargs.get('targetted_versions', None)
|
|
if targetted_versions != None:
|
|
versions = targetted_versions
|
|
else:
|
|
versions = getVersions(project)
|
|
#print(versions)
|
|
publishDates = determine(project, "date_published")
|
|
#print(publishDates)
|
|
# Get current date
|
|
currentDate = pytz.utc.localize(datetime.datetime.utcnow())
|
|
#print(currentDate)
|
|
convertedDates = {}
|
|
numberOfVersions = len(versions)
|
|
for item in range(numberOfVersions):
|
|
convertTime = iso8601.parse_date(publishDates[item])
|
|
convertedDates[versions[item]] = convertTime
|
|
shortestDate = {}
|
|
for item in range(numberOfVersions):
|
|
shortestDate[versions[item]] = currentDate - convertedDates[versions[item]]
|
|
#print(shortestDate)
|
|
# Sort the dictionary to find the most recent version
|
|
latest = {key: val for key, val in sorted(shortestDate.items(), key = lambda ele: ele[1])}
|
|
return list(latest.keys())[0]
|
|
|
|
def key_filter(dict_to_filter, type_to_grab, **kwargs):
|
|
print("Calling key_filter()...")
|
|
return_both = kwargs.get('return_both', False)
|
|
#versions = dict_to_filter
|
|
#build_type = determine(project, key_to_grab)
|
|
# Build a dictionary that ties the versions to the build type
|
|
#build_type_dict = {}
|
|
#number_of_versions = len(versions)
|
|
#for item in range(number_of_versions):
|
|
# build_type_dict[versions[item]] = build_type[item]
|
|
#print(build_type_dict)
|
|
# Sort dictionary to filter out only the release builds
|
|
#build_type_dict
|
|
output_version = []
|
|
output_query = []
|
|
print("Looking for "+str(type_to_grab))
|
|
for key, value in dict_to_filter.items():
|
|
#print("looking at "+str(value))
|
|
search = re.search(str(type_to_grab), str(value))
|
|
if search != None:
|
|
print("Match!")
|
|
print(key, value)
|
|
output_version.append(key)
|
|
output_query.append(value)
|
|
|
|
# Return output
|
|
if return_both == False:
|
|
return output_version
|
|
elif return_both == True:
|
|
#Tie the lists back together, and return it
|
|
return_dict = {}
|
|
for item in range(len(output_query)):
|
|
return_dict[output_query[item]] = output_version[item]
|
|
return return_dict
|
|
|
|
def getLatestVersion_by_number(project, version, **kwargs):
|
|
targetted_versions = kwargs.get('targetted_versions', None)
|
|
if targetted_versions != None:
|
|
versions = targetted_versions
|
|
else:
|
|
versions = getVersions(project)
|
|
print("Calling getLatestVersion_by_number()...")
|
|
version_number = determine(project, "version_number")
|
|
print(version_number)
|
|
print("Looking for latest build for Minecraft version: "+ str(version))
|
|
version_numbers = {}
|
|
number_of_versions = len(version_number)
|
|
for item in range(number_of_versions):
|
|
version_numbers[versions[item]] = version_number[item]
|
|
print(version_numbers)
|
|
# Ask the key_filter function to give us the builds that correspond to the requested version
|
|
versions_to_target = key_filter(version_numbers, version, return_both=True)
|
|
all_mc_mod_release = []
|
|
for key, value in versions_to_target.items():
|
|
mc_mod_release = re.split('-+', key)
|
|
all_mc_mod_release.append(mc_mod_release)
|
|
if len(all_mc_mod_release) == 1:
|
|
all_mc_mod_release = []
|
|
for key, value in versions_to_target.items():
|
|
mc_mod_release = re.split('\++', key)
|
|
all_mc_mod_release.append(mc_mod_release)
|
|
print("Length of all_mc_mod_release: "+str(len(all_mc_mod_release)))
|
|
print(all_mc_mod_release)
|
|
# Okay, take the last entry in the contents inside each mc_mod_release and output the higest value
|
|
versions_to_parse_2 = []
|
|
for item in range(len(all_mc_mod_release)):
|
|
versions_to_parse = all_mc_mod_release[item]
|
|
versions_to_parse_2.append(versions_to_parse[-1])
|
|
latest_version = max(versions_to_parse_2)
|
|
print(latest_version)
|
|
#Finally, key_filter for the latest build version
|
|
latest_version_id = key_filter(version_numbers, latest_version)
|
|
print(type(latest_version_id))
|
|
print(latest_version_id)
|
|
return str(latest_version_id[0])
|
|
|
|
def getForMinecraftVersion(project, version, stability):
|
|
print("Calling getForMinecraftVersion()...")
|
|
print("Downloading",stability,"for Minecraft version", version)
|
|
if stability == "stable":
|
|
#Filter Game versions
|
|
targetted_versions=key_filter("game_versions", version)
|
|
#Filter Stable versions
|
|
stable_versions=key_filter(project, targetted_versions, "version_type", "release")
|
|
result = getLatestVersion_by_number(project, version, targetted_versions=stable_versions)
|
|
if stability == "latest":
|
|
#Filter Game versions
|
|
targetted_versions=getVersions(project)
|
|
stable_versions=(targetted_versions)
|
|
result = getLatestVersion_by_number(project, version, targetted_versions=stable_versions)
|
|
print("latest build for "+version+" is "+result)
|
|
return result
|
|
|
|
def getLatestStable(project):
|
|
print("Calling getLatestStable()...")
|
|
versions = getVersions(project)
|
|
build_type = determine(project, "Release")
|
|
|
|
# Build a dictionary that ties the versions to the build type
|
|
build_type_dict = {}
|
|
number_of_versions = len(versions)
|
|
for item in range(number_of_versions):
|
|
build_type_dict[versions[item]] = build_type[item]
|
|
print(build_type_dict)
|
|
return getLatestVersion_by_number(project, getVersions(project) ,targetted_versions=key_filter(project, getVersions(project), "version_type", "release"))
|
|
|
|
|
|
def getDownloadURL(project, versionID):
|
|
print("Calling getDownloadURL()...")
|
|
versions = getVersions(project)
|
|
versionInfo = getAllModVersionInfo(project)
|
|
downloadURLs = {}
|
|
downloadSHA1 = {}
|
|
downloadFilenames = {}
|
|
# Iterate through the nested lists/dicts
|
|
for item in range(len(versions)):
|
|
workingDict = versionInfo[item]
|
|
#print("workingDict: "+str(workingDict))
|
|
workingList = workingDict["files"]
|
|
#print("workingList: "+str(workingList))
|
|
workingDict2 = workingList[0]
|
|
workingDict3 = workingDict2["hashes"]
|
|
#print(workingDict3)
|
|
downloadURLs[versions[item]] = unquote(workingDict2["url"])
|
|
downloadSHA1[versions[item]] = workingDict3["sha1"]
|
|
downloadFilenames[versions[item]] = workingDict2["filename"]
|
|
#print(downloadURLs)
|
|
return [downloadURLs[versionID], downloadSHA1[versionID], downloadFilenames[versionID]]
|