The query comes first in Apache Cassandra. It drives the design of the table and its primary key. Cassandra is a partition row store -- a type of NoSQL database -- that features high-availability and partition tolerance at the expense of consistency. Scalability is where NoSQL databases have a decided advantage over relational databases. To be scalable and offer fast reads and writes means that Cassandra must be able to locate data quickly, and uses the primary key to do so. Cassandra relies upon a well designed primary key composed of a partition key and one or more clustering columns for its performance.

The primary key in Cassandra has two principle goals:

  1. Distribute the data evenly across nodes in the cluster

  2. Minimize the number of partitions read

Running on many nodes and offering fast reads and writes means that Cassandra must be able to locate data quickly. The primary key performs this job. Its first element -- the partition key -- maps rows to the node where they are stored. Organizing data on a node is the role of the second element of the primary key -- the clustering columns. The first clustering column is the primary sort. The second is next, and so on. Together -- the partition key and clustering columns -- provide a unique identifier for rows and enables Cassandra to locate them efficiently.

Unlike relational systems, de-normalized data are the norm for NoSQL databases. Designed for fast reads and writes, joining tables across nodes is too expensive an operation to support, and that is why JOIN is not a CQL keyword. So, how can we design tables to provide the high performance that Cassandra promises? It is simple: one-table-per-query, and here is how to do it. First, create the query. Then define the table to answer it. This approach allows you to design the query to answer user's questions. Working on one table in isolation is easy because it does not need to be normalized. In taking this approach, Cassandra trades-off as disk space for performance and simplicity.

Part I. ETL Pipeline for Pre-Processing the Files

Extract data from CSV files

  • Find all the event logs generated by user's playing songs using the music app
  • Append the data from every line in every file to a list
  • Write out the data to a single CSV file for analysis: event_datafile_new.csv

Import Python packages

# Import Python packages
from cassandra.cluster import Cluster
import pandas as pd
import cassandra
import re
import os
import glob
import numpy as np
import json
import csv
import collections

Creating a list of file paths to process original event CSV data files

# get the current folder and subfolder event data
filepath = os.getcwd() + '/event_data'

# find all directories (roots) under filepath
for root, dirs, files in os.walk(filepath):

    # join the file path and roots with the subdirectories using glob
    file_path_list = glob.glob(os.path.join(root, '*'))
    print("\n".join(file_path_list))
/home/workspace/event_data/2018-11-18-events.csv
/home/workspace/event_data/2018-11-15-events.csv
/home/workspace/event_data/2018-11-16-events.csv
/home/workspace/event_data/2018-11-09-events.csv
/home/workspace/event_data/2018-11-11-events.csv
/home/workspace/event_data/2018-11-17-events.csv
/home/workspace/event_data/2018-11-14-events.csv
/home/workspace/event_data/2018-11-02-events.csv
/home/workspace/event_data/2018-11-27-events.csv
/home/workspace/event_data/2018-11-13-events.csv
/home/workspace/event_data/2018-11-26-events.csv
/home/workspace/event_data/2018-11-03-events.csv
/home/workspace/event_data/2018-11-19-events.csv
/home/workspace/event_data/2018-11-25-events.csv
/home/workspace/event_data/2018-11-04-events.csv
/home/workspace/event_data/2018-11-29-events.csv
/home/workspace/event_data/2018-11-01-events.csv
/home/workspace/event_data/2018-11-06-events.csv
/home/workspace/event_data/2018-11-24-events.csv
/home/workspace/event_data/2018-11-22-events.csv
/home/workspace/event_data/2018-11-30-events.csv
/home/workspace/event_data/2018-11-10-events.csv
/home/workspace/event_data/2018-11-28-events.csv
/home/workspace/event_data/2018-11-07-events.csv
/home/workspace/event_data/2018-11-21-events.csv
/home/workspace/event_data/2018-11-08-events.csv
/home/workspace/event_data/2018-11-05-events.csv
/home/workspace/event_data/2018-11-23-events.csv
/home/workspace/event_data/2018-11-12-events.csv
/home/workspace/event_data/2018-11-20-events.csv

Read in the data from all the CSV files

# read music app event data from csv files

# list to data read from csv files
full_data_rows_list = []

