Building ELT pipeline with Snowflake, dbt, and Tableau

Vinh Hung le vinhhle.github.io
04/21/2023

Overview

About the project

In this project, let’s assume that you are working for an e-commerce company called Olist. One day, the CEO asks you to create a denormalized table and build a revenue dashboard to show order information in 2017.

There are 8 different datasets in total, which contain information of around 100,000 orders between 2016 and 2018 from multiple marketplaces in Brazil. They are linked to each other as shown in the schema below:

The goal of this project is to build an ELT pipeline to extract data from source databases, load in to the data warehouse for transformation, and then create a dashboard for descriptive analysis.

Tech stacks: Google Cloud Storage, Snowflake, dbt, Tableau

ELT design

The ELT pipeline is shown as the below chart:

As can be seen, raw data come from different sources, including internal databases, SAP, Salesforce (for CRM data), and Stripe (for payment data). The first step is to extract data from these sources and then load into the data warehouse for storage and later uses. This can be done simply with SQL scripts or managed services like Stitch or Fivetran

In this project, I use Google Cloud Storage as a data lake, to replicate the source database. In simple words, a data lake is a central location where a company can put all of its raw/unstructured/unprocessed data and store them for later uses.

Next, I’ll write SQL scripts to build a Snowflake data warehouse and database to load/store the collected datasets. Depending on the business requirements, we’ll run some SQL queries to transform the raw data into usable tables in the warehosue, which are prepared for end users like busines analysts or data scientists.

But instead of writing SQL queries on Snowflake editor, I’ll use a tool called dbt, which provides us an easy way to create, transform, and validate raw data inside a data warehouse. The final table called order_customer_2017 can be used for answering some ad-hoc questions and building the revenue dashboard on Tableau.

Extract and load data to Google Cloud Storage

The first step in the pipeline is to extract data from different sources. Since this is not the main forcus, I skip this step and download datasets directly from Kaggle. To replicate the source databases, I create a bucket in Google Cloud Storage called olist_datalake and load 8 datasets into this space. We end up with the followings:

Load data to Snowflake data warehouse

Set up default data warehouse and database

--Use an admin role
USE ROLE accountadmin;

--Create a role called 'analyst'
CREATE ROLE IF NOT EXISTS analyst;
GRANT ROLE analyst TO ROLE accountadmin;

--Create the default data warehouse 
CREATE WAREHOUSE IF NOT EXISTS olist_wh;
GRANT operate ON WAREHOUSE olist_wh TO ROLE analyst;

--Create a user called 'vinh' and assign to role
CREATE USER IF NOT EXISTS vinh
  PASSWORD ='VinhLe111'
  LOGIN_NAME='vinh'
  MUST_CHANGE_PASSWORD=FALSE
  DEFAULT_WAREHOUSE='olist_wh'
  DEFAULT_ROLE='analyst'
  DEFAULT_NAMESPACE='olist_raw'
  COMMENT='Use dbt to transform data in Snowflake warehouse';
GRANT ROLE analyst to USER vinh;

--Create the olist database and olist_raw schema 
CREATE DATABASE IF NOT EXISTS olist;
CREATE SCHEMA IF NOT EXISTS olist_raw;

--Set up permissions to role 'analyst'
GRANT ALL ON WAREHOUSE olist_wh TO ROLE analyst; 
GRANT ALL ON DATABASE olist to ROLE analyst;
GRANT ALL ON ALL SCHEMAS IN DATABASE olist to ROLE analyst;
GRANT ALL ON FUTURE SCHEMAS IN DATABASE olist to ROLE analyst;
GRANT ALL ON ALL TABLES IN SCHEMA olist_raw to ROLE analyst;
GRANT ALL ON FUTURE TABLES IN SCHEMA olist_raw to ROLE analyst;

--Set up the default warehouse, database, and schema
USE WAREHOUSE olist_wh;
USE DATABASE olist;
USE SCHEMA olist_raw;

Load data

-- Configuring an Integration for Google Cloud Storage on Snowflake
CREATE STORAGE INTEGRATION gsc_olist
  TYPE = EXTERNAL_STAGE
  STORAGE_PROVIDER = 'GCS'
  ENABLED = TRUE
  STORAGE_ALLOWED_LOCATIONS = ('gcs://olist_datalake/');
DESC STORAGE INTEGRATION gsc_olist;

