Data Engineer (DE) can be thought of as an "extended version" of its older brothers - BI/ETL/Datawarehouse Developers. He/she deals with building various data applications, ETLs, integrations of numerous separate data sources, takes care of dimensional models inside the data warehouse (DWH), prepares cutting-edge dashboards and helps to get/prepare data for data analysts/scientists. Common desirable skillset for DE is:
- Coding - although DE is not a hardcore software engineer, he/she needs to be more than familiar with basics like data structures, basic algorithms, complexity, OOP, applications design and clean/reusable code.
- SQL mastery - with DDL, DML and advanced analytical queries.
- Data warehousing - dimensional data modeling, schema design, ETLs and other DWH concepts
- Data Visualization
- Big Data and (Near)Real-Time streaming technologies
- Product sense
Great overview of what DE is can be found for example in here.
The following material was prepared to present full data engineering project/pipeline which utilizes a lot of listed skills. It starts with the creation of a made-up operational system and dimensional model for DWH. That is followed by construction of script which inserts and updates rows in the operational database so it resembles more real-world system. Next, full-blown ETL process to populate DWH is implemented. Finally, main KPIs are chosen and visualized on the dashboard.
Although the codebase for all the steps described here is available on Github, anyone interested in the topic is challenged to create his/her own solutions and extend the project. Some basic suggestions for extensions are – improvements in the operational system, creation of user management framework, reconciliation of DWH data against operational DB or implementation of different levels of views and permissions based on user roles.
As the material is quite extensive, these topics will not be covered extensively. For example, in the beginning, the operational system and DWH data models are presented but more general modeling techniques are not discussed.
Operational system and DWH model.
As a starting point a fake operational system was created. That system enables the made- up company OceanRecords to sell their products. Fig.1 presents ER diagram for the operational database. It is relatively simple:
- there are customers who can place orders
- customer/s can have multiple addresses
- one address can be shared by multiple customers
- an order is made by a customer on a certain address
- orders have status and is realized by using a certain payment method
- within one order there can be many products
- products have types and subtypes and associated supplier
A RDBMS PostgreSQL was used. To execute SQL DDL statements and populate data within proper order - Python and Luigi were used. For readers not familiar with Luigi, its overview can be found. In a quick summary, Luigi helps you build a graph of tasks and dependencies between them. It handles failures in the instance that something went wrong and the script was re-executed, then only the unfinished tasks will run.
To populate database tables with data, another useful package was used – Faker. Faker is Python data generator. It supports mocking a wide range of entities including names, addresses, phone numbers, emails and much more.
To make OceanRecords more closely resemble a real-world system, a script which performs data updates and inserts have been created. While the script is performing any operation, it maintains all relations and keys inside the database. What is more, each action performed by the script was programmed to allow to emulate certain operations on DWH side. That is presented in Tab.1
|Action in operational system||Will allow in DWH|
|Place order||Insert new rows into fact table|
|Update order||Update rows in fact table|
|Insert new customer_address||Insert new data into dimensions|
|Update product price||SCD type 2 in product dimension|
As a next step, an operational system data model was transformed into DWH dimensional model. During design two fact tables were identified – order and order line. Grain in former one is single order and it is surrounded by dimensions like address, customer, payment, status, and date. Grain in the latter fact table is the single product in order. It is surrounded by product and supplier dimensions. It also has a one-to-many relationship with order table.
The transformation was accompanied by some denormalization. Product type and subtype tables became part of single product dimension. The table which linked the customers and addresses (“customers_addresses”) disappeared completely. In DWH, the link between those two entities can be found in order table. In all tables, some additional ETL specific columns were added. Created model is classic set of star schemas. It is depicted at Fig.2.
Extract, Transform, Load (ETL)
In the next step, a physical DWH model was created along with ETL process which moves and reshapes source data. DWH was placed in the same Postgres database as the operational system, but within a different schema. Even though both systems technically reside in the same RDBMS, ETL behaves as though they would be in completely separate environments. Fig.3 presents a high-level overview of ETL process.
Firstly, Python scripts extract rows for a given table from the source system and save the results locally as a CSV. Along with results, some additional files are created:
- status files used later to determine which files were already copied into DWH
- metadata files with extraction watermarks used to load only new/changed records
- DDL statements for delta and stage tables creation
- other auxiliary information like column names or primary key
After extraction of data, CSVs are copied into “Delta” tables within DWH. The reason for copying data into delta instead of directly into the staging area is that there is not enough flexibility of insertion during the copy command. For example, it is hard to determine if a row should be simply inserted or whether it is an update for an already existing row. That logic is implemented later while moving data from “Delta” into “Stage”. When data lands in “Stage” area it mirrors the operational system. At that point, the data is ready for transformations.
The transformation step is performed in so-called “Work” area. All cleaning, modifications, Slowly Changing Dimensions (SCD) logic etc. is taking place there. In “Work” area it is also resolved if a given row will be inserted into “DWH” tables or it is an update. That helps with the proper maintenance of surrogate keys. Lastly, data from “Work” are loaded into final “DWH” schema. Those tables are expected to be customer facing. At that point, data is integrated, cleaned and fully transformed into a dimensional model.
To glue all the ETL tasks together, Python and Airflow where used. At a high level, the concept of Airflow and Luigi (used earlier in this project) is the same - they allow the ability to create workflows with tasks and dependencies between them. That capability makes those modules ideal for defining ETLs. While creating workflows in Airflow, a/the user is defining Directed Acyclic Graphs (DAGs) in which nodes are tasks and edges – dependencies. In this project two DAGs where defined. The first one is to spin the initial state of DWH – the creation of all “raw” dimensions, facts and additional ETL control tables were defined there. Second DAG was the actual ETL. It consisted of three SubDAGs. One for extraction, the second for loading dimensions and the last one for loading facts.
Fig.4 shows Airflow DAG for creation initial state of DWH tables. It begins with a dummy starting task. Then dimensions tables are created in parallel. After DDLs are executed two ETL control tables are built – reprocessing queue and watermarks. Next, two fact tables are created in sequence and then dummy ending task finishes DAG.
Reprocessing queue table is used to insert rows from fact tables which should be reprocessed. This kind of situation occurs when there was a failure in loading dimension. In those cases, the row in fact table will not have a valid surrogate key in dimension to join on. To recover from such a failure automatically, “-1” surrogate key is assigned in fact and the row is marked for reprocessing in next ETL cycle. When dimension table is back to normal – marked facts are processed again and proper surrogate keys are assigned.
Watermarks table is used to incrementally load facts rows without need for expensive join with DWH table to determine changes.
Metrics and Dashboard
In the final step, company Key Performance Indicators (KPI) were chosen and visualized on the dashboard. That can be seen on Fig.5. The dashboard was created in open-sourced BI platform – Superset.
Potential areas of focus for OceanRecord company should be revenue and customer satisfaction. As the company is in the retail segment – “Sales” could be a good proxy for revenue. If everything works as it should – the higher the sales, the better for the company’s financial health. In addition, “Average Order Value” could provide some valuable insight. The company may prefer a higher average order size as it may increase sales and at the same reduce shipping costs.
Another important value to look at is “Lost Order Deviation from limit”. It is normal that certain part of orders gets lost. Customers may cancel an order or request a refund for products which do not satisfy his/her needs. The problem arises when the number of lost order is unstable or exceptionally high.
The last proposed KPI is directly linked to customer satisfaction. “Average Order Fulfillment” tells us how long on average it took for the order to be fulfilled. It could be expected that a shorter time to fulfill an order will greatly improve customer experience.
The current value of KPIs, along with its monthly averages, month-over-month growths and monthly trends are presented in the table in the upper left corner of the dashboard. This part of the dashboards brings most of the user’s attention, which makes it a perfect place for displaying KPIs
Along with the main KPIs table, more detailed views and other important metrics are presented. Firstly, in the top-right corner, a line graph for historical Lost Order Deviation from limit is rendered. As can be seen in the graph - OceanRecords has a big problem with stability of that KPI. A more detailed view could shed more light and bring more attention to the problem. Another important problem of OceanRecords which is emphasized on the dashboard is “Sales”. Even though it has an upward trend in the recent month, historical values showed on the month-over-month growth line graph shows that the company cannot expect that trend to be stable. Sales are growing and declining in a random manner since the beginning of historical records.
In the middle part of the dashboard, two bar charts are located. Both present values aggregated on a/the quarter level. One shows total sales and the other shows the number of unique customers. On the bottom of dashboard three tables are presented. There - top products, product subtypes, and countries are listed ordered by sales amount.
The presented project spans across many Data Engineering skills. It requires efficient Python, SQL, data modeling, ETL, and data visualization skills. It is really profitable to see how all those different skills cross with each other to create one final output.
Nevertheless, what is even more important is that in this project, one can see the whole journey of how the data from an operational system is made – from raw data in source into valuable information displayed on a dashboard. Data, whose original purpose was to make it possible for a company to run, were transformed into valuable information from a business perspective. What is more, with user-friendly data DWH and BI tools, less technical people will be able to get even more beneficial insights.
Furthermore, it should not be forgotten that there is a big difference between insights and impact. I truly believe that Data Engineers should not only brings insights, but also drive impact and change company processes/products into better ones.