# iterate over files
for f in file_path_list:

    # reading csv file
    with open(f, 'r', encoding='utf8', newline='') as csvfile:
        # creating a csv reader object
        csvreader = csv.reader(csvfile)
        next(csvreader)

        # append lines of file to list
        for line in csvreader:
            full_data_rows_list.append(line)

print(len(full_data_rows_list))
8056

Write out the data to one CSV file

# write the extracted to one csv file
# removing a subset of columns and removing blank lines

csv.register_dialect('myDialect', quoting=csv.QUOTE_ALL, skipinitialspace=True)

with open('event_datafile_new.csv', 'w', encoding='utf8', newline='') as f:
    writer = csv.writer(f, dialect='myDialect')
    writer.writerow(['artist', 'firstName', 'gender', 'itemInSession', 'lastName',
                     'length',                'level', 'location', 'sessionId', 'song', 'userId'])
    for row in full_data_rows_list:
        if (row[0] == ''):
            continue
        writer.writerow((row[0], row[2], row[3], row[4], row[5],
                         row[6], row[7], row[8], row[12], row[13], row[16]))
# check the number of rows in your csv file
with open('event_datafile_new.csv', 'r', encoding='utf8') as f:
    print(sum(1 for line in f))
6821

Part II. Load the data into Apache Cassandra

  • Data elements
    • artist
    • firstName of user
    • gender of user
    • item number in session
    • last name of user
    • length of the song
    • level (paid or free song)
    • location of the user
    • sessionId
    • song title
    • userId

The de-normalized data should similar to what is in event_datafile_new.csv after the code above is run:

Prepare Apache Cassandra cluster and keyspace

Creating a Cluster

# This should make a connection to a Cassandra instance your local machine
# (127.0.0.1)

cluster = Cluster(['127.0.0.1'])

# To establish connection and begin executing queries, need a session
session = cluster.connect()

Create Keyspace

try:
    session.execute("""
    CREATE KEYSPACE IF NOT EXISTS project2
    WITH REPLICATION = 
    {'class': 'SimpleStrategy', 'replication_factor': 1};
    """)
except Exception as e:
    print(e)

Set Keyspace

try:
    session.set_keyspace('project2')
except Exception as e:
    print(e)

Functions

I am applying the don't repeat yourself (DRY) principle here and keep all the fiddly code in one place.

def create_drop_query(table):
    """Create the CQL query to drop a table.

    Arguments:
    table -- table to drop
    """

    return f"DROP TABLE IF EXISTS {table};"
def perform_query(query, session=session, verbose=True):
    """Runs the query and returns results as a pandas dataframe.

    Arguments:
    session -- run query on this Cassandra session object
    query -- CQL query to run
    verbose -- diagnostics flag useful in debugging issues"""

    if verbose:
        print(f"query: {query}")

    try:
        rows = session.execute(query)
    except Exception as e:
        print(e)
        raise e

    # generators can only be consummed once
    rows = list(rows)
    df = pd.DataFrame(rows)

    if verbose:
        print("\nresults: " + 60*'-')
        print("\n".join([f"row:        {row}" for row in rows]))
        print(f"df.shape:   {df.shape}")
        print(f"df.columns: {df.columns}")
        print()

    return df
def lookup_values(keys, line):
    """
    Returns the values for the columns provided.

    Arguments:
    keys -- column names as a list or tuple
    line -- data values parsed from a line in the CSV data file
    """
    name_to_index = collections.defaultdict(None, {"artist":        line[0],                                                    "firstName":     line[1],                                                    "gender":        line[2],                                                     "itemInSession": int(line[3]),                                                    "lastName":      line[4],                                                    "length":        float(
        line[5]),                                                    "level":         line[6],                                                    "location":      line[7],                                                    "sessionId":     int(line[8]),                                                    "song":          line[9],                                                    "userId":        int(line[10])})
    return tuple(name_to_index.get(key) for key in keys)
def create_insert_query(table, columns):
    """Returns the CQL query to insert data from select columns into a table.

    Arguments:
    table -- where the data will be inserted
    columns -- columns to insert
    """
    query = f"INSERT INTO {table} "
    query += "(" + ", ".join(columns) + ") "
    # note: trailing comma after last %s is a syntax error
    query += "VALUES (" + ", ".join(["%s" for _ in range(len(columns))]) + ");"
    return query
