In this project, let’s assume that you are working for an e-commerce company called Olist. One day, the CEO asks you to create a denormalized table and build a revenue dashboard to show order information in 2017.
There are 8 different datasets in total, which contain information of around 100,000 orders between 2016 and 2018 from multiple marketplaces in Brazil. They are linked to each other as shown in the schema below:
The goal of this project is to build an ELT pipeline to extract data from source databases, load in to the data warehouse for transformation, and then create a dashboard for descriptive analysis.
Tech stacks: Google Cloud Storage, Snowflake, dbt, Tableau
The ELT pipeline is shown as the below chart:
As can be seen, raw data come from different sources, including internal databases, SAP, Salesforce (for CRM data), and Stripe (for payment data). The first step is to extract data from these sources and then load into the data warehouse for storage and later uses. This can be done simply with SQL scripts or managed services like Stitch or Fivetran
In this project, I use Google Cloud Storage as a data lake, to replicate the source database. In simple words, a data lake is a central location where a company can put all of its raw/unstructured/unprocessed data and store them for later uses.
Next, I’ll write SQL scripts to build a Snowflake data warehouse and database to load/store the collected datasets. Depending on the business requirements, we’ll run some SQL queries to transform the raw data into usable tables in the warehosue, which are prepared for end users like busines analysts or data scientists.
But instead of writing SQL queries on Snowflake editor, I’ll use a tool called dbt, which provides us an easy way to create, transform, and validate raw data inside a data warehouse. The final table called order_customer_2017
can be used for answering some ad-hoc questions and building the revenue dashboard on Tableau.
The first step in the pipeline is to extract data from different sources. Since this is not the main forcus, I skip this step and download datasets directly from Kaggle. To replicate the source databases, I create a bucket in Google Cloud Storage called olist_datalake
and load 8 datasets into this space. We end up with the followings:
--Use an admin role
USE ROLE accountadmin;
--Create a role called 'analyst'
CREATE ROLE IF NOT EXISTS analyst;
GRANT ROLE analyst TO ROLE accountadmin;
--Create the default data warehouse
CREATE WAREHOUSE IF NOT EXISTS olist_wh;
GRANT operate ON WAREHOUSE olist_wh TO ROLE analyst;
--Create a user called 'vinh' and assign to role
CREATE USER IF NOT EXISTS vinh
PASSWORD ='VinhLe111'
='vinh'
LOGIN_NAME=FALSE
MUST_CHANGE_PASSWORD='olist_wh'
DEFAULT_WAREHOUSE='analyst'
DEFAULT_ROLE='olist_raw'
DEFAULT_NAMESPACECOMMENT='Use dbt to transform data in Snowflake warehouse';
GRANT ROLE analyst to USER vinh;
--Create the olist database and olist_raw schema
CREATE DATABASE IF NOT EXISTS olist;
CREATE SCHEMA IF NOT EXISTS olist_raw;
--Set up permissions to role 'analyst'
GRANT ALL ON WAREHOUSE olist_wh TO ROLE analyst;
GRANT ALL ON DATABASE olist to ROLE analyst;
GRANT ALL ON ALL SCHEMAS IN DATABASE olist to ROLE analyst;
GRANT ALL ON FUTURE SCHEMAS IN DATABASE olist to ROLE analyst;
GRANT ALL ON ALL TABLES IN SCHEMA olist_raw to ROLE analyst;
GRANT ALL ON FUTURE TABLES IN SCHEMA olist_raw to ROLE analyst;
--Set up the default warehouse, database, and schema
USE WAREHOUSE olist_wh;
USE DATABASE olist;
USE SCHEMA olist_raw;
-- Configuring an Integration for Google Cloud Storage on Snowflake
CREATE STORAGE INTEGRATION gsc_olist
TYPE = EXTERNAL_STAGE
= 'GCS'
STORAGE_PROVIDER = TRUE
ENABLED = ('gcs://olist_datalake/');
STORAGE_ALLOWED_LOCATIONS DESC STORAGE INTEGRATION gsc_olist;
CREATE STAGE gcs_olist_stage
= 'gcs://olist_datalake/'
URL = gsc_olist;
STORAGE_INTEGRATION
-- Load data from Google Cloud Storage to Snowflake
-- Create and load data to 'orders_raw' table
CREATE OR REPLACE TABLE orders_raw
(order_id string,
customer_id string,
order_status string,timestamp,
order_purchase_timestamp timestamp,
order_approved_at timestamp,
order_delivered_carrier_date timestamp,
order_delivered_customer_date timestamp);
order_estimated_delivery_date COPY INTO orders_raw
(order_id,
customer_id,
order_status,
order_purchase_timestamp,
order_approved_at,
order_delivered_carrier_date,
order_delivered_customer_date,
order_estimated_delivery_date)FROM @gcs_olist_stage
= 'olist_orders_dataset.csv'
PATTERN = (type = 'CSV' skip_header = 1
FILE_FORMAT = '"');
FIELD_OPTIONALLY_ENCLOSED_BY
--Create and load data to 'order_items_raw' table
CREATE OR REPLACE TABLE order_items_raw
(order_id string,
order_item_id string,
product_id string,
seller_id string,timestamp,
shipping_limit_date numeric,
price numeric);
freight_value COPY INTO order_items_raw
(order_id,
order_item_id,
product_id,
seller_id,
shipping_limit_date,
price,
freight_value)FROM @gcs_olist_stage
= 'olist_order_items_dataset.csv'
PATTERN = (type = 'CSV' skip_header = 1
FILE_FORMAT = '"');
FIELD_OPTIONALLY_ENCLOSED_BY
-- Create and load data to 'order_reviews_raw' table
CREATE OR REPLACE TABLE order_reviews_raw
(review_id string,
order_id string,numeric,
review_score
review_comment_title string,
review_comment_message string,timestamp,
review_creation_date timestamp);
review_answer_timestamp COPY INTO order_reviews_raw
(review_id,
order_id,
review_score,
review_comment_title,
review_comment_message,
review_creation_date,
review_answer_timestamp)FROM @gcs_olist_stage
= 'olist_order_reviews_dataset.csv'
PATTERN = (type = 'CSV' skip_header = 1
FILE_FORMAT = '"');
FIELD_OPTIONALLY_ENCLOSED_BY
--Create and load data to 'order_payments_raw' table
CREATE OR REPLACE TABLE order_payments_raw
(order_id string,integer,
payment_sequential
payment_type string,integer,
payment_installments numeric);
payment_value COPY INTO order_payments_raw
(order_id,
payment_sequential,
payment_type,
payment_installments,
payment_value)FROM @gcs_olist_stage
= 'olist_order_payments_dataset.csv'
PATTERN = (type = 'CSV' skip_header = 1
FILE_FORMAT = '"');
FIELD_OPTIONALLY_ENCLOSED_BY
--Create and load data to 'customers_raw' table
CREATE OR REPLACE TABLE customers_raw
(customer_id string,
customer_unique_id string,
customer_zip_code_prefix string,
customer_city string,
customer_state string);COPY INTO customers_raw (customer_id,
customer_unique_id,
customer_zip_code_prefix,
customer_city,
customer_state)FROM @gcs_olist_stage/
= 'olist_customers_dataset.csv'
PATTERN = (type = 'CSV' skip_header = 1
FILE_FORMAT = '"');
FIELD_OPTIONALLY_ENCLOSED_BY
--Create and load data to 'products_raw' table
CREATE OR REPLACE TABLE products_raw
(product_id string,
product_category_name_english string,
product_name_lenght string,
product_description_lenght string,integer,
product_photos_qty integer,
product_weight_g integer,
product_length_cm integer,
product_height_cm integer);
product_width_cm COPY INTO products_raw
(product_id,
product_category_name_english,
product_name_lenght,
product_description_lenght,
product_photos_qty,
product_weight_g,
product_length_cm,
product_height_cm,
product_width_cm)FROM @gcs_olist_stage
= 'olist_products_dataset.csv'
PATTERN = (type = 'CSV' skip_header = 1
FILE_FORMAT = '"');
FIELD_OPTIONALLY_ENCLOSED_BY
--Create and load data to 'sellers_raw' table
CREATE OR REPLACE TABLE sellers_raw
(seller_id string,integer,
seller_zip_code_prefix
seller_city string,
seller_state string);COPY INTO sellers_raw
(seller_id,
seller_zip_code_prefix,
seller_city,
seller_state)FROM @gcs_olist_stage
= 'olist_sellers_dataset.csv'
PATTERN = (type = 'CSV' skip_header = 1
FILE_FORMAT = '"');
FIELD_OPTIONALLY_ENCLOSED_BY
--Create and load data to 'geolocation_raw' table
CREATE OR REPLACE TABLE geolocation_raw
integer,
(geolocation_zip_code_prefix
geolocation_lat string,
geolocation_lng string,
geolocation_city string,
geolocation_state string);COPY INTO geolocation_raw
(geolocation_zip_code_prefix,
geolocation_lat,
geolocation_lng,
geolocation_city,
geolocation_state)FROM @gcs_olist_stage
= 'olist_geolocation_dataset.csv'
PATTERN = (type = 'CSV' skip_header = 1
FILE_FORMAT = '"'); FIELD_OPTIONALLY_ENCLOSED_BY
Run the above SQL scripts in Snowflake editor and we’ll have the following tables in the olist database:
Once raw data has been loaded into the data warehouse, it’s time to transform them into suitable formats for later analysis. In dbt, we’ll form models, which are simply SQL files with SELECT statements. By doing this, we may create dependencies between models, write some tests to check data quality, and use them to materialize as views or tables in the data warehouse.
The transformation process is divided into separate layers:
In this layer, we will create staging models from raw data. For the requirements of this project, 4 staging models are needed:
-- stg_customers.sql
WITH stg_customers AS (SELECT * FROM OLIST.OLIST_RAW.CUSTOMERS_RAW)
SELECT * FROM stg_customers
-- stg_orders.sql
WITH stg_orders AS (SELECT * FROM OLIST.OLIST_RAW.ORDERS_RAW)
SELECT * FROM stg_orders
-- stg_order_items.sql
WITH stg_order_items AS (SELECT * FROM OLIST.OLIST_RAW.ORDER_ITEMS_RAW)
SELECT * FROM stg_order_items
-- stg_products.sql
WITH stg_products AS (SELECT * FROM OLIST.OLIST_RAW.PRODUCTS_RAW)
SELECT * FROM stg_products
In this step, we can use generic tests, which are predefined in dbt, to check data quality of staging models.
models:
- name: stg_customers
columns:
- name: customer_id
tests:
- unique
- not_null
- name: customer_state
tests:
- accepted_values:
values: ['AC','AL','AP','AM','BA','CE','DF','ES','GO',
'MA','MT','MS','MG','PA','PB','PR','PE','PI',
'RN','RS','RJ','RO','RR','SC','SP','SE','TO']
- name: stg_products
columns:
- name: product_id
tests:
- unique
- not_null
- name: stg_orders
columns:
- name: order_id
tests:
- unique
- not_null
- name: customer_id
tests:
- unique
- not_null
- name: order_status
tests:
- accepted_values:
values: ['delivered', 'invoiced', 'shipped', 'processing',
'unavailable','canceled','created', 'approved']
- name: stg_order_items
columns:
- name: order_id
tests:
- not_null
The above codes specify that:
stg_customers
model, the customer_id
column should contain unique values and no null values. The customer_state
column should contain only specified values in the accepted_values parameterThe core layer defines the fact and dim tables that will be used in the final layer. Since this project only requires customer information, I’ll create one fact table for orders and one dim table for customers.
The below code specifies the dim_customers model. Here the ref
function is used to show the dependency of this model on stg_customers
.
-- dim_customers.sql
WITH customers AS (
SELECT * FROM {{ ref('stg_customers') }})
SELECT
customer_id,
customer_unique_id,
customer_zip_code_prefix,INITCAP(customer_city) AS customer_city,
CASE
WHEN customer_state = 'AC' THEN 'Acre'
WHEN customer_state = 'AL' THEN 'Alagoas'
WHEN customer_state = 'AP' THEN 'Amapá'
WHEN customer_state = 'AM' THEN 'Amazonas'
WHEN customer_state = 'BA' THEN 'Bahia'
WHEN customer_state = 'CE' THEN 'Ceará'
WHEN customer_state = 'DF' THEN 'Distrito Federal'
WHEN customer_state = 'ES' THEN 'Espírito Santo'
WHEN customer_state = 'GO' THEN 'Goiás'
WHEN customer_state = 'MA' THEN 'Maranhão'
WHEN customer_state = 'MT' THEN 'Mato Grosso'
WHEN customer_state = 'MS' THEN 'Mato Grosso do Sul'
WHEN customer_state = 'MG' THEN 'Minas Gerais'
WHEN customer_state = 'PA' THEN 'Pará'
WHEN customer_state = 'PB' THEN 'Paraíba'
WHEN customer_state = 'PR' THEN 'Paraná'
WHEN customer_state = 'PE' THEN 'Pernambuco'
WHEN customer_state = 'PI' THEN 'Piauí'
WHEN customer_state = 'RJ' THEN 'Rio de Janeiro'
WHEN customer_state = 'RN' THEN 'Rio Grande do Norte'
WHEN customer_state = 'RS' THEN 'Rio Grande do Sul'
WHEN customer_state = 'RO' THEN 'Rondônia'
WHEN customer_state = 'RR' THEN 'Roraima'
WHEN customer_state = 'SC' THEN 'Santa Catarina'
WHEN customer_state = 'SP' THEN 'São Paulo'
WHEN customer_state = 'SE' THEN 'Sergipe'
ELSE 'Tocantins'
END AS customer_state
FROM customers
To get the fact_orders
table, I join 3 different tables: stg_orders
, stg_orders_items
, and stg_products
.
-- fact_orders.sql
WITH orders AS (
SELECT * FROM {{ ref('stg_orders') }}),
AS (
items SELECT * FROM {{ ref('stg_order_items') }}),
AS (
products SELECT * FROM {{ ref('stg_products') }})
SELECT
orders.order_id,
orders.customer_id,
orders.order_status,-- extract date from datetime
CAST(orders.order_purchase_timestamp AS DATE) AS purchased_date,
CAST(orders.order_delivered_customer_date AS DATE) AS delivered_date,
-- create a new column for the difference between the delivered and purchase date
ABS(DATEDIFF(DAY, delivered_date,purchased_date)) AS total_delivery_day,
items.product_id,AS item_number,
items.order_item_id -- convert to USD
*0.3 AS price_per_item_USD,
items.price*price_per_item_USD AS total_price,
item_number*0.3 AS freight_USD,
items.freight_value-- remove _
REPLACE(products.product_category_name_english,'_',' ') AS product_category,
CONCAT(UPPER(LEFT(product_category,1)), LOWER(RIGHT(product_category, LENGTH(product_category)-1))) AS product_category_upper
FROM orders
LEFT JOIN items USING (order_id)
LEFT JOIN products USING (product_id)
Lastly, I join the fact_orders
and dim_customers
to create the last model that we need for further analysis.
-- order_customers_2017.sql
WITH orders AS (
SELECT * FROM {{ ref('fact_orders') }}),
AS (
customers SELECT * FROM {{ ref('dim_customers') }})
SELECT
orders.order_id,
orders.order_status,
orders.purchased_date,
orders.delivered_date,
orders.total_delivery_day,
orders.product_id,
orders.item_number,
orders.price_per_item_USD,
orders.total_price,
orders.freight_USD,AS product_category,
orders.product_category_upper
customers.customer_unique_id,
customers.customer_zip_code_prefix,
customers.customer_city,
customers.customer_stateFROM orders
LEFT JOIN customers USING (customer_id)
WHERE orders.purchased_date BETWEEN '2017-01-01' AND '2017-12-31'
ORDER BY orders.purchased_date ASC
After setting up all the necessary model, we can run dbt codes with the command dbt run
Cool, the code runs successfully. As you can see, the dbt code creates 7 models: 4 models in staging layer and 3 models in core layer are materialized as views in the data warehouse, while the model in the final layer is materialized as a table.
Next, we use the command dbt test
to run the generic tests we defined in the previous section.
All 11 tests pass. It means there are no null or invalid values in the list of variables we specify.
The last step in this project is to build a dashboard on Tableau that shows an overview of revenue and other order metrics about the business in 2017. The picture below is a static dashboard. For the interactive version, visit my Tableau Public profile
Some quick insights from the dashboard:
There are many ideas to make this ELT projects better, but due to the time restraint, I cannot implement all of them. These include:
Apache Airflow
or dbt cloud