In this article, we will delve into a real-time project
scenario commonly encountered by data engineers.
In a real-time setting, whenever new files are placed
into a source folder, a Kafka message is generated in JSON format. The
ingestion process reads the data from the JSON file and performs various checks
to determine the file type, location, and destination (e.g., table or file). If
all the checks pass successfully, the ingestion process proceeds to load the
data into the designated destination system.
The ingestion process is designed to handle different
types of files, such as CSV, TXT, and Excel files. It identifies each file type
by examining the file extension, distinguishing between .csv, .xls, .xlsx, or
.txt formats. Each file is processed accordingly, ensuring compatibility with
the intended destination system.
By implementing this real-time project scenario, data
engineers can streamline data ingestion processes and reliably transfer diverse
file types into the destination system.
Json file format
{
"file_name" :
"G:\\ETL_Automation\\data\\dept1.csv",
"file_type" :
"csv"
}
check if json file present or not, if file is present
read inside csv/xls file
Step 1: Import libraries
import pandas as pd
import json
import os
import sys
import csv
step 2: Read csv file using
def read_csv(file):
try:
df=pd.read_csv(file)
print(df)
except FileNotFoundError:
print("File not
found")
except Exception as err:
print("An Exception
error",err)
step 3: Read Excel file
def read_xls(file):
try:
df=pd.read_excel(file,sheet_name='data')
print(df)
except FileNotFoundError:
print("File not
found")
except Exception as err:
print("An Exception
error",err)
Step 4:Read json file and
inside content
# check json file exists or not, if yes get data
def read_json_file(json_file):
try:
with open(json_file, 'r') as
jfile:
#data =
pd.read_json(jsonfile)
data=json.load(jfile)
print(data)
except FileNotFoundError:
print("JSON file not
found")
exit()
# Check the file
type
file_name = data.get('file_name', '')
file_type = data.get('file_type', '')
# check csv or
xls
if file_name.endswith(".csv"):
read_csv(file_name)
elif file_name.endswith('.xls') or
file_name.endswith('.xlsx'):
read_xls(file_name)
else:
print("Unsupported file
type")
jsonfile=r'G:\ETL_Automation\data\file_json.json'
read_json_file(jsonfile)
0 Comments
Thanks for your message.