Getting Started with Great Expectations - Part 2

image

Introduction

Data-driven organizations are reliant on the quality of their underlying data to succeed. One tool for ensuring the quality of data is Great Expectations (GX), an open-source library which is backed by a company with the same name, that has raised $65 million to date. In my previous post on GX, I covered many of the library’s basic primitives. This article serves as a follow-up, covering additional topics in the GX ecosystem - external data stores, execution engines, the data assistant, batching, data documentation, orchestrator integrations, and a brief overview of the company’s SaaS offering “GX Cloud”.

Great Expectations - Features

All examples will use the same data used in the first part of the series:

from nba_api.stats.endpoints import (
    playergamelogs,
)
import pandas as pd
from great_expectations.data_context import FileDataContext

data = pd.concat(
    [
        playergamelogs.PlayerGameLogs(
            season_nullable="2022-23", season_type_nullable=season_type
        ).player_game_logs.get_data_frame()
        for season_type in ["Pre Season", "Regular Season", "Playoffs"]
    ],
    ignore_index=True
)

context = FileDataContext.create(project_root_dir="/Users/Phil/Documents/Chew/great_expectations_article/ge")

In places where a database is required, we assume data has been loaded into a Snowflake deployment with the table name player_game_logs.

External Data Stores and Execution Engines

GX is able to connect to data assets stored locally on the filesystem, in-memory Spark/pandas data frames, and to SQL-compatible databases. SQL connections are done using SQLAlchemy, so we will need to pip install snowflake-sqlalchemy. Once the library is available, we can connect to a database like so:

connection = "snowflake://<USER_NAME>:<PASSWORD>@<ACCOUNT_NAME>/<DATABASE_NAME>/<SCHEMA_NAME>?warehouse=<WAREHOUSE_NAME>&role=<ROLE_NAME>&application=great_expectations_oss"
snowflake_source = context.sources.add_sql(name="snowflake", connection_string=connection)

Now that the source is available, we can register the table by name using add_table_asset, then proceed with the standard validation path for the asset:

asset = snowflake_source.add_table_asset("PLAYER_GAME_LOGS", "PLAYER_GAME_LOGS")
batch_request = asset.build_batch_request()

context.add_or_update_expectation_suite("suite")
validator = context.get_validator(batch_request=batch_request, expectation_suite_name="suite")

validator.expect_column_values_to_be_between("PTS", 0, 55, result_format={
    "result_format": "COMPLETE",
    "unexpected_index_column_names": ["PLAYER_ID", "GAME_ID"],
})

Note that every data source is associated with an execution engine that determines where computation for a given validation is performed. For our Snowflake data source, GX will use a SqlAlchemyExecutionEngine that applies validations as queries on the database itself.

Using the Data Assistant

It is not always easy to determine the exact validation checks necessary for a given data asset. To assist with this, GX provides the DataAssistant, a tool for initializing expectations based on the current characteristics of a data asset. This onboarding flow is easily accessible via the context:

result = context.assistants.onboarding.run(batch_request=batch_request)

The assistant queries all columns in your data to determine a set of default expectations for each column. Examining result shows the list of expectations applied automatically by the assistant:

[expectation["expectation_type"] for expectation in result.get_expectation_suite("suite").to_json_dict()["expectations"]]
# ['expect_table_row_count_to_be_between',
#  'expect_table_columns_to_match_set',
#  'expect_column_values_to_not_be_null',
#  'expect_column_min_to_be_between',
#  'expect_column_max_to_be_between',
#  'expect_column_values_to_be_between',
#  'expect_column_quantile_values_to_be_between',
#  'expect_column_median_to_be_between',
#  'expect_column_mean_to_be_between',
#  'expect_column_stdev_to_be_between',
#  'expect_column_values_to_be_in_set',
#  'expect_column_unique_value_count_to_be_between',
#  'expect_column_proportion_of_unique_values_to_be_between']

Each of these expectations is configured with specific parameters based on the original dataset. For instance, if we inspect the parameters for expect_column_median_to_be_between, we see that the current median of 8 is enforced:

