Reference

Pyspark Examples in Transforms

Welcome to this focused guide on utilizing PySpark within the Datazone platform. This document is tailored to illustrate how PySpark can be seamlessly integrated into Datazone Transforms, enabling efficient data processing and transformation.

Scenario

Using three key datasets - orders, SKUs, and customers - as our foundation, we will explore various PySpark operations. Each example is designed to demonstrate practical applications in data transformation, showcasing the versatility and power of PySpark in a Datazone environment.

From basic operations like projection and filtering to more advanced techniques such as joins, unions, and aggregations, each section of this guide offers concise yet comprehensive insights into PySpark's capabilities. Whether you're a beginner or an experienced user, these examples provide a clear pathway to enhance your data processing workflows in Datazone using PySpark.

In the upcoming examples, the terms dataset and dataframe are used interchangeably. This means that whenever either term is mentioned, it refers to the same concept of a structured collection of data within our context.

Please note that the usage of Dataset(id="<dataset_orders_id>") in the provided code examples serves merely as an illustrative placeholder. It is important to replace the string <dataset_orders_id> with the actual identifier of your specific orders dataset when implementing these examples in your environment. This ensures that the code correctly references and interacts with your dataset.

Projection

Projection in PySpark is used to select specific columns from a DataFrame. This operation is similar to the SELECT statement in SQL. It is useful when you want to work with only a subset of columns in your dataset.

Example Use Case: Selecting only the order_id and order_date columns from an orders DataFrame.

from datazone import transform, Input, Dataset 
@transform(input_mapping=
           {"orders": Input(Dataset(id="dataset_orders_id"))}
          )
def select_orders(orders): 
  return orders.select("order_id", "order_date")

Filter

The filter operation is used to retrieve rows from a DataFrame that meet a specific condition. This is akin to the WHERE clause in SQL. It allows for both simple and complex filtering criteria.

Example Use Case: Fetching orders made after a certain date.

Filtering records based on a condition.

from datazone import transform, Input, Dataset 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders(orders): 
  return orders.filter(orders.order_date > "2023-01-01")

Filtering using conditions on multiple columns.

from datazone import transform, Input, Dataset 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders_multi(orders): 
  return orders.filter((orders.order_date > "2023-01-01") & (orders.customer_id == 102))

Column Rename

Column renaming is used to change the name of a column in a DataFrame. This is particularly useful for improving readability or when column names need to conform to certain naming conventions.

Example Use Case: Renaming order_date to date_of_order for clarity.

from datazone import transform, Input, Dataset
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def rename_order_column(orders):
    return orders.withColumnRenamed("order_date", "date_of_order")

On-the-fly Columns

Creating on-the-fly columns involves adding new columns to a DataFrame, often with calculated or static values. This is useful for adding derived metrics or flags to your data.

Example Use Case: Adding a new column status to an orders DataFrame to indicate processing status.

from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def add_new_column(orders):
    return orders.withColumn("status", lit("processed"))

Sorting

Sorting refers to arranging data in a specified order. In PySpark, the orderBy function is used to sort the DataFrame based on one or more columns, either in ascending or descending order.

Detailed Usage:

  • orderBy("column"): Sorts the DataFrame in ascending order based on the specified column.

  • orderBy("column", ascending=False): Sorts in descending order.

from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def sort_orders_by_date(orders):
    return orders.orderBy("order_date")

Joins

Join operations are used to combine two DataFrames based on a common key or condition. This is similar to joins in SQL and is essential for merging related datasets.

Types of Joins:

  • Inner Join: Returns rows that have matching values in both DataFrames.

  • Left/Right Outer Join: Returns all rows from the left/right DataFrame and matched rows from the other DataFrame.

  • Full Outer Join: Returns all rows when there is a match in one of the DataFrames.

  • Anti Join: Returns rows from the left DataFrame that do not have matching keys in the right DataFrame.

