We will see the basic exploration of Data using Pyspark Dataframe. try to solve the exercises mentioned in Learning Spark Book.
Steps :
- read data from csv files
- converted into dataframes
- apply the sql queries to solve the problems
- apply the Dataframes to solve the same problems
# import the required packages
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
# initiated the spark sessions
spark = (SparkSession
.builder
.appName("Learning-spark")
.getOrCreate())
# declare the schema
We must declare the schema when we read data from any sources in spark and will give performances
for reading dataframes. True-- nullable or not null
firecallschema=StructType([StructField("CallNumber",IntegerType(),True),
StructField("UnitID",StringType(),True),
StructField("IncidentNumber",IntegerType(),True),
StructField("CallType",StringType(),True),
StructField("CallDate",DateType(),True),
StructField("WatchDate",DateType(),True),
StructField("CallFinalDisposition",StringType(),True),
StructField("AvailableDtTm",TimestampType(),True),
StructField("Address",StringType(),True),
StructField("City",StringType(),True),
StructField("Zipcode",IntegerType(),True),
StructField("Battalion",StringType(),True),
StructField("StationArea",IntegerType(),True),
StructField("Box",IntegerType(),True),
StructField("OriginalPriority",StringType(),True),
StructField("Priority",IntegerType(),True),
StructField("FinalPriority",IntegerType(),True),
StructField("ALSUnit",BooleanType(),True),
StructField("CallTypeGroup",StringType(),True),
StructField("NumAlarms",IntegerType(),True),
StructField("UnitType",StringType(),True),
StructField("UnitSequenceInCallDispatch",IntegerType(),True),
StructField("FirePreventionDistrict",StringType(),True),
StructField("SupervisorDistrict",StringType(),True),
StructField("Neighborhood",StringType(),True),
StructField("Location",StringType(),True),
StructField("RowID",StringType(),True),
StructField("Delay",DoubleType(),True)])
# read the dataframes
we will get the null values for date fields if we will not declare the dateformat in DataFrame reader.
.schema(firecallschema) -- datatypes declartions
firecalls = (spark.read.format("csv").schema(firecallschema).options(header="true")
.option("dateFormat", "MM/dd/yyyy")
.option("timestampFormat", "MM/dd/yyyy HH:mm:ss AM").
load('/tables/sf_fire_call-1.csv'))
# to view the schema of the data frames.. similar to df.info() in pandas
firecalls.printSchema()
# convert into dataframe into tempview
firecalls.createOrReplaceTempView("firecalls_db")
spark.sql("Select * from firecalls_db")
# What were all the different types of fire calls in 2002.
# need to use the year function to extract the values --its for sql
select distinct CallType from firecalls_db where year(CallDate)=2002;
df1 = firecalls.select('callType').distinct().filter(expr("year(CallDate)")==2002)
display(df1)