CREATE STAGE gcs_olist_stage
  URL = 'gcs://olist_datalake/'
  STORAGE_INTEGRATION = gsc_olist;

-- Load data from Google Cloud Storage to Snowflake  
-- Create and load data to 'orders_raw' table
CREATE OR REPLACE TABLE orders_raw
                    (order_id string,
                    customer_id string,
                    order_status string,
                    order_purchase_timestamp timestamp,
                    order_approved_at timestamp,
                    order_delivered_carrier_date timestamp,
                    order_delivered_customer_date timestamp,
                    order_estimated_delivery_date timestamp);
COPY INTO orders_raw
                    (order_id,
                    customer_id,
                    order_status,
                    order_purchase_timestamp,
                    order_approved_at,
                    order_delivered_carrier_date,
                    order_delivered_customer_date,
                    order_estimated_delivery_date)
                FROM @gcs_olist_stage
                    PATTERN = 'olist_orders_dataset.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');
                    
--Create and load data to 'order_items_raw' table
CREATE OR REPLACE TABLE order_items_raw
                    (order_id string,
                    order_item_id string,
                    product_id string,
                    seller_id string,
                    shipping_limit_date timestamp,
                    price numeric,
                    freight_value numeric);
COPY INTO order_items_raw
                    (order_id,
                    order_item_id,
                    product_id,
                    seller_id,
                    shipping_limit_date,
                    price,
                    freight_value)
                FROM @gcs_olist_stage
                    PATTERN = 'olist_order_items_dataset.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');
                    
-- Create and load data to 'order_reviews_raw' table                   
CREATE OR REPLACE TABLE order_reviews_raw
                    (review_id string,
                    order_id string,
                    review_score numeric,
                    review_comment_title string,
                    review_comment_message string,
                    review_creation_date timestamp, 
                    review_answer_timestamp timestamp);
COPY INTO order_reviews_raw
                    (review_id,
                    order_id,
                    review_score,
                    review_comment_title,
                    review_comment_message,
                    review_creation_date,
                    review_answer_timestamp)
                FROM @gcs_olist_stage
                    PATTERN = 'olist_order_reviews_dataset.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');

--Create and load data to 'order_payments_raw' table                    
CREATE OR REPLACE TABLE order_payments_raw
                    (order_id string,
                    payment_sequential integer,
                    payment_type string,
                    payment_installments integer,
                    payment_value numeric);
COPY INTO order_payments_raw
                    (order_id,
                    payment_sequential,
                    payment_type,
                    payment_installments,
                    payment_value)
                FROM @gcs_olist_stage
                    PATTERN = 'olist_order_payments_dataset.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');

--Create and load data to 'customers_raw' table
CREATE OR REPLACE TABLE customers_raw
                    (customer_id string,
                    customer_unique_id string,
                    customer_zip_code_prefix string,
                    customer_city string,
                    customer_state string);
COPY INTO customers_raw (customer_id,
                        customer_unique_id,
                        customer_zip_code_prefix,
                        customer_city,
                        customer_state)
                   FROM @gcs_olist_stage/
                      PATTERN = 'olist_customers_dataset.csv'
                      FILE_FORMAT = (type = 'CSV' skip_header = 1
                      FIELD_OPTIONALLY_ENCLOSED_BY = '"');

--Create and load data to 'products_raw' table              
CREATE OR REPLACE TABLE products_raw
                    (product_id string,
                    product_category_name_english string,
                    product_name_lenght string,
                    product_description_lenght string,
                    product_photos_qty integer,
                    product_weight_g integer,
                    product_length_cm integer,
                    product_height_cm integer,
                    product_width_cm integer);
COPY INTO products_raw
                    (product_id,
                    product_category_name_english,
                    product_name_lenght,
                    product_description_lenght,
                    product_photos_qty,
                    product_weight_g,
                    product_length_cm,
                    product_height_cm,
                    product_width_cm)
                FROM @gcs_olist_stage
                    PATTERN = 'olist_products_dataset.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');

--Create and load data to 'sellers_raw' table
CREATE OR REPLACE TABLE sellers_raw
                    (seller_id string,
                    seller_zip_code_prefix integer,
                    seller_city string,
                    seller_state string);
COPY INTO sellers_raw
                    (seller_id,
                    seller_zip_code_prefix,
                    seller_city,
                    seller_state)
                FROM @gcs_olist_stage
                    PATTERN = 'olist_sellers_dataset.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');

