Getting Started with Great Expectations - Part 1

image

Introduction

A consistent unsolved problem for many data organizations is poor data quality. Outages due to poor data, also known as data downtime, cost organizations millions per year. Despite the high impact, observability tools for managing data quality are immature compared with similar tools oriented towards handling generic software outages. Over the last few years, a wide variety of data quality management products have hit the market to address this issue. A leading solution for validating data is the open-source library Great Expectations (GX), which is backed by a company with the same name. The library has over 9,000 stars on GitHub, and the company has raised $65 million to support its development of “GX Cloud”, a SaaS offering providing a hosted version of the GX core open-source library.

This article is the first part of a two-part series covering some of the basics of Great Expectations. This piece walks through the code for using some basic abstractions found in the core library, including basic validation, persistent contexts, and checkpoint.

Great Expectations Core - Features

Basic Validation API

Let’s use some NBA game data for this walkthrough. You can gather this data using the nba_api Python package:

from nba_api.stats.endpoints import (
    playergamelogs,
)
import pandas as pd

data = pd.concat(
    [
        playergamelogs.PlayerGameLogs(
            season_nullable="2022-23", season_type_nullable=season_type
        ).player_game_logs.get_data_frame()
        for season_type in ["Pre Season", "Regular Season", "Playoffs"]
    ],
	ignore_index=True
)

Before we run any validation on the underlying data, we’ll need to wrap our pandas object using from_pandas:

import great_expectations as gx

gx_data = gx.from_pandas(data)

type(gx_data) # great_expectations.dataset.pandas_dataset.PandasDataset

Now, let’s assert some characteristics of the data. First, let’s confirm that each (player, game) pair shows up only once:

gx_data.expect_compound_columns_to_be_unique(["PLAYER_ID", "GAME_ID"])

This check, like all checks, will return a dictionary supplying a variety of statistics in addition to the test result:

{
  "success": true,
  "result": {
    "element_count": 29597,
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0,
    "partial_unexpected_list": []
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

We can get a better understanding of these statistics by seeing an invalid assertion. Let’s assume that no player in the NBA could have possibly scored more than 55 points in a game:

gx_data.expect_column_values_to_be_between("PTS", 0, 55)

The result shows us that our assumption is incorrect:

{
  "success": false,
  "result": {
    "element_count": 29597,
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_count": 8,
    "unexpected_percent": 0.027029766530391592,
    "unexpected_percent_total": 0.027029766530391592,
    "unexpected_percent_nonmissing": 0.027029766530391592,
    "partial_unexpected_list": [
      57,
      71,
      60,
      71,
      60,
      58,
      59,
      56
    ]
  },
  "meta": {},
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}

We see that GX provides some summary statistics on the failure proportions, and even includes some of the unexpected values. But, what if I actually want to locate the individual game records? GX provides the ability to extend the outputted result via the result_format argument, enabling us to acquire the indices of bad rows:

result = gx_data.expect_column_values_to_be_between("PTS", 0, 55, result_format={
    "result_format": "COMPLETE",
    "unexpected_index_column_names": ["PLAYER_ID", "GAME_ID"],
})

records = gx_data.loc[result.to_json_dict()["result"]["unexpected_index_list"], ["PLAYER_NAME", "MATCHUP", "PTS"]]

Ah, it seems we’ve forgotten Jimmy Butler’s career-high 56 point game against the Bucks in our expectations. Good thing this was flagged by the framework!

Next, let’s take a look at how to formalize this a bit more using the context interface.

Contexts

The examples in the previous section illustrate the core functionality of GX. Contexts wrap this core functionality with scaffolding that allows us to operate at a higher level of abstraction, providing features like registered external data stores, reusable suites of expectations, batching data, and automatically constructing data documentation.

First, let’s create a context. If we inspect the active context from our previous examples, we’ll see that the type is EphemeralDataContext - this is a temporary context that is eliminated at the end of our session. What we’d like to do is create a filesystem-based persisted context, that we can use across Python invocations:

type(gx.get_context()) # great_expectations.data_context.data_context.ephemeral_data_context.EphemeralDataContext

persisted_context = FileDataContext.create(project_root_dir="/Users/Phil/Documents/Chew/great_expectations_article/ge")

We’ll see that this instantiates some information on our local filesystem:

(env) 23-11-14 17:33:29 /Users/Phil/Documents/Chew/great_expectations_article/ge: tree gx
gx
├── checkpoints
├── expectations
├── great_expectations.yml
├── plugins
│   └── custom_data_docs
│       ├── renderers
│       ├── styles
│       │   └── data_docs_custom_styles.css
│       └── views
├── profilers
└── uncommitted
    ├── config_variables.yml
    ├── data_docs
    └── validations

This information is a persisted store of our GX setup.

To actually use our context, we’ll need to add a data store, then use a validator to construct expectations. In this code sample, our expectation is evaluated by the validator, and also persisted to the expectation store for later use:

source = persisted_context.sources.add_pandas(name="data")
asset = source.add_dataframe_asset(name="data")
batch_request = asset.build_batch_request(dataframe=data)

persisted_context.add_or_update_expectation_suite("suite")
validator = persisted_context.get_validator(batch_request=batch_request, expectation_suite_name="suite")

validator.expect_column_values_to_be_between("PTS", 0, 55, result_format={
    "result_format": "COMPLETE",
    "unexpected_index_column_names": ["PLAYER_ID", "GAME_ID"],
})

# This step is important, it allows the suite to persist across sessions.
validator.save_expectation_suite(discard_failed_expectations=False)

Note that here we are using an in-memory pandas data store, but GX also provides data store implementations that allow for pulling data from the filesystem or using a SQL connection. Since the expectation is now stored in our context, we can retrieve this suite and run it without configuring the expectation type again:

read_context = FileDataContext(project_root_dir="/Users/Phil/Documents/Chew/great_expectations_article/ge")
suite = read_context.get_expectation_suite("suite")

suite.show_expectations_by_expectation_type()
# [ { 'expect_column_values_to_be_between': { 'column': 'PTS',
#                                             'domain': 'column',
#                                             'max_value': 55,
#                                             'min_value': 0}}]

source = read_context.get_datasource("data")
asset = source.add_dataframe_asset("data")
batch_request = asset.build_batch_request(dataframe=data)
validator = read_context.get_validator(batch_request=batch_request, expectation_suite_name="suite")

# Will run our expectations set in the previous session.
validator.validate()

Here we see that our context allows us to persist the state of our execution suite, and utilize it later. In this example, we access the validator directly. But, GX also provides a higher-level abstraction called a checkpoint, which is the appropriate abstraction to use in production.

Checkpoints

A checkpoint combines the validation of batches against an expectation suite, with an optional action to take at the end. We are able to save checkpoints similarly to the way we saved the validator:

checkpoint = read_context.add_or_update_checkpoint(
    name="my_checkpoint",
    validations=[
        {
            "expectation_suite_name": "suite",
        },
    ],
)

checkpoint = read_context.get_checkpoint(name="my_checkpoint")

checkpoint.run(batch_request=batch_request)

Conclusion

Great Expectations can provide a shortcut to Python-based data validation that allows your organization to avoid rewriting a lot of boilerplate validation code. By configuring and persisting suite configurations, users can create an easily extensible and interpretable testing framework for their data assets. In the next part of this series, we will take a closer look at more of the GX ecosystem including - configuring external data stores, constructing expectations using the data assistant, execution engines, batching, and data documentation. The second part of the series will also cover the company’s cloud offering, GX Cloud.