# iterate over csv file inserting records into a table

def perform_insert_query(table, columns, file="event_datafile_new.csv", verbose=True):
    """Insert data from columns into table after performing any needed type conversions.

    Arguments:
    table -- where to insert data into
    columns -- CSV data columns to extract, perform type conversion and insert
    verbose -- diagnostics flag useful in debugging issues
    """
    query = create_insert_query(table, columns)

    with open(file, encoding='utf8') as f:
        csvreader = csv.reader(f)
        next(csvreader)  # skip header line
        for i, line in enumerate(csvreader):

            data_to_insert = lookup_values(columns, line)

            if verbose and i < 5:
                print(f"inserting line {i} " + 20*'-')
                print(f"line: {line}")
                print(f"columns: {columns}")
                print(f"query: {query}")
                print(f"data_to_insert: {data_to_insert}")
                print()

            try:
                session.execute(query, data_to_insert)
            except Exception as e:
                print(f"Exception: {e}")
                print(f"query: {query}")
                print(f"data_to_insert: {data_to_insert}")
                print(f"line: {line}")
                raise e
def display_results(table, columns, where_clause, limit=5, verbose=True):
    """
    Returns the selected rows as a pandas dataframe.

    Arguments:
    table -- the name of the table
    columns -- a list of column names to include
    where_clause -- selects data from the table and must reference elements of the primary key in the order specified
    limit -- the number of results to request from the database
    verbose -- additional information to help understand the results
    """

    query = "SELECT " + ", ".join(columns) + \
        f" FROM {table} WHERE {where_clause}"
    if limit:
        query += f" LIMIT {limit}"
    query += ";"

    if verbose:
        print(f"query:      {query}")

    try:
        rows = session.execute(query)
    except Exception as e:
        print(e)
        raise e

    # generators can only be consummed once
    rows = list(rows)
    df = pd.DataFrame(rows)

    if verbose:
        print("\nresults: " + 60*'-')
        print("\n".join([f"row:        {row}" for row in rows]))
        print(f"df.shape:   {df.shape}")
        print(f"df.columns: {df.columns}")
        print()

    return df

Perform Query 1 -- Session Item

Query: Find the artist, song title, and song's length listened to during sessionId = 338, and itemInSession = 4. In designing this query, it would be helpful to understand better the data stored by the app. For example, do sessionId_s repeat? Can multiple users have the same _sessionId. Here I am assuming they are unique across users.

Table columns: Requested information: artist, song and length. Primary key supporting where clause: sessionId and itemInSession.

Primary key: The primary key contains both a simple partition key (sessionId) and a single clustering column (itemInSession) that together uniquely identify rows.

Where clause: "sessionId = 338 AND itemInSession = 4" restricts the results to a single song

table = "session_item_table"

perform_query(create_drop_query(table))
query: DROP TABLE IF EXISTS session_item_table;

results: ------------------------------------------------------------

df.shape:   (0, 0)
df.columns: Index([], dtype='object')
# assume that combination of sessionId and itemInSession are unique across users
create_session_item_table_query = f"""
CREATE TABLE IF NOT EXISTS {table} (sessionId int, itemInSession int, artist text, song text, length float,
    PRIMARY KEY(sessionId, itemInSession)
);"""

perform_query(create_session_item_table_query)
query: 
CREATE TABLE IF NOT EXISTS session_item_table (sessionId int, itemInSession int, artist text, song text, length float,
    PRIMARY KEY(sessionId, itemInSession)
);

results: ------------------------------------------------------------

df.shape:   (0, 0)
df.columns: Index([], dtype='object')
columns = ("sessionId", "itemInSession", "artist", "song", "length")

perform_insert_query(table, columns,)
inserting line 0 --------------------
line: ['Rokia TraorÃ\x83©', 'Stefany', 'F', '0', 'White', '274.88608', 'free', 'Lubbock, TX', '693', 'Zen', '83']
columns: ('sessionId', 'itemInSession', 'artist', 'song', 'length')
query: INSERT INTO session_item_table (sessionId, itemInSession, artist, song, length) VALUES (%s, %s, %s, %s, %s);
data_to_insert: (693, 0, 'Rokia TraorÃ\x83©', 'Zen', 274.88608)