Example Use Case:

  • Inner Join: Find customers who have placed orders (common in both datasets).

  • Left Outer Join: Find all customers and their order details, if any.

  • Right Outer Join: Find all orders and their customer details, if any.

  • Anti Join: Find customers who have not placed any orders.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"customers": Input(Dataset(id="dataset_customers_id")),
                          "orders": Input(Dataset(id="dataset_orders_id"))})
def perform_various_joins(customers, orders):
    # Inner Join: Customers with orders
    inner_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "inner")
 
    # Left Outer Join: All customers, with order details if available
    left_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_outer")
 
    # Right Outer Join: All orders, with customer details if available
    right_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "right_outer")
 
    # Anti Join: Customers without orders
    anti_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_anti")
 
    return inner_join_df, left_outer_join_df, right_outer_join_df, anti_join_df

Scenario

Using three key datasets - orders, SKUs, and customers - as our foundation, we will explore various PySpark operations. Each example is designed to demonstrate practical applications in data transformation, showcasing the versatility and power of PySpark in a Datazone environment.

From basic operations like projection and filtering to more advanced techniques such as joins, unions, and aggregations, each section of this guide offers concise yet comprehensive insights into PySpark's capabilities. Whether you're a beginner or an experienced user, these examples provide a clear pathway to enhance your data processing workflows in Datazone using PySpark.

In the upcoming examples, the terms dataset and dataframe are used interchangeably. This means that whenever either term is mentioned, it refers to the same concept of a structured collection of data within our context.

Please note that the usage of Dataset(id="<dataset_orders_id>") in the provided code examples serves merely as an illustrative placeholder. It is important to replace the string <dataset_orders_id> with the actual identifier of your specific orders dataset when implementing these examples in your environment. This ensures that the code correctly references and interacts with your dataset.

Projection

Projection in PySpark is used to select specific columns from a DataFrame. This operation is similar to the SELECT statement in SQL. It is useful when you want to work with only a subset of columns in your dataset.

Example Use Case: Selecting only the order_id and order_date columns from an orders DataFrame.

from datazone import transform, Input, Dataset 
@transform(input_mapping=
           {"orders": Input(Dataset(id="dataset_orders_id"))}
          )
def select_orders(orders): 
  return orders.select("order_id", "order_date")

Filter

The filter operation is used to retrieve rows from a DataFrame that meet a specific condition. This is akin to the WHERE clause in SQL. It allows for both simple and complex filtering criteria.

Example Use Case: Fetching orders made after a certain date.

Filtering records based on a condition.

from datazone import transform, Input, Dataset 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders(orders): 
  return orders.filter(orders.order_date > "2023-01-01")

Filtering using conditions on multiple columns.

from datazone import transform, Input, Dataset 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders_multi(orders): 
  return orders.filter((orders.order_date > "2023-01-01") & (orders.customer_id == 102))

Column Rename

Column renaming is used to change the name of a column in a DataFrame. This is particularly useful for improving readability or when column names need to conform to certain naming conventions.

Example Use Case: Renaming order_date to date_of_order for clarity.

from datazone import transform, Input, Dataset
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def rename_order_column(orders):
    return orders.withColumnRenamed("order_date", "date_of_order")

On-the-fly Columns

Creating on-the-fly columns involves adding new columns to a DataFrame, often with calculated or static values. This is useful for adding derived metrics or flags to your data.

Example Use Case: Adding a new column status to an orders DataFrame to indicate processing status.

from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def add_new_column(orders):
    return orders.withColumn("status", lit("processed"))

Sorting

Sorting refers to arranging data in a specified order. In PySpark, the orderBy function is used to sort the DataFrame based on one or more columns, either in ascending or descending order.

Detailed Usage:

  • orderBy("column"): Sorts the DataFrame in ascending order based on the specified column.

  • orderBy("column", ascending=False): Sorts in descending order.

from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def sort_orders_by_date(orders):
    return orders.orderBy("order_date")

Joins