--Create and load data to 'geolocation_raw' table
CREATE OR REPLACE TABLE geolocation_raw
                    (geolocation_zip_code_prefix integer,
                    geolocation_lat string,
                    geolocation_lng string,
                    geolocation_city string,
                    geolocation_state string);
COPY INTO geolocation_raw
                    (geolocation_zip_code_prefix,
                    geolocation_lat,
                    geolocation_lng,
                    geolocation_city,
                    geolocation_state)
                FROM @gcs_olist_stage
                    PATTERN = 'olist_geolocation_dataset.csv'
                    FILE_FORMAT = (type = 'CSV' skip_header = 1
                    FIELD_OPTIONALLY_ENCLOSED_BY = '"');

Run the above SQL scripts in Snowflake editor and we’ll have the following tables in the olist database:

Data transformation with dbt

Once raw data has been loaded into the data warehouse, it’s time to transform them into suitable formats for later analysis. In dbt, we’ll form models, which are simply SQL files with SELECT statements. By doing this, we may create dependencies between models, write some tests to check data quality, and use them to materialize as views or tables in the data warehouse.

The transformation process is divided into separate layers:

Staging layer

In this layer, we will create staging models from raw data. For the requirements of this project, 4 staging models are needed:

-- stg_customers.sql
WITH stg_customers AS (SELECT * FROM OLIST.OLIST_RAW.CUSTOMERS_RAW)
SELECT * FROM stg_customers

-- stg_orders.sql
WITH stg_orders AS (SELECT * FROM OLIST.OLIST_RAW.ORDERS_RAW)
SELECT * FROM stg_orders

-- stg_order_items.sql
WITH stg_order_items AS (SELECT * FROM OLIST.OLIST_RAW.ORDER_ITEMS_RAW)
SELECT * FROM stg_order_items

-- stg_products.sql
WITH stg_products AS (SELECT * FROM OLIST.OLIST_RAW.PRODUCTS_RAW)
SELECT * FROM stg_products

In this step, we can use generic tests, which are predefined in dbt, to check data quality of staging models.

models:
  - name: stg_customers
    columns:
      - name: customer_id
        tests:
          - unique
          - not_null
      - name: customer_state
        tests:
          - accepted_values:
              values: ['AC','AL','AP','AM','BA','CE','DF','ES','GO',
                       'MA','MT','MS','MG','PA','PB','PR','PE','PI',
                       'RN','RS','RJ','RO','RR','SC','SP','SE','TO']
  - name: stg_products
    columns:
      - name: product_id
        tests:
          - unique
          - not_null
  - name: stg_orders
    columns:
      - name: order_id
        tests:
          - unique
          - not_null
      - name: customer_id
        tests:
          - unique
          - not_null
      - name: order_status
        tests:
          - accepted_values:
              values: ['delivered', 'invoiced', 'shipped', 'processing', 
                       'unavailable','canceled','created', 'approved']
  - name: stg_order_items
    columns:
      - name: order_id
        tests:
          - not_null

The above codes specify that:

Core layer

The core layer defines the fact and dim tables that will be used in the final layer. Since this project only requires customer information, I’ll create one fact table for orders and one dim table for customers.

The below code specifies the dim_customers model. Here the ref function is used to show the dependency of this model on stg_customers.

-- dim_customers.sql
WITH customers AS (
    SELECT * FROM {{ ref('stg_customers') }})
