Source code for snug.query

"""Types and functionality relating to queries"""
import typing as t
import urllib.request
from functools import partial

from .clients import send, send_async
from .http import basic_auth

__all__ = [
    "Query",
    "execute",
    "execute_async",
    "executor",
    "async_executor",
    "related",
]

T = t.TypeVar("T")


def _identity(obj):
    return obj


[docs]class Query(t.Generic[T]): """Abstract base class for query-like objects. Any object whose :meth:`~object.__iter__` returns a :class:`~snug.http.Request`/:class:`~snug.http.Response` generator implements it. Note ---- Generator iterators themselves also implement this interface (i.e. :meth:`~object.__iter__` returns the generator itself). Note ---- Query is a :class:`~typing.Generic`. This means you may write ``Query[<returntype>]`` as a descriptive type annotation. For example: ``Query[bool]`` indicates a query which returns a boolean. Examples -------- Creating a query from a generator function: >>> def repo(name: str, owner: str) -> snug.Query[dict]: ... \"\"\"a repo lookup by owner and name\"\"\" ... req = snug.GET(f'https://api.github.com/repos/{owner}/{name}') ... response = yield req ... return json.loads(response.content) ... >>> query = repo('Hello-World', owner='octocat') Creating a class-based query: >>> # actually subclassing `Query` is not required >>> class repo(snug.Query[dict]): ... \"\"\"a repository lookup by owner and name\"\"\" ... def __init__(self, name: str, owner: str): ... self.name, self.owner = name, owner ... ... def __iter__(self): ... owner, name = self.owner, self.name ... request = snug.GET( ... f'https://api.github.com/repos/{owner}/{name}') ... response = yield request ... return json.loads(response.content) ... >>> query = repo('Hello-World', owner='octocat') Creating a fully customized query (for rare cases): >>> # actually subclassing `Query` is not required >>> class asset_download(snug.Query): ... \"\"\"streaming download of a repository asset. ... Can only be executed with particular clients\"\"\" ... def __init__(self, repo_name, repo_owner, id): ... self.request = snug.GET( ... f'https://api.github.com/repos/{repo_owner}' ... f'/{repo_name}/releases/assets/{id}', ... headers={'Accept': 'application/octet-stream'}) ... ... def __execute__(self, client, authenticate): ... \"\"\"execute, returning a streaming requests response\"\"\" ... assert isinstance(client, requests.Session) ... req = authenticate(self.request) ... return client.request(req.method, req.url, ... data=req.content, ... params=req.params, ... headers=req.headers, ... stream=True) ... ... async def __execute_async__(self, client, authenticate): ... ... ... >>> query = asset_download('hub', rep_owner='github', id=4187895) >>> response = snug.execute(download, client=requests.Session()) >>> for chunk in response.iter_content(): ... ... """ __slots__ = ()
[docs] def __iter__(self): """A generator iterator which resolves the query Returns ------- ~typing.Generator[Request, Response, T] """ raise NotImplementedError()
[docs] def __execute__(self, client, auth): """Default execution logic for a query, which uses the query's :meth:`~Query.__iter__`. May be overriden for full control of query execution, at the cost of reusability. .. versionadded:: 1.1 Note ---- You shouldn't need to override this method, except in rare cases where implementing :meth:`Query.__iter__` does not suffice. Parameters ---------- client the client instance passed to :func:`execute` auth: ~typing.Callable[[Request], Request] a callable to authenticate a :class:`~snug.http.Request` Returns ------- T the query result """ gen = iter(self) request = next(gen) while True: response = send(client, auth(request)) try: request = gen.send(response) except StopIteration as e: return e.args[0]
[docs] async def __execute_async__(self, client, auth): """Default asynchronous execution logic for a query, which uses the query's :meth:`~Query.__iter__`. May be overriden for full control of query execution, at the cost of reusability. .. versionadded:: 1.1 Note ---- You shouldn't need to override this method, except in rare cases where implementing :meth:`Query.__iter__` does not suffice. Parameters ---------- client the client instance passed to :func:`execute_async` auth: ~typing.Callable[[Request], Request] a callable to authenticate a :class:`~snug.http.Request` Returns ------- T the query result """ gen = iter(self) request = next(gen) while True: response = await send_async(client, auth(request)) try: request = gen.send(response) except StopIteration as e: return e.value
def _make_auth(auth): if auth is None: return _identity elif callable(auth): return auth else: return basic_auth(auth)
[docs]def execute(query, auth=None, client=urllib.request.build_opener()): """Execute a query, returning its result Parameters ---------- query: Query[T] The query to resolve auth: ~typing.Tuple[str, str] \ or ~typing.Callable[[Request], Request] or None This may be: * A (username, password)-tuple for basic authentication * A callable to authenticate requests. * ``None`` (no authentication) client The HTTP client to use. Its type must have been registered with :func:`~snug.clients.send`. If not given, the built-in :mod:`urllib` module is used. Returns ------- T the query result """ exec_fn = getattr(type(query), "__execute__", Query.__execute__) return exec_fn(query, client, _make_auth(auth))
[docs]def execute_async(query, auth=None, client=None): """Execute a query asynchronously, returning its result Parameters ---------- query: Query[T] The query to resolve auth: ~typing.Tuple[str, str] \ or ~typing.Callable[[Request], Request] or None This may be: * A (username, password)-tuple for basic authentication * A callable to authenticate requests. * ``None`` (no authentication) client The HTTP client to use. Its type must have been registered with :func:`~snug.clients.send_async`. If not given, the current event loop from :mod:`asyncio` is used. Returns ------- T the query result Note ---- The default client is very rudimentary. Consider using a :class:`aiohttp.ClientSession` instance as ``client``. """ exc_fn = getattr(type(query), "__execute_async__", Query.__execute_async__) return exc_fn( query, client, _make_auth(auth), )
[docs]def executor(**kwargs): """Create a version of :func:`execute` with bound arguments. Parameters ---------- **kwargs arguments to pass to :func:`execute` Returns ------- ~typing.Callable[[Query[T]], T] an :func:`execute`-like function """ return partial(execute, **kwargs)
[docs]def async_executor(**kwargs): """Create a version of :func:`execute_async` with bound arguments. Parameters ---------- **kwargs arguments to pass to :func:`execute_async` Returns ------- ~typing.Callable[[Query[T]], ~typing.Awaitable[T]] an :func:`execute_async`-like function """ return partial(execute_async, **kwargs)