Join operations are used to combine two DataFrames based on a common key or condition. This is similar to joins in SQL and is essential for merging related datasets.

Types of Joins:

  • Inner Join: Returns rows that have matching values in both DataFrames.

  • Left/Right Outer Join: Returns all rows from the left/right DataFrame and matched rows from the other DataFrame.

  • Full Outer Join: Returns all rows when there is a match in one of the DataFrames.

  • Anti Join: Returns rows from the left DataFrame that do not have matching keys in the right DataFrame.

Example Use Case:

  • Inner Join: Find customers who have placed orders (common in both datasets).

  • Left Outer Join: Find all customers and their order details, if any.

  • Right Outer Join: Find all orders and their customer details, if any.

  • Anti Join: Find customers who have not placed any orders.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"customers": Input(Dataset(id="dataset_customers_id")),
                          "orders": Input(Dataset(id="dataset_orders_id"))})
def perform_various_joins(customers, orders):
    # Inner Join: Customers with orders
    inner_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "inner")
 
    # Left Outer Join: All customers, with order details if available
    left_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_outer")
 
    # Right Outer Join: All orders, with customer details if available
    right_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "right_outer")
 
    # Anti Join: Customers without orders
    anti_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_anti")
 
    return inner_join_df, left_outer_join_df, right_outer_join_df, anti_join_df

Scenario

Using three key datasets - orders, SKUs, and customers - as our foundation, we will explore various PySpark operations. Each example is designed to demonstrate practical applications in data transformation, showcasing the versatility and power of PySpark in a Datazone environment.

From basic operations like projection and filtering to more advanced techniques such as joins, unions, and aggregations, each section of this guide offers concise yet comprehensive insights into PySpark's capabilities. Whether you're a beginner or an experienced user, these examples provide a clear pathway to enhance your data processing workflows in Datazone using PySpark.

In the upcoming examples, the terms dataset and dataframe are used interchangeably. This means that whenever either term is mentioned, it refers to the same concept of a structured collection of data within our context.

Please note that the usage of Dataset(id="<dataset_orders_id>") in the provided code examples serves merely as an illustrative placeholder. It is important to replace the string <dataset_orders_id> with the actual identifier of your specific orders dataset when implementing these examples in your environment. This ensures that the code correctly references and interacts with your dataset.

Projection

Projection in PySpark is used to select specific columns from a DataFrame. This operation is similar to the SELECT statement in SQL. It is useful when you want to work with only a subset of columns in your dataset.

Example Use Case: Selecting only the order_id and order_date columns from an orders DataFrame.

from datazone import transform, Input, Dataset 
@transform(input_mapping=
           {"orders": Input(Dataset(id="dataset_orders_id"))}
          )
def select_orders(orders): 
  return orders.select("order_id", "order_date")

Filter

The filter operation is used to retrieve rows from a DataFrame that meet a specific condition. This is akin to the WHERE clause in SQL. It allows for both simple and complex filtering criteria.

Example Use Case: Fetching orders made after a certain date.

Filtering records based on a condition.

from datazone import transform, Input, Dataset 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders(orders): 
  return orders.filter(orders.order_date > "2023-01-01")

Filtering using conditions on multiple columns.

from datazone import transform, Input, Dataset 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders_multi(orders): 
  return orders.filter((orders.order_date > "2023-01-01") & (orders.customer_id == 102))

Column Rename

Column renaming is used to change the name of a column in a DataFrame. This is particularly useful for improving readability or when column names need to conform to certain naming conventions.

Example Use Case: Renaming order_date to date_of_order for clarity.

from datazone import transform, Input, Dataset
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def rename_order_column(orders):
    return orders.withColumnRenamed("order_date", "date_of_order")

On-the-fly Columns

Creating on-the-fly columns involves adding new columns to a DataFrame, often with calculated or static values. This is useful for adding derived metrics or flags to your data.

Example Use Case: Adding a new column status to an orders DataFrame to indicate processing status.

from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def add_new_column(orders):
    return orders.withColumn("status", lit("processed"))

