Friday 22 July 2011

SQL Server - Identifying Control Characters


Recently a query against our Analysis Services database failed with an error about invalid XML characters and thus prompted me to start the investigation as to which members contained these. I know that in Analysis Services you can set how you want to handle invalid XML characters but as we shouldn’t receive these I'm using the defaults and I don’t want to use “replace” etc.

Interestingly though, during my investigation I came across some unexpected behaviour with regards the 0x00 control code. At that point I just needed to identify the culprit members and prevent it from entering the database in the future, so I left that peculiarity and wrote the following function:

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

IF EXISTS (SELECT * FROM sys.objects WHERE [object_id] = OBJECT_ID(N'[dbo].[udf_string_contains_control_character]') AND [type] IN (N'FN', N'IF', N'TF', N'FS', N'FT'))
BEGIN
  DROP FUNCTION [dbo].[udf_string_contains_control_character];
END;
GO

CREATE FUNCTION [dbo].[udf_find_name_with_control_character] (@i_InputText NVARCHAR(MAX))
RETURNS INT
AS
BEGIN
  RETURN ( SELECT CASE WHEN (-- First control code set (i.e. 1 to 31)
                             PATINDEX('%[' + CHAR(0x01) + '-' + CHAR(0x1F) + ']%', @i_InputText COLLATE DATABASE_DEFAULT) +
                             -- Control character 127 (i.e. DEL)
                             PATINDEX('%[' + CHAR(0x7F) + '-' + CHAR(0x7F) + ']%', @i_InputText COLLATE DATABASE_DEFAULT) ) > 0 THEN
                    1
                  ELSE
                    0
                  END );

END;
GO

Clearly the function doesn’t handle every “dodgy” character but it handles some of the more common control characters and can be easily extended.

I then never found the time to get back to the issue with regards the 0x00 control character until now, thanks largely to a question posted on the Analysis Services forum which you can see here: http://social.msdn.microsoft.com/Forums/en/sqlanalysisservices/thread/619a0d73-455c-404f-99ad-bbebfab2cf18)
In this particular case, the error message was:
The server sent an unrecognizable response. _'.', hexadecimal value 0x00, is an invalid character. Line , position .

This prompted me to revisit why, when I used the 0x00 control character in the filter in the above function, the function didn’t behave as I initially expected. Clearly it appears that the control character 0x00 in SQL Server is a little problematic but I didn’t realise just how problematic until I started digging a little deeper.

So how hard can it be to find control characters in SQL? Answer, harder than you might think. So, in SSMS run this in grid view (i.e. CTRL-D) and then in test view (i.e. CTRL-T).

SELECT [test], LEN([test]) AS "len", DATALENGTH([test]) AS "data_length"
FROM ( SELECT 'foo' + CHAR(0x00) + 'bar' AS "test" ) a;

You should hopefully see the differences there. The grid view hides the underlying artefact, though the lengths clearly show something isn't right, but switching to text view shines a bit more light on the issue. So let’s try finding the row using LIKE…

SELECT [test], LEN([test]) AS "len", DATALENGTH([test]) AS "data_length"
FROM ( SELECT 'foo' + CHAR(0x00) + 'bar' AS "test" ) a
WHERE [test] LIKE '%' + CHAR(0x00) + '%';

Er, nope, that doesn’t work. Aha, maybe we need to escape it...

SELECT [test], LEN([test]) AS "len", DATALENGTH([test]) AS "data_length"
FROM ( SELECT 'foo' + CHAR(0x00) + 'bar' AS "test"
      UNION ALL
       SELECT 'bar' AS "test" ) a
WHERE [test] LIKE '%\' + CHAR(0x00) + '%' ESCAPE '\';

Groan, grumble, no that’s still not the right answer. Note how we get both rows. I suspect all of this is due to the fact that C Strings are “Nul” terminated (i.e. \0) so all rows match. I could go on for ages about trying to use REPLACE, PATINDEX, CHARINDEX etc. as I did, but I’ll save you the time, they don’t work.

For example, trying to use REPLACE:

SELECT REPLACE([test], CHAR(0x00), '') AS "test",
       LEN(REPLACE([test], CHAR(0x00), '')) AS "len",
       DATALENGTH(REPLACE([test], CHAR(0x00), '')) AS "data_length"
FROM ( SELECT 'foo' + CHAR(0x00) + 'bar' AS "test" ) a;

I then had one of those eureka moments whilst looking through the list of string functions in Books Online. What happens if I use the QUOTENAME function? I figured that that might do something interesting based on the fact that it is meant to delimit object names, and indeed it does do something interesting. If a column contains the 0x00 control character, wrapping that value with QUOTENAME returns NULL. So, to find rows that contain the 0x00 character in the data as is, I found the following works reliably:

SELECT [test], LEN([test]) AS "len", DATALENGTH([test]) AS "data_length"
FROM ( SELECT 'foo' + CHAR(0x00) + 'bar' AS "test"
      UNION ALL
       SELECT 'bar' AS "test" ) a
WHERE [test] IS NOT NULL AND
      QUOTENAME([test]) IS NULL;

However, this only identifies the culprits. We still haven’t discovered a way to fix the data by removing the control code. We know that we can’t use the standard string functions such as REPLACE etc but what happens if we use XML?

So, using XML we can identify the control character by looking for "�" within the xml fragment.  See what’s returned by the following code:

SELECT [test], LEN([test]) AS "len", DATALENGTH([test]) AS "data_length"
FROM ( SELECT 'foo' + CHAR(0x00) + 'bar' AS "test"
      UNION ALL
       SELECT 'bar' AS "test" ) a
FOR XML AUTO;

Okay, so now that we can identify the control character we should be able to remove it with the standard text functions. Something like the following should do it:

SELECT dta.[xmltest].value('(/a/@test)[1]', 'VARCHAR(20)') AS "test",
       LEN(dta.[xmltest].value('(/a/@test)[1]', 'VARCHAR(20)')) AS "len",
       DATALENGTH(dta.[xmltest].value('(/a/@test)[1]', 'VARCHAR(20)')) AS "data_length",
       dta.[xmltest].value('(/a/@len)[1]', 'INT') AS "original_len",
       dta.[xmltest].value('(/a/@data_length)[1]', 'INT')  AS "original_data_length"