inserting line 1 --------------------
line: ['Camila', 'Tucker', 'M', '1', 'Garrison', '230.81751', 'free', 'Oxnard-Thousand Oaks-Ventura, CA', '555', 'Abrazame  (Version Acustica)', '40']
columns: ('sessionId', 'itemInSession', 'artist', 'song', 'length')
query: INSERT INTO session_item_table (sessionId, itemInSession, artist, song, length) VALUES (%s, %s, %s, %s, %s);
data_to_insert: (555, 1, 'Camila', 'Abrazame  (Version Acustica)', 230.81751)

inserting line 2 --------------------
line: ['Carl Thomas', 'Tucker', 'M', '0', 'Garrison', '196.67546', 'free', 'Oxnard-Thousand Oaks-Ventura, CA', '698', "You Ain't Right (Album Version)", '40']
columns: ('sessionId', 'itemInSession', 'artist', 'song', 'length')
query: INSERT INTO session_item_table (sessionId, itemInSession, artist, song, length) VALUES (%s, %s, %s, %s, %s);
data_to_insert: (698, 0, 'Carl Thomas', "You Ain't Right (Album Version)", 196.67546)

inserting line 3 --------------------
line: ['N.E.R.D.', 'James', 'M', '0', 'Martin', '242.99057', 'free', 'Dallas-Fort Worth-Arlington, TX', '78', 'Provider (Remix Radio Edit)', '79']
columns: ('sessionId', 'itemInSession', 'artist', 'song', 'length')
query: INSERT INTO session_item_table (sessionId, itemInSession, artist, song, length) VALUES (%s, %s, %s, %s, %s);
data_to_insert: (78, 0, 'N.E.R.D.', 'Provider (Remix Radio Edit)', 242.99057)

inserting line 4 --------------------
line: ['Lil Jon / The East Side Boyz / DJ Flexx', 'Jacqueline', 'F', '3', 'Lynch', '285.30893', 'paid', 'Atlanta-Sandy Springs-Roswell, GA', '589', 'Aww Skeet Skeet', '29']
columns: ('sessionId', 'itemInSession', 'artist', 'song', 'length')
query: INSERT INTO session_item_table (sessionId, itemInSession, artist, song, length) VALUES (%s, %s, %s, %s, %s);
data_to_insert: (589, 3, 'Lil Jon / The East Side Boyz / DJ Flexx', 'Aww Skeet Skeet', 285.30893)

Results of the query

where_clause = "sessionId = 338 AND itemInSession = 4"

display_results(table, ("artist", "song", "length"), where_clause)
query:      SELECT artist, song, length FROM session_item_table WHERE sessionId = 338 AND itemInSession = 4 LIMIT 5;

results: ------------------------------------------------------------
row:        Row(artist='Faithless', song='Music Matters (Mark Knight Dub)', length=495.30731201171875)
df.shape:   (1, 3)
df.columns: Index(['artist', 'song', 'length'], dtype='object')
artist song length
0 Faithless Music Matters (Mark Knight Dub) 495.307312

Perform Query 2 -- User / Session

Query: Find the name of artist, song (sorted by itemInSession) and user (firstName and lastName) for userid = 10, sessionid = 182.

Primary key: The primary key contains both a compound partition key (userId, sessionId) and a single clustering column (itemInSession). Using both userId and sessionId for the partition key assures that only one node will be visited to answer this query.

Table columns: Requested information: artist, song, firstName and lastName. Primary key supporing where clause: userId, sessionId and itemInSession (added to support sorting)

Where clause: "userId = 10 AND sessionId = 182"

table = "session_user_table"

perform_query(create_drop_query(table))
query: DROP TABLE IF EXISTS session_user_table;

results: ------------------------------------------------------------

df.shape:   (0, 0)
df.columns: Index([], dtype='object')
create_user_session_table_query = f"""
CREATE TABLE IF NOT EXISTS {table} (
    userId int,
    sessionId int,
    itemInSession int,
    artist text,
    song text, 
    firstName text, 
    lastName text, 
PRIMARY KEY((userId, sessionId), itemInSession)) WITH CLUSTERING ORDER BY (itemInSession ASC);"""

