Have you ever wondered how to take raw log files and transform them into a relational database? With this repository, I will show you how to do it using pandas, Postgres and pscopg2 in Python. You will learn to read log files into a tabular panda's dataframe, use SQL to create a star-scheme perfect for doing aggregations and analytics in python.

Purpose

Sparkify -- a fictitious startup -- wants to analyze the data they have been collecting on songs and user activity form their new music streaming app. Understanding what songs are users are listening to is of particular interest. Their data is stored in JSON logs files and needs to be analyzed to figure this out. They want to create a database optimized to analyze the user's listening behavior. To perform this analysis routinely, they need a database schema and an extract-transform-and-load (ETL) pipeline.

Design

What songs are popular with subscribers? To answer this question, I need to restructure the Sparkify log files into a relational database allowing it to be quantified using SQL queries. Log files of subscriber activities are gathered using Sparkify's online transactional processing (OLTP) system optimized for fast writes. Think log files. To profit from the analysis of user data, the larger the data volume, the better. Analyzing this data is the realm of data warehouses that ingest and restructure transactional data for analysis. Star schemas simplify analytic queries by restructuring and normalizing the data. Think of tables of data where each row has a unique identifier or primary key. Known as the second-normal-form, tables of this kind are common in data warehouses. The idea of star schema is simple, one central fact table that is related to dimension tables by their primary keys. Star schemas are standard in data warehouses -- a typical example of an online analytical processing system (OLAP).

Files Descriptions

  1. data directory - Holds the song data and the log data.

  2. create_tables.py - Uses sql_queries.py to delete and re-create the database and all its tables. After running this function, the database is ready for data is ready for importing.

  3. environment.yml - The Python packages required to run this application.

  4. etl_prototype.py - This is a prototype for the data processing pipeline that loads data from one song and one log data file.

  5. etl.ipynb - Exported from etl.py using tooling provided by the Python Plugin for Visual Studio Code.

  6. sql_queries.py - Creates, inserts, and drops the tables that implement the star schema.

  7. test.ipynb - Tests whether all the data is present in the resulting database tables.

Running

  1. Install: Download this project from Github https://github.com/robOcity/song_play by running git clone https://github.com/robOcity/song_play.

  2. Configure: Configure your Python environment by running conda env create -f environment.yml. Regrettable, if you are using pip you can't get there from here. In other words, conda does not support creating a requirments.txt file directly.

  3. Run:

  4. Start and configure your Postgres database (not covered here)
  5. Change directories into the song_play directory
  6. Run python create_tables.py
  7. Run python etl.py

Implementation

PostgreSQL tables are managed using SQL statements that are executed using the Python psycopg2 package creating dimensional tables that comprise a star-schema. Data files are read using the pandas read_json function that returns a dataframe. Columns and rows from the dataframe are selected and output as tuples for insertion into the database tables. Connections to the database are managed by psycopg2, as is the cursor object used to interact with the database.

ETL Pipeline Prototype

Establish the data processing workflow using a small subset of the data.

import os
import glob
import psycopg2
import pandas as pd
import numpy as np
from pathlib import Path
from sql_queries import *

Connect to Postgres Database

After connecting to the database and getting a cursor object, then drop and recreate all tables.

conn = psycopg2.connect(
    "host=127.0.0.1 dbname=sparkifydb user=student password=student"
)
conn.set_session(autocommit=True)
cur = conn.cursor()
for sql_cmd in drop_table_queries + create_table_queries:
    cur.execute(sql_cmd)

Find data files for processing

Use os.walk to find all *.json files under the filepath directory.

# Let's apply the DRY principle and write a function to load our
# data.


def get_files(filepath):
    """Return all JSON files under filepath as a list"""
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root, "*.json"))
        for f in files:
            all_files.append(os.path.abspath(f))

    return all_files

#1: song Table

#Extract Data for Song Table

Run these commands to process the song_data by reading in a subset of the Million Song Dataset and extracting the data from the JSON files using pandas.

song_root_dir = Path().cwd() / "data" / "song_data"
song_files = get_files(song_root_dir)
filepath = song_files[0]
df = pd.read_json(filepath, lines=True)
Insert Data into the Song Table
  • Method 1: select columns and return as a tuple, knowing that there is one song per dataframe and results in year as typye np.int64 and duration as type np.float64. Pandas is implemented using numpy, and numpy (abbreviated np) data types are common.
song_data = next(
    df[["song_id", "title", "artist_id", "year", "duration"]].itertuples(
        index=False, name=None
    )
)
  • Method 2: Select columns, select first row, get values as numpy array, and convert to a list that results in year as type int and duration as type float. But inserting numpy numeric types into the database using psycopg2 causes errors, so I convert them to Python types first. This type conversion occurs because it is behavior of numpy.ndarray.tolist upon which pandas.Series.tolist is based. Mystery solved!
# Select and insert data into the songs table
song_df = df[["song_id", "title", "artist_id",
              "year", "duration"]]
song_df.head()
song_id title artist_id year duration
0 SONHOTT12A8C13493C Something Girls AR7G5I41187FB4CE6C 1982 233.40363
song_data = song_df.values[0].tolist()
song_data = [x if x else None for x in song_data]
cur.execute(song_table_insert, song_data)

#2: artists Table

Extract Data for Artist Table

Extract the data and insert it into the artist table.

