Concepts

Transform

The Transform entity represents a data processing unit within the data platform, primarily realized through PySpark scripts. These transforms take datasets as input, apply specific data manipulation or analysis operations, and produce new datasets or insights. They are essential for data transformation, enrichment, and preparation for further analysis or reporting.

Usage

  • Data Transformation: Transforms are used to modify or enrich data, such as filtering specific records, aggregating data, or transforming data structures.

  • Pipeline Formation: Multiple transforms can be chained to form a data processing pipeline, where the output of one transform becomes the input to another.

  • Reusable Code Units: Transforms are designed to be modular and reusable across different data processing workflows.

Properties

  • id: The id of the dataset

  • run_upstream: If true, the dataset will be extracted before the transform execution

  • freshness_duration: The duration of the freshness of the dataset

  • input_mapping: This input definition is used to specify the input parameters of the transform function. It can get one of more Datasets or another Transform(s) as input.

  • output_mapping: The output definition is used to specify the output of the transform. Thus, one can specify the output dataset properties such as the id, the name, the description, the tags.

Example

The transform decorator is used on a python function to specify it as a transform.

from datazone import transform
 
@transform
def simple_transform():
    return spark.createDataFrame(
        [(1, 2, 3), (4, 5, 6), (7, 8, 9)],
        ("col1", "col2", "col3")
    )


Also one can specify the input of the transform by using the input_mapping parameter of the decorator. The input mapping is a dictionary where the key is the name of the input and the value is a Input instance.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"countries": Input(Dataset(id="64ae6d92441cd6c5842b707c"))})
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")


Transform Inputs

Also one can combine multiple type of inputs in the same transform.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"countries": Input(Dataset(id="64ae6d92441cd6c5842b707c"))})
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")
 
 
@transform(
    input_mapping={
        "europe_countries": Input(europe_countries),
        "country_capitals": Input(Dataset(id="64afbef81fe54be6c8b49251"))
    }
)
def europe_countries_with_capital(europe_countries, country_capitals):
    joined_table = europe_countries.join(
        country_capitals,
        europe_countries["name"] == country_capitals["country_name"],
        "left"
    )
    return joined_table.select("name", "population", "capital")


Output Definitions

The output definition is used to specify the output of the transform. Thus, one can specify the output dataset properties such as the id, the name, the description, the tags.


from datazone import transform, Input, Dataset
 
@transform(
    input_mapping={
        "countries": Input(Dataset(id="64ae6d92441cd6c5842b707c")),
    },
    output_mapping={
        "europe_country_table": Output(Dataset(tags=["geographical"])
    },
)
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")

It creates a dataset with the named europe_country_table and has the tags geographical.

Note: Only first time the transform is executed, the dataset gets that name. If you change the name of the output dataset, it will not change the name of the dataset in the catalog. You can change the name of the dataset on the UI.

Best Practices

  • Descriptive Naming: Use clear and descriptive names for transforms to easily identify their purpose.

  • Modularity: Design transforms to be self-contained and focused on a specific task or data manipulation operation.

  • Error Handling/Logging: Implement robust error handling and logging within transforms to manage exceptions and ensure data integrity.

Usage

  • Data Transformation: Transforms are used to modify or enrich data, such as filtering specific records, aggregating data, or transforming data structures.

  • Pipeline Formation: Multiple transforms can be chained to form a data processing pipeline, where the output of one transform becomes the input to another.

  • Reusable Code Units: Transforms are designed to be modular and reusable across different data processing workflows.

Properties

  • id: The id of the dataset

  • run_upstream: If true, the dataset will be extracted before the transform execution

  • freshness_duration: The duration of the freshness of the dataset

  • input_mapping: This input definition is used to specify the input parameters of the transform function. It can get one of more Datasets or another Transform(s) as input.

  • output_mapping: The output definition is used to specify the output of the transform. Thus, one can specify the output dataset properties such as the id, the name, the description, the tags.

Example

The transform decorator is used on a python function to specify it as a transform.

from datazone import transform
 
@transform
def simple_transform():
    return spark.createDataFrame(
        [(1, 2, 3), (4, 5, 6), (7, 8, 9)],
        ("col1", "col2", "col3")
    )


Also one can specify the input of the transform by using the input_mapping parameter of the decorator. The input mapping is a dictionary where the key is the name of the input and the value is a Input instance.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"countries": Input(Dataset(id="64ae6d92441cd6c5842b707c"))})
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")


Transform Inputs