{'expectation_type': 'expect_column_median_to_be_between',
 'kwargs': {'strict_min': False,
  'strict_max': False,
  'column': 'PTS',
  'max_value': 8,
  'min_value': 8},
 'meta': {'profiler_details': {'metric_configuration': {'metric_name': 'column.median',
    'domain_kwargs': {'column': 'PTS'},
    'metric_value_kwargs': None},
   'num_batches': 1}}}

These data assistant rules are strict, but can serve as a good starting point for basic checks.

Batching

Batches are the fundamental unit of data for validation in GX. All batches are mutually exclusive, and collectively they make up an entire data asset. In previous examples, you may have noticed that data is always submitted via a batch_request argument. When a BatchRequest is created, it stores information necessary for fetching the correct data at validation time. When defining a data asset, users can specify different ways to batch the data using a “splitter”. For example, we can take our data and organize it by year, month, and day, such that each batch contains the records for a given date. Once a splitter is added, the splitter arguments become viewable in the batch_request_options member of the asset.

asset.add_splitter_year_and_month_and_day("game_date")

asset.batch_request_options
# ('year', 'month', 'day')

Now, we can use these options to acquire and validate specific batches of the data. Suppose we are validating newly added data for our pipeline on 2022-10-14. Then, we can fetch only the batch containing this day’s data:

br = asset.build_batch_request(options={"year": 2022, "month": 10, "day": 14})

Batching is useful when you want to provide greater specificity on which data to validate, such as when you only want to validate the most recently added data to minimize validation costs.

Generating Data Documentation

Another useful feature of Great Expectations is the ability to generate HTML files providing a readable overview of your data testing environment. First, let’s create a new suite, add a validation to it, run the validation with a checkpoint, persist the validation, and update the data documentation using our context object. Note here we are using the action_list for the first time - this is a sequence of actions that the checkpoint will perform after validations.

context.add_or_update_expectation_suite("suite")
validator = context.get_validator(batch_request=batch_request, expectation_suite_name="suite")

vresult = validator.expect_column_values_to_be_between("PTS", 0, 55, result_format={
    "result_format": "COMPLETE",
    "unexpected_index_column_names": ["PLAYER_ID", "GAME_ID"],
})

validator.save_expectation_suite(discard_failed_expectations=False)

# Need to use checkpoints in order to persist a validation result.
checkpoint = context.add_or_update_checkpoint(
    name="my_checkpoint",
    validations=[
        {
            "expectation_suite_name": "suite",
        },
    ],
    action_list=[
        {
            "name": "action",
            "action": {
                "class_name": "StoreValidationResultAction"
            }
        }
    ]
)
checkpoint.run(batch_request=batch_request)

context.build_data_docs()
# file:///Users/Phil/Documents/Chew/great_expectations_article/ge2/gx/uncommitted/data_docs/local_site/expectations/suite.html

Now, we have a generated HTML file that allows us to easily navigate our validation results and defined suites:

image
image

Orchestrator Integrations

GX also supports a variety of commonly used orchestrators including Airflow and Prefect. For example, one can instantiate an Airflow operator that invokes a GX suite like so:

validator = GreatExpectationsOperator(
        task_id="validate",
        conn_id=...,
        data_context_root_dir=...,
        schema=...,
        data_asset_name=...,
        expectation_suite_name=...,
)

GX Cloud

GX makes money by selling a managed environment for the GX library called GX Cloud. The primary features offered by GX Cloud are an enhanced UI and hosted backend storage. The enhanced UI offers a variety of interactive functionality, including the ability to configure expectations/data sources using a visual interface instead of code, and the ability to trigger validation runs from the UI. Hosted backend storage means storing all of the data underlying your GX deployment such as validation results, suite definitions, etc. Lastly, GX Cloud also offers a user abstraction with a variety of role-level access controls.

Conclusion

Data quality continues to be an important issue faced by many organizations. In this second part of this series on Great Expectations, we took a look at a variety of features that are critical for creating quality production deployments of GX. Though they are not relevant for basic local validation, things like external data stores, execution engines, the data assistant, batching, data documentation, orchestrator integrations are fundamental parts of utilizing GX in a production setting. If managing these aspects of GX proves to be too much overhead for your team, the GX Cloud offering is available as a way to ease this trouble.