Sorting

Sorting refers to arranging data in a specified order. In PySpark, the orderBy function is used to sort the DataFrame based on one or more columns, either in ascending or descending order.

Detailed Usage:

  • orderBy("column"): Sorts the DataFrame in ascending order based on the specified column.

  • orderBy("column", ascending=False): Sorts in descending order.

from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def sort_orders_by_date(orders):
    return orders.orderBy("order_date")

Joins

Join operations are used to combine two DataFrames based on a common key or condition. This is similar to joins in SQL and is essential for merging related datasets.

Types of Joins:

  • Inner Join: Returns rows that have matching values in both DataFrames.

  • Left/Right Outer Join: Returns all rows from the left/right DataFrame and matched rows from the other DataFrame.

  • Full Outer Join: Returns all rows when there is a match in one of the DataFrames.

  • Anti Join: Returns rows from the left DataFrame that do not have matching keys in the right DataFrame.

Example Use Case:

  • Inner Join: Find customers who have placed orders (common in both datasets).

  • Left Outer Join: Find all customers and their order details, if any.

  • Right Outer Join: Find all orders and their customer details, if any.

  • Anti Join: Find customers who have not placed any orders.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"customers": Input(Dataset(id="dataset_customers_id")),
                          "orders": Input(Dataset(id="dataset_orders_id"))})
def perform_various_joins(customers, orders):
    # Inner Join: Customers with orders
    inner_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "inner")
 
    # Left Outer Join: All customers, with order details if available
    left_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_outer")
 
    # Right Outer Join: All orders, with customer details if available
    right_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "right_outer")
 
    # Anti Join: Customers without orders
    anti_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_anti")
 
    return inner_join_df, left_outer_join_df, right_outer_join_df, anti_join_df

Scenario

Using three key datasets - orders, SKUs, and customers - as our foundation, we will explore various PySpark operations. Each example is designed to demonstrate practical applications in data transformation, showcasing the versatility and power of PySpark in a Datazone environment.

From basic operations like projection and filtering to more advanced techniques such as joins, unions, and aggregations, each section of this guide offers concise yet comprehensive insights into PySpark's capabilities. Whether you're a beginner or an experienced user, these examples provide a clear pathway to enhance your data processing workflows in Datazone using PySpark.

In the upcoming examples, the terms dataset and dataframe are used interchangeably. This means that whenever either term is mentioned, it refers to the same concept of a structured collection of data within our context.

Please note that the usage of Dataset(id="<dataset_orders_id>") in the provided code examples serves merely as an illustrative placeholder. It is important to replace the string <dataset_orders_id> with the actual identifier of your specific orders dataset when implementing these examples in your environment. This ensures that the code correctly references and interacts with your dataset.

Projection

Projection in PySpark is used to select specific columns from a DataFrame. This operation is similar to the SELECT statement in SQL. It is useful when you want to work with only a subset of columns in your dataset.

Example Use Case: Selecting only the order_id and order_date columns from an orders DataFrame.

from datazone import transform, Input, Dataset 
@transform(input_mapping=
           {"orders": Input(Dataset(id="dataset_orders_id"))}
          )
def select_orders(orders): 
  return orders.select("order_id", "order_date")

Filter

The filter operation is used to retrieve rows from a DataFrame that meet a specific condition. This is akin to the WHERE clause in SQL. It allows for both simple and complex filtering criteria.

Example Use Case: Fetching orders made after a certain date.

Filtering records based on a condition.

from datazone import transform, Input, Dataset 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders(orders): 
  return orders.filter(orders.order_date > "2023-01-01")

Filtering using conditions on multiple columns.

from datazone import transform, Input, Dataset 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders_multi(orders): 
  return orders.filter((orders.order_date > "2023-01-01") & (orders.customer_id == 102))

Column Rename

Column renaming is used to change the name of a column in a DataFrame. This is particularly useful for improving readability or when column names need to conform to certain naming conventions.