Also one can combine multiple type of inputs in the same transform.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"countries": Input(Dataset(id="64ae6d92441cd6c5842b707c"))})
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")
 
 
@transform(
    input_mapping={
        "europe_countries": Input(europe_countries),
        "country_capitals": Input(Dataset(id="64afbef81fe54be6c8b49251"))
    }
)
def europe_countries_with_capital(europe_countries, country_capitals):
    joined_table = europe_countries.join(
        country_capitals,
        europe_countries["name"] == country_capitals["country_name"],
        "left"
    )
    return joined_table.select("name", "population", "capital")


Output Definitions

The output definition is used to specify the output of the transform. Thus, one can specify the output dataset properties such as the id, the name, the description, the tags.


from datazone import transform, Input, Dataset
 
@transform(
    input_mapping={
        "countries": Input(Dataset(id="64ae6d92441cd6c5842b707c")),
    },
    output_mapping={
        "europe_country_table": Output(Dataset(tags=["geographical"])
    },
)
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")

It creates a dataset with the named europe_country_table and has the tags geographical.

Note: Only first time the transform is executed, the dataset gets that name. If you change the name of the output dataset, it will not change the name of the dataset in the catalog. You can change the name of the dataset on the UI.

Best Practices

  • Descriptive Naming: Use clear and descriptive names for transforms to easily identify their purpose.

  • Modularity: Design transforms to be self-contained and focused on a specific task or data manipulation operation.

  • Error Handling/Logging: Implement robust error handling and logging within transforms to manage exceptions and ensure data integrity.

Usage

  • Data Transformation: Transforms are used to modify or enrich data, such as filtering specific records, aggregating data, or transforming data structures.

  • Pipeline Formation: Multiple transforms can be chained to form a data processing pipeline, where the output of one transform becomes the input to another.

  • Reusable Code Units: Transforms are designed to be modular and reusable across different data processing workflows.

Properties

  • id: The id of the dataset

  • run_upstream: If true, the dataset will be extracted before the transform execution

  • freshness_duration: The duration of the freshness of the dataset

  • input_mapping: This input definition is used to specify the input parameters of the transform function. It can get one of more Datasets or another Transform(s) as input.

  • output_mapping: The output definition is used to specify the output of the transform. Thus, one can specify the output dataset properties such as the id, the name, the description, the tags.

Example

The transform decorator is used on a python function to specify it as a transform.

from datazone import transform
 
@transform
def simple_transform():
    return spark.createDataFrame(
        [(1, 2, 3), (4, 5, 6), (7, 8, 9)],
        ("col1", "col2", "col3")
    )


Also one can specify the input of the transform by using the input_mapping parameter of the decorator. The input mapping is a dictionary where the key is the name of the input and the value is a Input instance.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"countries": Input(Dataset(id="64ae6d92441cd6c5842b707c"))})
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")


Transform Inputs

Also one can combine multiple type of inputs in the same transform.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"countries": Input(Dataset(id="64ae6d92441cd6c5842b707c"))})
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")
 
 
@transform(
    input_mapping={
        "europe_countries": Input(europe_countries),
        "country_capitals": Input(Dataset(id="64afbef81fe54be6c8b49251"))
    }
)
def europe_countries_with_capital(europe_countries, country_capitals):
    joined_table = europe_countries.join(
        country_capitals,
        europe_countries["name"] == country_capitals["country_name"],
        "left"
    )
    return joined_table.select("name", "population", "capital")


Output Definitions

The output definition is used to specify the output of the transform. Thus, one can specify the output dataset properties such as the id, the name, the description, the tags.


from datazone import transform, Input, Dataset
 
