Source code for airbase.fetch

"""Helper functions encapsulating async HTTP request and file IO"""
from __future__ import annotations

import asyncio
import json
import warnings
from pathlib import Path
from types import SimpleNamespace
from typing import AsyncIterator, Awaitable, overload

import aiofiles
import aiohttp
from tqdm import tqdm

DEFAULT = SimpleNamespace(
    progress=False,
    raise_for_status=True,
    max_concurrent=10,
)


[docs]def fetch_text( url: str, *, timeout: float | None = None, encoding: str | None = None, ) -> str: """Request url and read response’s body :param url: requested url :param timeout: maximum time to complete request (seconds) :param encoding: text encoding used for decoding the response's body :return: decoded text from response's body """ async def fetch() -> str: timeout_ = aiohttp.ClientTimeout(total=timeout) async with aiohttp.ClientSession(timeout=timeout_) as session: async with session.get(url, ssl=False) as r: r.raise_for_status() text: str = await r.text(encoding=encoding) return text text = asyncio.run(fetch()) return text
[docs]def fetch_json( url: str, *, timeout: float | None = None, encoding: str | None = None, ) -> list[dict[str, str]]: """Request url and read response’s body as JSON :param url: requested url :param timeout: maximum time to complete request (seconds) :param encoding: text encoding used for decodding the response's body :return: decoded text from response's body as JSON """ text = fetch_text(url, timeout=timeout, encoding=encoding) payload: dict[str, str] | list[dict[str, str]] payload = json.loads(text) if isinstance(payload, dict): return [payload] return payload
@overload def fetcher( urls: list[str], *, encoding: str | None = None, progress: bool = DEFAULT.progress, raise_for_status: bool = DEFAULT.raise_for_status, max_concurrent: int = DEFAULT.max_concurrent, ) -> AsyncIterator[str]: # pragma: no cover ... @overload def fetcher( urls: dict[str, Path], *, encoding: str | None = None, progress: bool = DEFAULT.progress, raise_for_status: bool = DEFAULT.raise_for_status, max_concurrent: int = DEFAULT.max_concurrent, ) -> AsyncIterator[Path]: # pragma: no cover ...
[docs]async def fetcher( urls: list[str] | dict[str, Path], *, encoding: str | None = None, progress: bool = DEFAULT.progress, raise_for_status: bool = DEFAULT.raise_for_status, max_concurrent: int = DEFAULT.max_concurrent, ) -> AsyncIterator[str | Path]: """Request multiple urls and write resquest text into individual paths it a `dict[url, path]` is provided, or return the decoded text from each request if only a `list[url]` is provided. :param urls: requested urls :param encoding: text encoding used for decodding each response's body :param progress: show progress bar :param raise_for_status: Raise exceptions if download links return "bad" HTTP status codes. If False, a :py:func:`warnings.warn` will be issued instead. :param max_concurrent: maximum concurrent requests :return: url text or path to downloaded text, one by one as the requests are completed """ async with aiohttp.ClientSession() as session: semaphore = asyncio.Semaphore(max_concurrent) async def fetch(url: str) -> str: async with semaphore: async with session.get(url, ssl=False) as r: r.raise_for_status() text: str = await r.text(encoding=encoding) return text async def download(url: str, path: Path) -> Path: text = await fetch(url) async with aiofiles.open(str(path), mode="w") as f: await f.write(text) return path jobs: list[Awaitable[str | Path]] if isinstance(urls, dict): jobs = [download(url, path) for url, path in urls.items()] else: jobs = [fetch(url) for url in urls] with tqdm(total=len(jobs), leave=True, disable=not progress) as p_bar: for result in asyncio.as_completed(jobs): p_bar.update(1) try: yield await result except asyncio.CancelledError: continue except aiohttp.ClientResponseError as e: if raise_for_status: raise warnings.warn(str(e), category=RuntimeWarning)
[docs]def fetch_unique_lines( urls: list[str], *, encoding: str | None = None, progress: bool = DEFAULT.progress, raise_for_status: bool = DEFAULT.raise_for_status, max_concurrent: int = DEFAULT.max_concurrent, ) -> set[str]: """Request a list of url and return only the unique lines among all the responses :param urls: requested urls :param encoding: text encoding used for decodding each response's body :param progress: show progress bar :param raise_for_status: Raise exceptions if download links return "bad" HTTP status codes. If False, a :py:func:`warnings.warn` will be issued instead. :param max_concurrent: maximum concurrent requests :return: unique lines among from all the responses """ async def fetch() -> set[str]: lines = set() async for text in fetcher( urls, encoding=encoding, progress=progress, raise_for_status=raise_for_status, max_concurrent=max_concurrent, ): lines.update(text.splitlines()) return lines return asyncio.run(fetch())
[docs]def fetch_to_file( urls: list[str], path: Path, *, encoding: str | None = None, progress: bool = DEFAULT.progress, raise_for_status: bool = DEFAULT.raise_for_status, max_concurrent: int = DEFAULT.max_concurrent, ) -> None: """Request a list of url write out all responses into a single text file :param urls: requested urls :param path: text file for all combined responses :param encoding: text encoding used for decodding each response's body :param progress: show progress bar :param raise_for_status: Raise exceptions if download links return "bad" HTTP status codes. If False, a :py:func:`warnings.warn` will be issued instead. :param max_concurrent: maximum concurrent requests """ async def fetch() -> None: first = True async for text in fetcher( urls, encoding=encoding, progress=progress, raise_for_status=raise_for_status, max_concurrent=max_concurrent, ): if first: # keep header line async with aiofiles.open(str(path), mode="w") as f: await f.write(text) first = False else: # drop the 1st line lines = text.splitlines(keepends=True)[1:] async with aiofiles.open(str(path), mode="a") as f: await f.writelines(lines) asyncio.run(fetch())
[docs]def fetch_to_directory( urls: list[str], root: Path, *, skip_existing: bool = True, encoding: str | None = None, progress: bool = DEFAULT.progress, raise_for_status: bool = DEFAULT.raise_for_status, max_concurrent: int = DEFAULT.max_concurrent, ) -> None: """Request a list of url write each response to different file :param urls: requested urls :param root: directory to write all responses :param bool skip_existing: Do not re-download url if the corresponding file is found in `root` :param encoding: text encoding used for decodding each response's body :param progress: show progress bar :param raise_for_status: Raise exceptions if download links return "bad" HTTP status codes. If False, a :py:func:`warnings.warn` will be issued instead. :param max_concurrent: maximum concurrent requests """ url_paths: dict[str, Path] = {url: root / Path(url).name for url in urls} if skip_existing: url_paths = { url: path for url, path in url_paths.items() if not path.exists() } async def fetch() -> None: async for path in fetcher( url_paths, encoding=encoding, progress=progress, raise_for_status=raise_for_status, max_concurrent=max_concurrent, ): pass asyncio.run(fetch())