Example Use Case: Renaming order_date to date_of_order for clarity.

from datazone import transform, Input, Dataset
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def rename_order_column(orders):
    return orders.withColumnRenamed("order_date", "date_of_order")

On-the-fly Columns

Creating on-the-fly columns involves adding new columns to a DataFrame, often with calculated or static values. This is useful for adding derived metrics or flags to your data.

Example Use Case: Adding a new column status to an orders DataFrame to indicate processing status.

from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def add_new_column(orders):
    return orders.withColumn("status", lit("processed"))

Sorting

Sorting refers to arranging data in a specified order. In PySpark, the orderBy function is used to sort the DataFrame based on one or more columns, either in ascending or descending order.

Detailed Usage:

  • orderBy("column"): Sorts the DataFrame in ascending order based on the specified column.

  • orderBy("column", ascending=False): Sorts in descending order.

from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def sort_orders_by_date(orders):
    return orders.orderBy("order_date")

Joins

Join operations are used to combine two DataFrames based on a common key or condition. This is similar to joins in SQL and is essential for merging related datasets.

Types of Joins:

  • Inner Join: Returns rows that have matching values in both DataFrames.

  • Left/Right Outer Join: Returns all rows from the left/right DataFrame and matched rows from the other DataFrame.

  • Full Outer Join: Returns all rows when there is a match in one of the DataFrames.

  • Anti Join: Returns rows from the left DataFrame that do not have matching keys in the right DataFrame.

Example Use Case:

  • Inner Join: Find customers who have placed orders (common in both datasets).

  • Left Outer Join: Find all customers and their order details, if any.

  • Right Outer Join: Find all orders and their customer details, if any.

  • Anti Join: Find customers who have not placed any orders.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"customers": Input(Dataset(id="dataset_customers_id")),
                          "orders": Input(Dataset(id="dataset_orders_id"))})
def perform_various_joins(customers, orders):
    # Inner Join: Customers with orders
    inner_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "inner")
 
    # Left Outer Join: All customers, with order details if available
    left_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_outer")
 
    # Right Outer Join: All orders, with customer details if available
    right_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "right_outer")
 
    # Anti Join: Customers without orders
    anti_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_anti")
 
    return inner_join_df, left_outer_join_df, right_outer_join_df, anti_join_df

Scenario

Using three key datasets - orders, SKUs, and customers - as our foundation, we will explore various PySpark operations. Each example is designed to demonstrate practical applications in data transformation, showcasing the versatility and power of PySpark in a Datazone environment.

From basic operations like projection and filtering to more advanced techniques such as joins, unions, and aggregations, each section of this guide offers concise yet comprehensive insights into PySpark's capabilities. Whether you're a beginner or an experienced user, these examples provide a clear pathway to enhance your data processing workflows in Datazone using PySpark.

In the upcoming examples, the terms dataset and dataframe are used interchangeably. This means that whenever either term is mentioned, it refers to the same concept of a structured collection of data within our context.

Please note that the usage of Dataset(id="<dataset_orders_id>") in the provided code examples serves merely as an illustrative placeholder. It is important to replace the string <dataset_orders_id> with the actual identifier of your specific orders dataset when implementing these examples in your environment. This ensures that the code correctly references and interacts with your dataset.

Projection

Projection in PySpark is used to select specific columns from a DataFrame. This operation is similar to the SELECT statement in SQL. It is useful when you want to work with only a subset of columns in your dataset.

Example Use Case: Selecting only the order_id and order_date columns from an orders DataFrame.

from datazone import transform, Input, Dataset 
@transform(input_mapping=
           {"orders": Input(Dataset(id="dataset_orders_id"))}
          )
def select_orders(orders): 
  return orders.select("order_id", "order_date")

Filter

The filter operation is used to retrieve rows from a DataFrame that meet a specific condition. This is akin to the WHERE clause in SQL. It allows for both simple and complex filtering criteria.

