Source code for airbase.summary.db

from __future__ import annotations

import sqlite3
import sys
from contextlib import closing, contextmanager
from functools import cached_property
from itertools import chain
from pathlib import Path
from typing import TYPE_CHECKING, Iterator, NamedTuple

if sys.version_info >= (3, 11):  # pragma: no cover
    from importlib import resources
else:  # pragma: no cover
    import importlib_resources as resources

if TYPE_CHECKING:
    from airbase.parquet_api.types import (
        CityJSON,
        CountryJSON,
        PollutantJSON,
    )


[docs]def summary() -> Path: source = resources.files(__package__) / "summary.sqlite" path: Path with resources.as_file(source) as path: return path
[docs]class Pollutant(NamedTuple): notation: str id: int
[docs]class SummaryDB: """ In DB containing the available country and pollutants cached data from https://eeadmz1-downloads-api-appservice.azurewebsites.net/City https://eeadmz1-downloads-api-appservice.azurewebsites.net/Country https://eeadmz1-downloads-api-appservice.azurewebsites.net/Property """ db = sqlite3.connect(f"file:{summary()}?mode=ro", uri=True)
[docs] @classmethod @contextmanager def cursor(cls) -> Iterator[sqlite3.Cursor]: """db cursor as a "self closing" context manager""" with closing(cls.db.cursor()) as cur: yield cur
[docs] def countries(cls) -> list[str]: """ Unique country codes. :return: list of available country codes """ with cls.cursor() as cur: cur.execute("SELECT country_code FROM countries;") return list(row[0] for row in cur.fetchall())
@cached_property def COUNTRY_CODES(self) -> frozenset[str]: """All unique country codes""" return frozenset(self.countries())
[docs] def pollutants(self) -> dict[str, set[int]]: """ Pollutant notations and unique ids. :return: The available pollutants, as a dictionary with with notation as key and IDs as value, e.g. {"NO": {38}, ...} """ with self.cursor() as cur: cur.execute("SELECT pollutant, ids FROM pollutant_ids;") return { pollutant: set(map(int, ids.split(","))) for pollutant, ids in cur.fetchall() }
@cached_property def POLLUTANTS(self) -> frozenset[str]: """All unique pollutant names/notations""" return frozenset(self.pollutants()) @cached_property def POLLUTANT_IDS(self) -> frozenset[int]: """All unique pollutant IDs""" return frozenset(chain.from_iterable(self.pollutants().values()))
[docs] def properties(self, *pollutants: str) -> list[str]: """ Pollutant description URLs https://dd.eionet.europa.eu/vocabulary/aq/pollutant """ if not pollutants: return [] with self.cursor() as cur: cur.execute( f""" SELECT definition_url FROM pollutant WHERE pollutant in ({",".join("?"*len(pollutants))}); """, pollutants, ) return [url for (url,) in cur]
[docs] def search_pollutant( self, query: str, *, limit: int | None = None ) -> Iterator[Pollutant]: """ Search for a pollutant's ID number based on its name. :param query: The pollutant to search for. :param limit: (optional) Max number of results. :return: The best pollutant matches, as tuples of notation and ID, e.g. ("NO", 38) """ with self.cursor() as cur: cur.execute( f""" SELECT pollutant, pollutant_id FROM pollutants WHERE pollutant LIKE ? {f"LIMIT {limit}" if limit else ""}; """, (f"%{query}%",), ) for pollutant, pollutant_id in cur.fetchall(): yield Pollutant(pollutant, pollutant_id)
[docs] def search_pollutants(self, *pollutants: str) -> Iterator[int]: """ Search for a pollutant ID numbers based from exact matches to pollutant names. :param pollutants: The pollutant name(s)/notation(s) to search for. :return: ID(s) corresponding to the name(s)/notation(s), e.g. "NO" --> 38 """ with self.cursor() as cur: cur.execute( f""" SELECT pollutant_id FROM pollutants WHERE pollutant in ({",".join("?"*len(pollutants))}); """, pollutants, ) for row in cur.fetchall(): yield row[0]
[docs] def search_city(self, city: str) -> str | None: """ Search for a country code from city name :param city: City name. :return: country code, e.g. "NO" for "Oslo" """ with self.cursor() as cur: cur.execute( "SELECT country_code FROM city WHERE city_name IS ?;", (city,), ) row: tuple[str] | None = cur.fetchone() return None if row is None else row[0]
[docs] def city_json(self) -> CityJSON: """ simulate a request to https://eeadmz1-downloads-api-appservice.azurewebsites.net/City """ with self.cursor() as cur: cur.execute( "SELECT country_code, city_name FROM city WHERE city_name IS NOT NULL;" ) return [ dict(countryCode=country_code, cityName=city_name) for (country_code, city_name) in cur ]
[docs] def country_json(self) -> CountryJSON: """ simulate a request to https://eeadmz1-downloads-api-appservice.azurewebsites.net/Country """ with self.cursor() as cur: cur.execute("SELECT country_code, country_name FROM country;") return [ dict(countryCode=country_code, countryName=country_name) for (country_code, country_name) in cur ]
[docs] def pollutant_json(self) -> PollutantJSON: """ simulate a request to https://eeadmz1-downloads-api-appservice.azurewebsites.net/Pollutant """ with self.cursor() as cur: cur.execute("SELECT pollutant, definition_url FROM pollutant;") return [ dict(notation=pollutant, id=definition_url) for (pollutant, definition_url) in cur ]
DB = SummaryDB()