Source code for contessa.rules

import logging
from itertools import islice

from contessa.base_rules import Rule
from contessa.db import Connector
from contessa.executor import get_executor, SqlExecutor
from contessa.failed_examples import ExampleSelector, default_example_selector
from contessa.utils import AggregatedResult, render_jinja_sql


class SqlRule(Rule):
    """
    Rule that executes a custom sql that is custom written.
    It can use context from Executor.
    """

    executor_cls = SqlExecutor
    only_failures_mode = False

    def get_sql_parameters(self):
        e = get_executor(self.__class__)
        return e.context

    @property
    def sql(self):
        """
        SQL query to perform on column.
        Can use context from Executor.
        """
        return f""

    def render_sql(self, sql):
        """
        Replace some parameters in query.
        :return str, formatted sql
        """
        ctx = self.get_sql_parameters()
        rendered = render_jinja_sql(sql, ctx)
        return rendered

    @property
    def sql_with_where(self):
        """
        Adds `where` statement with time filter and/or user-defined condition to SQL statement.
        Could be tricky, you need to format your SQL query so WHERE statement fits to the end of it
        :return:
        """
        e = get_executor(SqlRule)
        where_clause = "WHERE "
        where_time_filter = e.compose_where_time_filter(self)
        where_condition = e.compose_where_condition(self)
        if where_time_filter == "" and where_condition == "":
            where_clause = ""
        elif where_time_filter != "" and where_condition != "":
            where_clause = f"{where_clause} {where_time_filter} AND {where_condition}"
        else:
            where_clause = f"{where_clause} {where_time_filter} {where_condition}"
        final_sql = f"{self.sql} {where_clause}"
        return self.render_sql(final_sql)

    def apply(
        self,
        conn: Connector,
        example_selector: ExampleSelector = default_example_selector,
    ):
        """
        Execute a formatted sql. Check if it returns column full of booleans representing validity that is needed
        to do a quality check. If yes, stream results and return aggregated results
        :return: AggregatedResult
        """
        sql = self.sql_with_where
        logging.debug(sql)

        failed = passed = total = 0
        failed_rows = set()

        with conn.engine.connect() as con:
            result = con.execution_options(stream_results=True).execute(sql)
            for row in result:
                if self.only_failures_mode:
                    failed += 1
                    failed_rows.add(tuple(row))
                else:
                    if not isinstance(row[0], bool) and not row[0] is None:
                        raise ValueError(
                            f"Your query for rule `{self.name}` of type `{self.type}` does not return list of booleans in column `valid`."
                        )
                    total += 1
                    if row[0] is True:
                        passed += 1
                    if row[0] is False:
                        failed += 1
                        failed_rows.add(tuple(islice(row.values(), 1, None)))

        failed_examples = example_selector.select_examples(failed_rows)

        return AggregatedResult(
            total_records=0 if self.only_failures_mode else total,
            failed=failed,
            passed=passed,
            failed_example=list(failed_examples),
        )


class OneColumnRuleSQL(SqlRule):
    def __init__(
        self, name, type, column, description, only_failures_mode=False, **kwargs
    ):
        if description == "" or description is None:
            raise TypeError("Description is mandatory")
        super().__init__(name, type, description=description, **kwargs)
        self.column = column
        self.only_failures_mode = only_failures_mode

    @property
    def attribute(self):
        return self.column

    def get_sql_parameters(self):
        context = super().get_sql_parameters()
        context.update({"target_column": self.column})
        if hasattr(self, "value"):
            context.update({"value": self.value})
        return context

    def __str__(self):
        tf = f"- {self.time_filter}" or ""
        return f"Rule {self.name} - {self.type} - {self.attribute} {tf}"


[docs]class CustomSqlRule(OneColumnRuleSQL): def __init__(self, name, type, column, sql, description, **kwargs): super().__init__(name, type, column, description, **kwargs) self.custom_sql = sql @property def sql(self): return self.custom_sql
[docs]class NotNullRule(OneColumnRuleSQL): def __init__( self, name, type, column, description="True when data is null.", **kwargs ): super().__init__(name, type, column, description=description, **kwargs) @property def sql(self): return """ SELECT {{target_column}} IS NOT NULL, {{target_column}} FROM {{table_fullname}} """
[docs]class GtRule(OneColumnRuleSQL): def __init__( self, name, type, column, value, description="True when data is greater than input value.", **kwargs, ): super().__init__(name, type, column, description=description, **kwargs) self.value = value @property def sql(self): return """ SELECT {{target_column}} > {{value}}, {{target_column}} FROM {{table_fullname}} """
[docs]class GteRule(OneColumnRuleSQL): def __init__( self, name, type, column, value, description="True when data is greater or even than input value.", **kwargs, ): super().__init__(name, type, column, description=description, **kwargs) self.value = value @property def sql(self): return """ SELECT {{target_column}} >= {{value}}, {{target_column}} FROM {{table_fullname}} """
[docs]class NotRule(OneColumnRuleSQL): def __init__( self, name, type, column, value, description="True when data is not input value.", **kwargs, ): super().__init__(name, type, column, description=description, **kwargs) self.value = value @property def sql(self): return """ SELECT {{target_column}} is distinct from {{value}}, {{target_column}} FROM {{table_fullname}} """
[docs]class LtRule(OneColumnRuleSQL): def __init__( self, name, type, column, value, description="True when data is less than input value.", **kwargs, ): super().__init__(name, type, column, description=description, **kwargs) self.value = value @property def sql(self): return """ SELECT {{target_column}} < {{value}}, {{target_column}} FROM {{table_fullname}} """
[docs]class LteRule(OneColumnRuleSQL): def __init__( self, name, type, column, value, description="True when data is less or even than input value.", **kwargs, ): super().__init__(name, type, column, description=description, **kwargs) self.value = value @property def sql(self): return """ SELECT {{target_column}} <= {{value}}, {{target_column}} FROM {{table_fullname}} """
[docs]class EqRule(OneColumnRuleSQL): def __init__( self, name, type, column, value, description="True when data is the same as input value.", **kwargs, ): super().__init__(name, type, column, description=description, **kwargs) self.value = value @property def sql(self): return """ SELECT {{target_column}} IS NOT DISTINCT FROM {{value}}, {{target_column}} FROM {{table_fullname}} """
NOT_NULL = "not_null" NOT_COLUMN = "not_column" GT = "gt" GTE = "gte" NOT = "not" SQL = "sql" LT = "lt" LTE = "lte" EQ = "eq" RULES = { NOT_NULL: NotNullRule, GT: GtRule, GTE: GteRule, NOT: NotRule, SQL: CustomSqlRule, LT: LtRule, LTE: LteRule, EQ: EqRule, } def get_rule_cls(key): aval_rules = RULES.keys() if key not in aval_rules: raise ValueError( f"I dont know this kind of rule - '{key}'. " f"Possible rules are - {list(aval_rules)}" ) return RULES[key]