Example Use Case: Fetching orders made after a certain date.

Filtering records based on a condition.

from datazone import transform, Input, Dataset 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders(orders): 
  return orders.filter(orders.order_date > "2023-01-01")

Filtering using conditions on multiple columns.

from datazone import transform, Input, Dataset 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders_multi(orders): 
  return orders.filter((orders.order_date > "2023-01-01") & (orders.customer_id == 102))

Column Rename

Column renaming is used to change the name of a column in a DataFrame. This is particularly useful for improving readability or when column names need to conform to certain naming conventions.

Example Use Case: Renaming order_date to date_of_order for clarity.

from datazone import transform, Input, Dataset
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def rename_order_column(orders):
    return orders.withColumnRenamed("order_date", "date_of_order")

On-the-fly Columns

Creating on-the-fly columns involves adding new columns to a DataFrame, often with calculated or static values. This is useful for adding derived metrics or flags to your data.

Example Use Case: Adding a new column status to an orders DataFrame to indicate processing status.

from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def add_new_column(orders):
    return orders.withColumn("status", lit("processed"))

Sorting

Sorting refers to arranging data in a specified order. In PySpark, the orderBy function is used to sort the DataFrame based on one or more columns, either in ascending or descending order.

Detailed Usage:

  • orderBy("column"): Sorts the DataFrame in ascending order based on the specified column.

  • orderBy("column", ascending=False): Sorts in descending order.

from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def sort_orders_by_date(orders):
    return orders.orderBy("order_date")

Joins

Join operations are used to combine two DataFrames based on a common key or condition. This is similar to joins in SQL and is essential for merging related datasets.

Types of Joins:

  • Inner Join: Returns rows that have matching values in both DataFrames.

  • Left/Right Outer Join: Returns all rows from the left/right DataFrame and matched rows from the other DataFrame.

  • Full Outer Join: Returns all rows when there is a match in one of the DataFrames.

  • Anti Join: Returns rows from the left DataFrame that do not have matching keys in the right DataFrame.

Example Use Case:

  • Inner Join: Find customers who have placed orders (common in both datasets).

  • Left Outer Join: Find all customers and their order details, if any.

  • Right Outer Join: Find all orders and their customer details, if any.

  • Anti Join: Find customers who have not placed any orders.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"customers": Input(Dataset(id="dataset_customers_id")),
                          "orders": Input(Dataset(id="dataset_orders_id"))})
def perform_various_joins(customers, orders):
    # Inner Join: Customers with orders
    inner_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "inner")
 
    # Left Outer Join: All customers, with order details if available
    left_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_outer")
 
    # Right Outer Join: All orders, with customer details if available
    right_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "right_outer")
 
    # Anti Join: Customers without orders
    anti_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_anti")
 
    return inner_join_df, left_outer_join_df, right_outer_join_df, anti_join_df

Dataset

© Copyright 2024. All rights reserved.

Reference

Pyspark Examples in Transforms

Welcome to this focused guide on utilizing PySpark within the Datazone platform. This document is tailored to illustrate how PySpark can be seamlessly integrated into Datazone Transforms, enabling efficient data processing and transformation.

Scenario

Using three key datasets - orders, SKUs, and customers - as our foundation, we will explore various PySpark operations. Each example is designed to demonstrate practical applications in data transformation, showcasing the versatility and power of PySpark in a Datazone environment.

From basic operations like projection and filtering to more advanced techniques such as joins, unions, and aggregations, each section of this guide offers concise yet comprehensive insights into PySpark's capabilities. Whether you're a beginner or an experienced user, these examples provide a clear pathway to enhance your data processing workflows in Datazone using PySpark.

In the upcoming examples, the terms dataset and dataframe are used interchangeably. This means that whenever either term is mentioned, it refers to the same concept of a structured collection of data within our context.

Please note that the usage of Dataset(id="<dataset_orders_id>") in the provided code examples serves merely as an illustrative placeholder. It is important to replace the string <dataset_orders_id> with the actual identifier of your specific orders dataset when implementing these examples in your environment. This ensures that the code correctly references and interacts with your dataset.