perform_query(create_user_session_table_query)
query: 
CREATE TABLE IF NOT EXISTS session_user_table (
    userId int,
    sessionId int,
    itemInSession int,
    artist text,
    song text, 
    firstName text, 
    lastName text, 
PRIMARY KEY((userId, sessionId), itemInSession)) WITH CLUSTERING ORDER BY (itemInSession ASC);

results: ------------------------------------------------------------

df.shape:   (0, 0)
df.columns: Index([], dtype='object')
perform_insert_query(table, ("userId", "sessionId",
                             "itemInSession", "artist", "song", "firstName", "lastName"))
inserting line 0 --------------------
line: ['Rokia TraorÃ\x83©', 'Stefany', 'F', '0', 'White', '274.88608', 'free', 'Lubbock, TX', '693', 'Zen', '83']
columns: ('userId', 'sessionId', 'itemInSession', 'artist', 'song', 'firstName', 'lastName')
query: INSERT INTO session_user_table (userId, sessionId, itemInSession, artist, song, firstName, lastName) VALUES (%s, %s, %s, %s, %s, %s, %s);
data_to_insert: (83, 693, 0, 'Rokia TraorÃ\x83©', 'Zen', 'Stefany', 'White')

inserting line 1 --------------------
line: ['Camila', 'Tucker', 'M', '1', 'Garrison', '230.81751', 'free', 'Oxnard-Thousand Oaks-Ventura, CA', '555', 'Abrazame  (Version Acustica)', '40']
columns: ('userId', 'sessionId', 'itemInSession', 'artist', 'song', 'firstName', 'lastName')
query: INSERT INTO session_user_table (userId, sessionId, itemInSession, artist, song, firstName, lastName) VALUES (%s, %s, %s, %s, %s, %s, %s);
data_to_insert: (40, 555, 1, 'Camila', 'Abrazame  (Version Acustica)', 'Tucker', 'Garrison')

inserting line 2 --------------------
line: ['Carl Thomas', 'Tucker', 'M', '0', 'Garrison', '196.67546', 'free', 'Oxnard-Thousand Oaks-Ventura, CA', '698', "You Ain't Right (Album Version)", '40']
columns: ('userId', 'sessionId', 'itemInSession', 'artist', 'song', 'firstName', 'lastName')
query: INSERT INTO session_user_table (userId, sessionId, itemInSession, artist, song, firstName, lastName) VALUES (%s, %s, %s, %s, %s, %s, %s);
data_to_insert: (40, 698, 0, 'Carl Thomas', "You Ain't Right (Album Version)", 'Tucker', 'Garrison')

inserting line 3 --------------------
line: ['N.E.R.D.', 'James', 'M', '0', 'Martin', '242.99057', 'free', 'Dallas-Fort Worth-Arlington, TX', '78', 'Provider (Remix Radio Edit)', '79']
columns: ('userId', 'sessionId', 'itemInSession', 'artist', 'song', 'firstName', 'lastName')
query: INSERT INTO session_user_table (userId, sessionId, itemInSession, artist, song, firstName, lastName) VALUES (%s, %s, %s, %s, %s, %s, %s);
data_to_insert: (79, 78, 0, 'N.E.R.D.', 'Provider (Remix Radio Edit)', 'James', 'Martin')

inserting line 4 --------------------
line: ['Lil Jon / The East Side Boyz / DJ Flexx', 'Jacqueline', 'F', '3', 'Lynch', '285.30893', 'paid', 'Atlanta-Sandy Springs-Roswell, GA', '589', 'Aww Skeet Skeet', '29']
columns: ('userId', 'sessionId', 'itemInSession', 'artist', 'song', 'firstName', 'lastName')
query: INSERT INTO session_user_table (userId, sessionId, itemInSession, artist, song, firstName, lastName) VALUES (%s, %s, %s, %s, %s, %s, %s);
data_to_insert: (29, 589, 3, 'Lil Jon / The East Side Boyz / DJ Flexx', 'Aww Skeet Skeet', 'Jacqueline', 'Lynch')