FROM (
SELECT CAST(REPLACE([xmltest], '�', '') AS XML) AS "xmltest"
FROM ( -- Retrieve sample data with lengths etc.
       SELECT [test], LEN([test]) AS "len", DATALENGTH([test]) AS "data_length"
       FROM ( SELECT 'foo' + CHAR(0x00) + 'bar' AS "test"
             UNION ALL
              SELECT 'bar' AS "test" ) a
       -- Don't specify TYPE as it will fail due to the 0x00 character
       -- not allowed in XML. We'll convert it later after removing it.
       FOR XML AUTO ) xmled(xmltest) ) dta;

Clearly you could also write a function to iterate through each character within the string and remove the control character, but where’s the fun in that?

Tuesday 12 July 2011

Implementing Analysis Services Process Add Functionality – Part 3


Following on from my previous post, which can be found here, where I presented one solution for implementing process add functionality, I’ll try to cover off some functional code in order to implement this. Clearly this code can be enhanced and/or converted to your specific needs but hopefully this is a start.

First let’s setup a dummy set of tables to represent a customer dimension. (Apologies for the formatting, will fix once I can edit my templates again.)

Dummy Tables

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO
SET NOCOUNT ON;
GO

/**
 * Delete objects.
 */
IF EXISTS ( SELECT *
            FROM sys.objects
            WHERE [object_id] = OBJECT_ID(N'[dbo].[customer]') AND
                  [type] in (N'U') )
BEGIN
  DROP TABLE [dbo].[customer];
END;
IF EXISTS ( SELECT *
            FROM sys.objects
            WHERE [object_id] = OBJECT_ID(N'[dbo].[address]') AND
                  [type] in (N'U') )
BEGIN
  DROP TABLE [dbo].[address];
END;
IF EXISTS ( SELECT *
            FROM sys.objects
            WHERE [object_id] = OBJECT_ID(N'[dbo].[customer_group]') AND
                  [type] in (N'U') )
BEGIN
  DROP TABLE [dbo].[customer_group];
END;
IF EXISTS ( SELECT *
            FROM sys.objects
            WHERE [object_id] = OBJECT_ID(N'[dbo].[customer_class]') AND
                  [type] in (N'U') )
BEGIN
  DROP TABLE [dbo].[customer_class];
END;
GO




/**
 * CUSTOMER CLASS table.
 */
CREATE TABLE [dbo].[customer_class]
(
       [customer_class_id] INT IDENTITY(1, 1) NOT NULL PRIMARY KEY CLUSTERED,
       [customer_class_name] VARCHAR(50) NOT NULL
) ON [PRIMARY];
GO

SET IDENTITY_INSERT [dbo].[customer_class] ON;
INSERT INTO [dbo].[customer_class] ([customer_class_id], [customer_class_name])
VALUES(1, 'Class1');
SET IDENTITY_INSERT [dbo].[customer_class] OFF;
GO



/**
 * CUSTOMER GROUP table.
 */
CREATE TABLE [dbo].[customer_group]
(
       [customer_group_id] INT IDENTITY(1, 1) NOT NULL PRIMARY KEY CLUSTERED,
       [customer_group_name] VARCHAR(50) NOT NULL,
    [customer_class_id] INT NOT NULL
          CONSTRAINT [fk_customer_group_customer_class]
          REFERENCES [dbo].[customer_class] ([customer_class_id])
) ON [PRIMARY];
GO

SET IDENTITY_INSERT [dbo].[customer_group] ON;
INSERT INTO [dbo].[customer_group] ( [customer_group_id], [customer_group_name],
                                     [customer_class_id] )
VALUES(1, 'Group1', 1);
INSERT INTO [dbo].[customer_group] ( [customer_group_id], [customer_group_name],
                                     [customer_class_id] )
VALUES(2, 'Group2', 1);
SET IDENTITY_INSERT [dbo].[customer_group] OFF;
GO



/**
 * ADDRESS table.
 */
CREATE TABLE [dbo].[address]
(
       [address_id] INT IDENTITY(1, 1) NOT NULL PRIMARY KEY CLUSTERED,
       [address_line_1] VARCHAR(50) NOT NULL,
       [address_line_2] VARCHAR(50)     NULL
) ON [PRIMARY];
GO

SET IDENTITY_INSERT [dbo].[address] ON;
INSERT INTO [dbo].[address] ([address_id], [address_line_1], [address_line_2])
VALUES(-1, 'Unknown', NULL);
INSERT INTO [dbo].[address] ([address_id], [address_line_1], [address_line_2])
VALUES(1, 'House 1', 'Street 1');
INSERT INTO [dbo].[address] ([address_id], [address_line_1], [address_line_2])
VALUES(2, 'House 2', NULL);
SET IDENTITY_INSERT [dbo].[address] OFF;
GO



/**
 * CUSTOMER table.
 */

CREATE TABLE [dbo].[customer]
(
       [customer_id] INT IDENTITY(1, 1) NOT NULL PRIMARY KEY CLUSTERED,
       [customer_name] VARCHAR(50) NOT NULL,
       [address_id] INT NOT NULL
                     CONSTRAINT [fk_customer_address]
                     REFERENCES [dbo].[address]([address_id]),
       [customer_group_id] INT NOT NULL
                     CONSTRAINT [fk_customer_customer_group]
                     REFERENCES [dbo].[customer_group]([customer_group_id])
) ON [PRIMARY];
GO

SET IDENTITY_INSERT [dbo].[customer] ON;
INSERT INTO [dbo].[customer] ([customer_id], [customer_name], [address_id], [customer_group_id])
VALUES(1, 'Customer 1', 1, 1);
INSERT INTO [dbo].[customer] ([customer_id], [customer_name], [address_id], [customer_group_id])
VALUES(2, 'Customer 2', -1, 1);
INSERT INTO [dbo].[customer] ([customer_id], [customer_name], [address_id], [customer_group_id])
VALUES(3, 'Customer 3', 2, 2);
SET IDENTITY_INSERT [dbo].[customer] OFF;
GO


Now, taking each of the steps in turn…



The Seven-And-A-Half Steps to Process Add

Step 1
First we need to capture the list of dimensions we are interested in and the associated key table for that dimension. Note that by “key table”, I’m referring to the table that contains the attribute to be used as the key in the dimension. The following code will create a table that captures this information.

/**
 * LIST OF DIMENSIONS
 */
IF EXISTS ( SELECT *
            FROM sys.objects
            WHERE [object_id] = OBJECT_ID(N'[dbo].[dimension]') AND
                  [type] in (N'U') )