Projection

Projection in PySpark is used to select specific columns from a DataFrame. This operation is similar to the SELECT statement in SQL. It is useful when you want to work with only a subset of columns in your dataset.

Example Use Case: Selecting only the order_id and order_date columns from an orders DataFrame.

from datazone import transform, Input, Dataset 
@transform(input_mapping=
           {"orders": Input(Dataset(id="dataset_orders_id"))}
          )
def select_orders(orders): 
  return orders.select("order_id", "order_date")

Filter

The filter operation is used to retrieve rows from a DataFrame that meet a specific condition. This is akin to the WHERE clause in SQL. It allows for both simple and complex filtering criteria.

Example Use Case: Fetching orders made after a certain date.

Filtering records based on a condition.

from datazone import transform, Input, Dataset 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders(orders): 
  return orders.filter(orders.order_date > "2023-01-01")

Filtering using conditions on multiple columns.

from datazone import transform, Input, Dataset 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def filter_orders_multi(orders): 
  return orders.filter((orders.order_date > "2023-01-01") & (orders.customer_id == 102))

Column Rename

Column renaming is used to change the name of a column in a DataFrame. This is particularly useful for improving readability or when column names need to conform to certain naming conventions.

Example Use Case: Renaming order_date to date_of_order for clarity.

from datazone import transform, Input, Dataset
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def rename_order_column(orders):
    return orders.withColumnRenamed("order_date", "date_of_order")

On-the-fly Columns

Creating on-the-fly columns involves adding new columns to a DataFrame, often with calculated or static values. This is useful for adding derived metrics or flags to your data.

Example Use Case: Adding a new column status to an orders DataFrame to indicate processing status.

from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def add_new_column(orders):
    return orders.withColumn("status", lit("processed"))

Sorting

Sorting refers to arranging data in a specified order. In PySpark, the orderBy function is used to sort the DataFrame based on one or more columns, either in ascending or descending order.

Detailed Usage:

  • orderBy("column"): Sorts the DataFrame in ascending order based on the specified column.

  • orderBy("column", ascending=False): Sorts in descending order.

from datazone import transform, Input, Dataset
from pyspark.sql.functions import lit
 
@transform(input_mapping={"orders": Input(Dataset(id="dataset_orders_id"))})
def sort_orders_by_date(orders):
    return orders.orderBy("order_date")

Joins

Join operations are used to combine two DataFrames based on a common key or condition. This is similar to joins in SQL and is essential for merging related datasets.

Types of Joins:

  • Inner Join: Returns rows that have matching values in both DataFrames.

  • Left/Right Outer Join: Returns all rows from the left/right DataFrame and matched rows from the other DataFrame.

  • Full Outer Join: Returns all rows when there is a match in one of the DataFrames.

  • Anti Join: Returns rows from the left DataFrame that do not have matching keys in the right DataFrame.

Example Use Case:

  • Inner Join: Find customers who have placed orders (common in both datasets).

  • Left Outer Join: Find all customers and their order details, if any.

  • Right Outer Join: Find all orders and their customer details, if any.

  • Anti Join: Find customers who have not placed any orders.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"customers": Input(Dataset(id="dataset_customers_id")),
                          "orders": Input(Dataset(id="dataset_orders_id"))})
def perform_various_joins(customers, orders):
    # Inner Join: Customers with orders
    inner_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "inner")
 
    # Left Outer Join: All customers, with order details if available
    left_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_outer")
 
    # Right Outer Join: All orders, with customer details if available
    right_outer_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "right_outer")
 
    # Anti Join: Customers without orders
    anti_join_df = customers.join(orders, customers.customer_id == orders.customer_id, "left_anti")
 
    return inner_join_df, left_outer_join_df, right_outer_join_df, anti_join_df

Dataset

© Copyright 2024. All rights reserved.