We will see the basic exploration of Data using Pyspark Dataframe. try to solve the exercises mentioned in Learning Spark Book.
Steps :
- read data from csv files
- converted into dataframes
- apply the sql queries to solve the problems
- apply the Dataframes to solve the same problems
from pyspark.sql import * from pyspark.sql.types import * from pyspark.sql.functions import *
# initiated the spark sessions
spark = (SparkSession .builder .appName("Learning-spark") .getOrCreate())
# declare the schemaWe must declare the schema when we read data from any sources in spark and will give performancesfor reading dataframes. True-- nullable or not nullfirecallschema=StructType([StructField("CallNumber",IntegerType(),True), StructField("UnitID",StringType(),True), StructField("IncidentNumber",IntegerType(),True), StructField("CallType",StringType(),True), StructField("CallDate",DateType(),True), StructField("WatchDate",DateType(),True), StructField("CallFinalDisposition",StringType(),True), StructField("AvailableDtTm",TimestampType(),True), StructField("Address",StringType(),True), StructField("City",StringType(),True), StructField("Zipcode",IntegerType(),True), StructField("Battalion",StringType(),True), StructField("StationArea",IntegerType(),True), StructField("Box",IntegerType(),True), StructField("OriginalPriority",StringType(),True), StructField("Priority",IntegerType(),True), StructField("FinalPriority",IntegerType(),True), StructField("ALSUnit",BooleanType(),True), StructField("CallTypeGroup",StringType(),True), StructField("NumAlarms",IntegerType(),True), StructField("UnitType",StringType(),True), StructField("UnitSequenceInCallDispatch",IntegerType(),True), StructField("FirePreventionDistrict",StringType(),True), StructField("SupervisorDistrict",StringType(),True), StructField("Neighborhood",StringType(),True), StructField("Location",StringType(),True), StructField("RowID",StringType(),True), StructField("Delay",DoubleType(),True)])# read the dataframeswe will get the null values for date fields if we will not declare the dateformat in DataFrame reader..schema(firecallschema) -- datatypes declartionsfirecalls = (spark.read.format("csv").schema(firecallschema).options(header="true") .option("dateFormat", "MM/dd/yyyy") .option("timestampFormat", "MM/dd/yyyy HH:mm:ss AM"). load('/tables/sf_fire_call-1.csv'))# to view the schema of the data frames.. similar to df.info() in pandasfirecalls.printSchema()
# convert into dataframe into tempview
firecalls.createOrReplaceTempView("firecalls_db") spark.sql("Select * from firecalls_db")# What were all the different types of fire calls in 2002.
# need to use the year function to extract the values --its for sqlselect distinct CallType from firecalls_db where year(CallDate)=2002;df1 = firecalls.select('callType').distinct().filter(expr("year(CallDate)")==2002) display(df1)