@transform(
    input_mapping={
        "countries": Input(Dataset(id="64ae6d92441cd6c5842b707c")),
    },
    output_mapping={
        "europe_country_table": Output(Dataset(tags=["geographical"])
    },
)
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")

It creates a dataset with the named europe_country_table and has the tags geographical.

Note: Only first time the transform is executed, the dataset gets that name. If you change the name of the output dataset, it will not change the name of the dataset in the catalog. You can change the name of the dataset on the UI.

Best Practices

  • Descriptive Naming: Use clear and descriptive names for transforms to easily identify their purpose.

  • Modularity: Design transforms to be self-contained and focused on a specific task or data manipulation operation.

  • Error Handling/Logging: Implement robust error handling and logging within transforms to manage exceptions and ensure data integrity.

Usage

  • Data Transformation: Transforms are used to modify or enrich data, such as filtering specific records, aggregating data, or transforming data structures.

  • Pipeline Formation: Multiple transforms can be chained to form a data processing pipeline, where the output of one transform becomes the input to another.

  • Reusable Code Units: Transforms are designed to be modular and reusable across different data processing workflows.

Properties

  • id: The id of the dataset

  • run_upstream: If true, the dataset will be extracted before the transform execution

  • freshness_duration: The duration of the freshness of the dataset

  • input_mapping: This input definition is used to specify the input parameters of the transform function. It can get one of more Datasets or another Transform(s) as input.

  • output_mapping: The output definition is used to specify the output of the transform. Thus, one can specify the output dataset properties such as the id, the name, the description, the tags.

Example

The transform decorator is used on a python function to specify it as a transform.

from datazone import transform
 
@transform
def simple_transform():
    return spark.createDataFrame(
        [(1, 2, 3), (4, 5, 6), (7, 8, 9)],
        ("col1", "col2", "col3")
    )


Also one can specify the input of the transform by using the input_mapping parameter of the decorator. The input mapping is a dictionary where the key is the name of the input and the value is a Input instance.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"countries": Input(Dataset(id="64ae6d92441cd6c5842b707c"))})
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")


Transform Inputs

Also one can combine multiple type of inputs in the same transform.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"countries": Input(Dataset(id="64ae6d92441cd6c5842b707c"))})
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")
 
 
@transform(
    input_mapping={
        "europe_countries": Input(europe_countries),
        "country_capitals": Input(Dataset(id="64afbef81fe54be6c8b49251"))
    }
)
def europe_countries_with_capital(europe_countries, country_capitals):
    joined_table = europe_countries.join(
        country_capitals,
        europe_countries["name"] == country_capitals["country_name"],
        "left"
    )
    return joined_table.select("name", "population", "capital")


Output Definitions

The output definition is used to specify the output of the transform. Thus, one can specify the output dataset properties such as the id, the name, the description, the tags.


from datazone import transform, Input, Dataset
 
@transform(
    input_mapping={
        "countries": Input(Dataset(id="64ae6d92441cd6c5842b707c")),
    },
    output_mapping={
        "europe_country_table": Output(Dataset(tags=["geographical"])
    },
)
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")

It creates a dataset with the named europe_country_table and has the tags geographical.

Note: Only first time the transform is executed, the dataset gets that name. If you change the name of the output dataset, it will not change the name of the dataset in the catalog. You can change the name of the dataset on the UI.

Best Practices

  • Descriptive Naming: Use clear and descriptive names for transforms to easily identify their purpose.

  • Modularity: Design transforms to be self-contained and focused on a specific task or data manipulation operation.

  • Error Handling/Logging: Implement robust error handling and logging within transforms to manage exceptions and ensure data integrity.

Usage

  • Data Transformation: Transforms are used to modify or enrich data, such as filtering specific records, aggregating data, or transforming data structures.

  • Pipeline Formation: Multiple transforms can be chained to form a data processing pipeline, where the output of one transform becomes the input to another.

  • Reusable Code Units: Transforms are designed to be modular and reusable across different data processing workflows.

Properties

  • id: The id of the dataset

  • run_upstream: If true, the dataset will be extracted before the transform execution

  • freshness_duration: The duration of the freshness of the dataset

  • input_mapping: This input definition is used to specify the input parameters of the transform function. It can get one of more Datasets or another Transform(s) as input.

  • output_mapping: The output definition is used to specify the output of the transform. Thus, one can specify the output dataset properties such as the id, the name, the description, the tags.

Example

The transform decorator is used on a python function to specify it as a transform.

from datazone import transform
 
@transform
def simple_transform():
    return spark.createDataFrame(
        [(1, 2, 3), (4, 5, 6), (7, 8, 9)],
        ("col1", "col2", "col3")
    )


Also one can specify the input of the transform by using the input_mapping parameter of the decorator. The input mapping is a dictionary where the key is the name of the input and the value is a Input instance.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"countries": Input(Dataset(id="64ae6d92441cd6c5842b707c"))})
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")


Transform Inputs

Also one can combine multiple type of inputs in the same transform.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"countries": Input(Dataset(id="64ae6d92441cd6c5842b707c"))})
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")
 
 
@transform(
    input_mapping={
        "europe_countries": Input(europe_countries),
        "country_capitals": Input(Dataset(id="64afbef81fe54be6c8b49251"))
    }
)
def europe_countries_with_capital(europe_countries, country_capitals):
    joined_table = europe_countries.join(
        country_capitals,
        europe_countries["name"] == country_capitals["country_name"],
        "left"
    )
    return joined_table.select("name", "population", "capital")