BEGIN
  DROP TABLE [dbo].[dimension];
END;
GO

CREATE TABLE [dbo].[dimension](
       [dimension_id] INT IDENTITY(1,1) NOT NULL
                     CONSTRAINT [pk_dimension] PRIMARY KEY CLUSTERED,
       [olap_dimension_id] SYSNAME NOT NULL
                     CONSTRAINT [uk_dimension] UNIQUE NONCLUSTERED ,
       [dimension_name] SYSNAME NOT NULL
) ON [PRIMARY];
GO



Step 1a
For performance reasons we will create a table in which we will capture the names of all the tables used by a given. (i.e. The key table and recursively all related tables.)

/**
 * DIMENSION TABLE RELATIONSHIP
 */
IF EXISTS ( SELECT *
            FROM sys.objects
            WHERE [object_id] = OBJECT_ID(N'[dbo].[dimension_table_relationship]') AND
                  [type] in (N'U') )
BEGIN
  DROP TABLE [dbo].[dimension_table_relationship];
END;
GO

CREATE TABLE [dbo].[dimension_table_relationship](
       [dimension_table_relationship_id] INT IDENTITY(1,1) NOT NULL
              CONSTRAINT [pk_dimension_table_relationship] PRIMARY KEY CLUSTERED,
       [dimension_id] INT NOT NULL
              CONSTRAINT [fk_dimension_table_relationship_dimension_dimension]
                REFERENCES [dbo].[dimension]([dimension_id]),
       [olap_dimension_id] SYSNAME NOT NULL
              CONSTRAINT [fk_dimension_table_relationship_dimension_olap]
                REFERENCES [dbo].[dimension]([olap_dimension_id]),
       [dimension_key_table_schema] SYSNAME NOT NULL,
       [dimension_key_table_name] SYSNAME NOT NULL,
       [attribute_table_schema] SYSNAME NOT NULL,
       [attribute_table_name] SYSNAME NOT NULL,
       [first_referencing_table_schema] SYSNAME NOT NULL,
       [first_referencing_table_name] SYSNAME NOT NULL
) ON [PRIMARY];
GO



Step 2
Next we need to create a table in which we will capture the names of all the tables used by a given dimension on rows. (i.e. The key table and all related tables.) as well as the previous and current identity values for each table.

IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[dimension_process_attribute_key_range]') AND type in (N'U'))
BEGIN
  DROP TABLE [dbo].[dimension_process_attribute_key_range];
END;
GO

CREATE TABLE [dbo].[dimension_process_attribute_key_range](
       [dimension_process_attribute_key_range_key] INT IDENTITY(1,1) NOT NULL
       CONSTRAINT [pk_dimension_process_attribute_key_range] PRIMARY KEY CLUSTERED ,
       [olap_dimension_id] SYSNAME NOT NULL,
       [dimension_key_table_schema] SYSNAME NOT NULL,
       [dimension_key_table_name] SYSNAME NOT NULL,
       [attribute_table_schema] SYSNAME NOT NULL,
       [attribute_table_name] SYSNAME NOT NULL,
       [min_key_value] BIGINT NOT NULL,
       [max_key_value] BIGINT NULL,
       [new_data_flag] BIT NOT NULL,
       [process_flag] BIT NOT NULL,
       [created_date] DATETIME NOT NULL,
       [created_by] NVARCHAR(255) NOT NULL,
       [updated_date] DATETIME NULL,
       [updated_by] NVARCHAR(255) NULL
) ON [PRIMARY];
GO



Step 3
Populate the tables created in point 1 and 1a above with the dimensions for which you wish to use process add and where the underlying relational schema has been configured as described in the previous posts.

SET IDENTITY_INSERT [dbo].[dimension] ON;
INSERT INTO [dbo].[dimension] ([dimension_id], [olap_dimension_id], [dimension_name] )
VALUES(1, 'Customer', 'Customer');
SET IDENTITY_INSERT [dbo].[dimension] OFF;
GO

INSERT INTO [dbo].[dimension_table_relationship]
       ( [dimension_id], [olap_dimension_id],
         [dimension_key_table_schema], [dimension_key_table_name],
         [attribute_table_schema], [attribute_table_name],
         [first_referencing_table_schema], [first_referencing_table_name] )
VALUES(1, 'Customer', 'dbo', 'customer', 'dbo', 'customer', 'dbo', 'customer');
GO



Step 4
Create a stored procedure that accepts a given dimension name and key table name and, using the information schema views etc, identify all related tables by means of foreign key relationships. If you did not create foreign keys between all tables, you will need statically capture all tables used by a given dimension and maintain this over time as tables are added or dropped.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[usp_reseed_dimension_table_relationship_by_dimension]') AND type in (N'P', N'PC'))
BEGIN
  DROP PROCEDURE [dbo].[usp_reseed_dimension_table_relationship_by_dimension];
END;
GO

CREATE PROCEDURE [dbo].[usp_reseed_dimension_table_relationship_by_dimension]
                                     @i_DimensionID             INT     = NULL,
                                     @i_DimensionKeyTableSchema SYSNAME = NULL,
                                     @i_DimensionKeyTableName   SYSNAME = NULL
AS
/**
 * Recursively captures all tables related to the table passed into the
 * stored procedure. Essentially it recursively captures the attribute
 * relationships from a given "dimension key" table.
 *
 * Input Parameters:
 * ----------------
 * @i_DimensionID             The relational identifier for the dimension for
 *                            which attribute relationships are required.
 * @i_DimensionKeyTableSchema The schema in which the dimension key table
 *                            resides.
 * @i_DimensionKeyTableName   The name of the table from which all related
 *                            tables should be derived.
 *
 * Ver       Author              Date         Comments
 * ---       -------------------       -----------  ------------------------------
 *   1       Philip Stephenson   11-Mar-2011  Original version.
 */
SET NOCOUNT ON;

DECLARE -- Define Error Codes.
        @ERR_MISSING_PARAM         INT,
        @ERR_GENERAL               INT,
        -- Define user variables.
        @ErrCode                   INT,
        @ErrMsg                    NVARCHAR(MAX),
        @StepNbr                   SMALLINT,
        @FQProcedureName           NVARCHAR(392),
        @RetVal                    INT,
        @NewLine                   NVARCHAR(MAX),
        @OlapDimensionID           SYSNAME,
        @RowsAffected              BIGINT;
BEGIN
 /**
  * STEP 1
  * Set variables.
  */
  SET @StepNbr = @StepNbr + 1;
  SET @ERR_MISSING_PARAM = 1;
  SET @ERR_GENERAL = 128;
  SET @ErrCode = 0;
  SET @FQProcedureName = QUOTENAME(DB_NAME()) + '.' + QUOTENAME(OBJECT_SCHEMA_NAME(@@PROCID)) + '.' + QUOTENAME(OBJECT_NAME(@@PROCID));
  SET @ErrMsg = N'ERROR - Stored procedure ' + @FQProcedureName + N' ';
  SET @NewLine = NCHAR(13) + NCHAR(10);
  -- Capture the OLAP dimension ID for the dimension ID provided.
  SELECT @OlapDimensionID = [olap_dimension_id]
  FROM [dbo].[dimension]
  WHERE [dimension_id] = @i_DimensionID;
  -- Set the default rows affected count.
  SET @RowsAffected = -1;
 
 
 
 /**
  * STEP 2
  * Ensure that the dimension ID parameter is not NULL and that a non-null olap
  * dimension ID was found.
  */
  SET @StepNbr = @StepNbr + 1;
  IF (@i_DimensionID IS NULL) OR (@OlapDimensionID IS NULL)
  BEGIN
    SET @ErrMsg = @ErrMsg + N'identified that either the given @i_DimensionID parameter (i.e. ' + ISNULL(@i_DimensionID, '{null}') + N') was NULL or the associated [olap_dimension_id] was NULL. This is not allowed!';
    SET @ErrCode = @ERR_MISSING_PARAM;
    GOTO FAIL;
  END;
 
 
 
 /**
  * STEP 3
  * Ensure that the dimension key table schema parameter is not NULL.
  */
  SET @StepNbr = @StepNbr + 1;
  IF (@i_DimensionKeyTableSchema IS NULL)
  BEGIN
    SET @ErrMsg = @ErrMsg + N'identified that the @i_DimensionKeyTableSchema parameter was NULL. This is not allowed!';
    SET @ErrCode = @ERR_MISSING_PARAM;
    GOTO FAIL;
  END;
 
 
 
 /**
  * STEP 4
  * Ensure that the dimension key table name parameter is not NULL.
  */
  SET @StepNbr = @StepNbr + 1;
  IF (@i_DimensionKeyTableName IS NULL)
  BEGIN
    SET @ErrMsg = @ErrMsg + N'identified that the @i_DimensionKeyTableName parameter was NULL. This is not allowed!';
    SET @ErrCode = @ERR_MISSING_PARAM;
    GOTO FAIL;
  END;
 
 
 
  BEGIN TRY
   /**
    * STEP 5
    * Create temporary table, else truncate it if it already exists.
    */
    SET @StepNbr = @StepNbr + 1;
    IF (OBJECT_ID('[tempdb].[dbo].[#foreign_key_tree]') IS NULL)
     BEGIN
       CREATE TABLE [#foreign_key_tree]
       (
         [base_table_schema]        SYSNAME COLLATE DATABASE_DEFAULT NOT NULL,
         [base_table_name]          SYSNAME COLLATE DATABASE_DEFAULT NOT NULL,
         [referenced_table_schema]  SYSNAME COLLATE DATABASE_DEFAULT NOT NULL,
         [referenced_table_name]    SYSNAME COLLATE DATABASE_DEFAULT NOT NULL,
         [referencing_table_schema] SYSNAME COLLATE DATABASE_DEFAULT NOT NULL,
         [referencing_table_name]   SYSNAME COLLATE DATABASE_DEFAULT NOT NULL
       )
     END;
    ELSE
     BEGIN
       TRUNCATE TABLE [#foreign_key_tree];
     END;
   
   
   
   /**
    * STEP 6
    * Capture the starting point from which to recursively derive all related
    * tables.
    */
    SET @StepNbr = @StepNbr + 1;
    INSERT INTO [#foreign_key_tree] ( [base_table_schema], [base_table_name],
                                      [referenced_table_schema],
                                      [referenced_table_name],
                                      [referencing_table_schema],
                                      [referencing_table_name] )
    VALUES(@i_DimensionKeyTableSchema, @i_DimensionKeyTableName,
           @i_DimensionKeyTableSchema, @i_DimensionKeyTableName,
           @i_DimensionKeyTableSchema, @i_DimensionKeyTableName);
   
   
   
   /**
    * STEP 7
    * Recursively capture all tables related to the set of tables already in
    * the temporary table. The idea here is to obtain the set of all tables
    * that could be used by a dimension in Analysis Services.
    */
    SET @StepNbr = @StepNbr + 1;
    WHILE (@RowsAffected <> 0)
    BEGIN
      INSERT INTO [#foreign_key_tree] ( [base_table_schema], [base_table_name],
                                        [referenced_table_schema],
                                        [referenced_table_name],
                                        [referencing_table_schema],
                                        [referencing_table_name] )
      SELECT inline.[base_table_schema]       AS "base_table_schema",
             inline.[base_table_name]         AS "base_table_name",
             inline.[TABLE_SCHEMA]            AS "referenced_table_schema",
             inline.[TABLE_NAME]              AS "referenced_table_name",
             inline.[referenced_table_schema] AS "referencing_table_schema",
             inline.[referenced_table_name]   AS "referencing_table_name"
      FROM
      (
       SELECT DISTINCT tmp.[base_table_schema],
                       tmp.[base_table_name],
                       reftab.[TABLE_SCHEMA],
                       reftab.[TABLE_NAME],
                       tmp.[referenced_table_schema],
                       tmp.[referenced_table_name]
       FROM INFORMATION_SCHEMA.TABLES tab
          INNER JOIN INFORMATION_SCHEMA.TABLE_CONSTRAINTS con
          ON tab.[TABLE_SCHEMA] = con.[TABLE_SCHEMA] AND
             tab.[TABLE_NAME] = con.[TABLE_NAME]
          INNER JOIN INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE fctcol
          ON tab.[TABLE_SCHEMA] = fctcol.[TABLE_SCHEMA] AND
             tab.[TABLE_NAME] = fctcol.[TABLE_NAME] AND
             con.[CONSTRAINT_NAME] = fctcol.[CONSTRAINT_NAME]
         INNER JOIN INFORMATION_SCHEMA.REFERENTIAL_CONSTRAINTS rfc
         ON con.[CONSTRAINT_SCHEMA] = rfc.[CONSTRAINT_SCHEMA] AND
            con.[CONSTRAINT_NAME] = rfc.[CONSTRAINT_NAME]
         INNER JOIN ( SELECT [CONSTRAINT_SCHEMA],
                             [CONSTRAINT_NAME],
                             [TABLE_SCHEMA],
                             [TABLE_NAME],
                             [COLUMN_NAME]
                      FROM INFORMATION_SCHEMA.CONSTRAINT_COLUMN_USAGE
                     UNION
                      -- Include unique indexes
                      SELECT SCHEMA_NAME(obj.[schema_id]) AS "CONSTRAINT_SCHEMA",
                             idx.[name]                   AS "CONSTRAINT_NAME",
                             SCHEMA_NAME(obj.[schema_id]) AS "TABLE_SCHEMA",
                             obj.[name]                   AS "TABLE_NAME",
                             col.[name]                   AS "COLUMN_NAME"
                      FROM sys.index_columns icl
                                             INNER JOIN sys.indexes idx
                                             ON icl.[index_id] = idx.[index_id]
                                             INNER JOIN sys.objects obj
                                             ON idx.[object_id] = obj.[object_id] AND
                                                icl.[object_id] = obj.[object_id]
                                             INNER JOIN sys.columns col
                                             ON obj.[object_id] = col.[object_id] AND
                                                icl.[column_id] = col.[column_id]
                      WHERE idx.[is_unique] = 1 AND
                            idx.[is_primary_key] = 0 AND
                            obj.[is_ms_shipped] = 0 ) refcol
         ON rfc.[UNIQUE_CONSTRAINT_SCHEMA] = refcol.[CONSTRAINT_SCHEMA] AND
            rfc.[UNIQUE_CONSTRAINT_NAME] = refcol.[CONSTRAINT_NAME]
                                           INNER JOIN INFORMATION_SCHEMA.TABLES reftab
                                           ON refcol.[TABLE_SCHEMA] = reftab.[TABLE_SCHEMA] AND
                                              refcol.[TABLE_NAME] = reftab.[TABLE_NAME]
                                           INNER JOIN [#foreign_key_tree] tmp
                                           ON tab.[TABLE_SCHEMA] = tmp.[referenced_table_schema] AND
                                              tab.[TABLE_NAME] = tmp.[referenced_table_name]
       WHERE tab.[TABLE_TYPE] = 'BASE TABLE' AND
             con.[CONSTRAINT_TYPE] = 'FOREIGN KEY'
       ) inline LEFT OUTER JOIN [#foreign_key_tree] nxt
                ON inline.[TABLE_SCHEMA] = nxt.[referenced_table_schema] AND
                   inline.[TABLE_NAME] = nxt.[referenced_table_name]
      WHERE nxt.[base_table_name] IS NULL;
     
      -- Capture the number of rows inserted.
      SET @RowsAffected = @@ROWCOUNT;
    END;
   
   
   
   /**
    * STEP 9
    * Update current information in the static table.
    */
    SET @StepNbr = @StepNbr + 1;
    UPDATE tgt
    SET [first_referencing_table_schema] = src.[referencing_table_schema],
        [first_referencing_table_name] = src.[referencing_table_name]
    FROM [dbo].[dimension_table_relationship] tgt
                INNER JOIN [#foreign_key_tree] src
                ON tgt.[dimension_id] = @i_DimensionID AND
                   tgt.[olap_dimension_id] = @OlapDimensionID AND
                   tgt.[dimension_key_table_schema] = src.[base_table_schema] AND
                   tgt.[dimension_key_table_name] = src.[base_table_name] AND
                   tgt.[attribute_table_schema] = src.[referenced_table_schema] AND
                  tgt.[attribute_table_name] = src.[referenced_table_name]
    WHERE tgt.[first_referencing_table_schema] <> src.[referencing_table_schema] OR
             tgt.[first_referencing_table_name] <> src.[referencing_table_name];
   
   
   /**
    * STEP 9
    * Insert new information into the static table.
    */
    SET @StepNbr = @StepNbr + 1;
    INSERT INTO [dbo].[dimension_table_relationship]
                  ( [dimension_id], [olap_dimension_id],
                    [dimension_key_table_schema], [dimension_key_table_name],
                    [attribute_table_schema], [attribute_table_name],
                    [first_referencing_table_schema], [first_referencing_table_name] )
      SELECT @i_DimensionID             AS "dimension_id",
             @OlapDimensionID           AS "olap_dimension_id",
             [base_table_schema],
             [base_table_name],
             [referenced_table_schema],
             [referenced_table_name],
             [referencing_table_schema],
             [referencing_table_name]
      FROM [#foreign_key_tree] src
               LEFT OUTER JOIN [dbo].[dimension_table_relationship] tgt
               ON tgt.[dimension_id] = @i_DimensionID AND
                  tgt.[olap_dimension_id] = @OlapDimensionID AND
                  tgt.[dimension_key_table_schema] = src.[base_table_schema] AND
                  tgt.[dimension_key_table_name] = src.[base_table_name] AND
                  tgt.[attribute_table_schema] = src.[referenced_table_schema] AND
                  tgt.[attribute_table_name] = src.[referenced_table_name]
      WHERE tgt.[dimension_table_relationship_id] IS NULL;
   
   
   
   /**
    * STEP 11
    * Clean up.
    */
    SET @StepNbr = @StepNbr + 1;
    IF (OBJECT_ID('[tempdb].[dbo].[#foreign_key_tree]') IS NOT NULL)
    BEGIN
      DROP TABLE [#foreign_key_tree];
    END;
   
  END TRY
  BEGIN CATCH
    SET @ErrCode = @ERR_GENERAL;
    SET @ErrMsg = @ErrMsg + N'raised error (' + CAST(ERROR_NUMBER() AS NVARCHAR) + N') in step number (' + CAST(@StepNbr AS NVARCHAR) + N') at line number (' + CAST(ERROR_LINE() AS NVARCHAR) + N') with message: ' + ERROR_MESSAGE();
    GOTO FAIL;
  END CATCH;
 
 
  GOTO SUCCESS;
 
FAIL:
  -- Cater for max exception string length.
  SET @ErrMsg = CASE WHEN (LEN(@ErrMsg) > 2044) THEN N'...' + RIGHT(@ErrMsg, 2041) ELSE RIGHT(@ErrMsg, 2044) END;
  RAISERROR(@ErrMsg, 16, 1);
  IF (OBJECT_ID('[tempdb].[dbo].[#foreign_key_tree]') IS NOT NULL)
  BEGIN
    DROP TABLE [#foreign_key_tree];
  END;
  RETURN @ErrCode;

SUCCESS: 
  RETURN 0;
END;
GO



Step 5
Create a stored procedure that uses the information captured by the SP created in step 4 above, to now populate the table created in point 2 above with an arbitrary minimum value and, using the IDENT_CURRENT(‘’) function, populate the max value with the current identity value for the table. Note that you must ensure that you retrieve the values for the key table before any of the related tables. This is what provides protection against the race condition discussed in my previous post.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[usp_derive_new_attribute_keys_for_processing]') AND type in (N'P', N'PC'))
BEGIN
  DROP PROCEDURE [dbo].[usp_derive_new_attribute_keys_for_processing];
END;
GO

CREATE PROCEDURE [dbo].[usp_derive_new_attribute_keys_for_processing]
AS
/**
 * For each dimension configured to be processed using "process add", capture the
 * set of new attribute keys for every attribute in the dimension.
 *
 * Ver       Author              Date         Comments
 * ---       -------------------       -----------  ------------------------------
 *   1       Philip Stephenson   01-Jul-2011  Original version.
 */
SET NOCOUNT ON;

DECLARE -- Define Error Codes.
        @ERR_GENERAL               INT,
        @ERR_LOOKUP_FAIL           INT,
        -- Define user variables.
        @ErrCode                   INT ,--= 0,
        @ErrMsg                    NVARCHAR(MAX),
        @StepNbr                   SMALLINT,-- = 0,
        @FQProcedureName           NVARCHAR(392),
        @RetVal                    INT,
        @AbsoluteMinKeyValue       BIGINT,
        @AbsoluteMaxKeyValue       BIGINT,
        @MinKeyValue               BIGINT,
        @RowsAffected              BIGINT,
        @ProcessAddFlag            BIT;
BEGIN
 /**
  * STEP 1
  * Set variables.
  */
  SET @StepNbr = @StepNbr + 1;
  SET @ERR_GENERAL     = 128
  SET @ERR_LOOKUP_FAIL = 512;
  SET @FQProcedureName = QUOTENAME(DB_NAME()) + '.' + QUOTENAME(OBJECT_SCHEMA_NAME(@@PROCID)) + '.' + QUOTENAME(OBJECT_NAME(@@PROCID));
  SET @ErrMsg = N'ERROR - Stored procedure ' + @FQProcedureName + N' ';
  SET @AbsoluteMinKeyValue = -32768;
  SET @AbsoluteMaxKeyValue = ((POWER(CAST(2 AS BIGINT), 62) - 1) * 2) + 1;
  SET @MinKeyValue = @AbsoluteMinKeyValue;
  SET @RowsAffected = -1;
 
 
 
  BEGIN TRY
   /**
    * STEP 2
    * Create temporary table, else truncate it if it already exists.
    */
    SET @StepNbr = @StepNbr + 1;
    IF (OBJECT_ID('[tempdb].[dbo].[#foreign_key_tree]') IS NULL)
     BEGIN
       CREATE TABLE [#foreign_key_tree]
       (
         [olap_dimension_id]       SYSNAME COLLATE DATABASE_DEFAULT NOT NULL,
         [base_table_schema]       SYSNAME COLLATE DATABASE_DEFAULT NOT NULL,
         [base_table_name]         SYSNAME COLLATE DATABASE_DEFAULT NOT NULL,
         [referenced_table_schema] SYSNAME COLLATE DATABASE_DEFAULT NULL,
         [referenced_table_name]   SYSNAME COLLATE DATABASE_DEFAULT NULL,
         [new_min_key_value]       BIGINT  NULL,
         [min_key_value]           BIGINT  NULL,
         [max_key_value]           BIGINT  NULL,
         [process_add_flag]        BIT     NOT NULL
       )
     END
    ELSE
     BEGIN
       TRUNCATE TABLE [#foreign_key_tree];
     END;
   
   
   
   /**
    * STEP 3
    * Capture all tables (and their current identity column value) for all
    * dimensions set to be processed. The idea here is to obtain the set of
    * all tables that could be used by a dimension in Analysis Services and
    * to capture the available keys.
    */
    SET @StepNbr = @StepNbr + 1;
    INSERT INTO [#foreign_key_tree] ( [olap_dimension_id], [base_table_schema],
                                      [base_table_name], [referenced_table_schema],
                                      [referenced_table_name], [new_min_key_value],
                                      [min_key_value], [max_key_value],
                                      [process_add_flag] )
      SELECT rel.[olap_dimension_id],
             rel.[dimension_key_table_schema] AS "base_table_schema",
             rel.[dimension_key_table_name]   AS "base_table_name",
             rel.[attribute_table_schema]     AS "referenced_table_schema",
             rel.[attribute_table_name]       AS "referenced_table_name",
             atr.[max_key_value] + 1          AS "new_min_key_value",
             @AbsoluteMinKeyValue             AS "min_key_value",
             IDENT_CURRENT(QUOTENAME(rel.[attribute_table_schema]) + N'.' + QUOTENAME(rel.[attribute_table_name])) AS "max_key_value",
             1                                AS "process_add_flag"
      FROM [dbo].[dimension_table_relationship] rel
            LEFT OUTER JOIN [dbo].[dimension_process_attribute_key_range] atr
            ON rel.[olap_dimension_id] = atr.[olap_dimension_id] AND
               rel.[dimension_key_table_schema] = atr.[dimension_key_table_schema] AND
               rel.[dimension_key_table_name] = atr.[dimension_key_table_name] AND
               rel.[attribute_table_schema] = atr.[attribute_table_schema] AND
               rel.[attribute_table_name] = atr.[attribute_table_name] AND
               atr.[process_flag] = 1;
   
   
   
   /**
    * STEP 4
    * Invert current (process flag = 1) and non-current (process flag = 0) rows,
    * updating related attributes appropriately.
    */
    SET @StepNbr = @StepNbr + 1;
    UPDATE tgt
    SET [process_flag]  = CASE WHEN tgt.[process_flag] = 1 THEN
                            0
                          ELSE
                            1
                          END,
        [min_key_value] = CASE
                            -- Update soon to be non-current record for ProcessAdd.
                            WHEN (tmp.[process_add_flag] = 1) AND (tgt.[process_flag] = 1) THEN tmp.[max_key_value] + 1
                            -- Update soon to be current record for ProcessAdd when flipped from "non process add".
                            WHEN (tmp.[process_add_flag] = 1) AND (tgt.[process_flag] = 0) AND (tgt.[min_key_value] = @AbsoluteMinKeyValue) THEN non.[max_key_value] + 1
                            -- Update soon to be current record for ProcessUpdate/ProcessFull
                            WHEN (tmp.[process_add_flag] = 0) AND (tgt.[process_flag] = 0) THEN @AbsoluteMinKeyValue
                          ELSE
                            tgt.[min_key_value]
                          END,
        [max_key_value] = tmp.[max_key_value],
        [new_data_flag] = CASE
                            WHEN ( (tmp.[process_add_flag] = 1) AND (tgt.[process_flag] = 1) AND ((tmp.[max_key_value] + 1) <= ISNULL(tmp.[max_key_value], @AbsoluteMaxKeyValue)) ) THEN 1
                            WHEN ( (tmp.[process_add_flag] = 0) AND (tgt.[process_flag] = 0) AND (@AbsoluteMinKeyValue <= ISNULL(tmp.[max_key_value], @AbsoluteMaxKeyValue)) ) THEN 1
                            WHEN ( (tmp.[process_add_flag] = 1) AND (tgt.[process_flag] = 0) AND (tgt.[min_key_value] = @AbsoluteMinKeyValue) AND ((non.[max_key_value] + 1) <= ISNULL(tmp.[max_key_value], @AbsoluteMaxKeyValue)) ) THEN 1
                            WHEN ( (tmp.[process_add_flag] = 1) AND (tgt.[process_flag] = 0) AND (tgt.[min_key_value] <> @AbsoluteMinKeyValue) AND (tgt.[min_key_value] <= ISNULL(tmp.[max_key_value], @AbsoluteMaxKeyValue)) ) THEN 1
                            WHEN ( (tmp.[process_add_flag] = 0) AND (tgt.[process_flag] = 1) AND (tgt.[min_key_value] <= ISNULL(tmp.[max_key_value], @AbsoluteMaxKeyValue)) ) THEN 1
                          ELSE
                            0
                          END,
        [updated_date]  = GETDATE(),
           [updated_by]    = ORIGINAL_LOGIN()
    FROM [dbo].[dimension_process_attribute_key_range] tgt
            LEFT OUTER JOIN [dbo].[dimension_process_attribute_key_range] non
            ON tgt.[olap_dimension_id] = non.[olap_dimension_id] AND
               tgt.[dimension_key_table_schema] = non.[dimension_key_table_schema] AND
               tgt.[dimension_key_table_name] = non.[dimension_key_table_name] AND
               tgt.[attribute_table_schema] = non.[attribute_table_schema] AND
               tgt.[attribute_table_name] = non.[attribute_table_name] AND
               tgt.[process_flag] <> non.[process_flag]
            INNER JOIN [#foreign_key_tree] tmp
            ON tgt.[dimension_key_table_schema] = tmp.[base_table_schema] AND
               tgt.[dimension_key_table_name] = tmp.[base_table_name] AND
               tgt.[attribute_table_schema] = tmp.[referenced_table_schema] AND
               tgt.[attribute_table_name] = tmp.[referenced_table_name]
    WHERE tgt.[olap_dimension_id] = tmp.[olap_dimension_id] AND
          ( (non.[process_flag] IS NULL) OR ((non.[process_flag] IS NOT NULL) AND
            (tgt.[process_flag] <> non.[process_flag])) );
   
   
   
   /**
    * STEP 5
    * Capture information for use by OLAP views during dimension processing.
    */
    SET @StepNbr = @StepNbr + 1;
    INSERT INTO [dbo].[dimension_process_attribute_key_range]
                            ( [olap_dimension_id], [dimension_key_table_schema],
                              [dimension_key_table_name], [attribute_table_schema],
                              [attribute_table_name], [min_key_value],
                              [max_key_value], [new_data_flag], [process_flag],
                              [created_date], [created_by] )
      SELECT tmp.[olap_dimension_id]                     AS "olap_dimension_id",
             tmp.[base_table_schema],
             tmp.[base_table_name],
             tmp.[referenced_table_schema],
             tmp.[referenced_table_name],
             -- In the case of ProcessAdd, set the Min Key value appropriately else it must be the AbsoluteMinKeyValue.
             CASE WHEN (tmp.[process_add_flag] = 1) THEN
               -- Note that the prev min key value was bumped by the update statement in the prior step so we can just carry this forward.
               COALESCE(tmp.[new_min_key_value], @AbsoluteMinKeyValue)
             ELSE
               @AbsoluteMinKeyValue
             END                                         AS "min_key_value",
                tmp.[max_key_value],
                CASE WHEN ISNULL(tmp.[max_key_value], @AbsoluteMaxKeyValue) >= COALESCE(tmp.[new_min_key_value], @AbsoluteMinKeyValue) THEN
                  1
                ELSE
                  0
                END                                      AS "new_data_flag" ,
                1                                        AS "process_flag",
                GETDATE()                                AS "created_date",
                ORIGINAL_LOGIN()                         AS "created_by"
      FROM [#foreign_key_tree] tmp
                   LEFT OUTER JOIN [dbo].[dimension_process_attribute_key_range] tgt
                   ON tmp.[olap_dimension_id] = tgt.[olap_dimension_id] AND
                      tmp.[base_table_schema] = tgt.[dimension_key_table_schema] AND
                      tmp.[base_table_name] = tgt.[dimension_key_table_name] AND
                      tmp.[referenced_table_schema] = tgt.[attribute_table_schema] AND
                      tmp.[referenced_table_name] = tgt.[attribute_table_name] AND
                      tgt.[process_flag] = 1
      WHERE tgt.[dimension_process_attribute_key_range_key] IS NULL;
   
  END TRY
  BEGIN CATCH
    SET @ErrCode = @ERR_GENERAL;
    SET @ErrMsg = @ErrMsg + N'raised error (' + CAST(ERROR_NUMBER() AS NVARCHAR) + N') in step number (' + CAST(@StepNbr AS NVARCHAR) + N') at line number (' + CAST(ERROR_LINE() AS NVARCHAR) + N') with message: ' + ERROR_MESSAGE();
    GOTO FAIL;
  END CATCH;
 
 
  GOTO SUCCESS;
 
FAIL:
  -- Cater for max exception string length.
  SET @ErrMsg = CASE WHEN (LEN(@ErrMsg) > 2044) THEN N'...' + RIGHT(@ErrMsg, 2041) ELSE RIGHT(@ErrMsg, 2044) END;
  RAISERROR(@ErrMsg, 16, 1);
  RETURN @ErrCode;

SUCCESS: 
  RETURN 0;
END;
GO



Step 6
Create a user defined table function that accepts a dimension name, schema name and table name and returns the minimum and maximum values from the table created in point 2 above for a given dimension, schema and table.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

IF  EXISTS ( SELECT *
             FROM sys.objects
             WHERE [object_id] = OBJECT_ID(N'[dbo].[udf_attribute_keys_for_process_add]') AND
                   [type] IN (N'FN', N'IF', N'TF', N'FS', N'FT') )
BEGIN
  DROP FUNCTION [dbo].[udf_attribute_keys_for_process_add];
END;
GO

CREATE FUNCTION [dbo].[udf_attribute_keys_for_process_add] (
                                                 @i_OlapDimensionID       SYSNAME,
                                                 @i_DimensionTableSchema  SYSNAME,
                                                 @i_DimensionTableName    SYSNAME,
                                                 @i_AttributeTableSchema  SYSNAME,
                                                 @i_AttributeTableName    SYSNAME )
RETURNS TABLE
AS

/*
 * Returns the minimum and maximum keys for a given dimension's attribute
 * table that need to be processed by Analysis Services.
 *
 *
 * Input Parameters:
 * ----------------
 * @i_OlapDimensionID       The Analysis Services identifier for the dimension
 *                          for which attribute key ranges are required.
 * @i_DimensionTableSchema  The schema for the table that contains the key
 *                          attribute for the dimension.
 * @i_DimensionTableName    The name of the table that contains the key
 *                          attribute for the dimension.
 * @i_AttributeTableSchema  The schema for the table for the attribute to be
 *                          processed.
 * @i_AttributeTableName    The name of the table for the attribute to be
 *                          processed.
 *
 * Ver       Author              Date         Comments
 * ---       -------------------       -----------  ---------------------------------
 *   1       Philip Stephenson   01-Jul-2011  Original version.
 */

  -- Return results.
  RETURN SELECT [min_key_value], [max_key_value]
         FROM [dbo].[dimension_process_attribute_key_range]
         WHERE [olap_dimension_id] = @i_OlapDimensionID AND
               [dimension_key_table_schema] = @i_DimensionTableSchema AND
               [dimension_key_table_name] = @i_DimensionTableName AND
               [attribute_table_schema] = @i_AttributeTableSchema AND
               [attribute_table_name] = @i_AttributeTableName AND
               [process_flag] = 1 AND
               [new_data_flag] = 1;

GO



Step 7
Update the current views used to present data to Analysis Services to join to the function created in point 6 above, limiting the output to be where the surrogate key value is between the minimum and maximum values.

SET ANSI_NULLS ON
GO
SET QUOTED_IDENTIFIER ON
GO

IF EXISTS (SELECT * FROM sys.views WHERE [object_id] = OBJECT_ID(N'[dbo].[v_process_add_customer]'))
BEGIN
  DROP VIEW [dbo].[v_process_add_customer];
END;
GO

CREATE VIEW [dbo].[v_process_add_customer]
AS
  SELECT [customer_id],
         [customer_name],
         [address_id],
         [customer_group_id]
  FROM [dbo].[customer] att
             INNER JOIN [dbo].[udf_attribute_keys_for_process_add]('Customer',
                                                                   'dbo',
                                                                   'customer',
                                                                   'dbo',
                                                                   'customer') prc
             ON att.[customer_id] BETWEEN prc.[min_key_value] AND prc.[max_key_value];
GO



Test Drive
Now that we have everything setup, let’s give it a whirl. First of all we need to identify all of the relationships for a given dimension, customer in this case, starting from the key table.

EXECUTE [dbo].[usp_reseed_dimension_table_relationship_by_dimension]
  @i_DimensionID             = 1,
  @i_DimensionKeyTableSchema = 'dbo',
  @i_DimensionKeyTableName   = 'customer';

We can see what this did be executing the following:

SELECT * FROM [dbo].[dimension_table_relationship];

Next we need to identify the minimum and maximum surrogate key values in each of the tables, which we do as follows:

EXECUTE [dbo].[usp_derive_new_attribute_keys_for_processing];

We can see what this did by executing the following:

SELECT * FROM [dbo].[dimension_process_attribute_key_range];

All we need to do now is select from the view to see all new entries:

SELECT *
FROM [dbo].[v_process_add_customer];

To see how this works, we can select from the function we created earlier as follows:

SELECT *
FROM [dbo].[udf_attribute_keys_for_process_add]('Customer',
                                                'dbo',
                                                'customer',
                                                'dbo',
                                                'customer');

To actually test the concept of “process add”, let’s now add a new member:

INSERT INTO [dbo].[customer] ([customer_name], [address_id], [customer_group_id])
VALUES('Customer 4', 2, 2);

In order to capture the metadata around new key values etc. we need to execute:

EXECUTE [dbo].[usp_derive_new_attribute_keys_for_processing];

Which, once again, has captured the metadata as follows:

SELECT * FROM [dbo].[dimension_process_attribute_key_range];

To see what Analysis Services would see, run the following:

SELECT *
FROM [dbo].[v_process_add_customer];



Conclusion
Now I will admit that I haven’t fully test driven all of this myself to see if it all works in all situations etc. so please let me know if anything is broken and I’ll aim to fix it. Either way I hope the concept is clear, such that you can make any modifications as required.

Hopefully you will have noted that in the stored procedure that determines the min and max key values, that it can be easily updated to accommodate ProcessUpdate and ProcessAdd situations by not hard-coding the value for the “process_add_flag” but I’ll leave that as an exercise for the reader.

From the above code and sample, the thing to note is that immediately before processing dimensions the [dbo].[dimension_process_attribute_key_range] stored procedure must be called in order to “snapshot” all surrogate key values. Clearly you need to be careful not to get out of sync else things will not work. If that happens you will need to revert back to a “ProcessUpdate” and then continue as per normal.