Source code for intake_sql.intake_sql

from intake.source import base
from . import __version__


[docs]class SQLSource(base.DataSource): """ One-shot SQL to dataframe reader (no partitioning) Caches entire dataframe in memory. Parameters ---------- uri: str or None Full connection string in sqlalchemy syntax sql_expr: str Query expression to pass to the DB backend sql_kwargs: dict Further arguments to pass to pandas.read_sql """ name = 'sql' version = __version__ container = 'dataframe' partition_access = True def __init__(self, uri, sql_expr, sql_kwargs={}, metadata={}): self._init_args = { 'uri': uri, 'sql_expr': sql_expr, 'sql_kwargs': sql_kwargs, 'metadata': metadata, } self._uri = uri self._sql_expr = sql_expr self._sql_kwargs = sql_kwargs self._dataframe = None super(SQLSource, self).__init__(metadata=metadata) def _load(self): import pandas as pd self._dataframe = pd.read_sql(self._sql_expr, self._uri, **self._sql_kwargs) def _get_schema(self): if self._dataframe is None: # TODO: could do read_sql with chunksize to get likely schema from # first few records, rather than loading the whole thing self._load() return base.Schema(datashape=None, dtype=self._dataframe.dtypes, shape=self._dataframe.shape, npartitions=1, extra_metadata={}) def _get_partition(self, _): if self._dataframe is None: self._load_metadata() return self._dataframe
[docs] def read(self): return self._get_partition(None)
def _close(self): self._dataframe = None
[docs]class SQLSourceAutoPartition(base.DataSource): """ SQL table reader with automatic partitioning Only reads existing tables, not arbitrary SQL expressions. For partitioning, require to provide the column to be used, which should be indexed in the database. Can then provide list of boundaries, number of partitions or target partition size; see dask.dataframe.read_sql_table and examples for a list of possibilities. Parameters ---------- uri: str or None Full connection string in sqlalchemy syntax table: str Table to read index: str Column to use for partitioning and as the index of the resulting dataframe sql_kwargs: dict Further arguments to pass to dask.dataframe.read_sql """ name = 'sql_auto' version = __version__ container = 'dataframe' partition_access = True def __init__(self, uri, table, index, sql_kwargs={}, metadata={}): self._init_args = { 'uri': uri, 'sql_expr': table, 'index': index, 'sql_kwargs': sql_kwargs, 'metadata': metadata, } self._uri = uri self._sql_expr = table self._sql_kwargs = sql_kwargs self._index = index self._dataframe = None super(SQLSourceAutoPartition, self).__init__(metadata=metadata) def _load(self): import dask.dataframe as dd self._dataframe = dd.read_sql_table(self._sql_expr, self._uri, self._index, **self._sql_kwargs) def _get_schema(self): if self._dataframe is None: self._load() return base.Schema(datashape=None, dtype=self._dataframe, shape=(None, len(self._dataframe.columns)), npartitions=self._dataframe.npartitions, extra_metadata={}) def _get_partition(self, i): if self._dataframe is None: self._load_metadata() return self._dataframe.get_partition(i).compute()
[docs] def to_dask(self): self._get_schema() return self._dataframe
[docs] def read(self): self._get_schema() return self._dataframe.compute()
def _close(self): self._dataframe = None
[docs]class SQLSourceManualPartition(base.DataSource): """ SQL expression reader with explicit partitioning Reads any arbitrary SQL expressions into pa5titioned data-frame, but requires a full specification of the boundaries. The boundaries are specified as either a set of strings with `WHERE` clauses to be applied to the main SQL expression, or a string to be formatted with a set of values to produce the comlete SQL expressions. Note, if not supplying a `meta` argument, dask will load the first partition in order to determine the schema. If some of the partitions are empty, loading without a meta will likely fail. Parameters ---------- uri: str or None Full connection string in sqlalchemy syntax sql_expr: str SQL expression to evaluate where_values: list of str or list of values/tuples Either a set of explicit partitioning statements (e.g., `"WHERE index_col < 50"`...) or pairs of valued to be entered into where_template, if using where_template: str (optional) Template for generating partition selection clauses, using the values from where_values, e.g., `"WHERE index_col >= {} AND index_col < {}"` sql_kwargs: dict Further arguments to pass to pd.read_sql_query """ name = 'sql_manual' version = __version__ container = 'dataframe' partition_access = True def __init__(self, uri, sql_expr, where_values, where_template=None, sql_kwargs={}, metadata={}): self._init_args = { 'uri': uri, 'sql_expr': sql_expr, 'where': where_values, 'where_tmp': where_template, 'sql_kwargs': sql_kwargs, 'metadata': metadata, } self._uri = uri self._sql_expr = sql_expr # TODO: may check for table and expand to # "SELECT * FROM {table}" self._sql_kwargs = sql_kwargs self._where = where_values self._where_tmp = where_template self._dataframe = None self._meta = self._sql_kwargs.pop('meta', None) super(SQLSourceManualPartition, self).__init__(metadata=metadata) def _load(self): self._dataframe = read_sql_query(self._uri, self._sql_expr, self._where, where_tmp=self._where_tmp, meta=self._meta, kwargs=self._sql_kwargs) def _get_schema(self): if self._dataframe is None: self._load() return base.Schema(datashape=None, dtype=self._dataframe, shape=(None, len(self._dataframe.columns)), npartitions=self._dataframe.npartitions, extra_metadata={}) def _get_partition(self, i): if self._dataframe is None: self._load_metadata() return self._dataframe.get_partition(i).compute()
[docs] def to_dask(self): self._get_schema() return self._dataframe
[docs] def read(self): self._get_schema() return self._dataframe.compute()
def _close(self): self._dataframe = None
def load_part(sql, engine, where, kwargs, meta=None): import pandas as pd sql = sql + ' ' + where df = pd.read_sql(sql, engine, **kwargs) if meta is not None: if df.empty: df = meta else: df = df.astype(meta.dtypes.to_dict(), copy=False) return df def read_sql_query(uri, sql, where, where_tmp=None, meta=None, kwargs=None): """ Create a dask dataframe from SQL using explicit partitioning Parameters ---------- uri: str connection string (sql sqlalchemy documentation) sql: str SQL query to execute where: list of str or list of tuple Either a set of explicit partitioning statements (e.g., `"WHERE index_col < 50"`...) or pairs of valued to be entered into where_template, if using where_tmp: str (optional) Template for generating partition selection clauses, using the values from where_values, e.g., `"WHERE index_col >= {} AND index_col < {}"` meta: dataframe metadata (optional) If given, a zero-length version of the dataframe structure, with index and column names and types correctly specified. Can also be the same information in dictionary or tuple of tuples format kwargs: dict Any further parameters to pass to pd.read_sql_query, see its documentation """ import dask import dask.dataframe as dd if where_tmp is not None: where = [where_tmp.format(values) for values in where] if kwargs is None: kwargs = {} dload = dask.delayed(load_part) parts = [dload(sql, uri, w, kwargs) for w in where] return dd.from_delayed(parts, meta=meta)