artist_df = (
    df[
        [
            "artist_id",
            "artist_name",
            "artist_location",
            "artist_latitude",
            "artist_longitude",
        ]
    ]
)
artist_df.head()
artist_id artist_name artist_location artist_latitude artist_longitude
0 AR7G5I41187FB4CE6C Adam Ant London, England NaN NaN
artist_data = artist_df.values[0].tolist()
cur.execute(artist_table_insert, artist_data)

Process log_data

Now let's add the subscriber activity data to see which songs are popular.

log_data_root = Path().cwd() / "data" / "log_data"
log_files = get_files(log_data_root)
# just read first file to test functionality
filepath = log_files[0]
df = pd.read_json(filepath, lines=True)

#3: time Table

Extract and Insert Data into Time Table

Find what songs users are choosing by just considering NextSong records. Then convert the ts timestamp column to datetime and extract columns for an hour, day, week of the year, month, year, and weekday (see: Accessors dt Accessor that allows datetime properties to be easily accessed).

df = df.assign(ts=pd.to_datetime(df.ts, unit="ms"))
df = df.loc[df.page.isin(["NextSong"])]
df = df.assign(timestamp=pd.to_datetime(df.ts, unit="ms"))
df.timestamp = df.timestamp.dt.tz_localize("UTC")
time_df = pd.DataFrame(
    {
        "timestamp": df.timestamp,
        "hour": df.timestamp.dt.hour,
        "day": df.timestamp.dt.day,
        "week_of_year": df.timestamp.dt.week,
        "month": df.timestamp.dt.month,
        "year": df.timestamp.dt.year,
        "weekday": df.timestamp.dt.weekday,
    }
)
# Here we want native pandas datatypes, so I'll user iterrows.
for i, row in time_df.iterrows():
    cur.execute(time_table_insert, list(row))
time_df.head()
timestamp hour day week_of_year month year weekday
0 2018-11-11 02:33:56.796000+00:00 2 11 45 11 2018 6
1 2018-11-11 02:36:10.796000+00:00 2 11 45 11 2018 6
2 2018-11-11 02:40:34.796000+00:00 2 11 45 11 2018 6
4 2018-11-11 04:36:13.796000+00:00 4 11 45 11 2018 6
5 2018-11-11 04:36:46.796000+00:00 4 11 45 11 2018 6

#4: users Table

Extract and Insert Data into Users Table

Every time a user plays a song, they appear in the log file, so naturally, there are duplicate userId entries. Here we remove them to create a normalized user table.

user_df = df[["userId", "firstName", "lastName", "gender", "level"]]
user_df = user_df.drop_duplicates(subset="userId", keep="last")
user_df.head()
userId firstName lastName gender level
2 69 Anabelle Simpson F free
4 32 Lily Burns F free
5 75 Joseph Gutierrez M free
10 92 Ryann Smith F free
25 49 Chloe Cuevas F free
for i, row in user_df.iterrows():
    cur.execute(user_table_insert, row)

#5: songplays Table

Extract and Insert Data and Songplays Table

To look up a song or an artist using the primary key that uniquely identifies it. The log files only have the name of the song and artist. So, I need to do a reverse lookup up to get identifiers.

sql SELECT s.song_id, a.artist_id FROM dim_song s JOIN dim_artist a ON s.artist_id = a.artist_id WHERE s.title = %s AND a.name = %s AND s.duration = %s;

Iterating over the rows of the dataframe holding the log data. First, I extract the find the unique identifiers. Next, I combine them with other data from the log data to insert the user's songplay activity into the song_play table.

for index, row in df.iterrows():

    # get songid and artistid from song and artist tables
    cur.execute(song_select, (row.song, row.artist, row.length))
    results = cur.fetchone()

    if results:
        songid, artistid = results
    else:
        songid, artistid = None, None

    # insert songplay record
    songplay_data = (
        row.userId,
        songid,
        artistid,
        row.sessionId,
        row.ts,
        row.level,
        row.location,
        row.userAgent,
    )
    cur.execute(songplay_table_insert, songplay_data)

Close Connection to Sparkify Database

conn.close()

References

  1. Million Song Dataset - FAQ with fields and data types - Lists the fields and data-types used in the Million Song Dataset.

  2. Converting from Unix Timestamp to PostgreSQL Timestamp or Date - Explans how to go from Unix epoch time to a PostgreSQL timestamp value.

  3. PostgreSQL Keyword List - Note: USER is a reserved keyword in Postgres and cannot be used as a table name.

  4. Psycopg2 - Fast execution helpers - How to use the executemany() method to insert many rows into a table, at once.

  5. Using PostgreSQL SERIAL To Create Auto-increment Column - How to create a primary key that increments automatically.

  6. How to insert current_timestamp into Postgres via python - Explains how to easily insert timestamps into PostgreSQL by converting them to datetime objects in Python and then letting pscopg2 handle the rest.

  7. Pandas convert dataframe to an array of tuples - Examples and explanation of how to convert rows of pandas dataframe into tuples for insertion into the database.

  8. Psycopg2 Extras - Fast execution helpers - Explanation and examples of how to insert many records into a table in one transaction using psycopg2's executemany() method.

  9. How to UPSERT (MERGE, INSERT … ON DUPLICATE UPDATE) in PostgreSQL? - How to handle duplicate primary keys in PostgreSQL INSERT statements informally called upsert.