This post will step through the basics of dbt
model setup using Snowflake as the data warehouse and NBA game statistics as our sample data.
Data Setup
First, we will use the NBA API to acquire a list of the 2022-2023 season’s games, and the per-player statistics for each game. We’ll then write these to our local disk for later use.
import pandas as pd
from nba_api.stats.endpoints import leaguegamefinder, playergamelogs
games_df = leaguegamefinder.LeagueGameFinder(
player_or_team_abbreviation="T", season_nullable="2022-23", league_id_nullable="00"
).league_game_finder_results.get_data_frame()
pgl_df = pd.concat(
[
playergamelogs.PlayerGameLogs(
season_nullable="2022-23", season_type_nullable=season_type
).player_game_logs.get_data_frame()
for season_type in ["Pre Season", "Regular Season", "Playoffs"]
]
)
# Filter out play-in tournaments etc.
games_df = games_df.loc[games_df["GAME_ID"].isin(pgl_df["GAME_ID"].unique())]
games_df.to_csv("games.csv", index=False)
pgl_df.to_csv("player_game_logs.csv", index=False)
Next, you will need a Snowflake account and access to the snowsql
interface. Once you are logged in, create a database for this work, and define a file format to use when loading our local files.
phildakin#COMPUTE_WH@(no database).(no schema)>create database DBT_SNOWFLAKE_DEMO;
+---------------------------------------------------+
| status |
|---------------------------------------------------|
| Database DBT_SNOWFLAKE_DEMO successfully created. |
+---------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.215s
phildakin#COMPUTE_WH@DBT_SNOWFLAKE_DEMO.PUBLIC>use database DBT_SNOWFLAKE_DEMO;
+----------------------------------+
| status |
|----------------------------------|
| Statement executed successfully. |
+----------------------------------+
1 Row(s) produced. Time Elapsed: 0.085s
phildakin#COMPUTE_WH@DBT_SNOWFLAKE_DEMO.PUBLIC>create file format csv_basic type=csv parse_header=true;
+---------------------------------------------+
| status |
|---------------------------------------------|
| File format CSV_BASIC successfully created. |
+---------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.214s
We’ll use Snowflake’s INFER_SCHEMA
functionality to create our tables to avoid the tedium of writing out our column definitions. To do this, we’ll need to create a new stage, and stage the files we have locally onto Snowflake.
phildakin#COMPUTE_WH@DBT_SNOWFLAKE_DEMO.PUBLIC>create stage dbt_snowflake_demo_stage;
+-----------------------------------------------------------+
| status |
|-----------------------------------------------------------|
| Stage area DBT_SNOWFLAKE_DEMO_STAGE successfully created. |
+-----------------------------------------------------------+
1 Row(s) produced. Time Elapsed: 0.239s
phildakin#COMPUTE_WH@DBT_SNOWFLAKE_DEMO.PUBLIC>put file://player_game_logs.csv @dbt_snowflake_demo_stage;
+----------------------+-------------------------+-------------+-------------+--------------------+--------------------+----------+---------+
| source | target | source_size | target_size | source_compression | target_compression | status | message |
|----------------------+-------------------------+-------------+-------------+--------------------+--------------------+----------+---------|
| player_game_logs.csv | player_game_logs.csv.gz | 10048774 | 2180368 | NONE | GZIP | UPLOADED | |
+----------------------+-------------------------+-------------+-------------+--------------------+--------------------+----------+---------+
1 Row(s) produced. Time Elapsed: 2.454s
phildakin#COMPUTE_WH@DBT_SNOWFLAKE_DEMO.PUBLIC>put file://games.csv @dbt_snowflake_demo_stage;
+-----------+--------------+-------------+-------------+--------------------+--------------------+----------+---------+
| source | target | source_size | target_size | source_compression | target_compression | status | message |
|-----------+--------------+-------------+-------------+--------------------+--------------------+----------+---------|
| games.csv | games.csv.gz | 394195 | 91072 | NONE | GZIP | UPLOADED | |
+-----------+--------------+-------------+-------------+--------------------+--------------------+----------+---------+
1 Row(s) produced. Time Elapsed: 0.886s
Then, create the tables required while inferring the schema from our staged files.
phildakin#COMPUTE_WH@DBT_SNOWFLAKE_DEMO.PUBLIC>CREATE TABLE games
USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
WITHIN GROUP (ORDER BY order_id)
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@dbt_snowflake_demo_stage/games.csv',
FILE_FORMAT=>'csv_basic'
)
));
+-----------------------------------+
| status |
|-----------------------------------|
| Table GAMES successfully created. |
+-----------------------------------+
1 Row(s) produced. Time Elapsed: 1.334s
phildakin#COMPUTE_WH@DBT_SNOWFLAKE_DEMO.PUBLIC>CREATE TABLE player_game_logs
USING TEMPLATE (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
WITHIN GROUP (ORDER BY order_id)
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@dbt_snowflake_demo_stage/player_game_logs.csv',
FILE_FORMAT=>'csv_basic'
)
));
+----------------------------------------------+
| status |
|----------------------------------------------|
| Table PLAYER_GAME_LOGS successfully created. |
+----------------------------------------------+
1 Row(s) produced. Time Elapsed: 4.174s
Finally, copy the staged file data into our tables.
phildakin#COMPUTE_WH@DBT_SNOWFLAKE_DEMO.PUBLIC>copy into player_game_logs from @dbt_snowflake_demo_stage/player_game_logs.csv file_format=(skip_header = 1);
+--------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
| file | status | rows_parsed | rows_loaded | error_limit | errors_seen | first_error | first_error_line | first_error_character | first_error_column_name |
|--------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------|
| dbt_snowflake_demo_stage/player_game_logs.csv.gz | LOADED | 29597 | 29597 | 1 | 0 | NULL | NULL | NULL | NULL |
+--------------------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
1 Row(s) produced. Time Elapsed: 1.883s
phildakin#COMPUTE_WH@DBT_SNOWFLAKE_DEMO.PUBLIC>copy into games from @dbt_snowflake_demo_stage/games.csv file_format=(skip_header = 1);
+---------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
| file | status | rows_parsed | rows_loaded | error_limit | errors_seen | first_error | first_error_line | first_error_character | first_error_column_name |
|---------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------|
| dbt_snowflake_demo_stage/games.csv.gz | LOADED | 2768 | 2768 | 1 | 0 | NULL | NULL | NULL | NULL |
+---------------------------------------+--------+-------------+-------------+-------------+-------------+-------------+------------------+-----------------------+-------------------------+
1 Row(s) produced. Time Elapsed: 0.810s
Defining the Models
First, let’s define a basic model that provides us the average points for each player, indexed by opponent. This will allow us to find the player’s most advantageous opponent matchup. Note that we will need to use a join between the games
and player_game_logs
table to get a consistent naming scheme for our opponents. We can use the following query to compute this:
select player_name, team_name as opponent, avg(pts) as avg_pts
from (
select player_game_logs.player_name, player_game_logs.pts, player_game_logs.matchup, games.team_name
from games inner join player_game_logs
where games.game_id = player_game_logs.game_id and games.team_id != player_game_logs.team_id
)
group by player_name, team_name order by player_name;
Once this query is defined, we can use the dbt
CLI to initialize a new dbt
project via dbt init
. This command will also prompt you to set up the profiles.yml
file used to authenticate with Snowflake. Once this is complete, verify that your connection is working via the dbt debug
command. Then, in the models/
directory, create a new file opponent_point_avgs.sql
and paste in our query with a dbt
materialization configuration:
{{ config(materialized='table') }}
select player_name, team_name as opponent, avg(pts) as avg_pts
from (
select player_game_logs.player_name, player_game_logs.pts, player_game_logs.matchup, games.team_name
from games inner join player_game_logs
where games.game_id = player_game_logs.game_id and games.team_id != player_game_logs.team_id
)
group by player_name, team_name order by player_name
Invoking dbt run
will build this mode:
(dbt-env) 23-09-26 15:14:57 /Users/Phil/Documents/Chew/DbtPlay/dbt_snowflake_demo: dbt run
19:15:49 Running with dbt=1.6.3
19:15:49 Registered adapter: snowflake=1.6.3
19:15:49 Unable to do partial parsing because a project config has changed
19:15:50 Found 1 model, 0 sources, 0 exposures, 0 metrics, 372 macros, 0 groups, 0 semantic models
19:15:50
19:15:53 Concurrency: 1 threads (target='dev')
19:15:53
19:15:53 1 of 1 START sql table model PUBLIC.opponent_point_avgs ........................ [RUN]
19:15:55 1 of 1 OK created sql table model PUBLIC.opponent_point_avgs ................... [SUCCESS 1 in 1.52s]
19:15:55
19:15:55 Finished running 1 table model in 0 hours 0 minutes and 4.97 seconds (4.97s).
19:15:55
19:15:55 Completed successfully
19:15:55
19:15:55 Done. PASS=1 WARN=0 ERROR=0 SKIP=0 TOTAL=1
We’ll now see that there is a table available in Snowflake with the model data:
phildakin#COMPUTE_WH@DBT_SNOWFLAKE_DEMO.PUBLIC>select count(*) from opponent_point_avgs;
+----------+
| COUNT(*) |
|----------|
| 12698 |
+----------+
1 Row(s) produced. Time Elapsed: 0.161s
Try switching the materialized
configuration to 'view'
. After building the model with dbt run
, you’ll see that the old table is deleted, and a new view has been created for the model.
Now, let’s create another model using the ref
function to reference our original model. Create a new model player_best_opponents.sql
that queries the best result from our previous model, referencing the previous model using the ref
function:
{{ config(materialized='view') }}
select player_name, opponent, avg_pts as pts from {{ref('opponent_point_avgs')}}
qualify row_number() over(partition by player_name order by avg_pts DESC) <= 1
Running dbt run
here will produce another view, dependent on the previous model.
Generating Schema Files and Tests
The dbt
package can also be used to run assertions on data quality as models are built. To do this, we’ll need to construct a schema file for our models. We’ll use the dbt-codegen package. Once installed, we can generate our schema files using dbt run-operation generate_model_yaml --args '{"model_names": ["opponent_point_avgs", "player_best_opponents"]}'
:
models:
- name: opponent_point_avgs
description: ""
columns:
- name: player_name
data_type: varchar
description: ""
- name: opponent
data_type: varchar
description: ""
- name: avg_pts
data_type: number
description: ""
- name: player_best_opponents
description: ""
columns:
- name: player_name
data_type: varchar
description: ""
- name: opponent
data_type: varchar
description: ""
- name: pts
data_type: number
description: ""
Add this to a models.yml
file in the models/
directory. Once complete, built-in test unique
can be applied to the player_name
column in the player_best_opponents
model in order to confirm that each player has been assigned only one best opponent in the final view.