Lab 7 - SparkSQL
Make sure that you are connected to the Saxanet WiFi network and not the GuestNet network. SSH (TCP port 22) is blocked on GuesNet which means if you are on GuesNet you wil not be able to connect to your cloud VMs or clone repos from GitHub via SSH. Use SaxaNet. #
PySpark Introduction
The right data tool…
Sometimes you only need simple solutions to problems. Perfection is the enemy of the good. We’ll walk through a few options when considering a big data job.
Here are a few options when your data is local:
- Using a single process in R/python
- Writing an R/python function to run in parallel on a single node
- Using Python cluster tools like Ray or Dask
- Using PySpark on a single node to development with multiple workers
Here are a few options when your data is distributed
- Using Dask on a cluster
- Using PySpark on a cluster
Here are a few options for task-specific needs:
- Harnessing a specialized solution like GPU data science with RAPIDS. This software is developed so you can conduct your entire data science pipeline on a GPU.
- TensorFlow distributed netural network training with Spark!
- Large language model processing with Spark!
RDD vs PySparkSQL DataFrame
RDD (resilient distributed dataset) is an immutable collection of records (rows) of data
- great for unstructured data
- doing low-level transformations
PySpark DataFrame is organized into a table with rows and columns. This is just like the format when working in a relational database like Hive.
- Two dimensional table format, great for “standard” datasets
- Column format means consistent metadata and data typing
- Query optimization
- Schema is created automatically!
Common Actions in R, Python Pandas, and PySpark
Anything you can do, I can do in parallel!
head(starwars)
Reading in data
R
library(arrow)
library(dplyr)
<- arrow::read_parquet('starwars.parquet') starwars
Pandas
import os
import pandas as pd
= pd.read_parquet('starwars.parquet') starwars
PySpark
import pyspark
from pyspark.sql.functions import udf, lit, col
from pyspark.sql import SparkSession
# configuration for workers
= pyspark.SparkConf().setAll([('spark.executor.memory', '10g'),
config 'spark.executor.cores', '2'),
('spark.cores.max', '16'),])
(# launch cluster connection
= pyspark.SparkContext(conf = config)
sc
# set up pyspark session
= pyspark.SparkSession.builder.appName('my-test').getOrCreate()
spark
= spark.read.load('starwars.parquet') starwars
Selecting data variables
R
<- starwars %>% select(name, height, mass) starwars_select
Pandas
= starwars[['name','height','mass']] starwars_select
PySparkSQL
= starwars.select(['name','height','mass']) starwars_select
Filtering data rows
R
<- starwars %>% filter(height > 110,
starwars_filter == "Tatooine") homeworld
Pandas
= starwars[(starwars.height > 110) &
starwars_filter == "Tatooine")] (starwars.homeworld
PySpark
= starwars[(col('height') > 110) &
starwars_filter 'homeworld') == "Tatooine")] (col(
Manipulating data
R
<- starwars %>%
starwars mutate(tatooine_dummy = if_else(homeworld == 'Tatooine',
TRUE,
FALSE))
Pandas
'tatooine_dummy'] = starwars.apply(
starwars[lambda x: True if x.homeworld == 'Tatooine'
else False,
= 1) axis
PySpark
from pyspark.sql.types import BooleanType
@udf(returnType=BooleanType())
def dummy_tatooine(x):
if x == 'Tatooine':
return True
else:
return False
= starwars.withColumn('tatooine_dummy',
starwars 'homeworld'))) dummy_tatooine(col(
View the head of the data
R
%>% head(5) starwars
Pandas
5) starwars.head(
PySpark
5) # RDD version
starwars.take(
5) # SQL version starwars.show(
Group-by mean data
R
%>% group_by(species) %>% summarize(mean_height = mean(height)) starwars
Pandas
'species')['height'].mean() starwars.groupby(
PySpark
'species').mean('height').collect() starwars.groupBy(
Tallest character from each species
R
%>% group_by(species) %>% filter(height = max(height)) starwars
Pandas
'species').height.idxmax().tolist(),:]
starwars.iloc[starwars.groupby(
'height',ascending=False).groupby('species').first() starwars.sort_values(
PySpark
= starwars.groupBy('species').agg(f.max('height').alias('height'))
temp_df ='height',how='leftsemi').show()
starwars.groupBy.join(temp_df,on
from pyspark.sql import Window
= Window.partitionBy('species')
w 'maxheight', f.max('height').over(w))\
starwars.withColumn('height') == f.col('maxheight'))\
.where(f.col('maxheight')\
.drop( .show()
Collecting Data
Be extra careful when using the .collect()
function. If you have massive amounts of data, then your spark driver is going to have trouble.
In general, always run a .count()
function to check the number of rows before running .collect()
. Alternatively, you can run the command .show(5)
or .take(5)
to only see the first few rows of data. You never want to bring 10s of millions of rows to your local session. Let the big data live in big data land.