Wednesday, August 25, 2021

PySpark- End-to-End DataFrame With Apache Spark--1

We will see the basic exploration of Data using Pyspark Dataframe. try to solve the exercises mentioned in Learning Spark Book. 

Steps :

  1. read data from csv files
  2. converted into dataframes
  3. apply the sql queries to solve the problems
  4. apply the Dataframes to solve the same problems       


# import the required packages

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

# initiated the spark sessions

spark = (SparkSession
 .builder
 .appName("Learning-spark")
 .getOrCreate())

# declare the schema

We must declare the schema when we read data from any sources in spark and will give performances
for reading dataframes. True-- nullable or not null

firecallschema=StructType([StructField("CallNumber",IntegerType(),True),
                     StructField("UnitID",StringType(),True),
                     StructField("IncidentNumber",IntegerType(),True),
                     StructField("CallType",StringType(),True),
                     StructField("CallDate",DateType(),True),
                     StructField("WatchDate",DateType(),True),
                     StructField("CallFinalDisposition",StringType(),True),
                     StructField("AvailableDtTm",TimestampType(),True),
                     StructField("Address",StringType(),True),
                     StructField("City",StringType(),True),
                     StructField("Zipcode",IntegerType(),True),
                     StructField("Battalion",StringType(),True),
                     StructField("StationArea",IntegerType(),True),
                     StructField("Box",IntegerType(),True),
                     StructField("OriginalPriority",StringType(),True),
                     StructField("Priority",IntegerType(),True),
                     StructField("FinalPriority",IntegerType(),True),
                     StructField("ALSUnit",BooleanType(),True),
                     StructField("CallTypeGroup",StringType(),True),
                     StructField("NumAlarms",IntegerType(),True),
                     StructField("UnitType",StringType(),True),
                     StructField("UnitSequenceInCallDispatch",IntegerType(),True),
                     StructField("FirePreventionDistrict",StringType(),True),
                     StructField("SupervisorDistrict",StringType(),True),
                     StructField("Neighborhood",StringType(),True),
                     StructField("Location",StringType(),True),
                     StructField("RowID",StringType(),True),
                     StructField("Delay",DoubleType(),True)])

# read the dataframes
we will get the null values for date fields if we will not declare the dateformat in DataFrame reader.
.schema(firecallschema) -- datatypes declartions

firecalls = (spark.read.format("csv").schema(firecallschema).options(header="true")
             .option("dateFormat", "MM/dd/yyyy")
             .option("timestampFormat", "MM/dd/yyyy HH:mm:ss AM").
             load('/tables/sf_fire_call-1.csv'))
# to view the schema of the data frames.. similar to df.info() in pandas
firecalls.printSchema()

# convert into dataframe into tempview
firecalls.createOrReplaceTempView("firecalls_db")
spark.sql("Select * from firecalls_db")

# What were all the different types of fire calls in 2002.

# need to use the year function to extract the values --its for sql
select distinct CallType from firecalls_db where year(CallDate)=2002;

df1 = firecalls.select('callType').distinct().filter(expr("year(CallDate)")==2002)
display(df1)

No comments:

Post a Comment

PySpark- End-to-End DataFrame With Apache Spark--2

  We will see the basic exploration of Data using Pyspark Dataframe.  Scenario : we need to remove the string from int datatype field . if i...