Lab 7 - SparkSQL
Make sure that you are connected to the Saxanet WiFi network and not the GuestNet network. SSH (TCP port 22) is blocked on GuesNet which means if you are on GuesNet you wil not be able to connect to your cloud VMs or clone repos from GitHub via SSH. Use SaxaNet. #
PySpark Introduction
The right data tool…
Sometimes you only need simple solutions to problems. Perfection is the enemy of the good. We’ll walk through a few options when considering a big data job.
Here are a few options when your data is local:
- Using a single process in R/python
- Writing an R/python function to run in parallel on a single node
- Using Python cluster tools like Ray or Dask
- Using PySpark on a single node to development with multiple workers
Here are a few options when your data is distributed
- Using Dask on a cluster
- Using PySpark on a cluster
Here are a few options for task-specific needs:
- Harnessing a specialized solution like GPU data science with RAPIDS. This software is developed so you can conduct your entire data science pipeline on a GPU.
- TensorFlow distributed netural network training with Spark!
- Large language model processing with Spark!
RDD vs PySparkSQL DataFrame
RDD (resilient distributed dataset) is an immutable collection of records (rows) of data
- great for unstructured data
- doing low-level transformations
PySpark DataFrame is organized into a table with rows and columns. This is just like the format when working in a relational database like Hive.
- Two dimensional table format, great for “standard” datasets
- Column format means consistent metadata and data typing
- Query optimization
- Schema is created automatically!
Common Actions in R, Python Pandas, and PySpark
Anything you can do, I can do in parallel!
head(starwars)Reading in data
R
library(arrow)
library(dplyr)
starwars <- arrow::read_parquet('starwars.parquet')Pandas
import os
import pandas as pd
starwars = pd.read_parquet('starwars.parquet')PySpark
import pyspark
from pyspark.sql.functions import udf, lit, col
from pyspark.sql import SparkSession
# configuration for workers
config = pyspark.SparkConf().setAll([('spark.executor.memory', '10g'),
('spark.executor.cores', '2'),
('spark.cores.max', '16'),])
# launch cluster connection
sc = pyspark.SparkContext(conf = config)
# set up pyspark session
spark = pyspark.SparkSession.builder.appName('my-test').getOrCreate()
starwars = spark.read.load('starwars.parquet')Selecting data variables
R
starwars_select <- starwars %>% select(name, height, mass)Pandas
starwars_select = starwars[['name','height','mass']]PySparkSQL
starwars_select = starwars.select(['name','height','mass'])Filtering data rows
R
starwars_filter <- starwars %>% filter(height > 110,
homeworld == "Tatooine")Pandas
starwars_filter = starwars[(starwars.height > 110) &
(starwars.homeworld == "Tatooine")]PySpark
starwars_filter = starwars[(col('height') > 110) &
(col('homeworld') == "Tatooine")]Manipulating data
R
starwars <- starwars %>%
mutate(tatooine_dummy = if_else(homeworld == 'Tatooine',
TRUE,
FALSE))Pandas
starwars['tatooine_dummy'] = starwars.apply(
lambda x: True if x.homeworld == 'Tatooine'
else False,
axis = 1)PySpark
from pyspark.sql.types import BooleanType
@udf(returnType=BooleanType())
def dummy_tatooine(x):
if x == 'Tatooine':
return True
else:
return False
starwars = starwars.withColumn('tatooine_dummy',
dummy_tatooine(col('homeworld')))View the head of the data
R
starwars %>% head(5)Pandas
starwars.head(5)PySpark
starwars.take(5) # RDD version
starwars.show(5) # SQL versionGroup-by mean data
R
starwars %>% group_by(species) %>% summarize(mean_height = mean(height))Pandas
starwars.groupby('species')['height'].mean()PySpark
starwars.groupBy('species').mean('height').collect()Tallest character from each species
R
starwars %>% group_by(species) %>% filter(height = max(height))Pandas
starwars.iloc[starwars.groupby('species').height.idxmax().tolist(),:]
starwars.sort_values('height',ascending=False).groupby('species').first()PySpark
temp_df = starwars.groupBy('species').agg(f.max('height').alias('height'))
starwars.groupBy.join(temp_df,on='height',how='leftsemi').show()
from pyspark.sql import Window
w = Window.partitionBy('species')
starwars.withColumn('maxheight', f.max('height').over(w))\
.where(f.col('height') == f.col('maxheight'))\
.drop('maxheight')\
.show()Collecting Data
Be extra careful when using the .collect() function. If you have massive amounts of data, then your spark driver is going to have trouble.
In general, always run a .count() function to check the number of rows before running .collect(). Alternatively, you can run the command .show(5) or .take(5) to only see the first few rows of data. You never want to bring 10s of millions of rows to your local session. Let the big data live in big data land.