SELECT
    customer_id,
    customer_unique_id,
    customer_zip_code_prefix,
    INITCAP(customer_city) AS customer_city,
    CASE 
        WHEN customer_state = 'AC' THEN 'Acre'
        WHEN customer_state = 'AL' THEN 'Alagoas'
        WHEN customer_state = 'AP' THEN 'Amapá'
        WHEN customer_state = 'AM' THEN 'Amazonas'
        WHEN customer_state = 'BA' THEN 'Bahia'
        WHEN customer_state = 'CE' THEN 'Ceará'
        WHEN customer_state = 'DF' THEN 'Distrito Federal'
        WHEN customer_state = 'ES' THEN 'Espírito Santo'
        WHEN customer_state = 'GO' THEN 'Goiás'
        WHEN customer_state = 'MA' THEN 'Maranhão'
        WHEN customer_state = 'MT' THEN 'Mato Grosso'
        WHEN customer_state = 'MS' THEN 'Mato Grosso do Sul'
        WHEN customer_state = 'MG' THEN 'Minas Gerais'
        WHEN customer_state = 'PA' THEN 'Pará'
        WHEN customer_state = 'PB' THEN 'Paraíba'
        WHEN customer_state = 'PR' THEN 'Paraná'
        WHEN customer_state = 'PE' THEN 'Pernambuco'
        WHEN customer_state = 'PI' THEN 'Piauí'
        WHEN customer_state = 'RJ' THEN 'Rio de Janeiro'
        WHEN customer_state = 'RN' THEN 'Rio Grande do Norte'
        WHEN customer_state = 'RS' THEN 'Rio Grande do Sul'
        WHEN customer_state = 'RO' THEN 'Rondônia'
        WHEN customer_state = 'RR' THEN 'Roraima'
        WHEN customer_state = 'SC' THEN 'Santa Catarina'
        WHEN customer_state = 'SP' THEN 'São Paulo'
        WHEN customer_state = 'SE' THEN 'Sergipe'
    ELSE 'Tocantins'
    END AS customer_state
FROM customers

To get the fact_orders table, I join 3 different tables: stg_orders, stg_orders_items, and stg_products.

-- fact_orders.sql
WITH orders AS (
    SELECT * FROM {{ ref('stg_orders') }}),
items AS (
    SELECT * FROM {{ ref('stg_order_items') }}),
products AS (
    SELECT * FROM {{ ref('stg_products') }})
SELECT 
    orders.order_id,
    orders.customer_id,
    orders.order_status,
    -- extract date from datetime
    CAST(orders.order_purchase_timestamp AS DATE) AS purchased_date,
    CAST(orders.order_delivered_customer_date AS DATE) AS delivered_date,
    -- create a new column for the difference between the delivered and purchase date
    ABS(DATEDIFF(DAY, delivered_date,purchased_date)) AS total_delivery_day,
    items.product_id,
    items.order_item_id AS item_number,
    -- convert to USD
    items.price*0.3 AS price_per_item_USD,
    item_number*price_per_item_USD AS total_price,
    items.freight_value*0.3 AS freight_USD,
    -- remove _
    REPLACE(products.product_category_name_english,'_',' ') AS product_category,
    CONCAT(UPPER(LEFT(product_category,1)), LOWER(RIGHT(product_category, LENGTH(product_category)-1))) AS product_category_upper
FROM orders 
LEFT JOIN items USING (order_id) 
LEFT JOIN products USING (product_id)

Final layer

Lastly, I join the fact_orders and dim_customers to create the last model that we need for further analysis.

-- order_customers_2017.sql
WITH orders AS (
    SELECT * FROM {{ ref('fact_orders') }}),
customers AS (
    SELECT * FROM {{ ref('dim_customers') }})
SELECT 
    orders.order_id,
    orders.order_status,
    orders.purchased_date,
    orders.delivered_date,
    orders.total_delivery_day,
    orders.product_id,
    orders.item_number,
    orders.price_per_item_USD,
    orders.total_price,
    orders.freight_USD,
    orders.product_category_upper AS product_category,
    customers.customer_unique_id,
    customers.customer_zip_code_prefix,
    customers.customer_city,
    customers.customer_state
FROM orders
LEFT JOIN customers USING (customer_id)
WHERE orders.purchased_date BETWEEN '2017-01-01' AND '2017-12-31'
ORDER BY orders.purchased_date ASC

Running dbt code

After setting up all the necessary model, we can run dbt codes with the command dbt run

Cool, the code runs successfully. As you can see, the dbt code creates 7 models: 4 models in staging layer and 3 models in core layer are materialized as views in the data warehouse, while the model in the final layer is materialized as a table.

Next, we use the command dbt test to run the generic tests we defined in the previous section.

All 11 tests pass. It means there are no null or invalid values in the list of variables we specify.

Revenue dashboard with Tableau

The last step in this project is to build a dashboard on Tableau that shows an overview of revenue and other order metrics about the business in 2017. The picture below is a static dashboard. For the interactive version, visit my Tableau Public profile

Some quick insights from the dashboard:

Potential improvements

There are many ideas to make this ELT projects better, but due to the time restraint, I cannot implement all of them. These include: