How to use mapping data flow to build generic, schema-less transformations?
There’s probably “many roads to Rome”, but here’s my approach designed with transforming data from a staging area into a 3NF-modelled data warehouse in mind. This involves handling surrogate keys, an unknown number of foreign key constraints, update / insert / ignore logic and SCD (type 1 as of now, type 2 example coming soon).
In order to hopefully be able to give a decent presentation of my approach I need to establish some context in terms of the orchestration and metadata repository supporting the generic mapping data flow. So, for the first part I’ll try to cover the metadata repository and the orchestration in ADF. Then, in the second part I’ll cover the actual data flow in detail.
Prerequisites and support systems necessary for this approach
- Metadata repository delivering .. well .. the necessary metadata – I’ll detail how I designed a first version below
- Source tables which can be queried with SQL
- Destination tables that has only one surrogate identity primary key column (the identity part could easily be substituted with logic in the mapping data flow)
First, let’s start off with the metadata repository supporting this generic mapping data flow. The demo data specified below is used throughout these posts.
Repository | Tables
Holds all the high level information about a given table both in the source and destination. Some of the columns are used to support a set of metadata driven ingest pipelines in ADF which I’ll cover in another blog post.
The main columns for relevant for this post are; TableID, ProjectName, SchemaName, TableName, IntegrationStage and isActive. The two columns which might not be self-explanatory I guess is ProjectName and IntegrationStage.
- ProjectName distinguishes between the mentioned Ingest process and the Data warehouse transformation process.
- IntegrationStage controls the order in which tables are loaded through a set of orchestration pipelines in ADF.
Repository | Columns
You might have guessed it – Columns holds the details about the columns in a given table in the data warehouse transformation process.
- TableID, SourceTableID and ReferenceTableID are foreign keys to Tables
- ColumnDataTypeID references a ColumnDataType table holding a set of predefined common data-types as an effort to ensure consistency in the data warehouse
- ChangeTypeID references a ChangeType table holding supported SCD types (for now just SCD 1, SCD 2 coming soon)
- ColumnName = ColumnName in destination
- SourceColumnName = ColumnName in the table referenced in Tables using SourceTableID
- SourceTransformation is supposed to hold simple transformation logic to be applied to the column (yet to be implemented)
- SourceWhereClause holds a where clause relevant to the source column
- isPrimaryKey, isBusinessKey, isReferenceKey are used to flag Key columns – more about that later.
The pipeline orchestration in ADF behind this solution consists of 4 layers
This is a really minimal representation without any utilities etc but it makes it easier for me to walk trough the stages without too much clutter.
Layer 1 | Orchestration
The first orchestration layer handles the different projects in the metadata repository, as of now, manually.
Layer 2 | Resource Management & Integration stages
The second layer, specific to the Data warehouse project, handles resource scaling and the requirement to load tables in the right order defined through the different integration stages in the metadata repository. A lookup to a stored procedure in the metadata repository supplies the different integration stages for a given project supplied as a parameter passed along from the orchestration layer.
Layer 3 is executed sequentially through an Execute Pipeline activity in the ForEach IntegrationStage which passes along the given integration stage value as parameter.
Layer 3 | Control logic
First I call a stored procedure in the metadata repository passing an integration stage.
The stored procedure returns all the table names in the given integration stage which is used by the ForEach loop. Inside the ForEach loop we do two things. First we create the destination table in Synapse if it doesn’t already exist based on the information in the metadata repository. This is done through a seperate utility pipeline which gets passed the table name as a parameter. Secondly we trigger the next layer of orchestration passing the table name as a parameter.
Layer 4 | Execution
For readability I’ve split the lookups against the metadata repository into three separate activities each calling a stored procedure passing in the table name passed down from the layer above. This could easily be combined in a single lookup.
Gets the information in the repository about the source table and it’s columns. The stored procedure returns (with example values based on TableID 1 in the picture above) :
- Destination TableID
- Destination TableName
- list of Source TableIDs
- list of schema identified table names
- list of schema and table identified column names “as” destination column name
- SQL Where clause with schema and table identified column names
|SourceColumnQuery||ERP.CSYTAB.CTCONO as CompanyKey, ERP.CSYTAB.CTSTKY as Code, …….|
Gets information from the repository about the existing / destination table. The stored procedure returns (with examples based on TableID 1 described earlier) :
- TableID for the destination table
- Primary Key column name for the destination table
- The business key or natural key
- List of columns that triggers update of existing rows
|ChangeColumnNames||Name, Description, Created, Modified, ModifiedBy|
Gets the information from the metadata repository about the reference tables connected to the destination table. This time the examples are taken from TableID 2 described earlier in order to show how several references are handled, which will be further elaborated upon in the next post about the generic mapping data flow. The stored procedure returns :
- TableID of the destination table
- Table name of the destination table
- List of table names for the reference tables
- Primary key columns of the reference tables
- Primary key columns of the reference tables identified with the table name
- A comma separated list of Business key or natural key columns of the reference tables. Formatted as [Column name]_[Table name] to ensure that we don’t run into problems with several tables having the same column name as business key (for example several tables with a “Code” column as the natural key).
- A comma separated list of Table-identified business key column names formatted as [Table name].[Column name] as [Column name]_[Table name].
|BusinessKeyIdentifiedColumnNames||Company.Number as Number_Company, CustomerGroup.Code as Code_CustomerGroup|
We need to split the logic in two separate data flows as mapping data flow as of now doesn’t handle sources that doesn’t return anything. This is done using an if-statement checking whether or not getMetata ReferenceInfo returns anything.
Calling the generic mapping data flow
So, finally .. the last step in the orchestration part is calling the generic mapping flow and passing in the necessary parameters based on the information from the three lookup activities.
The only extra logic here is in the SourceWhereQuery where we need to escape any apostrophes in the returned value. The replace function is a bit confusing to look at maybe but here’s what’s happening:
@replace(activity(‘GetMetadataSourceInfo’).output.firstRow.SourceWhereQuery, ””, ‘\”’)
We’re replacing a single ‘ with an escaped ‘ character in a format that mapping data flow can interpret. So, ”” (that’s 4 of them) – the outer two are defining the string we’re replacing and the second one is escaping the third so the string we’re replacing equals a single ‘.
For the “replacing with” part; the outer two defines the string. the \ is the escape character supported in mapping data flow and ‘ number 2 is escaping the ‘ number 3. So the “replace with” string interprets to \’. Hopefully this attempt at explaining it makes sense.
Conclusion and what’s coming in part 2
I hope this was an interesting read. Please drop me a comment or reach out in another way if you have questions, suggestions on how to improve this or just want to have a chat about it.
Part 2 is under way and will cover the generic mapping data flow all this feeds into in detail. So check back soon 🙂