Skip to content

🧺 Data dictionary for metadata driven Synapse pipeline¢

DiagramΒΆ

graph TD;
    Process[Process] --> ProcessStep[ProcessStep]
    ProcessStep --> ProcessStepElementMapping[ProcessStepElementMapping]
    ProcessStepElementMapping --> DataSet[DataSet]
    DataSet --> DataSetElement[DataSetElement]
    ProcessStepElementMapping --> Action[Action]
    Action --> ActionType[ActionType]
    Action --> ActionParameter[ActionParameter]

Derive the entity relation from given list of entities with their properties:ΒΆ

Process: ProcessID, ProcessCode, ProcessName, ProcessDescription, TriggerPattern, KeyParameterTypeID, ImplementationInfo, MaxConcurrency, IsParallelProcess, EffectiveDate, ExpireDate, CreatedDatetime, CreatedBy, UpdatedDatetime, UpdatedBy, ValidFrom, ValidTo;

ProcessStep: ProcessStepID, ProcessID, ProcessStepName, ProcessStepDescription, ProcessStepTypeID, SubProcessID, ProcessStepOrder, SourceDatasetID, SourceDatasetValidationTypeID, DestinationDatasetID, ActionID, EffectiveDate, ExpireDate, CreatedDatetime, CreatedBy, UpdatedDatetime, UpdatedBy, ValidFrom, ValidTo;

DataSet: DatasetID, DatasetName, DatasetDescription, DatasetTypeID, NamePattern, LocationPattern, DatasetKey, DatasetSource, ConnectionDescription, EffectiveDate, ExpireDate, CreatedDatetime, CreatedBy, UpdatedDatetime, UpdatedBy, ValidFrom, ValidTo;

DataSetElement: DatasetElementID, DatasetID, DatasetElementName, DatasetElementDescription, DatasetElementOrder, DatasetElementType, IsRequired, EffectiveDate, ExpireDate, CreatedDatetime, CreatedBy, UpdatedDatetime, UpdatedBy, ValidFrom, ValidTo;

ProcessStepElementMapping: ProcessStepElementMappingID, ProcessStepID, DestinationElementID, ElementSourceTypeID, Source, EffectiveDate, ExpireDate, CreatedDatetime, CreatedBy, UpdatedDatetime, UpdatedBy, ValidFrom, ValidTo;

ActionType: ActionTypeID, ActionTypeDescription, CreatedDatetime, CreatedBy, UpdatedDatetime, UpdatedBy;

Action: ActionID, ActionName, ActionDescription, ActionTypeID, NamePattern, LocationPattern, ParameterExpression, ConnectionDescription, EffectiveDate, ExpireDate, CreatedDatetime, CreatedBy, UpdatedDatetime, UpdatedBy, ValidFrom, ValidTo;

ActionParameter: ActionParameterID, ActionID, ParameterName, ParameterValueTypeID, ParameterValue, ParameterOrder, IsRequired, EffectiveDate, ExpireDate, CreatedDatetime, CreatedBy, UpdatedDatetime, UpdatedBy;

The entities and their relationships are as follows: - Process has a one-to-many relationship with ProcessStep, as a Process can have multiple ProcessSteps, but a ProcessStep can only belong to one Process. The ProcessID field in the ProcessStep entity is used to establish this relationship. - ProcessStep has a one-to-one relationship with SubProcess, as a ProcessStep can only belong to one SubProcess, but a SubProcess can only have one ProcessStep. The SubProcessID field in the ProcessStep entity is used to establish this relationship. - ProcessStep has a many-to-one relationship with DataSet, as a ProcessStep can have a source or destination DataSet, but a DataSet can be used by multiple ProcessSteps. The SourceDatasetID and DestinationDatasetID fields in the ProcessStep entity are used to establish this relationship. - ProcessStep has a many-to-one relationship with Action, as a ProcessStep can have an action, but an Action can be used by multiple ProcessSteps. The ActionID field in the ProcessStep entity is used to establish this relationship. - DataSet has a one-to-many relationship with DataSetElement, as a DataSet can have multiple DataSetElements, but a DataSetElement can only belong to one DataSet. The DatasetID field in the DataSetElement entity is used to establish this relationship. - ProcessStep has a many-to-many relationship with DataSetElement, as a ProcessStep can have multiple DataSetElements, and a DataSetElement can be used by multiple ProcessSteps. The ProcessStepElementMapping entity is used to establish this relationship, with ProcessStepID and DestinationElementID fields. - Action has a one-to-many relationship with ActionParameter, as an Action can have multiple ActionParameters, but an ActionParameter can only belong to one Action. The ActionID field in the ActionParameter entity is used to establish this relationship. - Action has a one-to-one relationship with ActionType, as an Action can only have one ActionType, but an ActionType can be used by multiple Actions. The ActionTypeID field in the Action entity is used to establish this relationship.

schema for them in pysparkΒΆ

Python
from pyspark.sql.types import *

# Define the schema for the Process table
process_schema = StructType([
    StructField("ProcessID", IntegerType()),
    StructField("ProcessCode", StringType()),
    StructField("ProcessName", StringType()),
    StructField("ProcessDescription", StringType()),
    StructField("TriggerPattern", StringType()),
    StructField("KeyParameterTypeID", IntegerType()),
    StructField("ImplementationInfo", StringType()),
    StructField("MaxConcurrency", IntegerType()),
    StructField("IsParallelProcess", BooleanType()),
    StructField("EffectiveDate", DateType()),
    StructField("ExpireDate", DateType()),
    StructField("CreatedDatetime", TimestampType()),
    StructField("CreatedBy", StringType()),
    StructField("UpdatedDatetime", TimestampType()),
    StructField("UpdatedBy", StringType()),
    StructField("ValidFrom", DateType()),
    StructField("ValidTo", DateType())
])

# Create the Process DataFrame
process_df = spark.createDataFrame(data, process_schema)

