Have you ever wondered how to take raw log files and transform them into a relational database? With this repository, I will show you how to do it using pandas, Postgres and pscopg2 in Python. You will learn to read log files into a tabular panda's dataframe, use SQL to create a star-scheme perfect for doing aggregations and analytics in python.
Purpose
Sparkify -- a fictitious startup -- wants to analyze the data they have been collecting on songs and user activity form their new music streaming app. Understanding what songs are users are listening to is of particular interest. Their data is stored in JSON logs files and needs to be analyzed to figure this out. They want to create a database optimized to analyze the user's listening behavior. To perform this analysis routinely, they need a database schema and an extract-transform-and-load (ETL) pipeline.
Design
What songs are popular with subscribers? To answer this question, I need to restructure the Sparkify log files into a relational database allowing it to be quantified using SQL queries. Log files of subscriber activities are gathered using Sparkify's online transactional processing (OLTP) system optimized for fast writes. Think log files. To profit from the analysis of user data, the larger the data volume, the better. Analyzing this data is the realm of data warehouses that ingest and restructure transactional data for analysis. Star schemas simplify analytic queries by restructuring and normalizing the data. Think of tables of data where each row has a unique identifier or primary key. Known as the second-normal-form, tables of this kind are common in data warehouses. The idea of star schema is simple, one central fact table that is related to dimension tables by their primary keys. Star schemas are standard in data warehouses -- a typical example of an online analytical processing system (OLAP).
Files Descriptions
-
data
directory - Holds the song data and the log data. -
create_tables.py
- Usessql_queries.py
to delete and re-create the database and all its tables. After running this function, the database is ready for data is ready for importing. -
environment.yml
- The Python packages required to run this application. -
etl_prototype.py
- This is a prototype for the data processing pipeline that loads data from one song and one log data file. -
etl.ipynb
- Exported frometl.py
using tooling provided by the Python Plugin for Visual Studio Code. -
sql_queries.py
- Creates, inserts, and drops the tables that implement the star schema. -
test.ipynb
- Tests whether all the data is present in the resulting database tables.
Running
-
Install: Download this project from Github https://github.com/robOcity/song_play by running
git clone https://github.com/robOcity/song_play
. -
Configure: Configure your Python environment by running
conda env create -f environment.yml
. Regrettable, if you are usingpip
you can't get there from here. In other words,conda
does not support creating arequirments.txt
file directly. -
Run:
- Start and configure your Postgres database (not covered here)
- Change directories into the
song_play
directory - Run
python create_tables.py
- Run
python etl.py
Implementation
PostgreSQL tables are managed using SQL statements that are executed using the Python psycopg2 package creating dimensional tables that comprise a star-schema. Data files are read using the pandas read_json
function that returns a dataframe. Columns and rows from the dataframe are selected and output as tuples for insertion into the database tables. Connections to the database are managed by psycopg2, as is the cursor object used to interact with the database.
ETL Pipeline Prototype
Establish the data processing workflow using a small subset of the data.
import os
import glob
import psycopg2
import pandas as pd
import numpy as np
from pathlib import Path
from sql_queries import *
Connect to Postgres Database
After connecting to the database and getting a cursor object, then drop and recreate all tables.
conn = psycopg2.connect(
"host=127.0.0.1 dbname=sparkifydb user=student password=student"
)
conn.set_session(autocommit=True)
cur = conn.cursor()
for sql_cmd in drop_table_queries + create_table_queries:
cur.execute(sql_cmd)
Find data files for processing
Use os.walk
to find all *.json
files under the filepath
directory.
# Let's apply the DRY principle and write a function to load our
# data.
def get_files(filepath):
"""Return all JSON files under filepath as a list"""
all_files = []
for root, dirs, files in os.walk(filepath):
files = glob.glob(os.path.join(root, "*.json"))
for f in files:
all_files.append(os.path.abspath(f))
return all_files
#1: song
Table
#Extract Data for Song Table
Run these commands to process the song_data
by reading in a subset of the Million Song Dataset and extracting the data from the JSON files using pandas.
song_root_dir = Path().cwd() / "data" / "song_data"
song_files = get_files(song_root_dir)
filepath = song_files[0]
df = pd.read_json(filepath, lines=True)
Insert Data into the Song Table
- Method 1: select columns and return as a tuple, knowing that there is one song per dataframe and results in year as typye np.int64 and duration as type np.float64. Pandas is implemented using numpy, and numpy (abbreviated np) data types are common.
song_data = next(
df[["song_id", "title", "artist_id", "year", "duration"]].itertuples(
index=False, name=None
)
)
- Method 2: Select columns, select first row, get values as numpy array, and convert to a list that results in year as type int and duration as type float. But inserting numpy numeric types into the database using psycopg2 causes errors, so I convert them to Python types first. This type conversion occurs because it is behavior of numpy.ndarray.tolist upon which pandas.Series.tolist is based. Mystery solved!
# Select and insert data into the songs table
song_df = df[["song_id", "title", "artist_id",
"year", "duration"]]
song_df.head()
song_id | title | artist_id | year | duration | |
---|---|---|---|---|---|
0 | SONHOTT12A8C13493C | Something Girls | AR7G5I41187FB4CE6C | 1982 | 233.40363 |
song_data = song_df.values[0].tolist()
song_data = [x if x else None for x in song_data]
cur.execute(song_table_insert, song_data)
#2: artists
Table
Extract Data for Artist Table
Extract the data and insert it into the artist table.
artist_df = (
df[
[
"artist_id",
"artist_name",
"artist_location",
"artist_latitude",
"artist_longitude",
]
]
)
artist_df.head()
artist_id | artist_name | artist_location | artist_latitude | artist_longitude | |
---|---|---|---|---|---|
0 | AR7G5I41187FB4CE6C | Adam Ant | London, England | NaN | NaN |
artist_data = artist_df.values[0].tolist()
cur.execute(artist_table_insert, artist_data)
Process log_data
Now let's add the subscriber activity data to see which songs are popular.
log_data_root = Path().cwd() / "data" / "log_data"
log_files = get_files(log_data_root)
# just read first file to test functionality
filepath = log_files[0]
df = pd.read_json(filepath, lines=True)
#3: time
Table
Extract and Insert Data into Time Table
Find what songs users are choosing by just considering NextSong
records. Then convert the ts
timestamp column to datetime and extract columns for an hour, day, week of the year, month, year, and weekday (see: Accessors dt Accessor that allows datetime properties to be easily accessed).
df = df.assign(ts=pd.to_datetime(df.ts, unit="ms"))
df = df.loc[df.page.isin(["NextSong"])]
df = df.assign(timestamp=pd.to_datetime(df.ts, unit="ms"))
df.timestamp = df.timestamp.dt.tz_localize("UTC")
time_df = pd.DataFrame(
{
"timestamp": df.timestamp,
"hour": df.timestamp.dt.hour,
"day": df.timestamp.dt.day,
"week_of_year": df.timestamp.dt.week,
"month": df.timestamp.dt.month,
"year": df.timestamp.dt.year,
"weekday": df.timestamp.dt.weekday,
}
)
# Here we want native pandas datatypes, so I'll user iterrows.
for i, row in time_df.iterrows():
cur.execute(time_table_insert, list(row))
time_df.head()
timestamp | hour | day | week_of_year | month | year | weekday | |
---|---|---|---|---|---|---|---|
0 | 2018-11-11 02:33:56.796000+00:00 | 2 | 11 | 45 | 11 | 2018 | 6 |
1 | 2018-11-11 02:36:10.796000+00:00 | 2 | 11 | 45 | 11 | 2018 | 6 |
2 | 2018-11-11 02:40:34.796000+00:00 | 2 | 11 | 45 | 11 | 2018 | 6 |
4 | 2018-11-11 04:36:13.796000+00:00 | 4 | 11 | 45 | 11 | 2018 | 6 |
5 | 2018-11-11 04:36:46.796000+00:00 | 4 | 11 | 45 | 11 | 2018 | 6 |
#4: users
Table
Extract and Insert Data into Users Table
Every time a user plays a song, they appear in the log file, so naturally, there are duplicate userId entries. Here we remove them to create a normalized user table.
user_df = df[["userId", "firstName", "lastName", "gender", "level"]]
user_df = user_df.drop_duplicates(subset="userId", keep="last")
user_df.head()
userId | firstName | lastName | gender | level | |
---|---|---|---|---|---|
2 | 69 | Anabelle | Simpson | F | free |
4 | 32 | Lily | Burns | F | free |
5 | 75 | Joseph | Gutierrez | M | free |
10 | 92 | Ryann | Smith | F | free |
25 | 49 | Chloe | Cuevas | F | free |
for i, row in user_df.iterrows():
cur.execute(user_table_insert, row)
#5: songplays
Table
Extract and Insert Data and Songplays Table
To look up a song or an artist using the primary key that uniquely identifies it. The log files only have the name of the song and artist. So, I need to do a reverse lookup up to get identifiers.
sql
SELECT s.song_id, a.artist_id FROM dim_song s
JOIN dim_artist a ON s.artist_id = a.artist_id
WHERE s.title = %s AND a.name = %s AND s.duration = %s;
Iterating over the rows of the dataframe holding the log data. First, I extract the find the unique identifiers. Next, I combine them with other data from the log data to insert the user's songplay activity into the song_play
table.
for index, row in df.iterrows():
# get songid and artistid from song and artist tables
cur.execute(song_select, (row.song, row.artist, row.length))
results = cur.fetchone()
if results:
songid, artistid = results
else:
songid, artistid = None, None
# insert songplay record
songplay_data = (
row.userId,
songid,
artistid,
row.sessionId,
row.ts,
row.level,
row.location,
row.userAgent,
)
cur.execute(songplay_table_insert, songplay_data)
Close Connection to Sparkify Database
conn.close()
References
-
Million Song Dataset - FAQ with fields and data types - Lists the fields and data-types used in the Million Song Dataset.
-
Converting from Unix Timestamp to PostgreSQL Timestamp or Date - Explans how to go from Unix epoch time to a PostgreSQL timestamp value.
-
PostgreSQL Keyword List - Note: USER is a reserved keyword in Postgres and cannot be used as a table name.
-
Psycopg2 - Fast execution helpers - How to use the
executemany()
method to insert many rows into a table, at once. -
Using PostgreSQL SERIAL To Create Auto-increment Column - How to create a primary key that increments automatically.
-
How to insert current_timestamp into Postgres via python - Explains how to easily insert timestamps into PostgreSQL by converting them to datetime objects in Python and then letting pscopg2 handle the rest.
-
Pandas convert dataframe to an array of tuples - Examples and explanation of how to convert rows of pandas dataframe into tuples for insertion into the database.
-
Psycopg2 Extras - Fast execution helpers - Explanation and examples of how to insert many records into a table in one transaction using psycopg2's
executemany()
method. -
How to UPSERT (MERGE, INSERT … ON DUPLICATE UPDATE) in PostgreSQL? - How to handle duplicate primary keys in PostgreSQL INSERT statements informally called
upsert
.