ETL using Python (Example 2)

We will do the following steps using Python

  • Read several CSV, JSON and XML file types.

  • Extract data from the above file types and combine it.

  • Transform data.

  • Save the transformed data in a ready-to-load format which data engineers can use to load into an RDBMS

Please Download the :datasorce.zip" Zip file and extract it. Replace “C:\Nazmul\Blog\Python\ETL\source\” to the location where you have extracted the Zip File. The following Python script is attacehed "ETL script using Python (Example 2).py"

##Import the required modules and functions

import glob # this module helps in selecting files

import pandas as pd # this module helps in processing CSV files

import xml.etree.ElementTree as ET # this module helps in processing XML files.

from datetime import datetime

### Set Path

tmpfile = "dealership_temp.tmp" # file used to store all extracted data

logfile = "dealership_logfile.txt" # all event logs will be stored in this file

targetfile = r"C:\Nazmul\Blog\Python\ETL\datasource\dealership_transformed_data.csv" #

file where transformed data is stored

### CSV Extract Function

def extract_from_csv(file_to_process):

dataframe = pd.read_csv(file_to_process)

return dataframe

### JSON Extract Function

def extract_from_json(file_to_process):

dataframe = pd.read_json(file_to_process,lines=True)

return dataframe

### XML Extract Function

def extract_from_xml(file_to_process):

dataframe = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel'])

tree = ET.parse(file_to_process)

root = tree.getroot()

for person in root:

car_model = person.find("car_model").text

year_of_manufacture = int(person.find("year_of_manufacture").text)

price = float(person.find("price").text)

fuel = person.find("fuel").text

dataframe = dataframe.append({"car_model":car_model,

"year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel},

ignore_index=True)

return dataframe

### Extract Function

def extract():

extracted_data = pd.DataFrame(columns=['car_model','year_of_manufacture','price',

'fuel']) # create an empty data frame to hold extracted data

#process all csv files

for csvfile in glob.glob(r"C:\Nazmul\Blog\Python\ETL\datasource\*.csv"):

extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)

#process all json files

for jsonfile in glob.glob(r"C:\Nazmul\Blog\Python\ETL\datasource\*.json"):

extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)

#process all xml files

for xmlfile in glob.glob(r"C:\Nazmul\Blog\Python\ETL\datasource\*.xml"):

extracted_data = extracted_data.append(extract_from_xml(xmlfile), ignore_index=True)

return extracted_data

### Transform

#Round the price columns to 2 decimal places

def transform(data):

data['price'] = round(data.price,2)

return data

### Loading

def load(targetfile,data_to_load):

data_to_load.to_csv(targetfile)

### Logging

def log(message):

timestamp_format = '%Y-%h-%d-%H:%M:%S' # Year-Monthname-Day-Hour-Minute-Second

now = datetime.now() # get current timestamp

timestamp = now.strftime(timestamp_format)

with open("logfile.txt","a") as f:

f.write(timestamp + ',' + message + '\n')

###Running ETL Process

log("ETL Job Started")

log("Extract phase Started")

extracted_data = extract()

log("Extract phase Ended")

extracted_data

log("Transform phase Started")

transformed_data = transform(extracted_data)

log("Transform phase Ended")

transformed_data

log("Load phase Started")

load(targetfile,transformed_data)

log("Load phase Ended")

log("ETL Job Ended")