# Define the schema for the ProcessStep table
process_step_schema = StructType([
    StructField("ProcessStepID", IntegerType()),
    StructField("ProcessID", IntegerType()),
    StructField("ProcessStepName", StringType()),
    StructField("ProcessStepDescription", StringType()),
    StructField("ProcessStepTypeID", IntegerType()),
    StructField("SubProcessID", IntegerType()),
    StructField("ProcessStepOrder", IntegerType()),
    StructField("SourceDatasetID", IntegerType()),
    StructField("SourceDatasetValidationTypeID", IntegerType()),
    StructField("DestinationDatasetID", IntegerType()),
    StructField("ActionID", IntegerType()),
    StructField("EffectiveDate", DateType()),
    StructField("ExpireDate", DateType()),
    StructField("CreatedDatetime", TimestampType()),
    StructField("CreatedBy", StringType()),
    StructField("UpdatedDatetime", TimestampType()),
    StructField("UpdatedBy", StringType()),
    StructField("ValidFrom", DateType()),
    StructField("ValidTo", DateType())
])

# Create the ProcessStep DataFrame
process_step_df = spark.createDataFrame(data, process_step_schema)

# Define the schema for the DataSet table
dataset_schema = StructType([
    StructField("DatasetID", IntegerType()),
    StructField("DatasetName", StringType()),
    StructField("DatasetDescription", StringType()),
    StructField("DatasetTypeID", IntegerType()),
    StructField("NamePattern", StringType()),
    StructField("LocationPattern", StringType()),
    StructField("DatasetKey", StringType()),
    StructField("DatasetSource", StringType()),
    StructField("ConnectionDescription", StringType()),
    StructField("EffectiveDate

    StructField("EffectiveDate", DateType()),
    StructField("ExpireDate", DateType()),
    StructField("CreatedDatetime", TimestampType()),
    StructField("CreatedBy", StringType()),
    StructField("UpdatedDatetime", TimestampType()),
    StructField("UpdatedBy", StringType()),
    StructField("ValidFrom", DateType()),
    StructField("ValidTo", DateType())
])

# Create the DataSet DataFrame
dataset_df = spark.createDataFrame(data, dataset_schema)

# Define the schema for the DataSetElement table
dataset_element_schema = StructType([
    StructField("DatasetElementID", IntegerType()),
    StructField("DatasetID", IntegerType()),
    StructField("DatasetElementName", StringType()),
    StructField("DatasetElementDescription", StringType()),
    StructField("DatasetElementOrder", IntegerType()),
    StructField("DatasetElementType", StringType()),
    StructField("IsRequired", BooleanType()),
    StructField("EffectiveDate", DateType()),
    StructField("ExpireDate", DateType()),
    StructField("CreatedDatetime", TimestampType()),
    StructField("CreatedBy", StringType()),
    StructField("UpdatedDatetime", TimestampType()),
    StructField("UpdatedBy", StringType()),
    StructField("ValidFrom", DateType()),
    StructField("ValidTo", DateType())
])

# Create the DataSetElement DataFrame
dataset_element_df = spark.createDataFrame(data, dataset_element_schema)

# Define the schema for the ProcessStepElementMapping table
process_step_element_mapping_schema = StructType([
    StructField("ProcessStepElementMappingID", IntegerType()),
    StructField("ProcessStepID", IntegerType()),
    StructField("DestinationElementID", IntegerType()),
    StructField("ElementSourceTypeID", IntegerType()),
    StructField("Source", StringType()),
    StructField("EffectiveDate", DateType()),
    StructField("ExpireDate", DateType()),
    StructField("CreatedDatetime", TimestampType()),
    StructField("CreatedBy", StringType()),
    StructField("UpdatedDatetime", TimestampType()),
    StructField("UpdatedBy", StringType()),
    StructField("ValidFrom", DateType()),
    StructField("ValidTo", DateType())
])

# Create the ProcessStepElementMapping DataFrame
process_step_element_mapping_df = spark.createDataFrame(data, process_step_element_mapping_schema)

# Define the schema for the ActionType table
action_type_schema = StructType([
    StructField("ActionTypeID", IntegerType()),
    StructField("ActionTypeDescription", StringType()),
    StructField("CreatedDatetime", TimestampType()),
    StructField("CreatedBy", StringType()),
    StructField("UpdatedDatetime", TimestampType()),
    StructField("UpdatedBy", StringType())
])

# Create the ActionType DataFrame
action_type_df = spark.createDataFrame(data, action_type_schema)

# Define the schema for the Action table
action_schema = StructType([
    StructField("ActionID", IntegerType()),
    StructField("ActionName", StringType()),
    StructField("ActionDescription", StringType()),
    StructField("ActionTypeID", IntegerType()),
    StructField("NamePattern", StringType()),
    StructField("LocationPattern", StringType()),
    StructField("ParameterExpression", StringType()),
    StructField("ConnectionDescription", StringType()),
    StructField("EffectiveDate", DateType()),
    StructField("ExpireDate", DateType()),
    StructField("CreatedDatetime", TimestampType()),
    StructField("CreatedBy", StringType()),
    StructField("UpdatedDatetime", TimestampType()),
    StructField("UpdatedBy", StringType()),
    StructField("ValidFrom", DateType()),
    StructField("ValidTo", DateType())
])

# Create the Action DataFrame
action_df = spark.createDataFrame(data, action_schema)

# Define the schema for the ActionParameter table
action_parameter_schema = StructType([
    StructField("ActionParameterID", IntegerType()),
    StructField("ActionID", IntegerType()),
    StructField("ParameterName", StringType()),
    StructField("ParameterValueTypeID", IntegerType()),
    StructField("ParameterValue", StringType()),
    StructField("ParameterOrder", IntegerType()),
    StructField("IsRequired", BooleanType()),
    StructField("EffectiveDate", DateType()),
    StructField("ExpireDate", DateType()),
    StructField("CreatedDatetime", TimestampType()),
    StructField("CreatedBy", StringType()),
    StructField("UpdatedDatetime", TimestampType()),
    StructField("UpdatedBy", StringType())
])

# Create the ActionParameter DataFrame
action_parameter_df = spark.createDataFrame(data, action_parameter_schema)
It's worth noting that the data variable used in the example is a list of rows that should be loaded into the DataFrames. Also, this is just an example, you may need to adjust the datatype of the fields or add/remove fields as per your use case.

Here is how the relationships between the entities can be defined in PySpark:

Python
# Define the relationships between the tables
process_df.createOrReplaceTempView("process")
process_step_df.createOrReplaceTempView("process_step")
dataset_df.createOrReplaceTempView("dataset")
dataset_element_df.createOrReplaceTempView("dataset_element")
process_step_element_mapping_df.createOrReplaceTempView("process_step_element_mapping")
action_type_df.createOrReplaceTempView("action_type")
action_df.createOrReplaceTempView("action")
action_parameter_df.createOrReplaceTempView("action_parameter")

# One-to-many relationship from Process to ProcessStep
spark.sql("SELECT process.*, process_step.* FROM process JOIN process_step ON process.ProcessID = process_step.ProcessID")

# One-to-one relationship from ProcessStep to SubProcess
spark.sql("SELECT process_step.*, sub_process.* FROM process_step JOIN sub_process ON process_step.SubProcessID = sub_process.SubProcessID")

# Many-to-one relationship from ProcessStep to DataSet
spark.sql("SELECT process_step.*, dataset.* FROM process_step JOIN dataset ON process_step.SourceDatasetID = dataset.DatasetID OR process_step.DestinationDatasetID = dataset.DatasetID")

# Many-to-one relationship from ProcessStep to Action
spark.sql("SELECT process_step.*, action.* FROM process_step JOIN action ON process_step.ActionID = action.ActionID")

# One-to-many relationship from DataSet to DataSetElement
spark.sql("SELECT dataset.*, dataset_element.* FROM dataset JOIN dataset_element ON dataset.DatasetID = dataset_element.DatasetID")

# Many-to-many relationship from ProcessStep to DataSetElement
spark.sql("SELECT process_step.*, dataset_element.* FROM process_step JOIN process_step_element_mapping ON process_step.ProcessStepID = process_step_element_mapping.ProcessStepID JOIN dataset_element ON process_step_element_mapping.DestinationElementID = dataset_element.DatasetElementID")

# One-to-many relationship from Action to ActionParameter
spark.sql("SELECT action.*, action_parameter.* FROM action JOIN action_parameter ON action.ActionID = action_parameter.ActionID")

# One-to-one relationship from Action to ActionType
spark.sql("SELECT action.*, action_type.* FROM action JOIN action_type ON action.ActionTypeID = action_type.ActionTypeID")
It is worth noting that this is just an example and you may need to adjust the query as per your requirement. Also, the above mentioned SQL Queries are used to join the dataframe and load the data in the required format.

Pyspark dataframe from individual dataframe which are grouped by process, processStep and so onΒΆ

Use PySpark to group the individual dataframes by their respective entities and create a new dataframe. Here is an example of how you can group the dataframes by "Process" and create a new dataframe:

Python
from pyspark.sql.functions import *

# Group the dataframes by Process
grouped_process_df = process_df.groupBy("ProcessID", "ProcessCode", "ProcessName").agg(
    sum("MaxConcurrency").alias("TotalMaxConcurrency"),
    count("*").alias("NumOfProcessSteps"),
    first("ImplementationInfo").alias("ImplementationInfo"),
    first("IsParallelProcess").alias("IsParallelProcess"),
    first("EffectiveDate").alias("EffectiveDate"),
    first("ExpireDate").alias("ExpireDate"),
    first("CreatedDatetime").alias("CreatedDatetime"),
    first("CreatedBy").alias("CreatedBy"),
    first("UpdatedDatetime").alias("UpdatedDatetime"),
    first("UpdatedBy").alias("UpdatedBy"),
    first("ValidFrom").alias("ValidFrom"),
    first("ValidTo").alias("ValidTo")
)

# Create a new dataframe with the grouped data
process_summary_df = grouped_process_df.select(
    "ProcessID", "ProcessCode", "ProcessName", "TotalMaxConcurrency", "NumOfProcessSteps", "ImplementationInfo", "IsParallelProcess", "EffectiveDate", "ExpireDate", "CreatedDatetime", "CreatedBy", "UpdatedDatetime", "UpdatedBy", "ValidFrom", "ValidTo"
)

You can also group other dataframes by their respective entities in a similar way.

Python
grouped_processStep_df = processStep_df.groupBy("ProcessStepID", "ProcessID", "ProcessStepName").agg(
    first("ProcessStepDescription").alias("ProcessStepDescription"),
    first("ProcessStepTypeID").alias("ProcessStepTypeID"),
    first("SubProcessID").alias("SubProcessID"),
    first("ProcessStepOrder").alias("ProcessStepOrder"),
    first("SourceDatasetID").alias("SourceDatasetID"),
    first("SourceDatasetValidationTypeID").alias("SourceDatasetValidationTypeID"),
    first("DestinationDatasetID").alias("DestinationDatasetID"),
    first("ActionID").alias("ActionID"),
    first("EffectiveDate").alias("EffectiveDate"),
    first("ExpireDate").alias("ExpireDate"),
    first("CreatedDatetime").alias("CreatedDatetime"),
    first("CreatedBy").alias("CreatedBy"),
    first("UpdatedDatetime").alias("UpdatedDatetime"),
    first("UpdatedBy").alias("UpdatedBy"),
    first("ValidFrom").alias("ValidFrom"),
    first("ValidTo").alias("ValidTo")
)

processStep_summary_df = grouped_processStep_df.select(
    "ProcessStepID", "ProcessID", "ProcessStepName", "ProcessStepDescription", "ProcessStepTypeID", "SubProcessID", "ProcessStepOrder", "SourceDatasetID", "SourceDatasetValidationTypeID", "DestinationDatasetID", "ActionID", "EffectiveDate", "ExpireDate", "CreatedDatetime", "CreatedBy", "UpdatedDatetime", "UpdatedBy", "ValidFrom", "ValidTo"
)

In this example, I am using the groupBy method to group the dataframe by specific columns and the agg method to aggregate the data. You can use different aggregation functions like sum, count, first, last etc. to get the desired values for the new dataframe.

It's worth noting that this is just an example, you may need to adjust the code as per your use case.

Now let's create nested group starting from processΒΆ

Create nested groups starting from the "Process" dataframe in PySpark. Here is an example of how you can group the dataframes by "Process" and then group the resulting dataframe by "ProcessStep":

Python
from pyspark.sql.functions import *

# Group the dataframes by Process
grouped_process_df = process_df.groupBy("ProcessID", "ProcessCode", "ProcessName").agg(
    sum("MaxConcurrency").alias("TotalMaxConcurrency"),
    count("*").alias("NumOfProcessSteps"),
    first("ImplementationInfo").alias("ImplementationInfo"),
    first("IsParallelProcess").alias("IsParallelProcess"),
    first("EffectiveDate").alias("EffectiveDate"),
    first("ExpireDate").alias("ExpireDate"),
    first("CreatedDatetime").alias("CreatedDatetime"),
    first("CreatedBy").alias("CreatedBy"),
    first("UpdatedDatetime").alias("UpdatedDatetime"),
    first("UpdatedBy").alias("UpdatedBy"),
    first("ValidFrom").alias("ValidFrom"),
    first("ValidTo").alias("ValidTo"),
    collect_list("ProcessStepID").alias("ProcessStepIDList")
)

# Group the resulting dataframe by ProcessStep
grouped_processStep_df = grouped_process_df.selectExpr("ProcessID", "ProcessCode", "ProcessName", "TotalMaxConcurrency", "NumOfProcessSteps", "ImplementationInfo", "IsParallelProcess", "EffectiveDate", "ExpireDate", "CreatedDatetime", "CreatedBy", "UpdatedDatetime", "UpdatedBy", "ValidFrom", "ValidTo", "explode(ProcessStepIDList) as ProcessStepID")
grouped_processStep_df = grouped_processStep_df.groupBy("ProcessID", "ProcessCode", "ProcessName", "ProcessStepID").agg(
    first("TotalMaxConcurrency").alias("TotalMaxConcurrency"),
    first("NumOfProcessSteps").alias("NumOfProcessSteps"),
    first("ImplementationInfo").alias("ImplementationInfo"),
    first("IsParallelProcess").alias("IsParallelProcess"),
    first("EffectiveDate").alias("EffectiveDate"),
    first("ExpireDate").alias("ExpireDate"),
    first("CreatedDatetime").alias("CreatedDatetime"),
    first("CreatedBy").alias("CreatedBy"),
    first("UpdatedDatetime").alias("UpdatedDatetime"),
    first("UpdatedBy").alias("UpdatedBy"),
    first("ValidFrom").alias("ValidFrom"),
    first("ValidTo").alias("ValidTo"),
    collect_list("ProcessStepDescription").alias("ProcessStepDescriptionList"),
    collect_list("ProcessStepTypeID").alias("ProcessStepTypeIDList"),
    collect_list("SubProcessID").alias("SubProcessIDList"),
    collect_list("ProcessStepOrder").alias("ProcessStepOrderList"),
    collect_list("SourceDatasetID").alias("SourceDatasetIDList"),
    collect_list("SourceDatasetValidationTypeID").alias("SourceDatasetValidationTypeIDList"),
    collect_list("DestinationDatasetIDList").alias("DestinationDatasetIDList"),
    collect_list("ActionID").alias("ActionIDList"),
    collect_list("EffectiveDate").alias("EffectiveDateList"),
    collect_list("ExpireDate").alias("ExpireDateList"),
    collect_list("CreatedDatetime").alias("CreatedDatetimeList"),
    collect_list("CreatedBy").alias("CreatedByList"),
    collect_list("UpdatedDatetime").alias("UpdatedDatetimeList"),
    collect_list("UpdatedBy").alias("UpdatedByList"),
    collect_list("ValidFrom").alias("ValidFromList"),
    collect_list("ValidTo").alias("ValidToList")
)

In this example, I am using the collect_list function to collect all the values for each unique ProcessStepID and then groupBy method to group the resulting dataframe by "ProcessID", "ProcessCode", "ProcessName" and "ProcessStepID". Once you have the nested group it's possible to explode the lists and have separate rows for each processStep by using selectExpr and explode method

It's worth noting that this is just an example, you may need to adjust the code as per your use case.

Example of how you can use the selectExpr and explode method to explode the lists and have separate rows for each processStep:

Python
# Explode the lists and create separate rows for each ProcessStep
exploded_df = grouped_processStep_df.selectExpr("ProcessID", "ProcessCode", "ProcessName", "TotalMaxConcurrency", "NumOfProcessSteps", "ImplementationInfo", "IsParallelProcess", "EffectiveDate", "ExpireDate", "CreatedDatetime", "CreatedBy", "UpdatedDatetime", "UpdatedBy", "ValidFrom", "ValidTo", "ProcessStepID", "explode(ProcessStepDescriptionList) as ProcessStepDescription", "explode(ProcessStepTypeIDList) as ProcessStepTypeID", "explode(SubProcessIDList) as SubProcessID", "explode(ProcessStepOrderList) as ProcessStepOrder", "explode(SourceDatasetIDList) as SourceDatasetID", "explode(SourceDatasetValidationTypeIDList) as SourceDatasetValidationTypeID", "explode(DestinationDatasetIDList) as DestinationDatasetID", "explode(ActionIDList) as ActionID", "explode(EffectiveDateList) as EffectiveDate", "explode(ExpireDateList) as ExpireDate", "explode(CreatedDatetimeList) as CreatedDatetime", "explode(CreatedByList) as CreatedBy", "explode(UpdatedDatetimeList) as UpdatedDatetime", "explode(UpdatedByList) as UpdatedBy", "explode(ValidFromList) as ValidFrom", "explode(ValidToList) as ValidTo")

This will create a new dataframe, exploded_df, where each row represents a unique ProcessStep. Each row will contain the corresponding ProcessStep information as well as the information from the parent Process.

You can also use the selectExpr

Python
exploded_df.selectExpr("ProcessID", "ProcessCode", "ProcessName", "ProcessStepID", "ProcessStepDescription", "ProcessStepTypeID", "SubProcessID", "ProcessStepOrder", "SourceDatasetID", "SourceDatasetValidationTypeID", "DestinationDatasetID", "ActionID", "EffectiveDate", "ExpireDate", "CreatedDatetime", "CreatedBy", "UpdatedDatetime", "UpdatedBy", "ValidFrom", "ValidTo")

This will select only the relevant columns for the exploded dataframe. It's worth noting that the above method will repeat the parent process information for each processStep, if you don't want this, you can use a join method to bring the parent process information.

It's also worth noting that this is just an example, you may need to adjust the code as per your use case.

Use Spark SQL to query the dataframes and find all related information for a given process. Here is an example of how you can query the dataframes and group them by entities for a given process:

Python
from pyspark.sql.functions import *

# Create a temporary view for each dataframe
process_df.createOrReplaceTempView("process")
processStep_df.createOrReplaceTempView("processStep")
dataset_df.createOrReplaceTempView("dataset")
datasetElement_df.createOrReplaceTempView("datasetElement")
processStepElementMapping_df.createOrReplaceTempView("processStepElementMapping")
actionType_df.createOrReplaceTempView("actionType")
action_df.createOrReplaceTempView("action")
actionParameter_df.createOrReplaceTempView("actionParameter")

# Define the process ID for which you want to find the related information
process_id = "123"

# Query to find all related information for the given process ID
query = """
SELECT process.*, processStep.*, dataset.*, datasetElement.*, processStepElementMapping.*, actionType.*, action.*, actionParameter.*
FROM process
LEFT JOIN processStep ON process.ProcessID = processStep.ProcessID
LEFT JOIN dataset ON processStep.SourceDatasetID = dataset.DatasetID OR processStep.DestinationDatasetID = dataset.DatasetID
LEFT JOIN datasetElement ON dataset.DatasetID = datasetElement.DatasetID
LEFT JOIN processStepElementMapping ON processStep.ProcessStepID = processStepElementMapping.ProcessStepID
LEFT JOIN actionType ON processStep.ProcessStepTypeID = actionType.ActionTypeID
LEFT JOIN action ON processStep.ActionID = action.ActionID
LEFT JOIN actionParameter ON action.ActionID = actionParameter.ActionID
WHERE process.ProcessID = '{}'
""".format(process_id)

# Execute the query and group the results by entities
result_df = spark.sql(query)

In this example, I'm using the LEFT JOIN operator to join the dataframes based on the relationships between the entities. I'm also using the WHERE clause to filter the results based on the given process ID.

It's worth noting that this is just an example, you may need to adjust the code as per your use case.

Use the groupBy method to group the results by entities after executing the query. Here is an example of how you can group the results:

Python
# Group the results by Process
grouped_by_process_df = result_df.groupBy("ProcessID", "ProcessCode", "ProcessName").agg(
    first("ProcessDescription").alias("ProcessDescription"),
    first("TriggerPattern").alias("TriggerPattern"),
    first("KeyParameterTypeID").alias("KeyParameterTypeID"),
    first("ImplementationInfo").alias("ImplementationInfo"),
    first("MaxConcurrency").alias("MaxConcurrency"),
    first("IsParallelProcess").alias("IsParallelProcess"),
    first("EffectiveDate").alias("EffectiveDate"),
    first("ExpireDate").alias("ExpireDate"),
    first("CreatedDatetime").alias("CreatedDatetime"),
    first("CreatedBy").alias("CreatedBy"),
    first("UpdatedDatetime").alias("UpdatedDatetime"),
    first("UpdatedBy").alias("UpdatedBy"),
    first("ValidFrom").alias("ValidFrom"),
    first("ValidTo").alias("ValidTo"),
    collect_list("ProcessStepID").alias("ProcessStepIDList"),
    collect_list("ProcessStepName").alias("ProcessStepNameList"),
    collect_list("ProcessStepDescription").alias("ProcessStepDescriptionList"),
    collect_list("ProcessStepTypeID").alias("ProcessStepTypeIDList"),
    collect_list("SubProcessID").alias("SubProcessIDList"),
    collect_list("ProcessStepOrder").alias("ProcessStepOrderList"),
    collect_list("SourceDatasetID").alias("SourceDatasetIDList"),
    collect_list("SourceDatasetValidationTypeID").alias("SourceDatasetValidationTypeIDList"),
    collect_list

    # seems incomplete
    grouped_by_process_df = grouped_by_process_df.agg(
    collect_list("DestinationDatasetID").alias("DestinationDatasetIDList"),
    collect_list("ActionID").alias("ActionIDList"),
    collect_list("DatasetID").alias("DatasetIDList"),
    collect_list("DatasetName").alias("DatasetNameList"),
    collect_list("DatasetDescription").alias("DatasetDescriptionList"),
    collect_list("DatasetTypeID").alias("DatasetTypeIDList"),
    collect_list("NamePattern").alias("NamePatternList"),
    collect_list("LocationPattern").alias("LocationPatternList"),
    collect_list("DatasetKey").alias("DatasetKeyList"),
    collect_list("DatasetSource").alias("DatasetSourceList"),
    collect_list("ConnectionDescription").alias("ConnectionDescriptionList"),
    collect_list("EffectiveDate").alias("EffectiveDateList"),
    collect_list("ExpireDate").alias("ExpireDateList"),
    collect_list("CreatedDatetime").alias("CreatedDatetimeList"),
    collect_list("CreatedBy").alias("CreatedByList"),
    collect_list("UpdatedDatetime").alias("UpdatedDatetimeList"),
    collect_list("UpdatedBy").alias("UpdatedByList"),
    collect_list("ValidFrom").alias("ValidFromList"),
    collect_list("ValidTo").alias("ValidToList"),
    collect_list("DatasetElementID").alias("DatasetElementIDList"),
    collect_list("DatasetElementName").alias("DatasetElementNameList"),
    collect_list("DatasetElementDescription").alias("DatasetElementDescriptionList"),
    collect_list("DatasetElementOrder").alias("DatasetElementOrderList"),
    collect_list("DatasetElementType").alias("DatasetElementTypeList"),
    collect_list("IsRequired").alias("IsRequiredList"),
    collect_list("ProcessStepElementMappingID").alias("ProcessStepElementMappingIDList"),
    collect_list("DestinationElementID").alias("DestinationElementIDList"),
    collect_list("ElementSourceTypeID").alias("ElementSourceTypeIDList"),
    collect_list("Source").alias("SourceList"),
    collect_list

    # same here
    grouped_by_process_df = grouped_by_process_df.agg(
    collect_list("EffectiveDate").alias("EffectiveDateList"),
    collect_list("ExpireDate").alias("ExpireDateList"),
    collect_list("CreatedDatetime").alias("CreatedDatetimeList"),
    collect_list("CreatedBy").alias("CreatedByList"),
    collect_list("UpdatedDatetime").alias("UpdatedDatetimeList"),
    collect_list("UpdatedBy").alias("UpdatedByList"),
    collect_list("ValidFrom").alias("ValidFromList"),
    collect_list("ValidTo").alias("ValidToList"),
    collect_list("ActionTypeID").alias("ActionTypeIDList"),
    collect_list("ActionTypeDescription").alias("ActionTypeDescriptionList"),
    collect_list("ActionName").alias("ActionNameList"),
    collect_list("ActionDescription").alias("ActionDescriptionList"),
    collect_list("NamePattern").alias("NamePatternList"),
    collect_list("LocationPattern").alias("LocationPatternList"),
    collect_list("ParameterExpression").alias("ParameterExpressionList"),
    collect_list("ConnectionDescription").alias("ConnectionDescriptionList"),
    collect_list("ActionParameterID").alias("ActionParameterIDList"),
    collect_list("ParameterName").alias("ParameterNameList"),
    collect_list("ParameterValueTypeID").alias("ParameterValueTypeIDList"),
    collect_list("ParameterValue").alias("ParameterValueList"),
    collect_list("ParameterOrder").alias("ParameterOrderList"),
    collect_list("IsRequired").alias("IsRequiredList"),
    )
In this example, I'm using the groupBy method to group the results by Process ID, Process Code, and Process Name, and using the agg method and the collect_list function to collect all the values for each unique Process ID and then group the resulting dataframe by process. It's worth noting that this is just an example, you may need to adjust the code as per your use case.

use Spark SQL to query the dataframes and find all related information for a given ProcessCode. Here is an example of how you can query the dataframes and group them by entities for a given ProcessCode:

Python
from pyspark.sql.functions import *

# Create a temporary view for each dataframe
process_df.createOrReplaceTempView("process")
processStep_df.createOrReplaceTempView("processStep")
dataset_df.createOrReplaceTempView("dataset")
datasetElement_df.createOrReplaceTempView("datasetElement")
processStepElementMapping_df.createOrReplaceTempView("processStepElementMapping")
actionType_df.createOrReplaceTempView("actionType")
action_df.createOrReplaceTempView("action")
actionParameter_df.createOrReplaceTempView("actionParameter")

# Define the process code for which you want to find the related information
process_code = "code123"

# Query to find all related information for the given process code
query = """
SELECT process.*, processStep.*, dataset.*, datasetElement.*, processStepElementMapping.*, actionType.*, action.*, actionParameter.*
FROM process
LEFT JOIN processStep ON process.ProcessID = processStep.ProcessID
LEFT JOIN dataset ON processStep.SourceDatasetID = dataset.DatasetID OR processStep.DestinationDatasetID = dataset.DatasetID
LEFT JOIN datasetElement ON dataset.DatasetID = datasetElement.DatasetID
LEFT JOIN processStepElementMapping ON processStep.ProcessStepID = processStepElementMapping.ProcessStepID
LEFT JOIN actionType ON processStep.ProcessStepTypeID = actionType.ActionTypeID
LEFT JOIN action ON processStep.ActionID = action.ActionID
LEFT JOIN actionParameter ON action.ActionID = actionParameter.ActionID
WHERE process.ProcessCode = '{}'
""".format(process_code)

# Execute the query and group the results by entities
result_df = spark.sql(query)

In this example, I'm using the LEFT JOIN operator to join the dataframes based on the relationships between the entities. I'm also using the WHERE clause to filter the results based on the given ProcessCode.

It's worth noting that this is just an example, you may need to adjust the code as per your use case. Once you have the result of the query, you can use the groupBy and agg method as previously explained to group the result by entities.

Query to group themΒΆ

Python
# Group the results by Process
grouped_by_process_df = result_df.groupBy("ProcessCode", "ProcessName").agg(
    first("ProcessID").alias("ProcessID"),
    first("ProcessDescription").alias("ProcessDescription"),
    first("TriggerPattern").alias("TriggerPattern"),
    first("KeyParameterTypeID").alias("KeyParameterTypeID"),
    first("ImplementationInfo").alias("ImplementationInfo"),
    first("MaxConcurrency").alias("MaxConcurrency"),
    first("IsParallelProcess").alias("IsParallelProcess"),
    first("EffectiveDate").alias("EffectiveDate"),
    first("ExpireDate").alias("ExpireDate"),
    first("CreatedDatetime").alias("CreatedDatetime"),
    first("CreatedBy").alias("CreatedBy"),
    first("UpdatedDatetime").alias("UpdatedDatetime"),
    first("UpdatedBy").alias("UpdatedBy"),
    first("ValidFrom").alias("ValidFrom"),
    first("ValidTo").alias("ValidTo"),
    collect_list("ProcessStepID").alias("ProcessStepIDList"),
    collect_list("ProcessStepName").alias("ProcessStepNameList"),
    collect_list("ProcessStepDescription").alias("ProcessStepDescriptionList"),
    collect_list("ProcessStepTypeID").alias("ProcessStepTypeIDList"),
    collect_list("SubProcessID").alias("SubProcessIDList"),
    collect_list("ProcessStepOrder").alias("ProcessStepOrderList"),
    collect_list("SourceDatasetID").alias("SourceDatasetIDList"),
    collect_list("SourceDatasetValidationTypeID").alias("SourceDatasetValidationTypeIDList"),
    collect_list("DestinationDatasetID").alias("DestinationDatasetIDList"),
    collect_list("ActionID").alias("ActionIDList"),
    collect_list("DatasetID").alias("DatasetIDList"),
    collect_

grouped_by_process_df = grouped_by_process_df.agg(
    collect_list("DatasetName").alias("DatasetNameList"),
    collect_list("DatasetDescription").alias("DatasetDescriptionList"),
    collect_list("DatasetTypeID").alias("DatasetTypeIDList"),
    collect_list("NamePattern").alias("NamePatternList"),
    collect_list("LocationPattern").alias("LocationPatternList"),
    collect_list("DatasetKey").alias("DatasetKeyList"),
    collect_list("DatasetSource").alias("DatasetSourceList"),
    collect_list("ConnectionDescription").alias("ConnectionDescriptionList"),
    collect_list("EffectiveDate").alias("EffectiveDateList"),
    collect_list("ExpireDate").alias("ExpireDateList"),
    collect_list("CreatedDatetime").alias("CreatedDatetimeList"),
    collect_list("CreatedBy").alias("CreatedByList"),
    collect_list("UpdatedDatetime").alias("UpdatedDatetimeList"),
    collect_list("UpdatedBy").alias("UpdatedByList"),
    collect_list("ValidFrom").alias("ValidFromList"),
    collect_list("ValidTo").alias("ValidToList"),
    collect_list("DatasetElementID").alias("DatasetElementIDList"),
    collect_list("DatasetElementName").alias("DatasetElementNameList"),
    collect_list("DatasetElementDescription").alias("DatasetElementDescriptionList"),
    collect_list("DatasetElementOrder").alias("DatasetElementOrderList"),
    collect_list("DatasetElementType").alias("DatasetElementTypeList"),
    collect_list("IsRequired").alias("IsRequiredList"),
    collect_list("ProcessStepElementMappingID").alias("ProcessStepElementMappingIDList"),
    collect_list("DestinationElementID").alias("DestinationElementIDList"),
    collect_list("ElementSourceTypeID").alias("ElementSourceTypeIDList"),
    collect_list("Source").alias("SourceList"),
    collect_list("EffectiveDate").alias("EffectiveDateList"),
    collect_list("ExpireDate").alias("ExpireDateList"),
    collect_list("CreatedDatetime").alias("CreatedDatetimeList"),
    collect_list("CreatedBy").alias("CreatedByList"),
    collect_list("UpdatedDatetime").alias("


grouped_by_process_df = grouped_by_process_df.agg(
    collect_list("UpdatedBy").alias("UpdatedByList"),
    collect_list("ValidFrom").alias("ValidFromList"),
    collect_list("ValidTo").alias("ValidToList"),
    collect_list("ActionTypeID").alias("ActionTypeIDList"),
    collect_list("ActionTypeDescription").alias("ActionTypeDescriptionList"),
    collect_list("ActionName").alias("ActionNameList"),
    collect_list("ActionDescription").alias("ActionDescriptionList"),
    collect_list("NamePattern").alias("NamePatternList"),
    collect_list("LocationPattern").alias("LocationPatternList"),
    collect_list("ParameterExpression").alias("ParameterExpressionList"),
    collect_list("ConnectionDescription").alias("ConnectionDescriptionList"),
    collect_list("ActionParameterID").alias("ActionParameterIDList"),
    collect_list("ParameterName").alias("ParameterNameList"),
    collect_list("ParameterValueTypeID").alias("ParameterValueTypeIDList"),
    collect_list("ParameterValue").alias("ParameterValueList"),
    collect_list("ParameterOrder").alias("ParameterOrderList"),
    collect_list("IsRequired").alias("IsRequiredList")
)

In this example, I'm using the groupBy method to group the results by Process Code and Process Name, and using the agg method and the collect_list function to collect all the values for each unique Process Code and then group the resulting dataframe by process.
It's worth noting that this is just an example, you may need to adjust the code as per your use case.

Flow DiagramΒΆ

flowchart TD
    subgraph Process
        ProcessID(ProcessID) --> ProcessCode(ProcessCode)
        ProcessCode --> ProcessName(ProcessName)
        ProcessName --> ProcessDescription(ProcessDescription)
        ProcessDescription --> TriggerPattern(TriggerPattern)
        TriggerPattern --> KeyParameterTypeID(KeyParameterTypeID)
        KeyParameterTypeID --> ImplementationInfo(ImplementationInfo)
        ImplementationInfo --> MaxConcurrency(MaxConcurrency)
        MaxConcurrency --> IsParallelProcess(IsParallelProcess)
        IsParallelProcess --> EffectiveDate(EffectiveDate)
        EffectiveDate --> ExpireDate(ExpireDate)
        ExpireDate --> CreatedDatetime(CreatedDatetime)
        CreatedDatetime --> CreatedBy(CreatedBy)
        CreatedBy --> UpdatedDatetime(UpdatedDatetime)
        UpdatedDatetime --> UpdatedBy(UpdatedBy)
        UpdatedBy --> ValidFrom(ValidFrom)
        ValidFrom --> ValidTo(ValidTo)
    end

    subgraph ProcessStep
        ProcessStepID(ProcessStepID) --> ProcessID(ProcessID)
        ProcessID --> ProcessStepName(ProcessStepName)
        ProcessStepName --> ProcessStepDescription(ProcessStepDescription)
        ProcessStepDescription --> ProcessStepTypeID(ProcessStepTypeID)
        ProcessStepTypeID --> SubProcessID(SubProcessID)
        SubProcessID --> ProcessStepOrder(ProcessStepOrder)
        ProcessStepOrder --> SourceDatasetID(SourceDatasetID)
        SourceDatasetID --> SourceDatasetValidationTypeID(SourceDatasetValidationTypeID)
        SourceDatasetValidationTypeID --> DestinationDatasetID(DestinationDatasetID)
        DestinationDatasetID --> ActionID(ActionID)
        ActionID --> EffectiveDate(EffectiveDate)
        EffectiveDate --> ExpireDate(ExpireDate)
        ExpireDate --> CreatedDatetime(CreatedDatetime)
        CreatedDatetime --> CreatedBy(CreatedBy)
        CreatedBy --> UpdatedDatetime(UpdatedDatetime)
        UpdatedDatetime --> UpdatedBy(UpdatedBy)
        UpdatedBy --> ValidFrom(ValidFrom)
        ValidFrom --> ValidTo(ValidTo)
    end

    subgraph DataSet
        DatasetName --> DatasetDescription(DatasetDescription)
        DatasetDescription --> DatasetTypeID(DatasetTypeID)
        DatasetTypeID --> NamePattern(NamePattern)
        NamePattern --> LocationPattern(LocationPattern)
        LocationPattern --> DatasetKey(DatasetKey)
        DatasetKey --> DatasetSource(DatasetSource)
        DatasetSource --> ConnectionDescription(ConnectionDescription)
        ConnectionDescription --> EffectiveDate(EffectiveDate)
        EffectiveDate --> ExpireDate(ExpireDate)
        ExpireDate --> CreatedDatetime(CreatedDatetime)
        CreatedDatetime --> CreatedBy(CreatedBy)
        CreatedBy --> UpdatedDatetime(UpdatedDatetime)
        UpdatedDatetime --> UpdatedBy(UpdatedBy)
        UpdatedBy --> ValidFrom(ValidFrom)
        ValidFrom --> ValidTo(ValidTo)
    end

    subgraph DataSetElement
        DatasetElementID(DatasetElementID) --> DatasetID(DatasetID)
        DatasetID --> DatasetElementName(DatasetElementName)
        DatasetElementName --> DatasetElementDescription(DatasetElementDescription)
        DatasetElementDescription --> DatasetElementOrder(DatasetElementOrder)
        DatasetElementOrder --> DatasetElementType(DatasetElementType)
        DatasetElementType --> IsRequired(IsRequired)
        IsRequired --> EffectiveDate(EffectiveDate)
        EffectiveDate --> ExpireDate(ExpireDate)
        ExpireDate --> CreatedDatetime(CreatedDatetime)
        CreatedDatetime --> CreatedBy(CreatedBy)
        CreatedBy --> UpdatedDatetime(UpdatedDatetime)
        UpdatedDatetime --> UpdatedBy(UpdatedBy)
        UpdatedBy --> ValidFrom(ValidFrom)
        ValidFrom --> ValidTo(ValidTo)
    end

    subgraph ProcessStepElementMapping
        ProcessStepElementMappingID(ProcessStepElementMappingID) --> ProcessStepID(ProcessStepID)
        ProcessStepID --> DestinationElementID(DestinationElementID)
        DestinationElementID --> ElementSourceTypeID(ElementSourceTypeID)
        ElementSourceTypeID --> Source(Source)
        Source --> EffectiveDate(EffectiveDate)
        EffectiveDate --> ExpireDate(ExpireDate)
        ExpireDate --> CreatedDatetime(CreatedDatetime)
        CreatedDatetime --> CreatedBy(CreatedBy)
        CreatedBy --> UpdatedDatetime(UpdatedDatetime)
        UpdatedDatetime --> UpdatedBy(UpdatedBy)
        UpdatedBy --> ValidFrom(ValidFrom)
        ValidFrom --> ValidTo(ValidTo)
    end

    subgraph ActionType
        ActionTypeID(ActionTypeID) --> ActionTypeDescription(ActionTypeDescription)
        ActionTypeDescription --> CreatedDatetime(CreatedDatetime)
        CreatedDatetime --> CreatedBy(CreatedBy)
        CreatedBy --> UpdatedDatetime(UpdatedDatetime)
        UpdatedDatetime --> UpdatedBy(UpdatedBy)
    end

    subgraph Action
        NamePattern --> LocationPattern(LocationPattern)
        LocationPattern --> ParameterExpression(ParameterExpression)
        ParameterExpression --> ConnectionDescription(ConnectionDescription)
        ConnectionDescription --> EffectiveDate(EffectiveDate)
        EffectiveDate --> ExpireDate(ExpireDate)
        ExpireDate --> CreatedDatetime(CreatedDatetime)
        CreatedDatetime --> CreatedBy(CreatedBy)
        CreatedBy --> UpdatedDatetime(UpdatedDatetime)
        UpdatedDatetime --> UpdatedBy(UpdatedBy)
        UpdatedBy --> ValidFrom(ValidFrom)
        ValidFrom --> ValidTo(ValidTo)
    end

    subgraph ActionParameter
        ActionParameterID(ActionParameterID) --> ActionID(ActionID)
        ActionID --> ParameterName(ParameterName)
        ParameterName --> ParameterValueTypeID(ParameterValueTypeID)
        ParameterValueTypeID --> ParameterValue(ParameterValue)
        ParameterValue --> ParameterOrder(ParameterOrder)
        ParameterOrder --> IsRequired(IsRequired)
        IsRequired --> EffectiveDate(EffectiveDate)
        EffectiveDate --> ExpireDate(ExpireDate)
        ExpireDate --> CreatedDatetime(CreatedDatetime)
        CreatedDatetime --> CreatedBy(CreatedBy)
        CreatedBy --> UpdatedDatetime(UpdatedDatetime)
        UpdatedDatetime --> UpdatedBy(UpdatedBy)
    end
Text Only
Process --> ProcessStep
ProcessStep --> SubProcess
ProcessStep --> DataSet
ProcessStep --> Action
DataSet --> DataSetElement
ProcessStep --> ProcessStepElementMapping
ProcessStepElementMapping --> DataSetElement
Action --> ActionParameter
Action --> ActionType