Minecraft-Updater/parsers/modrinth.py

195 lines
7.6 KiB
Python
Raw Normal View History

2021-12-19 18:00:57 -06:00
from os import error
2021-12-20 09:38:59 -06:00
from types import WrapperDescriptorType
2021-12-19 23:42:31 -06:00
from pytz.tzinfo import memorized_timedelta
2021-12-20 09:38:59 -06:00
#import requests
import socket
2022-02-24 13:29:47 -06:00
import time
2021-12-20 09:38:59 -06:00
from request_wrapper import requests_wrapper as requests
2021-12-19 23:42:31 -06:00
import datetime
import iso8601
import pytz
from urllib.parse import unquote
2022-02-27 20:20:41 -06:00
import re
2021-12-20 09:38:59 -06:00
debug = True
2021-12-19 18:00:57 -06:00
if debug == True:
import logging
from requests import api
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
timeoutTime = 1
2022-02-27 20:20:41 -06:00
#Setup re
#regex = re.compile(r'*')
# Setup session, this lets the parser re-use the connection instead of establishing a new connection for EACH request, not only does this cause a HUGE performance boost, it's also nicer to the API.
session = requests.Session()
2021-12-20 09:38:59 -06:00
base_api_url = "https://api.modrinth.com:443/api/v1"
2022-02-24 13:29:47 -06:00
def failCheck(response, functOrigin):
2021-12-19 18:00:57 -06:00
print("Status Code is: "+str(response.status_code))
2022-02-24 13:29:47 -06:00
if response.status_code == 429:
sleep_time = int(response.headers["X-Ratelimit-Reset"])+1
print("Too many requests!"+'\n'+"Waiting for "+str(sleep_time)+" seconds...")
print(response.headers)
time.sleep(sleep_time) # Wait until API ratelimit is over
2022-02-24 13:30:36 -06:00
print("Retrying "+functOrigin+"...")
2022-02-24 13:29:47 -06:00
return True
elif response.status_code != 200:
2021-12-19 18:00:57 -06:00
raise error
# Data Caching
dataCache = {}
def cacheData(function_name, cached_data):
print("Caching data!")
dataCache[function_name] = cached_data
#print(dataCache)
2021-12-20 09:38:59 -06:00
print("Stored "+function_name+"'s data to cache")
2021-12-19 18:00:57 -06:00
def modInfo(project):
print("Calling modInfo()...")
if "modInfo" in dataCache:
2021-12-20 09:38:59 -06:00
print("Returning cached data!")
2021-12-19 18:00:57 -06:00
return dataCache["modInfo"]
else:
response = session.get(base_api_url+"/mod/"+project, family=socket.AF_INET)
if failCheck(response, "modInfo") == True: #Attempt to requery API
response = session.get(base_api_url+"/mod/"+project, family=socket.AF_INET)
api_response = response.json()
cacheData("modInfo", api_response)
return api_response
2021-12-19 18:00:57 -06:00
def getVersions(project):
print("Calling getVersions()...")
if "getVersions" in dataCache:
2021-12-20 09:38:59 -06:00
print("Returning cached data!")
2021-12-19 18:00:57 -06:00
return dataCache["getVersions"]
else:
workingDict = modInfo(project)
versions = workingDict["versions"]
cacheData("getVersions", versions)
return versions
2021-12-19 18:00:57 -06:00
def getAllModVersionInfo(project):
print("Calling getAllModVersionInfo()...")
if "getAllModVersionInfo" in dataCache:
2021-12-20 09:38:59 -06:00
print("Returning cached data!")
return dataCache["getAllModVersionInfo"]
else:
versions = getVersions(project)
responseList = []
numberOfVersions = len(versions)
for item in range(numberOfVersions):
response = session.get(base_api_url+"/version/"+versions[item], family=socket.AF_INET)
if failCheck(response, "getAllModVersionInfo") == True: #Attempt to requery API
response = session.get(base_api_url+"/version/"+versions[item], family=socket.AF_INET)
api_response = response.json()
responseList.append(api_response)
cacheData("getAllModVersionInfo", responseList)
return responseList
2021-12-19 18:00:57 -06:00
2021-12-19 23:42:31 -06:00
def determine(project, whatToDetermine):
2021-12-20 09:38:59 -06:00
print("Calling determine()...")
2021-12-19 18:00:57 -06:00
modInfo = getAllModVersionInfo(project)
numberOfVersions = len(modInfo)
2021-12-19 23:42:31 -06:00
determine = []
for item in range(numberOfVersions):
workingDict = modInfo[item]
determine.append(workingDict[whatToDetermine])
#print(str(item)+" "+str(determine[item]))
return determine
def getLatestVersion(project, **kwargs):
2021-12-20 09:38:59 -06:00
print("Calling getLatestVersion()...")
targetted_versions = kwargs.get('targetted_versions', None)
if targetted_versions != None:
versions = targetted_versions
else:
versions = getVersions(project)
2022-02-27 20:20:41 -06:00
#print(versions)
2021-12-19 23:42:31 -06:00
publishDates = determine(project, "date_published")
#print(publishDates)
# Get current date
currentDate = pytz.utc.localize(datetime.datetime.utcnow())
#print(currentDate)
convertedDates = {}
numberOfVersions = len(versions)
for item in range(numberOfVersions):
2021-12-19 23:42:31 -06:00
convertTime = iso8601.parse_date(publishDates[item])
convertedDates[versions[item]] = convertTime
shortestDate = {}
for item in range(numberOfVersions):
2021-12-19 23:42:31 -06:00
shortestDate[versions[item]] = currentDate - convertedDates[versions[item]]
#print(shortestDate)
# Sort the dictionary to find the most recent version
latest = {key: val for key, val in sorted(shortestDate.items(), key = lambda ele: ele[1])}
return list(latest.keys())[0]
2021-12-19 18:00:57 -06:00
2022-02-27 20:20:41 -06:00
def key_filter(project, dict_to_filter, key_to_grab, type_to_grab):
print("Calling key_filter()...")
versions = dict_to_filter
build_type = determine(project, key_to_grab)
# Build a dictionary that ties the versions to the build type
build_type_dict = {}
number_of_versions = len(versions)
for item in range(number_of_versions):
build_type_dict[versions[item]] = build_type[item]
2022-02-27 20:20:41 -06:00
#print(build_type_dict)
# Sort dictionary to filter out only the release builds
stable = []
2022-02-27 20:20:41 -06:00
print("Looking for "+str(type_to_grab))
for key, value in build_type_dict.items():
2022-02-27 20:20:41 -06:00
#print("looking at "+str(value))
search = re.search(str(type_to_grab), str(value))
if search != None:
print("Match!")
#print(key)
stable.append(key)
2022-02-27 20:20:41 -06:00
# Call getLatestVersion, tell it to use our output
2022-02-27 20:20:41 -06:00
return stable
def getForMinecraftVersion(project, version, stability):
print("Calling getForMinecraftVersion()...")
print("Downloading",stability,"for Minecraft version", version)
if stability == "stable":
#Filter Game versions
targetted_versions=key_filter(project, getVersions(project), "game_versions", version)
#Filter Stable versions
stable_versions=key_filter(project, targetted_versions, "version_type", "release")
result = getLatestVersion(project, targetted_versions=stable_versions)
elif stability == "latest":
#Filter Game versions
targetted_versions=key_filter(project, getVersions(project), "game_versions", version)
#print(targetted_versions)
#Filter Latest versions
stable_versions=key_filter(project, targetted_versions, "version_type", "release")
result = getLatestVersion(project, targetted_versions=stable_versions)
print("latest build for "+version+" is "+result)
return result
def getLatestStable(project):
print("Calling getLatestStable()...")
return getLatestVersion(project, getVersions(project) ,targetted_versions=key_filter(project, "version_type", "release"))
2021-12-19 23:42:31 -06:00
def getDownloadURL(project, versionID):
2021-12-20 09:38:59 -06:00
print("Calling getDownloadURL()...")
2021-12-19 23:42:31 -06:00
versions = getVersions(project)
versionInfo = getAllModVersionInfo(project)
downloadURLs = {}
downloadSHA1 = {}
downloadFilenames = {}
# Iterate through the nested lists/dicts
for item in range(len(versions)):
workingDict = versionInfo[item]
#print("workingDict: "+str(workingDict))
workingList = workingDict["files"]
#print("workingList: "+str(workingList))
workingDict2 = workingList[0]
workingDict3 = workingDict2["hashes"]
#print(workingDict3)
downloadURLs[versions[item]] = unquote(workingDict2["url"])
2021-12-19 23:42:31 -06:00
downloadSHA1[versions[item]] = workingDict3["sha1"]
downloadFilenames[versions[item]] = workingDict2["filename"]
#print(downloadURLs)
return [downloadURLs[versionID], downloadSHA1[versionID], downloadFilenames[versionID]]