Workspace
Jorge Alejandro Cruz Rivera/

Project

0
Beta
Spinner

Walmart is the biggest retail store in the United States. Just like others, they have been expanding their e-commerce part of the business. By the end of 2022, e-commerce represented a roaring $80 billion in sales, which is 13% of total sales of Walmart. One of the main factors that affects their sales is public holidays, like the Super Bowl, Labour Day, Thanksgiving, and Christmas.

In this project, you have been tasked with creating a data pipeline for the analysis of demand and supply around the holidays and running preliminary analysis of the data. You will be working with two data sources: grocery sales and complementary data. You have been provided with the grocery_sales table in PostgreSQL database and extra_data.parquet file that contains complementary data.

Here is information about all the available columns in the two data files:

  • "index" - unique ID of the row
  • "Store_ID" - the store number
  • "Date" - the week of sales
  • "Weekly_Sales" - sales for the given store
  • "IsHoliday" - Whether the week contains a public holiday - 1 if yes, 0 if no.
  • "Temperature" - Temperature on the day of sale
  • "Fuel_Price" - Cost of fuel in the region
  • "CPI" – Prevailing consumer price index
  • "Unemployment" - The prevailing unemployment rate
  • "MarkDown1", "MarkDown2", "MarkDown3", "MarkDown4" - number of promotional markdowns
  • "Dept" - Department Number in each store
  • "Size" - size of the store
  • "Type" - type of the store (depends on Size column)

You will need to merge those files for further data manipulations and store the merged file in the clean_data.csv file that should contain the following columns:

  • "Store_ID"
  • "Month"
  • "Dept"
  • "IsHoliday"
  • "Weekly_Sales"
  • "CPI"
  • ""Unemployment""

After merging and cleaning the data, you will have to analyze monthly sales of Walmart and store the results of your analysis in the agg_date.csv file that should look like:

MonthWeekly_Sales
1.033174.178494
2.034333.326579
......

It is recommended to use pandas for this project.

Unknown integration
DataFrameavailable as
store_df
variable
-- Write your SQL query here
SELECT * FROM grocery_sales;
This query is taking long to finish...Consider adding a LIMIT clause or switching to Query mode to preview the result.
# Import required packages
import pandas as pd
import numpy as np
import logging
import os

# Start coding here...
logging.basicConfig(format='%(process)d-%(levelname)s-%(message)s')

# Create the extract() function with two parameters: one for the store data and the other one for the extra data
# Read the extra data from the parquet file and merge the DataFrames using "index" column
def extract(store_df, file):
    parquet_df = pd.DataFrame()
    merged_df = pd.DataFrame()
    try:
        if not store_df.empty:
            parquet_df = pd.read_parquet(file)
            merged_df = store_df.merge(parquet_df, how='inner', on='index')
            logging.info('Data extracted correctly')
        else:
            logging.warning('Check data, df is empty')
    except Exception as e:
       logging.error('An error ocurred while extracting data: ', e)
    return merged_df

# Call the extract() function and store it as the "merged_df" variable
merged_df = extract(store_df, "extra_data.parquet")

# Create the transform() function with one parameter: "raw_data"
def transform(raw_data):
    try:
        if not raw_data.empty:
            # Fill NaNs using mean since we are dealing with numeric columns
            # Set inplace = True to do the replacing on the current DataFrame
            raw_data.fillna(
              {
                  'CPI': raw_data['CPI'].mean(),
                  'Weekly_Sales': raw_data['Weekly_Sales'].mean(),
                  'Unemployment': raw_data['Unemployment'].mean(),
              }, inplace = True
            )
            # Define the type of the "Date" column and its format
            raw_data["Date"] = pd.to_datetime(raw_data["Date"], format = "%Y-%m-%d")
            # Extract the month value from the "Date" column to calculate monthly sales later on
            raw_data["Month"] = raw_data["Date"].dt.month
            
            # Filter the entire DataFrame using the "Weekly_Sales" column. Use .loc to access a group of rows
            raw_data = raw_data.loc[raw_data["Weekly_Sales"] > 10000, :]
            
            # Drop unnecessary columns. Set axis = 1 to specify that the columns should be removed
            final_table_columns = ['Store_ID', 'Month', 'Dept', 'IsHoliday',
                                   'Weekly_Sales', 'CPI', 'Unemployment']
            raw_data = raw_data.drop(columns=[col for col in raw_data 
                                                     if col not in final_table_columns], axis=1)
            logging.info('Data transformed correctly')
        else:
            logging.warning('Check data, df is empty')
    except Exception as e:
       logging.error('An error ocurred while transforming data: ', e)
    return raw_data

# Call the transform() function and pass the merged DataFrame
clean_data = transform(merged_df)

# Create the avg_monthly_sales function that takes in the cleaned data from the last step
def avg_monthly_sales(clean_data):
    holidays_sales = pd.DataFrame()
    try:
        if not clean_data.empty:
            # Select the "Month" and "Weekly_Sales" columns as they are the only ones needed for this analysis
            holidays_sales = clean_data[["Month", "Weekly_Sales"]]
            
            # Create a chain operation with groupby(), agg(), reset_index(), and round() functions
            # Group by the "Month" column and calculate the average monthly sales
            # Call reset_index() to start a new index order
            # Round the results to two decimal places
            holidays_sales = (holidays_sales.groupby("Month")
            .agg(Avg_Sales = ("Weekly_Sales", "mean"))
            .reset_index().round(2))
            logging.info('Data aggregated correctly')
        else:
            logging.warning('Check data, df is empty')
    except Exception as e:
       logging.error('An error ocurred while aggregating data: ', e)
    return holidays_sales

# Call the avg_monthly_sales() function and pass the cleaned DataFrame
agg_data = avg_monthly_sales(clean_data)

# Create the load() function that takes in the cleaned DataFrame and the aggregated one with the paths where they are going to be stored
def load(clean_data, clean_data_file_path, agg_sales, agg_sales_file_path):
    try:
        if not (clean_data.empty and agg_sales.empty):
            clean_data.to_csv(clean_data_file_path, index=False)
            agg_sales.to_csv(agg_sales_file_path, index=False)
            logging.info('Data saved correctly')
        else:
            logging.warning('Check data, df is empty')
    except Exception as e:
       logging.error('An error ocurred while saving data: ', e)
    
# Call the load() function and pass the cleaned and aggregated DataFrames with their paths    
load(clean_data, "clean_data.csv", agg_data, "agg_data.csv")

# Create the validation() function with one parameter: file_path - to check whether the previous function was correctly executed
def validation(file_path):
    try:
       # Use the "os" package to check whether a path exists
        file_exists = os.path.exists(file_path)
        logging.info('{0} exists'.format(file_exists))
        # Raise an exception if the path doesn't exist, hence, if there is no file found on a given path
        if not file_exists:
            logging.error('An error ocurred while validating files')
            raise Exception(f"There is no file at the path {file_path}")
    except FileNotFoundError as e:
        logging.error('An error ocurred while validating files: ', e)
        
# Call the validation() function and pass first, the cleaned DataFrame path, and then the aggregated DataFrame path
validation("clean_data.csv")
validation("agg_data.csv")
  • AI Chat
  • Code