Output Definitions

The output definition is used to specify the output of the transform. Thus, one can specify the output dataset properties such as the id, the name, the description, the tags.


from datazone import transform, Input, Dataset
 
@transform(
    input_mapping={
        "countries": Input(Dataset(id="64ae6d92441cd6c5842b707c")),
    },
    output_mapping={
        "europe_country_table": Output(Dataset(tags=["geographical"])
    },
)
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")

It creates a dataset with the named europe_country_table and has the tags geographical.

Note: Only first time the transform is executed, the dataset gets that name. If you change the name of the output dataset, it will not change the name of the dataset in the catalog. You can change the name of the dataset on the UI.

Best Practices

  • Descriptive Naming: Use clear and descriptive names for transforms to easily identify their purpose.

  • Modularity: Design transforms to be self-contained and focused on a specific task or data manipulation operation.

  • Error Handling/Logging: Implement robust error handling and logging within transforms to manage exceptions and ensure data integrity.

Pyspark Examples in Transforms

Pipeline

© Copyright 2024. All rights reserved.

Concepts

Transform

The Transform entity represents a data processing unit within the data platform, primarily realized through PySpark scripts. These transforms take datasets as input, apply specific data manipulation or analysis operations, and produce new datasets or insights. They are essential for data transformation, enrichment, and preparation for further analysis or reporting.

Usage

  • Data Transformation: Transforms are used to modify or enrich data, such as filtering specific records, aggregating data, or transforming data structures.

  • Pipeline Formation: Multiple transforms can be chained to form a data processing pipeline, where the output of one transform becomes the input to another.

  • Reusable Code Units: Transforms are designed to be modular and reusable across different data processing workflows.

Properties

  • id: The id of the dataset

  • run_upstream: If true, the dataset will be extracted before the transform execution

  • freshness_duration: The duration of the freshness of the dataset

  • input_mapping: This input definition is used to specify the input parameters of the transform function. It can get one of more Datasets or another Transform(s) as input.

  • output_mapping: The output definition is used to specify the output of the transform. Thus, one can specify the output dataset properties such as the id, the name, the description, the tags.

Example

The transform decorator is used on a python function to specify it as a transform.

from datazone import transform
 
@transform
def simple_transform():
    return spark.createDataFrame(
        [(1, 2, 3), (4, 5, 6), (7, 8, 9)],
        ("col1", "col2", "col3")
    )


Also one can specify the input of the transform by using the input_mapping parameter of the decorator. The input mapping is a dictionary where the key is the name of the input and the value is a Input instance.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"countries": Input(Dataset(id="64ae6d92441cd6c5842b707c"))})
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")


Transform Inputs

Also one can combine multiple type of inputs in the same transform.

from datazone import transform, Input, Dataset
 
@transform(input_mapping={"countries": Input(Dataset(id="64ae6d92441cd6c5842b707c"))})
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")
 
 
@transform(
    input_mapping={
        "europe_countries": Input(europe_countries),
        "country_capitals": Input(Dataset(id="64afbef81fe54be6c8b49251"))
    }
)
def europe_countries_with_capital(europe_countries, country_capitals):
    joined_table = europe_countries.join(
        country_capitals,
        europe_countries["name"] == country_capitals["country_name"],
        "left"
    )
    return joined_table.select("name", "population", "capital")


Output Definitions

The output definition is used to specify the output of the transform. Thus, one can specify the output dataset properties such as the id, the name, the description, the tags.


from datazone import transform, Input, Dataset
 
@transform(
    input_mapping={
        "countries": Input(Dataset(id="64ae6d92441cd6c5842b707c")),
    },
    output_mapping={
        "europe_country_table": Output(Dataset(tags=["geographical"])
    },
)
def europe_countries(countries):
    return countries.filter(countries["continent"] == "Europe")

It creates a dataset with the named europe_country_table and has the tags geographical.

Note: Only first time the transform is executed, the dataset gets that name. If you change the name of the output dataset, it will not change the name of the dataset in the catalog. You can change the name of the dataset on the UI.

Best Practices

  • Descriptive Naming: Use clear and descriptive names for transforms to easily identify their purpose.

  • Modularity: Design transforms to be self-contained and focused on a specific task or data manipulation operation.

  • Error Handling/Logging: Implement robust error handling and logging within transforms to manage exceptions and ensure data integrity.

Pyspark Examples in Transforms

Pipeline

© Copyright 2024. All rights reserved.