Module steamstore.client
Expand source code
import logging
import asyncio
import aiohttp
from .defaults import *
from .app import App
from .featured import FeaturedList
log = logging.getLogger(__name__)
class Client:
"""Main class for the Steam API"""
def __init__(self, *, loop=None, **opts):
self.loop = asyncio.get_event_loop() if loop is None else loop
self.ready = False
self.http = None
def __cleanup(self):
loop = self.loop
if self.http:
asyncio.ensure_future(self.http.close(), loop=loop)
def stop(self):
self.__cleanup()
def __build_api(self, path:str, qs:dict=None, **args):
url = base_api + path
if qs:
url += "?" + '&'.join([name + "=" + str(value) for name, value in qs.items()])
return url
async def __get_json(self, req:aiohttp.client_reqrep.ClientResponse, *args, **kwargs) -> dict:
json_resp = None
try:
json_resp = await req.json()
except:
print("Error")
return # TODO: Handle this
return json_resp
async def start(self, *args, **kwargs):
self.http = await aiohttp.ClientSession().__aenter__()
self.ready = True
def run(self, *args, **kwargs):
loop = self.loop
async def runner():
try:
await self.start(*args, **kwargs)
finally:
pass # TODO: Handle this
asyncio.ensure_future(runner(), loop=loop)
try:
loop.run_forever()
except KeyboardInterrupt:
self.__cleanup()
async def get_app_from_id(self, appid:int, currency_code:str="us", language_code:str="en"):
req = await self.http.get(self.__build_api("appdetails", {"appids":appid, "cc":currency_code, "l":language_code}))
json_resp = await self.__get_json(req)
for item in json_resp:
data = json_resp[item]
return App(data=data["data"]) if data["success"] else None # TODO: Handle This
async def get_featured(self, *args, **kwargs):
req = await self.http.get(self.__build_api("featured"))
json_resp = await self.__get_json(req)
return FeaturedList(data=json_resp)
Classes
class Client (*, loop=None, **opts)
-
Main class for the Steam API
Expand source code
class Client: """Main class for the Steam API""" def __init__(self, *, loop=None, **opts): self.loop = asyncio.get_event_loop() if loop is None else loop self.ready = False self.http = None def __cleanup(self): loop = self.loop if self.http: asyncio.ensure_future(self.http.close(), loop=loop) def stop(self): self.__cleanup() def __build_api(self, path:str, qs:dict=None, **args): url = base_api + path if qs: url += "?" + '&'.join([name + "=" + str(value) for name, value in qs.items()]) return url async def __get_json(self, req:aiohttp.client_reqrep.ClientResponse, *args, **kwargs) -> dict: json_resp = None try: json_resp = await req.json() except: print("Error") return # TODO: Handle this return json_resp async def start(self, *args, **kwargs): self.http = await aiohttp.ClientSession().__aenter__() self.ready = True def run(self, *args, **kwargs): loop = self.loop async def runner(): try: await self.start(*args, **kwargs) finally: pass # TODO: Handle this asyncio.ensure_future(runner(), loop=loop) try: loop.run_forever() except KeyboardInterrupt: self.__cleanup() async def get_app_from_id(self, appid:int, currency_code:str="us", language_code:str="en"): req = await self.http.get(self.__build_api("appdetails", {"appids":appid, "cc":currency_code, "l":language_code})) json_resp = await self.__get_json(req) for item in json_resp: data = json_resp[item] return App(data=data["data"]) if data["success"] else None # TODO: Handle This async def get_featured(self, *args, **kwargs): req = await self.http.get(self.__build_api("featured")) json_resp = await self.__get_json(req) return FeaturedList(data=json_resp)
Methods
async def get_app_from_id(self, appid: int, currency_code: str = 'us', language_code: str = 'en')
-
Expand source code
async def get_app_from_id(self, appid:int, currency_code:str="us", language_code:str="en"): req = await self.http.get(self.__build_api("appdetails", {"appids":appid, "cc":currency_code, "l":language_code})) json_resp = await self.__get_json(req) for item in json_resp: data = json_resp[item] return App(data=data["data"]) if data["success"] else None # TODO: Handle This
async def get_featured(self, *args, **kwargs)
-
Expand source code
async def get_featured(self, *args, **kwargs): req = await self.http.get(self.__build_api("featured")) json_resp = await self.__get_json(req) return FeaturedList(data=json_resp)
def run(self, *args, **kwargs)
-
Expand source code
def run(self, *args, **kwargs): loop = self.loop async def runner(): try: await self.start(*args, **kwargs) finally: pass # TODO: Handle this asyncio.ensure_future(runner(), loop=loop) try: loop.run_forever() except KeyboardInterrupt: self.__cleanup()
async def start(self, *args, **kwargs)
-
Expand source code
async def start(self, *args, **kwargs): self.http = await aiohttp.ClientSession().__aenter__() self.ready = True
def stop(self)
-
Expand source code
def stop(self): self.__cleanup()