Lab 7 - SparkSQL

Important

Make sure that you are connected to the Saxanet WiFi network and not the GuestNet network. SSH (TCP port 22) is blocked on GuesNet which means if you are on GuesNet you wil not be able to connect to your cloud VMs or clone repos from GitHub via SSH. Use SaxaNet. #

PySpark Introduction

The right data tool…

Sometimes you only need simple solutions to problems. Perfection is the enemy of the good. We’ll walk through a few options when considering a big data job.

Here are a few options when your data is local:

  • Using a single process in R/python
  • Writing an R/python function to run in parallel on a single node
  • Using Python cluster tools like Ray or Dask
  • Using PySpark on a single node to development with multiple workers

Here are a few options when your data is distributed

  • Using Dask on a cluster
  • Using PySpark on a cluster

Here are a few options for task-specific needs:

  • Harnessing a specialized solution like GPU data science with RAPIDS. This software is developed so you can conduct your entire data science pipeline on a GPU.
  • TensorFlow distributed netural network training with Spark!
  • Large language model processing with Spark!

RDD vs PySparkSQL DataFrame

RDD (resilient distributed dataset) is an immutable collection of records (rows) of data

  • great for unstructured data
  • doing low-level transformations

PySpark DataFrame is organized into a table with rows and columns. This is just like the format when working in a relational database like Hive.

  • Two dimensional table format, great for “standard” datasets
  • Column format means consistent metadata and data typing
  • Query optimization
  • Schema is created automatically!

Common Actions in R, Python Pandas, and PySpark

Anything you can do, I can do in parallel!

TDS big data options

head(starwars)

Reading in data

R

library(arrow)
library(dplyr)

starwars <- arrow::read_parquet('starwars.parquet')

Pandas

import os
import pandas as pd
starwars = pd.read_parquet('starwars.parquet')

PySpark

import pyspark
from pyspark.sql.functions import udf, lit, col
from pyspark.sql import SparkSession

# configuration for workers
config = pyspark.SparkConf().setAll([('spark.executor.memory', '10g'),
                             ('spark.executor.cores', '2'),
                             ('spark.cores.max', '16'),])
# launch cluster connection
sc = pyspark.SparkContext(conf = config)

# set up pyspark session
spark = pyspark.SparkSession.builder.appName('my-test').getOrCreate()

starwars = spark.read.load('starwars.parquet')

Selecting data variables

R

starwars_select <- starwars %>% select(name, height, mass)

Pandas

starwars_select = starwars[['name','height','mass']]

PySparkSQL

starwars_select = starwars.select(['name','height','mass'])

Filtering data rows

R

starwars_filter <- starwars %>% filter(height > 110, 
                                       homeworld == "Tatooine")

Pandas

starwars_filter = starwars[(starwars.height > 110) & 
                        (starwars.homeworld == "Tatooine")]

PySpark

starwars_filter = starwars[(col('height') > 110) &
                    (col('homeworld') == "Tatooine")]

Manipulating data

R

starwars <- starwars %>% 
    mutate(tatooine_dummy = if_else(homeworld == 'Tatooine',
                                    TRUE,
                                    FALSE))

Pandas

starwars['tatooine_dummy'] = starwars.apply(
                                  lambda x: True if x.homeworld == 'Tatooine' 
                                        else False, 
                                                axis = 1)

PySpark

from pyspark.sql.types import BooleanType
@udf(returnType=BooleanType())
def dummy_tatooine(x):
    if x == 'Tatooine':
        return True
    else:
        return False

starwars = starwars.withColumn('tatooine_dummy', 
                dummy_tatooine(col('homeworld')))

View the head of the data

R

starwars %>% head(5)

Pandas

starwars.head(5)

PySpark

starwars.take(5) # RDD version

starwars.show(5) # SQL version

Group-by mean data

R

starwars %>% group_by(species) %>% summarize(mean_height = mean(height))

Pandas

starwars.groupby('species')['height'].mean()

PySpark

starwars.groupBy('species').mean('height').collect()

Tallest character from each species

R

starwars %>% group_by(species) %>% filter(height = max(height))

Pandas

starwars.iloc[starwars.groupby('species').height.idxmax().tolist(),:]

starwars.sort_values('height',ascending=False).groupby('species').first()

PySpark

temp_df = starwars.groupBy('species').agg(f.max('height').alias('height'))
starwars.groupBy.join(temp_df,on='height',how='leftsemi').show()

from pyspark.sql import Window
w = Window.partitionBy('species')
starwars.withColumn('maxheight', f.max('height').over(w))\
    .where(f.col('height') == f.col('maxheight'))\
    .drop('maxheight')\
    .show()

Collecting Data

Be extra careful when using the .collect() function. If you have massive amounts of data, then your spark driver is going to have trouble.

In general, always run a .count() function to check the number of rows before running .collect(). Alternatively, you can run the command .show(5) or .take(5) to only see the first few rows of data. You never want to bring 10s of millions of rows to your local session. Let the big data live in big data land.

GitHub Classroom

GitHub Classroom Link