Results of the query

display_results(table, ("artist", "song", "firstName",
                        "lastName"), "userId = 10 AND sessionId = 182")
query:      SELECT artist, song, firstName, lastName FROM session_user_table WHERE userId = 10 AND sessionId = 182 LIMIT 5;

results: ------------------------------------------------------------
row:        Row(artist='Down To The Bone', song="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz')
row:        Row(artist='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz')
row:        Row(artist='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz')
row:        Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz')
df.shape:   (4, 4)
df.columns: Index(['artist', 'song', 'firstname', 'lastname'], dtype='object')
artist song firstname lastname
0 Down To The Bone Keep On Keepin' On Sylvie Cruz
1 Three Drives Greece 2000 Sylvie Cruz
2 Sebastien Tellier Kilometer Sylvie Cruz
3 Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio... Sylvie Cruz

Verify the results of the query showing sort order

display_results(table, ["artist", "song", "firstName", "lastName",
                        "userId", "itemInSession"], "userId = 10 AND sessionId = 182")
query:      SELECT artist, song, firstName, lastName, userId, itemInSession FROM session_user_table WHERE userId = 10 AND sessionId = 182 LIMIT 5;

results: ------------------------------------------------------------
row:        Row(artist='Down To The Bone', song="Keep On Keepin' On", firstname='Sylvie', lastname='Cruz', userid=10, iteminsession=0)
row:        Row(artist='Three Drives', song='Greece 2000', firstname='Sylvie', lastname='Cruz', userid=10, iteminsession=1)
row:        Row(artist='Sebastien Tellier', song='Kilometer', firstname='Sylvie', lastname='Cruz', userid=10, iteminsession=2)
row:        Row(artist='Lonnie Gordon', song='Catch You Baby (Steve Pitron & Max Sanna Radio Edit)', firstname='Sylvie', lastname='Cruz', userid=10, iteminsession=3)
df.shape:   (4, 6)
df.columns: Index(['artist', 'song', 'firstname', 'lastname', 'userid', 'iteminsession'], dtype='object')
artist song firstname lastname userid iteminsession
0 Down To The Bone Keep On Keepin' On Sylvie Cruz 10 0
1 Three Drives Greece 2000 Sylvie Cruz 10 1
2 Sebastien Tellier Kilometer Sylvie Cruz 10 2
3 Lonnie Gordon Catch You Baby (Steve Pitron & Max Sanna Radio... Sylvie Cruz 10 3

Perform Query 3 -- All Hands Listener's

Query: Find every user name (first and last) in the music app history who listened to the song 'All Hands Against His Own'

Primary key: The primary key contains both a simple partition key (song) and one clustering column (userId ). The song title is the focus of this query and the partition key too. UserId is unique across all users, where the combination of first and last names is probably not.

Table columns: Requested information: song, firstName and lastName. Primary key supporting where clause: song and userId.

Where clause: "song = 'All Hands Against His Own'"

table = "all_hands_listeners"

perform_query(create_drop_query(table))
query: DROP TABLE IF EXISTS all_hands_listeners;

results: ------------------------------------------------------------

df.shape:   (0, 0)
df.columns: Index([], dtype='object')
all_hands_users_query = f"""
CREATE TABLE IF NOT EXISTS {table} (
    song text,
    userId int,
    lastName text,
    firstName text,
    PRIMARY KEY(song, userId)
);"""

perform_query(all_hands_users_query)
query: 
CREATE TABLE IF NOT EXISTS all_hands_listeners (
    song text,
    userId int,
    lastName text,
    firstName text,
    PRIMARY KEY(song, userId)
);

results: ------------------------------------------------------------

df.shape:   (0, 0)
df.columns: Index([], dtype='object')
perform_insert_query(table, ("song", "userId", "lastName", "firstName"))
inserting line 0 --------------------
line: ['Rokia TraorÃ\x83©', 'Stefany', 'F', '0', 'White', '274.88608', 'free', 'Lubbock, TX', '693', 'Zen', '83']
columns: ('song', 'userId', 'lastName', 'firstName')
query: INSERT INTO all_hands_listeners (song, userId, lastName, firstName) VALUES (%s, %s, %s, %s);
data_to_insert: ('Zen', 83, 'White', 'Stefany')

inserting line 1 --------------------
line: ['Camila', 'Tucker', 'M', '1', 'Garrison', '230.81751', 'free', 'Oxnard-Thousand Oaks-Ventura, CA', '555', 'Abrazame  (Version Acustica)', '40']
columns: ('song', 'userId', 'lastName', 'firstName')
query: INSERT INTO all_hands_listeners (song, userId, lastName, firstName) VALUES (%s, %s, %s, %s);
data_to_insert: ('Abrazame  (Version Acustica)', 40, 'Garrison', 'Tucker')

inserting line 2 --------------------
line: ['Carl Thomas', 'Tucker', 'M', '0', 'Garrison', '196.67546', 'free', 'Oxnard-Thousand Oaks-Ventura, CA', '698', "You Ain't Right (Album Version)", '40']
columns: ('song', 'userId', 'lastName', 'firstName')
query: INSERT INTO all_hands_listeners (song, userId, lastName, firstName) VALUES (%s, %s, %s, %s);
data_to_insert: ("You Ain't Right (Album Version)", 40, 'Garrison', 'Tucker')

inserting line 3 --------------------
line: ['N.E.R.D.', 'James', 'M', '0', 'Martin', '242.99057', 'free', 'Dallas-Fort Worth-Arlington, TX', '78', 'Provider (Remix Radio Edit)', '79']
columns: ('song', 'userId', 'lastName', 'firstName')
query: INSERT INTO all_hands_listeners (song, userId, lastName, firstName) VALUES (%s, %s, %s, %s);
data_to_insert: ('Provider (Remix Radio Edit)', 79, 'Martin', 'James')

inserting line 4 --------------------
line: ['Lil Jon / The East Side Boyz / DJ Flexx', 'Jacqueline', 'F', '3', 'Lynch', '285.30893', 'paid', 'Atlanta-Sandy Springs-Roswell, GA', '589', 'Aww Skeet Skeet', '29']
columns: ('song', 'userId', 'lastName', 'firstName')
query: INSERT INTO all_hands_listeners (song, userId, lastName, firstName) VALUES (%s, %s, %s, %s);
data_to_insert: ('Aww Skeet Skeet', 29, 'Lynch', 'Jacqueline')

Results of the query

display_results(table, ["firstName", "lastName"],
                "song = 'All Hands Against His Own'")
query:      SELECT firstName, lastName FROM all_hands_listeners WHERE song = 'All Hands Against His Own' LIMIT 5;

results: ------------------------------------------------------------
row:        Row(firstname='Jacqueline', lastname='Lynch')
row:        Row(firstname='Tegan', lastname='Levine')
row:        Row(firstname='Sara', lastname='Johnson')
df.shape:   (3, 2)
df.columns: Index(['firstname', 'lastname'], dtype='object')
firstname lastname
0 Jacqueline Lynch
1 Tegan Levine
2 Sara Johnson

Verify the results of the query showing song title

display_results(table, ["userId", "firstName", "lastName", "song"],
                "song = 'All Hands Against His Own'")
query:      SELECT userId, firstName, lastName, song FROM all_hands_listeners WHERE song = 'All Hands Against His Own' LIMIT 5;

results: ------------------------------------------------------------
row:        Row(userid=29, firstname='Jacqueline', lastname='Lynch', song='All Hands Against His Own')
row:        Row(userid=80, firstname='Tegan', lastname='Levine', song='All Hands Against His Own')
row:        Row(userid=95, firstname='Sara', lastname='Johnson', song='All Hands Against His Own')
df.shape:   (3, 4)
df.columns: Index(['userid', 'firstname', 'lastname', 'song'], dtype='object')
userid firstname lastname song
0 29 Jacqueline Lynch All Hands Against His Own
1 80 Tegan Levine All Hands Against His Own
2 95 Sara Johnson All Hands Against His Own

Drop the tables before closing out the sessions

perform_query(create_drop_query("session_user_table"), verbose=False)
perform_query(create_drop_query("session_user_table"), verbose=False)
perform_query(create_drop_query("all_hands_users"), verbose=False)

Close the session and cluster connection¶

session.shutdown()
cluster.shutdown()

References