Advanced topics#

This sections continues where the tutorial left off, describing more advanced functionality.

Executors#

To make it easier to call execute()/execute_async() repeatedly with specific arguments, the executor()/async_executor() shortcut can be used.

import requests
exec = snug.executor(auth=('me', 'password'),
                     client=requests.Session())
exec(some_query)
exec(other_query)

# we can still override arguments
exec(another_query, auth=('bob', 'hunter2'))

Authentication methods#

The default authentication method is HTTP Basic authentication. To use another type of authentication, pass a callable as the auth parameter of executor()/async_executor().

This callable takes a Request, and should return an authenticated Request.

To illutrate, here is a simple example for token-based authentication:

class Token:
    def __init__(self, token):
        self._headers = {'Authorization': f'token {token}'}

    def __call__(self, request):
        return request.with_headers(self._headers)

snug.execute(my_query, auth=Token('my token'))

See the slack API example for a real-world use-case.

Pagination#

Often in web APIs, results are obtained through pagination. We can define queries in such a way that page content can be iterated over with ease.

To support pagination, a query should return a Page (or Pagelike) object.

This can be illustrated by github’s organizations endpoint.

import snug
import json

def organizations(since: int=None):
    """retrieve a page of organizations since a particular id"""
    response = yield snug.GET('https://api.github.com/organizations',
                              params={'since': since} if since else {})
    orgs = json.loads(response.content)
    next_query = organizations(since=orgs[-1]['id']) if orgs else None
    return snug.Page(orgs, next_query=next_query)

The query can be executed as-is…

>>> snug.execute(organizations())
Page([{"id": 44, ...}, ...])

…but it can also be wrapped with paginated. Executing the paginated query returns an iterator which automatically fetches the content of next pages as needed.

paginated_query = snug.paginated(organizations(since=44))
for org_list in snug.execute(paginated_query):
    ...

Note that:

  • The pagination iterator yields the page content (not the Page itself).

  • paginated objects are reusable (if the wrapped query is).

  • The page content may be any type.

Asynchronous execution results in an asychronous iterator. Note that the result of execute_async() itself is not awaited in this case.

async for org_list in snug.execute_async(paginated_query):
    ...

Custom page types#

It is possible to implement more full-featured page-like objects. This can be useful, for example, to implement an iterable interface, or to add attributes. Custom page types can be created by subclassing Pagelike, or implementing its interface.

See the slack API example for a practical use-case.

Low-level control#

In some cases it is necessary to have more control over query execution. For example, when:

  • HTTP client-specific features are needed (e.g. streaming responses, multipart data)

  • implementing advanced execution logic

For this purpose, you can use the __execute__()/__execute_async__() hook. Implementing this method allows full customization of a query’s execution. The downside is that the query may become dependent on the client, which limits its reusability.

The __execute__()/__execute_async__() methods take two (positional) arguments:

  • client – the client which was passed to execute().

  • auth – a callable which takes a Request, and returns an authenticated Request.

The following example shows how this can be used to implement streaming responses to download github repository assets.

class asset_download(snug.Query):
    """streaming download of a repository asset.
    Can only be executed with the `requests` or `aiohttp` client"""

    def __init__(self, repo_name, repo_owner, id):
        self.request = snug.GET(
            f'https://api.github.com/repos/{repo_owner}'
            f'/{repo_name}/releases/assets/{id}',
            headers={'Accept': 'application/octet-stream'})

    def __execute__(self, client, auth):
        """executes the query, returning a streaming requests response"""
        assert isinstance(client, requests.Session)
        req = auth(self.request)
        return client.request(req.method, req.url,
                              data=req.content,
                              params=req.params,
                              headers=req.headers,
                              stream=True)

    async def __execute_async__(self, client, auth):
        """executes the query, returning an aiohttp response"""
        assert isinstance(client, aiohttp.Session)
        req = auth(self.request)
        return client.request(req.method, req.url,
                              data=req.content,
                              params=req.params,
                              headers=req.headers)

We can then write:

download = asset_download('hub', repo_owner='github', id=4187895)

# with requests:
response = snug.execute(download, client=requests.Session())
for chunk in response.iter_content():
    ...

# with aiohttp (inside a coroutine)
async with aiohttp.Session() as s:
    response = snug.execute_async(download, client=s)

    while True:
        chunk = await resp.content.read(chunk_size)
        ...

Note

You shouldn’t have to use this feature very often. Its purpose is to provide an “escape hatch” to circumvent the usual query execution logic. If you find yourself using it often, consider posting a feature request for your use-case.

Registering HTTP clients#

By default, clients for requests, httpx, and aiohttp are registered. Register new clients with send() or send_async().

These functions are singledispatch() functions. A new client type can be registered as follows:

@snug.send.register(MyClientType)
def _send(client: MyClientType, req: snug.Request) -> snug.Response:
    # unpack the snug.Request into a client call
    raw_response = client.send_request(
        url=req.url,
        data=req.content,
        ...)

    # be sure to wrap the response in a snug.Response before returning
    return snug.Response(
        status_code=raw_response.status_code,
        ...)

Composing queries#

