Implementing IDataReader

Last post, I talked about Customized Event and setting up trace for web applications which usually use pooled connection. Assuming you already have everything setup, SqlCommand component has been wrapped up to send an customized event with user or session information before an actually procedure call taking place. The trace is also setup to receive both procedure calls and customized events. To simplify the scenario, trace only capture the customized event and RPC Complete with column SPID, TextData, StartTime. To receive the trace data, you can either save them to the disk then read the trace file from the disk or directly stream the trace data.( see my post here for detail). After trace data being read, there will be always a customized event prior to an actual procedure call. When you are asked to get SQL Server response time for an particular application, for instance, give me average respose time of procedure ABC every 30 minutes during the day for that user, you can query the table includes the trace rows to locate all procedure ABC, then based on SPID and identity values to find previous record to get the user information of the procedure call, perform filter, aggregates…It will be fine if your table is small. The trace system generate more than 500MB data every hour, performing such query will be very challenging irrespective of how you design the indexes on the trace table. Now you may think to re-format the trace data to table with columns, EventID, SPID, TextData, StartTime, and UserInfo to simplify the query. But SQL Profiler cannot do it in that way, you will have to write an ETL for that.

Almost any programming tools can be used to implement the ETL, such as SSIS, C#. No matter which way you go, you use bulkcopy to insert the converted trace data to your trace table. The first solution will be, the trace data is loaded to a DataTable (in .Net or DataSet in SSIS), you loop all record over the table, collapse customized event entries and RPC Complete entry, output the result to another DataTable, then send the DataTable to SqlBulkCopy component. (Assuming you already have the code to deal with the scenario where the customzed event for a session is in previously read DataTable and the RPC entry is in the second read DataTable—very daunting logic.). This will work. But it will be memory consuming and less efficient.

After checking with MSDN, I found you can pass a IDataReader to SqlBulkCopy component. Why not write a class which implements IDataReader interface to process the trace data in the stream and then pass the IDataReader to SqlBulkCopy. The first, I use SqlDataReader to read the trace file, then pass the reader to the class which implements IDataReader, finally send the instance of the class to SqlBulkCopy. SqlBulkCopy will call Read() method from IDataReader interface in the class, the class will call Read() method of SqlDataReader. Within the class, when an record read from SqlDataReader is an Customized Event entry, then save this entry to an IDictionary object using SPID as the Key. When a record read from SqlDataReader is not a customized event, then the class lookup the IDictionary object with SPID to get the user information then output the user information with the RPC Complete entry.

Now, Let’s implement the IDataReader. As describe above, Read() method of SqlDataReader is called by newly implemented IDataReader that will be invoked from SqlBulkCopy. The implementation of the IDataReader is below

