Thursday, November 18, 2021

PySpark- End-to-End DataFrame With Apache Spark--2

 We will see the basic exploration of Data using Pyspark Dataframe. 

Scenario :

we need to remove the string from int datatype field . if its string, need to make as null else returns original values.

Steps :

  1. read data from csv files
  2. converted into dataframes
  3. create the python functions
  4. register the python functions in spark
  5. call the function in spark dataframe


# import the required packages

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

# initiated the spark sessions

spark = (SparkSession
 .builder
 .appName("Cleaning data")
 .getOrCreate())


# declare the schema


# read the dataframes
we will get the null values for date fields if we will not declare the dateformat in DataFrame reader.
.schema(firecallschema) -- datatypes declartions

cleaning = (spark.read.format("csv").options(header="true").
             load('/FileStore/tables/cleaning.csv'))

# to view the schema of the data frames.. similar to df.info() in pandas
firecalls.printSchema()






# declare the python functions

def cleaning_int(string):
   try:
       x =int(string)
   except ValueError:
       x = None
   return x

#register the function in Spark layer

 spark.udf.register("cleaning_data",cleaning_int,IntegerType())

# convert into dataframe into tempview
firecalls.createOrReplaceTempView("firecalls_db")
spark.sql("Select * from firecalls_db")

# What were all the different types of fire calls in 2002.

 cleaning.withColumn('Salary',expr("cleaning_data(Salary)")).show()




Wednesday, August 25, 2021

PySpark- End-to-End DataFrame With Apache Spark--1

We will see the basic exploration of Data using Pyspark Dataframe. try to solve the exercises mentioned in Learning Spark Book. 

Steps :

  1. read data from csv files
  2. converted into dataframes
  3. apply the sql queries to solve the problems
  4. apply the Dataframes to solve the same problems       


# import the required packages

from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

# initiated the spark sessions

spark = (SparkSession
 .builder
 .appName("Learning-spark")
 .getOrCreate())

# declare the schema

We must declare the schema when we read data from any sources in spark and will give performances
for reading dataframes. True-- nullable or not null

firecallschema=StructType([StructField("CallNumber",IntegerType(),True),
                     StructField("UnitID",StringType(),True),
                     StructField("IncidentNumber",IntegerType(),True),
                     StructField("CallType",StringType(),True),
                     StructField("CallDate",DateType(),True),
                     StructField("WatchDate",DateType(),True),
                     StructField("CallFinalDisposition",StringType(),True),
                     StructField("AvailableDtTm",TimestampType(),True),
                     StructField("Address",StringType(),True),
                     StructField("City",StringType(),True),
                     StructField("Zipcode",IntegerType(),True),
                     StructField("Battalion",StringType(),True),
                     StructField("StationArea",IntegerType(),True),
                     StructField("Box",IntegerType(),True),
                     StructField("OriginalPriority",StringType(),True),
                     StructField("Priority",IntegerType(),True),
                     StructField("FinalPriority",IntegerType(),True),
                     StructField("ALSUnit",BooleanType(),True),
                     StructField("CallTypeGroup",StringType(),True),
                     StructField("NumAlarms",IntegerType(),True),
                     StructField("UnitType",StringType(),True),
                     StructField("UnitSequenceInCallDispatch",IntegerType(),True),
                     StructField("FirePreventionDistrict",StringType(),True),
                     StructField("SupervisorDistrict",StringType(),True),
                     StructField("Neighborhood",StringType(),True),
                     StructField("Location",StringType(),True),
                     StructField("RowID",StringType(),True),
                     StructField("Delay",DoubleType(),True)])

# read the dataframes
we will get the null values for date fields if we will not declare the dateformat in DataFrame reader.
.schema(firecallschema) -- datatypes declartions

firecalls = (spark.read.format("csv").schema(firecallschema).options(header="true")
             .option("dateFormat", "MM/dd/yyyy")
             .option("timestampFormat", "MM/dd/yyyy HH:mm:ss AM").
             load('/tables/sf_fire_call-1.csv'))
# to view the schema of the data frames.. similar to df.info() in pandas
firecalls.printSchema()

# convert into dataframe into tempview
firecalls.createOrReplaceTempView("firecalls_db")
spark.sql("Select * from firecalls_db")

# What were all the different types of fire calls in 2002.

# need to use the year function to extract the values --its for sql
select distinct CallType from firecalls_db where year(CallDate)=2002;

df1 = firecalls.select('callType').distinct().filter(expr("year(CallDate)")==2002)
display(df1)

Thursday, August 19, 2021

Hierarchy queries using recursive function in Python

 There is no direct function to find out the hierarchy levels in python similar to Oracle connect by clause. We have created recursive function to find the hierarchy queries.


#import the pandas packages
# to read the data from employees
import pandas as pd
df = pd.read_csv(r'Downloads\table.csv')
print(df.head(10))