To keep everything nice and modular, queries may be composed and extended. In the github API example, we may wish to define common logic for:

  • prefixing urls with https://api.github.com

  • setting the required headers

  • parsing responses to JSON

  • deserializing JSON into objects

  • raising descriptive exceptions from responses

  • following redirects

We can use a function-based approach with gentools, or a class-based approach by subclassing Query. We’ll explore the functional style first.

Function-based approach#

Preparing requests#

Outgoing requests of a query can be modified with the map_yield decorator.

from gentools import map_yield, reusable

add_prefix = snug.prefix_adder('https://api.github.com')
add_headers = snug.header_adder({
    'Accept': 'application/vnd.github.v3+json',
    'User-Agent': 'my awesome app',
})

@reusable
@map_yield(add_headers, add_prefix, snug.GET)
def repo(name: str, owner: str) -> snug.Query[dict]:
    """a repository lookup by owner and name"""
    return json.loads((yield f'/repos/{owner}/{name}').content)

@reusable
@map_yield(add_headers, add_prefix, snug.PUT)
def follow(username: str) -> snug.Query[bool]:
    """follow a user"""
    return (yield f'/user/following/{username}').status_code == 204

Parsing responses#

Responses being sent to a query can be modified with the map_send decorator.

from gentools import reusable, map_yield, map_send

class ApiException(Exception):
    """an error from the github API"""

def handle_errors(resp):
    """raise a descriptive exception on a "bad request" response"""
    if resp.status_code == 400:
        raise ApiException(json.loads(resp.content).get('message'))
    return resp

def load_json_content(resp):
    """get the response body as JSON"""
    return json.loads(resp.content)

@reusable
@map_send(load_json_content, handle_errors)
@map_yield(add_headers, add_prefix, snug.GET)
def repo(name: str, owner: str) -> snug.Query[dict]:
    """a repository lookup by owner and name"""
    return (yield f'/repos/{owner}/{name}')

@reusable
@map_send(handle_errors)
@map_yield(add_headers, add_prefix, snug.PUT)
def follow(username: str) -> snug.Query[bool]:
    """follow a user"""
    return (yield f'/user/following/{username}').status_code == 204

Relaying queries#

For advanced cases, each requests/response interaction of a query can be relayed through another generator. This can be done with the relay decorator. This can be useful if response handling is dependent on the request, or more complex control flow. The following example shows how this can be used to implement redirects.

from gentools import reusable, map_yield, map_send, relay

def follow_redirects(req):
    resp = yield req
    while resp.status_code in (301, 302, 307):
        resp = yield req.replace(url=resp.headers['Location'])
    return resp

@reusable
@relay(follow_redirects)
@map_send(load_json_content, handle_errors)
@map_yield(add_headers, add_prefix, snug.GET)
def repo(name: str, owner: str) -> snug.Query[dict]:
    """a repository lookup by owner and name"""
    return (yield f'/repos/{owner}/{name}')

See the recipes for more examples.

Loading return values#

To modify the return value of a generator, use the map_return decorator.

from collections import namedtuple
from gentools import reusable, map_yield, map_send, map_return, relay

Repository = namedtuple(...)

def load_repo(jsondata: dict) -> Repository:
    ...  # deserialization logic

@reusable
@relay(follow_redirects)
@map_return(load_repo)
@map_send(load_json_content, handle_errors)
@map_yield(add_headers, add_prefix, snug.GET)
def repo(name: str, owner: str) -> snug.Query[Repository]:
    """a repository lookup by owner and name"""
    return (yield f'/repos/{owner}/{name}')

Object-oriented approach#

Below is a roughly equivalent, object-oriented approach:

import json
import snug
import typing as t
from collections import namedtuple

Repository = namedtuple(...)

class ApiException(Exception):
    """an error from the github API"""

def load_repo(jsondata: dict) -> Repository:
    ...  # deserialization logic

T = t.TypeVar('T')

# typevar allows us to subclass `BaseQuery` as a generic, not required
class BaseQuery(snug.Query[T]):
    """base class for github queries"""

    def prepare(self, request):
        """add headers and stuff"""
        return (request.with_prefix('https://api.github.com')
                .with_headers({
                    'Accept': 'application/vnd.github.v3+json',
                    'User-Agent': 'my awesome app',
                }))

    def __iter__(self):
        """perform the query, while handling redirects"""
        req = self.prepare(self.request)
        resp = yield req
        while resp.status_code in (301, 302, 307):
            resp = yield req.replace(url=resp.headers['Location'])
        return self.load(self.check_response(resp))

    def check_response(self, resp):
        """raise a descriptive exception on a "bad request" response"""
        if resp.status_code == 400:
            raise ApiException(json.loads(resp.content).get('message'))
        return resp

class repo(BaseQuery[Repository]):
    """a repository lookup by owner and name"""
    def __init__(self, name: str, owner: str):
        self.request = snug.GET(f'/repos/{owner}/{name}')

    def load(self, response):
        return load_repo(json.loads(response.content))


class follow(BaseQuery[bool]):
    """follow another user"""
    def __init__(self, name: str):
        self.request == snug.PUT(f'/user/following/{name}')

    def load(self, response):
        return response.status_code == 204