public class MyDataReader : IDataReader
        {
            SqlDataReader sourceReader = null;
            DataTable schemaTable = null;
            Dictionary<int, string> userInfo = new Dictionary<int, string>(); //save session and userinfos.
            int currentSPID = -1, currentSPIDOrdinal, userInfoOrdinal, eventClassOrdinal, textDataOrdinal;
            const string UserInfoColumnName = "UserInformation";
            public MyDataReader(SqlDataReader r)
            {
                sourceReader = r;
                schemaTable = r.GetSchemaTable(); //this gets the structure of the source schema information
                userInfoOrdinal = schemaTable.Rows.Count;
                DataRow row = schemaTable.NewRow();
                row.BeginEdit();
                row["ColumnName"] = UserInfoColumnName;
                row["ColumnOrdinal"] = userInfoOrdinal;
                row["ColumnSize"] = 128;
                row["NumericPrecision"] = 128; //we don't care
                row["NumericScale"] = 128;//we don't care
                row["IsUnique"] = false;
                row["IsKey"] = false;
                row["DataType"] = typeof(string);
                row["AllowDBNull"] = true; //in case there is procedure calls can't be linked to user information
                row["ProviderType"] = SqlDbType.NVarChar;
                row["IsIdentity"] = false;
                row["IsAutoIncrement"] = false;
                row["IsRowVersion"] = false;
                row["IsLong"] = false;
                row["IsReadOnly"] = false;
                row["ProviderSpecificDataType"] = typeof(SqlString);
                row["DataTypeName"] = "nvarchar";
                row["IsColumnSet"] = false;
                row.EndEdit();
                schemaTable.Rows.Add(row);

                //following code is for performance concern
                currentSPIDOrdinal = sourceReader.GetOrdinal("SPID"); 
                eventClassOrdinal = sourceReader.GetOrdinal("EventClass");
                textDataOrdinal = sourceReader.GetOrdinal("TextData");
            }
            object GetUserInfo()
            {
                if (currentSPID == -1) return DBNull.Value;
                try
                {
                    return userInfo[currentSPID];
                }
                catch
                {
                    return DBNull.Value;
                }
            }
            //since we have SqlDataReader passed in, things become simple.
            #region implementation of IDataReader
            public void Close()
            {
                //We don't care since there is nothing opened other than passed in SqlDataReader
                return; 
            }
            public DataTable GetSchemaTable()
            {
                return schemaTable; //this has been generated in the Constructor
            }
            //This is called for next result set implementation this is important
            //Let's say your SQL code has returned a set of record then does some 
            //more operations after. If method NextResult is NOT called before
            //the reader getting closed, the T-SQL code after the T-SQL returning
            //set of records will NOT be executed.
            public bool NextResult()
            {
                return sourceReader.NextResult();
            }
            public bool Read()
            {
                //To-Do
                while (sourceReader.Read())
                {
                    if (Convert.ToInt16(sourceReader[eventClassOrdinal]) == 82) // this is user defined event
                    {
                        userInfo[Convert.ToInt32(sourceReader[currentSPIDOrdinal])] = sourceReader.GetString(textDataOrdinal);
                        continue; // If the record is customized event, the check next record
                    }
                    else if (sourceReader[textDataOrdinal].ToString().IndexOf("sp_trace_generateevent") >= 0)
                        continue; //skip sp_trace_generateevent calls
                    //next record is a procedure call; 
                    //we don't need to do anything here since the implementation of IDataRecord interface has handled this.
                    currentSPID = Convert.ToInt32(sourceReader[currentSPIDOrdinal]);
                    return true;
                }
                return false;
            }
            // This one is not supported and it will not be called by SqlBulkCopy
            public int Depth { get { return sourceReader.Depth; } }
            //Check whether the reader is closed
            public bool IsClosed { get { return sourceReader.IsClosed; } }
            //check how many records affected. Useless, either return zero or the same value in the source reader
            public int RecordsAffected { get { return sourceReader.RecordsAffected; } }
            #endregion

            #region implementation of IDatarecords
            public bool GetBoolean(int i)
            {
                return (bool)this[i];
            }
            public object this[string name]
            {
                get { return name == UserInfoColumnName ? GetUserInfo() : sourceReader[name]; }
            }

            public object this[int i]
            {
                get { return i == userInfoOrdinal ? GetUserInfo() : sourceReader[i]; }
            }

            public byte GetByte(int i){return (byte) this[i]; }

            //this method copies bytes in the column to a buffer
            public long GetBytes(int i, long fieldOffset, byte[] buffer, int bufferoffset, int length)
            {
                if (i != currentSPIDOrdinal)
                    return sourceReader.GetBytes (i, fieldOffset, buffer, bufferoffset, length); //we still borrow the method in the sourceReader
                // but here I give you the example code, although this will not be called.
                byte[] b = System.Text.ASCIIEncoding.UTF8.GetBytes((string)GetUserInfo());
                if (bufferoffset >= b.Length)
                    return 0;
                length = bufferoffset + length <= b.Length ? length : b.Length - bufferoffset;
                Array.Copy(b, bufferoffset, buffer, 0, length);
                return length;
            }
            public char GetChar(int i)
            {
                return (char) this[i];
            }
            public long GetChars(int i, long fieldoffset, char[] buffer, int bufferoffset, int length)
            {
                if (i != currentSPIDOrdinal)
                    return sourceReader.GetChars(i, fieldoffset, buffer, bufferoffset, length); //we still borrow the method in the sourceReader
                // but here I give you the example code, although this will not be called.
                char[] b = ((string)GetUserInfo()).ToCharArray();
                if (bufferoffset >= b.Length)
                    return 0;
                length = bufferoffset + length <= b.Length ? length : b.Length - bufferoffset;
                Array.Copy(b, bufferoffset, buffer, 0, length);
                return length;
            }
            //this is not supported in SqlDatareader anyways
            public IDataReader GetData(int i)
            {
                return sourceReader.GetData(i);
            }
            public string GetDataTypeName(int i)
            {
                return (string)schemaTable.Rows[i]["DataTypeName"];
            }
            public DateTime GetDateTime(int i)
            {
                return (DateTime)this[i];
            }
            public decimal GetDecimal(int i)
            {
                return (decimal)this[i];
            }
            public double GetDouble(int i)
            {
                return (double)this[i];
            }
            public Type GetFieldType(int i)
            {
                return (Type)this[i];
            }
            public float GetFloat(int i)
            {
                return (float)this[i];
            }
            public Guid GetGuid(int i)
            {
                return (Guid)this[i];
            }
            public short GetInt16(int i)
            {
                return (short)this[i];
            }
            public int GetInt32(int i)
            {
                return (int)this[i];
            }
            public long GetInt64(int i)
            {
                return (long)this[i];
            }
            public string GetName(int i)
            {
                if (userInfoOrdinal == i)
                    return UserInfoColumnName;
                return sourceReader.GetName(i);
            }
            public int GetOrdinal(string name)
            {
                if (name == UserInfoColumnName)
                    return userInfoOrdinal;
                return sourceReader.GetOrdinal(name);
            }
            public string GetString(int i)
            {
                return (string)this[i];
            }
            public object GetValue(int i)
            {
                return this[i];
            }
            public int GetValues(object[] values)
            {
                int maxAllowableLength = Math.Min(values.Length, schemaTable.Rows.Count);
                for (int i = 0; i < maxAllowableLength; i++)
                    values[i] = this[i];
                return maxAllowableLength;
            }
            public bool IsDBNull(int i)
            {
                return this[i] is DBNull;
            }

            //number of field
            public int FieldCount { get { return schemaTable.Rows.Count; } }

            
            #endregion
            //implementation of IDisposable
            public void Dispose()
            {
                //nothing to dispose actually.
                //let's remove the reference
                sourceReader = null;
                userInfo = null;
            }
        }
    }