#declare the varaibels
#write recursive functions
final_list=[]
dfs=[]
def get_parent_child_dtls(manager_id,level,sub_manager_list=[]):
    emp_list2=[]
    if level == 1:
        print(level)
        df1=df.query('MANAGER_ID == @manager_id')[['EMPLOYEE_ID','FIRST_NAME','MANAGER_ID']]
        #print(df.query('MANAGER_ID == @manager_id')[['EMPLOYEE_ID','FIRST_NAME','MANAGER_ID']])
        final_df = pd.DataFrame(df1, columns =['EMPLOYEE_ID','FIRST_NAME','MANAGER_ID'])
        #print(final_df)
        dfs.append(final_df)
        emp_list=[]
        emp_list = list(df1['EMPLOYEE_ID'])
        #print(emp_list)
        level = level + 1
        get_parent_child_dtls(manager_id,level,emp_list)
    elif level > 1:
        #print(level)
        level = level + 1
        employee_ids=''
        for emp in sub_manager_list:
            sub_manager=str(emp)
            #print(df.query('MANAGER_ID == @sub_manager')['EMPLOYEE_ID','FIRST_NAME','MANAGER_ID']])
            #return_df=return_df.append(df.query('MANAGER_ID == @sub_manager')[['EMPLOYEE_ID','FIRST_NAME','MANAGER_ID']])
            #add_managers_df(final_df,df.query('MANAGER_ID == @sub_manager')[['EMPLOYEE_ID','FIRST_NAME','MANAGER_ID']])
            #print(df.query('MANAGER_ID == @sub_manager')[['EMPLOYEE_ID','FIRST_NAME','MANAGER_ID']])
            dfs.append(df.query('MANAGER_ID == @sub_manager')[['EMPLOYEE_ID','FIRST_NAME','MANAGER_ID']])
            employee_ids=list(df.query('MANAGER_ID == @sub_manager')['EMPLOYEE_ID'].values)
            for child_emp in employee_ids:
                emp_list2.append(child_emp)
                print(child_emp)
        print('length---' + str(len(emp_list2)))
        print(emp_list2)
        if len(emp_list2)>0:
            get_parent_child_dtls(manager_id,level,emp_list2)
        else:
             result = pd.concat(dfs)
             #print(result)
             return result

#calling the functions
get_parent_child_dtls('100',1)
result = pd.concat(dfs,ignore_index=True)
print(result)

Saturday, August 14, 2021

Migrate File from On premise file server to Azure cloud using Python

Azure provides azure.storage.blob python packages to move the files from On Premise file to Azure Blob storage and its easy very efficient package for that purposes.

# Required packages

import configparser
import os
import glob
from azure.storage.blob import ContainerClient
import logging

Configparser                : to read the Azure/file properties from config file
OS                        :-to manipulate the files
Logging                          :-to capture the logs
azure.storage.blob          : Azure utility to move the files from on premise to Azure cloud

# to read the properties from config files

config = configparser.ConfigParser()
configfile = r'\PycharmProjects\Python\lib\Azure_config.ini'
properties = config.read(configfile)
log_file=r"\PycharmProjects\Python\lib\app.log"
#logging the basic information’s
logging.basicConfig(filename=log_file, filemode='w',format='%(asctime)s - %(message)s', level=logging.INFO)
logging.info('Admin logged in')

# define the function to capture the azure properties 


def load_azure_config():
    credentials = config['AZURE']['credentials']
    container = config['AZURE']['container']
    inbound_folder = config['AZURE']['inbound_folder']
    return credentials,container,inbound_folder


# get the input files

def get_input_files(inbound_folder):
     with os.scandir(inbound_folder) as files:
           for file in files:
               if file.is_file() and not file.name.startswith('.'):
                   yield file


# upload the files

def file_upload(files,connections_string,containter_name):
    container_client = ContainerClient.from_connection_string(connections_string,containter_name)
    print(type(container_client))
    for file in files:
        print(file)
        logging.info('copying the files to Azure cloud inprogress')
        blob_client=container_client.get_blob_client(file.name)
        print(type(blob_client))
        logging.info('files names' + file.name)
        with open(file.path,"rb") as data:
              print(data)
              #blob_client.upload_blob(data)
    return 0

# main functions

def main():
    credentials, container, inbound_folder = load_azure_config()
    logging.info('credentials' + credentials)
    logging.info('container' + container)
    logging.info('inbound_folder' + inbound_folder)
    list_of_files = get_input_files(inbound_folder)
    file_upload(list_of_files, credentials, container)
    logging.info('copying the files to Azure cloud Completed')
    return True

# call the main functions

if __name__ == "__main__":
    print(main())


PySpark- End-to-End DataFrame With Apache Spark--2

  We will see the basic exploration of Data using Pyspark Dataframe.  Scenario : we need to remove the string from int datatype field . if i...