Now let’s have some fake trace data for testing.

We are expecting RowNumber 5,6, 8, 9, 12, 13, 15, and 16 are removed after import, instead, we have UserInformation Column populated with “User Session1” or “User Session2”. Now let’s create a new table with same definition of source but have UserInformation Column added, then do a test

public class Test
{
    public static void Main()
    {
        SqlConnectionStringBuilder sb = new SqlConnectionStringBuilder();
        sb.DataSource = "."; //local
        sb.InitialCatalog = "mtest";
        sb.IntegratedSecurity = true;
        SqlConnection connection = new SqlConnection(sb.ToString());
        connection.Open();
        SqlCommand cmd = connection.CreateCommand();
        cmd.CommandText = @"select RowNumber, EventClass, TextData, CPU, Reads, Writes, Duration, SPID, StartTime, EndTime 
                            from TraceTable
                            where SPID is not null
                            order by RowNumber";
        SqlDataReader reader = cmd.ExecuteReader();
        MyDataReader mreader = new MyDataReader(reader);

        SqlBulkCopy bulkCopy = new SqlBulkCopy(sb.ToString());
        bulkCopy.DestinationTableName = "NewTraceTable";
        //bulkCopy.ColumnMappings.Add
        //ignore the mapping
        bulkCopy.WriteToServer(mreader);
        bulkCopy.Close();
        reader.Close();
        connection.Close();
        Console.WriteLine("done");
        Console.ReadKey();
    }

Run the code and check NewTraceTable

In this way, you can process trace record in a stream. It requires minimum memory usage in which you can put it in a CLR procedure. For production deployment, you will have to add some code to make it more robust. For instance, you may want to save the Dictionary to a the table when application exit and when application is launched again, previously saved dictionary can be loaded.

1 thought on “Implementing IDataReader”

Leave a Comment

C# | HTML | Plain Text | SQL | XHTML | XML | XSLT |

This site uses Akismet to reduce spam. Learn how your comment data is processed.