baoshengfei
作者baoshengfei·2016-06-19 18:56
软件架构设计师·EM

DB2数据库复制和读写分离

字数 42343阅读 5621评论 0赞 4

这段时间花了很多时间在DB2数据库的复制上,写出来总结一下,也给没有经验的同学们分享一下。

其实DB2的复制没什么好说的,多数都是common sense,而且选择也不多,只有HADR和CDC(Oracle GoldenGate)。很多人也会选择QRep和SQL Rep,对这两样复制,我还没有想到选择它们的理由。

HADR是DB2数据库全库的物理复制方案,全库复制很容易理解,就是数据库中的每一个对象、每一个字节都会被复制,数据、索引、存储过程、序列.....everything in DB。这种复制是通过将日志中所有的内容写到本地的同时,也写到备机上,备机然后redo日志。这里的redo和mysql的replay不一样。mysql没有物理复制,所有的复制动作都是在备机上重新翻译成SQL语句去执行,这点和CDC一样。DB2 HADR的redo就是数据库做crash recovery时的redo动作,就是将记在日志中的数据重新修改在相应表空间的页面的槽位的过程。

HADR的复制的有这些好处:

1. 全库复制,并且配置简单。

2. 物理复制,同步效率高。

3. 支持同步、近同步、异步和超级异步四种同步模式,可靠性更高。

4. 最多支持三个备机(这真是为了响应我国某些行业两地三中心的要求开发的)。

5. 支持delayed replay,备机落后的时间可以简单的配置。

CDC(change data capture)是IBM IIDR产品里面的一个组件。和其它复制软件(HADR除外)一样,它是通过db2readlog API在数据库在线的情况下,把DB2数据库的事务日志读出来、从日志中解析出数据、发送到目标;目标获取到数据和操作,然后翻译成SQL语句,插入到目标数据库里面。

CDC的有点在于:

1.    支持跨平台的复制。也就是说,Z系统和I系统的数据可以复制到开放平台上。

2.    复制灵活:支持表级的复制。你可以选择你需要的数据复制到目标。

很容易看出来,CDC应该是纯异步的复制。

在我的测试里面,CDC复制的效率比我预期的要低很多。但是对于多数交易系统,应该是够用了。

CDC的测试过程中,有一个功能也超出了我的预期,这个功能叫做userexit。你可以通过自己编程,使用这个功能获取到源系统传过来的数据,然后决定如何处理这些数据,例如你可以把这些数据放入到kafaka、hbase等。

 

聊读写分离之前,我想先聊一下db2readlog API。这是一个非常好玩的接口,如果有时间,你也可以用这个接口开发一个CDC的功能。即使没有时间,也可以用这个接口了解一下DB2数据库怎样存数据的。

在~sqllib/samples/c下面,有一个使用这个api的例子。我自己花了一些时间写了一个程序解析日志,谈一下我的收获:

1.    我终于搞明白了alter table …… capture data changes这个功能。数据库的事务日志记录的是变化数据,可以包括前镜像和后镜像。例如:update这条语句,前镜像指的就是update前这行里面的数据,后镜像是update后这行里面的数据,不光是变化的列,还有其它没有数据变化的列。AS400有很明确的界面选择是否记录某张表的前镜像或者后镜像(TD好像也是)。但是开放平台的DB2不行,默认只记录了后镜像(开放平台DB2是必须至少记录后镜像)。这种日志记录,叫做unprogatable日志记录。这种日志记录,readlog api是不会获取的。如果要获取一张表的变化数据,这张表所记录的日志必须是propagatable的,所以,必须enable data capture,以记录前镜像和后镜像。

2.    看出来很多DB2数据的存储格式。例如,一张表t(id int, col1 varchar(20), col2 decimal)。对于(1, ‘a’, ‘4.5)这条记录,是这样存的:第一个字段定长,存的就是数字1;第二个字段存的不是数据,而是数据的offset和长度;第三个字段存的也不是浮点型数据,而是一个字符串。也就是说,DB2的变长字段都存在最后,字段内容只有offset和长度,所以,查找的效率肯定不高;DB2的decimal数据是以字符串形式存储的。

下面这段程序是我从sample程序拼凑出来的,可以用来结合信息中心中日志的数据结构来学习db2readlog api

#include <stdio.h>

#include <stdlib.h>

#include <sqlenv.h>

#include <sqlda.h>

#include <sqlca.h>

#include <string.h>

#include <ctype.h>

#include <sql.h>

#include <sqlenv.h>

#include <db2ApiDf.h>

#include <sqludf.h>

#define CHECKRC(x,y)                                   

    if ((x) != 0)                                      

    {                                                  

       printf("Non-zero rc from function %s.n", (y)); 

       return (x);                                     

    }

/* support function called by DbLogRecordsForCurrentConnectionRead() */

int LogBufferDisplay( char *, sqluint32, int );

int LogRecordDisplay( char *, sqluint32, sqluint16, sqluint16 );

int SimpleLogRecordDisplay( sqluint16, sqluint16, char *, sqluint32 );

int ComplexLogRecordDisplay( sqluint16, sqluint16, char *, sqluint32,

                            sqluint8, char *, sqluint32 );

int LogSubRecordDisplay( char *, sqluint16 );

int UserDataDisplay( char *, sqluint16 );

/* The Record ID is 6 bytes in size. */

typedef struct RID

{

    char ridParts[6];

} RID;

int RidToString( RID* rid, char* buf )

{

    char *ptrBuf = rid->ridParts;

    int size = sprintf( buf, "x%2.2X%2.2X%2.2X%2.2X%2.2X%2.2X",

                        *ptrBuf, *(ptrBuf+1), *(ptrBuf+2),

                        *(ptrBuf+3), *(ptrBuf+4), *(ptrBuf+5) );

    return size;

}

EXEC SQL BEGIN DECLARE SECTION;

  char dbAlias[15];

  char user[129];

  char pswd[15];

EXEC SQL END DECLARE SECTION;

#define MAX_UID_LENGTH 18

#define MAX_PWD_LENGTH 30

/* macro for embedded SQL checking */

#define EMB_SQL_CHECK(MSG_STR)                     

SqlInfoPrint(MSG_STR, &sqlca, __LINE__, __FILE__); 

if (sqlca.sqlcode < 0)                             

{                                                  

  return 1;                                        

}

#define DB2_API_CHECK(MSG_STR)                     

SqlInfoPrint(MSG_STR, &sqlca, __LINE__, __FILE__); 

if (sqlca.sqlcode < 0)                             

{                                                  

  return 1;                                        

}

/* macro for expected error checking and message */

#define EXPECTED_ERR_CHECK(MSG_STR)                         

printf("n-- The following error report is expected! --"); 

SqlInfoPrint(MSG_STR, &sqlca, __LINE__, __FILE__);          

/* macro for expected warning */

#define EXPECTED_WARN_CHECK(MSG_STR)                         

printf("n-- The following warning report is expected! --"); 

SqlInfoPrint(MSG_STR, &sqlca, __LINE__, __FILE__);          

int db2ReadLogAPICall(char *, char *, char *, char *);

int main(int argc, char* argv[])

{

   int rc = 0;

   char *dbAlias = argv[1];

   char *user = argv[2];

   char *pswd = argv[3];

   char *serverWorkingPath = argv[4];;

   /* call the function to do asynchronous log read */

   

   rc = DbConn(dbAlias, user, pswd);

   CHECKRC(rc, "DbConn");

   rc = db2ReadLogAPICall(dbAlias, user, pswd, serverWorkingPath);

   CHECKRC(rc, "dbReadLogAPICall");

   rc = DbDisconn(dbAlias);

   CHECKRC(rc, "DbDisconn");

   return rc;

}

/* function that makes a call to the db2ReadLog API

   to read the asynchronous log records */

int db2ReadLogAPICall(char dbAlias[],

                      char user[],

                      char pswd[],

                      char serverWorkingPath[])

{

   int rc = 0;

   struct sqlca sqlca = { 0 };

   db2LSN startLSN;

   db2LSN endLSN;

   char *logBuffer = NULL;

   sqluint32 logBufferSize = 0;

   db2ReadLogInfoStruct readLogInfo = { 0 };

   db2ReadLogStruct readLogInput = { 0 };

  

   /*

   * The API db2ReadLog (Asynchronous Read Log) is used to extract

   * records from the database logs, and to query the log manager for

   * current log state information. This API can only be used on

   * recoverable databases.

   */

   /* Query the log manager for current log state information. */

   readLogInput.iCallerAction = DB2READLOG_QUERY;

   readLogInput.piStartLSN = NULL;

   readLogInput.piEndLSN = NULL;

   readLogInput.poLogBuffer = NULL;

   readLogInput.iLogBufferSize = 0;

   /* The 'iFilterOption' specifies the level of log record filtering

     to be used when reading the log records. With the iFilterOption ON,

     only log records in the given LSN range marked as propagatable

     are read */

   /* Log record contents will only be decompressed when reading logs

     through the db2ReadLog API with the iFilterOption ON.

     If the iFilterOption is OFF the log records queried may contain

     mixed compressed and uncompressed user data */

    readLogInput.iFilterOption = DB2READLOG_FILTER_ON;

    readLogInput.poReadLogInfo = &readLogInfo;

   db2ReadLog(db2Version970, &readLogInput, &sqlca);

   DB2_API_CHECK("database log info -- get");

   logBufferSize = 64 * 1024;    /* Maximum size of a log buffer */

   logBuffer = (char *)malloc(logBufferSize);

   memcpy(&startLSN, &(readLogInfo.initialLSN), sizeof(startLSN));

   memcpy(&endLSN, &(readLogInfo.nextStartLSN), sizeof(endLSN));

   /*

   * Extract a log record from the database logs, and read the first

   * log sequence asynchronously.

   */

   readLogInput.iCallerAction = DB2READLOG_READ;

   readLogInput.piStartLSN = &startLSN ;

   readLogInput.piEndLSN = &endLSN;

   readLogInput.poLogBuffer = logBuffer;

   readLogInput.iLogBufferSize = logBufferSize;

   readLogInput.iFilterOption = DB2READLOG_FILTER_ON;

   readLogInput.poReadLogInfo = &readLogInfo;

   db2ReadLog(db2Version970, &readLogInput, &sqlca);

   printf("start lsn is %xn", startLSN);

   printf("%xn", endLSN);

   printf("%dn", readLogInfo.logRecsWritten);

   if (sqlca.sqlcode != SQLU_RLOG_READ_TO_CURRENT)

   {

      DB2_API_CHECK("database logs -- read");

   }

   else

   {

      if (readLogInfo.logRecsWritten == 0)

      {

         printf("n  Database log empty.n");

      }

   }

   /* display log buffer */

   rc = LogBufferDisplay(logBuffer, readLogInfo.logRecsWritten, 1);

   CHECKRC(rc, "LogBufferDisplay");

   while (sqlca.sqlcode != SQLU_RLOG_READ_TO_CURRENT)

   {

      /* read the next log sequence */

      memcpy(&startLSN, &(readLogInfo.nextStartLSN), sizeof(startLSN));

      /*

      * Extract a log record from the database logs, and read the

      * next log sequence asynchronously.

      */

      db2ReadLog(db2Version970, &readLogInput, &sqlca);

      if (sqlca.sqlcode != SQLU_RLOG_READ_TO_CURRENT)

      {

         DB2_API_CHECK("database logs -- read");

      }

      /* display log buffer */

      rc = LogBufferDisplay(logBuffer, readLogInfo.logRecsWritten, 1);

      CHECKRC(rc, "LogBufferDisplay");

   }

   /* free the log buffer */

   free(logBuffer);

   logBuffer = NULL;

   logBufferSize = 0;

   return 0;

} /* db2ReadLogAPICall */

void SqlInfoPrint(char *appMsg, struct sqlca *pSqlca, int line, char *file)

{

  int rc = 0;

  char sqlInfo[1024];

  char sqlInfoToken[1024];

  char sqlstateMsg[1024];

  char errorMsg[1024];

  if (pSqlca->sqlcode != 0 && pSqlca->sqlcode != 100)

  {

    strcpy(sqlInfo, "");

    if (pSqlca->sqlcode < 0)

    {

      sprintf(sqlInfoToken,

              "n---- error report -----------------------------n");

      strcat(sqlInfo, sqlInfoToken);

    }

    else

    {

      sprintf(sqlInfoToken,

              "n---- warning report ---------------------------n");

      strcat(sqlInfo, sqlInfoToken);

    } /* endif */

    sprintf(sqlInfoToken, "napplication message = %sn", appMsg);

    strcat(sqlInfo, sqlInfoToken);

    sprintf(sqlInfoToken, "line                = %dn", line);

    strcat(sqlInfo, sqlInfoToken);

    sprintf(sqlInfoToken, "file                = %sn", file);

    strcat(sqlInfo, sqlInfoToken);

    sprintf(sqlInfoToken, "SQLCODE             = %dnn", pSqlca->sqlcode);

    strcat(sqlInfo, sqlInfoToken);

    /* get error message */

    rc = sqlaintp(errorMsg, 1024, 80, pSqlca);

    if (rc > 0) /* return code is the length of the errorMsg string */

    {

      sprintf(sqlInfoToken, "%sn", errorMsg);

      strcat(sqlInfo, sqlInfoToken);

    }

    /* get SQLSTATE message */

    rc = sqlogstt(sqlstateMsg, 1024, 80, pSqlca->sqlstate);

    if (rc > 0)

    {

      sprintf(sqlInfoToken, "%sn", sqlstateMsg);

      strcat(sqlInfo, sqlInfoToken);

    }

    if (pSqlca->sqlcode < 0)

    {

      sprintf(sqlInfoToken,

              "---- end error report ------------------------n");

      strcat(sqlInfo, sqlInfoToken);

      printf("%s", sqlInfo);

    }

    else

    {

      sprintf(sqlInfoToken,

              "---- end warning report ----------------------n");

      strcat(sqlInfo, sqlInfoToken);

      printf("%s", sqlInfo);

    } /* endif */

  } /* endif */

} /* SqlInfoPrint */

int DbConn(char paramDbAlias[], char paramUser[], char paramPswd[])

{

  struct sqlca sqlca;

  int rc = 0;

  strcpy(dbAlias, paramDbAlias);

  strcpy(user, paramUser);

  strcpy(pswd, paramPswd);

  printf("n  Connecting to '%s' database...n", dbAlias);

  if (strlen(user) == 0)

  {

    EXEC SQL CONNECT TO :dbAlias;

    EMB_SQL_CHECK("CONNECT");

  }

  else

  {

    EXEC SQL CONNECT TO :dbAlias USER :user USING :pswd;

    EMB_SQL_CHECK("CONNECT");

  }

  printf("  Connected to '%s' database.n", dbAlias);

  return 0;

} /* DbConn */

int DbDisconn(char *dbAlias)

{

  struct sqlca sqlca;

  int rc = 0;

  printf("n  Disconnecting from '%s' database...n", dbAlias);

  /* Commit all non-committed transactions to release database locks */

  EXEC SQL COMMIT;

  EMB_SQL_CHECK("COMMIT");

  EXEC SQL CONNECT RESET;

  EMB_SQL_CHECK("CONNECT RESET");

  printf("  Disconnected from '%s' database.n", dbAlias);

  return 0;

} /* DbDisconn */

/***************************************************************************/

/* LogBufferDisplay                                                        */

/* Displays the log buffer                                                 */

/***************************************************************************/

int LogBufferDisplay( char *logBuffer,

                      sqluint32 numLogRecords, int conn )  

{

  int       rc = 0;

  sqluint32 logRecordNb = 0;

  sqluint32 recordSize = 0;

  sqluint16 recordType = 0;

  sqluint16 recordFlag = 0;

  char *recordBuffer = NULL;

  int headerSize = 0;                                              

  printf("1n");

  if (logBuffer == NULL)

  {

    if (numLogRecords == 0)

    {

      /* there's nothing to do */

      return 0;

    }

    else

    {

      /* we can't display NULL log records */

      return 1;

    }

  }

  printf("2n");

  /* If there is no connection to the database or if the iFilterOption

     is OFF, the 8-byte LSN 'db2LSN' is prefixed to the log records.

     If there is a connection to the database and the iFilterOption is

     ON, the db2ReadLogFilterData structure will be prefixed to all 

     log records returned by the db2ReadLog API ( for compressed and 

     uncompressed data ) */

  if (conn == 0)                                                  

  {

    headerSize = sizeof(db2LSN);

  }

  else

  {

    headerSize = sizeof(db2ReadLogFilterData);

  }                                                               

  recordBuffer = logBuffer;

   printf("numLogRecords: %dn", numLogRecords);

  for (logRecordNb = 0; logRecordNb < numLogRecords; logRecordNb++)

  {

    if (conn == 1)                                             

    {

      db2ReadLogFilterData  *filterData = (db2ReadLogFilterData *)recordBuffer;

      printf("nRLOG_FILTERDATA:n");

      printf("    recordLSN: %lun", filterData->recordLSN);

      printf("    realLogRecLen: %lun", filterData->realLogRecLen );

      printf("    sqlcode: %dn", filterData->sqlcode ); 

      

    }

    

    recordBuffer += headerSize;                                    

    recordSize = *(sqluint32 *) (recordBuffer);

    recordType = *(sqluint16 *) (recordBuffer + sizeof(sqluint32));

    recordFlag = *(sqluint16 *) (recordBuffer + sizeof(sqluint32) +

                 sizeof(sqluint16));

    printf("    recordSize: %lun", recordSize );

    rc = LogRecordDisplay(recordBuffer, recordSize, recordType, recordFlag);

    CHECKRC(rc, "LogRecordDisplay");

    recordBuffer += recordSize;                                  

  }

  return 0;

} /* LogBufferDisplay */

/***************************************************************************/

/* LogRecordDisplay                                                        */

/* Displays the log records                                                */

/***************************************************************************/

int LogRecordDisplay( char      *recordBuffer,

                      sqluint32 recordSize,

                      sqluint16 recordType,

                      sqluint16 recordFlag )

{

  int       rc = 0;

  sqluint32 logManagerLogRecordHeaderSize = 0;

  char      *recordDataBuffer = NULL;

  sqluint32 recordDataSize = 0;

  char      *recordHeaderBuffer = NULL;

  sqluint8  componentIdentifier = 0;

  sqluint32 recordHeaderSize = 0;

  /* Determine the log manager log record header size. */

  logManagerLogRecordHeaderSize = 24; 

  if (recordType == 0x0043)

  {            /* compensation */

    logManagerLogRecordHeaderSize += sizeof(db2LSN);              

    if (recordFlag & 0x0002)

    {          /* propagatable */

      logManagerLogRecordHeaderSize += sizeof(db2LSN);           

    }

  }

   printf("recordType: %xn", recordType);

  switch (recordType)

  {

    case 0x008A: /* Local Pending List */

    case 0x0084: /* Normal Commit */

    case 0x0041: /* Normal Abort */

      recordDataBuffer = recordBuffer + logManagerLogRecordHeaderSize;

      recordDataSize = recordSize - logManagerLogRecordHeaderSize;

      rc = SimpleLogRecordDisplay( recordType,

                                   recordFlag,

                                   recordDataBuffer,

                                   recordDataSize );

      CHECKRC(rc, "SimpleLogRecordDisplay");

      break;

    case 0x004E: /* Normal */

    case 0x0043: /* Compensation */

      recordHeaderBuffer = recordBuffer + logManagerLogRecordHeaderSize;

      componentIdentifier = *(sqluint8 *) recordHeaderBuffer;

      printf("tablespace id is %u and table id is %un", *(sqluint16 *) (recordHeaderBuffer+2), *(sqluint16 *) (recordHeaderBuffer+4));

      switch (componentIdentifier)

      {

          case 1: /* Data Manager Log Record */

              recordHeaderSize = 6;

              break;

         default:

printf("waht? %dn", componentIdentifier);

             printf( "    Unknown complex log record: %lu %c %un",

                     recordSize, recordType, componentIdentifier );

             return 1;

      }

      recordDataBuffer = recordBuffer +

                         logManagerLogRecordHeaderSize +

                         recordHeaderSize;

      recordDataSize = recordSize -

                       logManagerLogRecordHeaderSize -

                       recordHeaderSize;

      rc = ComplexLogRecordDisplay( recordType,

                                    recordFlag,

                                    recordHeaderBuffer,

                                    recordHeaderSize,

                                    componentIdentifier,

                                    recordDataBuffer,

                                    recordDataSize );

      CHECKRC(rc, "ComplexLogRecordDisplay");

      break;

    default:

      printf( "    Unknown log record: %lu "%c"n",

              recordSize, (char)recordType );

      break;

  }

  return 0;

} /* LogRecordDisplay */

/***************************************************************************/

/* SimpleLogRecordDisplay                                                  */

/* Prints the minimum details of the log record                            */

/***************************************************************************/

int SimpleLogRecordDisplay( sqluint16 recordType,

                            sqluint16 recordFlag,

                            char      *recordDataBuffer,

                            sqluint32 recordDataSize)

{

  int       rc = 0;

  sqluint32 timeTransactionCommited = 0;

  sqluint16 authIdLen = 0;

  char      *authId = NULL;

  switch (recordType)

  {

    case 138:

      printf("n    Record type: Local pending listn");

      timeTransactionCommited = *(sqluint32 *) (recordDataBuffer);

      authIdLen = *(sqluint16 *) (recordDataBuffer + 2*sizeof(sqluint32));

      authId = (char *)malloc(authIdLen + 1);

      memset( authId, '', (authIdLen + 1 ));

      memcpy( authId, (char *)(recordDataBuffer + 2*sizeof(sqluint32) +

              sizeof(sqluint16)), authIdLen);

      authId[authIdLen] = '';

      printf( "      %s: %lun",

              "UTC transaction committed (in seconds since 70-01-01)",

              timeTransactionCommited);

      printf("      authorization ID of the application: %sn", authId);

      free(authId);

      authId = NULL;

      authIdLen = 0;

      break;

    case 132:

      printf("n    Record type: Normal commitn");

      timeTransactionCommited = *(sqluint32 *) (recordDataBuffer);

      authIdLen = *(sqluint16 *) (recordDataBuffer + 2*sizeof(sqluint32));

      authId = (char *)malloc(authIdLen + 1);

      memset( authId, '', (authIdLen + 1 ));

      memcpy(authId, (char *)(recordDataBuffer + 2*sizeof(sqluint32) +

                              sizeof(sqluint16)), authIdLen);

      authId[authIdLen] = '';

      printf( "      %s: %lun",

              "UTC transaction committed (in seconds since 70-01-01)",

              timeTransactionCommited);

      printf("      authorization ID of the application: %sn", authId);

      free(authId);

      authId = NULL;

      authIdLen = 0;

      break;

    case 65:

      printf("n    Record type: Normal abortn");

      authIdLen = *(sqluint16 *) (recordDataBuffer);

      authId = (char *)malloc(authIdLen + 1);

      memset( authId, '', (authIdLen + 1 ));

      memcpy(authId, (char *)(recordDataBuffer + sizeof(sqluint16)), authIdLen);

      authId[authIdLen] = '';

      printf("      authorization ID of the application: %sn", authId);

      free(authId);

      authId = NULL;

      authIdLen = 0;

      break;

    default:

      printf( "    Unknown simple log record: %d %lun",

              recordType, recordDataSize);

      break;

  }

  return 0;

} /* SimpleLogRecordDisplay */

/***************************************************************************/

/* ComplexLogRecordDisplay                                                 */

/* Prints a detailed information of the log record                         */

/***************************************************************************/

int ComplexLogRecordDisplay( sqluint16 recordType,

                             sqluint16 recordFlag,

                             char      *recordHeaderBuffer,

                             sqluint32 recordHeaderSize,

                             sqluint8  componentIdentifier,

                             char      *recordDataBuffer,

                             sqluint32 recordDataSize)

{

  int      rc = 0;

  sqluint8 functionIdentifier = 0;

  /* for insert, delete, undo delete */

  struct RID recid = { 0 };

  sqluint16 subRecordLen = 0;

  sqluint16 subRecordOffset = 0;

  char *subRecordBuffer = NULL;

  /* for update */

  struct RID newRecid = { 0 };

  sqluint16 newSubRecordLen = 0;

  sqluint16 newSubRecordOffset = 0;

  char      *newSubRecordBuffer = NULL;

  struct RID oldRecid = { 0 };

  sqluint16 oldSubRecordLen = 0;

  sqluint16 oldSubRecordOffset = 0;

  char      *oldSubRecordBuffer = NULL;

  /* for alter table attributes */

  sqluint64 alterBitMask = 0;

  sqluint64 alterBitValues = 0;

  char ridString[14];

   printf("recordType: %xn", recordType);

  switch( recordType )

  {

    case 0x004E:

      printf("n    Record type: Normaln");

      break;

    case 0x0043:

      printf("n    Record type: Compensationn");

      break;

    default:

      printf("n    Unknown complex log record type: %cn", recordType);

      break;

  }

  switch (componentIdentifier)

  {

    case 1:

      printf("      component ID: DMS log recordn");

      break;

    default:

      printf("      unknown component ID: %dn", componentIdentifier);

      break;

  }

  functionIdentifier = *(sqluint8 *) (recordHeaderBuffer + 1);

  switch (functionIdentifier)

  {

    case 161:

      printf("      function ID: Delete Recordn");

      subRecordLen = *((sqluint16*)(recordDataBuffer + sizeof(sqluint16)));

      recid = *( (struct RID*)( recordDataBuffer +

                                3 * sizeof(sqluint16) ) );

      subRecordOffset = *( (sqluint16 *) ( recordDataBuffer +

                                         3 * sizeof(sqluint16) +

                                         sizeof(struct RID) ) );

      printf("        RID: " );

      RidToString( &recid, ridString );

      printf("%sn", ridString );

      printf("        subrecord length: %un", subRecordLen);

      printf("        subrecord offset: %un", subRecordOffset);

      subRecordBuffer = recordDataBuffer + 3 * sizeof(sqluint16) +

                        sizeof(struct RID) + sizeof(sqluint16);

      rc = LogSubRecordDisplay(subRecordBuffer, subRecordLen);

      CHECKRC(rc, "LogSubRecordDisplay");

      break;

    case 112:                                                   

      printf("      function ID: Undo Update Recordn");

      subRecordLen = *(sqluint16 *) ( recordDataBuffer + sizeof(sqluint16) );

      recid = *(struct RID *) (recordDataBuffer + 3 * sizeof(sqluint16));

      subRecordOffset = *(sqluint16 *) ( recordDataBuffer +

                                         3 * sizeof(sqluint16) +

                                         sizeof(struct RID) );

      printf("        RID: ");

      RidToString( &recid, ridString );

      printf("%sn", ridString );

      printf("        subrecord length: %un", subRecordLen);

      printf("        subrecord offset: %un", subRecordOffset);

      subRecordBuffer = recordDataBuffer + 3 * sizeof(sqluint16) +

                        sizeof(struct RID) + sizeof(sqluint16);

      rc = LogSubRecordDisplay(subRecordBuffer, subRecordLen);

      CHECKRC(rc, "LogSubRecordDisplay");

      break;

    case 110:

      printf("      function ID: Undo Insert Recordn");

      subRecordLen = *(sqluint16 *) ( recordDataBuffer + sizeof(sqluint16) );

      recid = *(struct RID *) (recordDataBuffer + 3 * sizeof(sqluint16));

      printf("        RID: ");

      RidToString( &recid, ridString );

      printf("%sn", ridString );

      printf("        subrecord length: %un", subRecordLen);

      break;                                                   

    case 111:

      printf("      function ID: Undo Delete Recordn");

      subRecordLen = *(sqluint16 *) ( recordDataBuffer + sizeof(sqluint16) );

      recid = *(struct RID *) (recordDataBuffer + 3 * sizeof(sqluint16));

      subRecordOffset = *(sqluint16 *) ( recordDataBuffer +

                                         3 * sizeof(sqluint16) +

                                         sizeof(struct RID) );

      printf("        RID: ");

      RidToString( &recid, ridString );

      printf("%sn", ridString );

      printf("        subrecord length: %un", subRecordLen);

      printf("        subrecord offset: %un", subRecordOffset);

      subRecordBuffer = recordDataBuffer + 3 * sizeof(sqluint16) +

                        sizeof(struct RID) + sizeof(sqluint16);

      rc = LogSubRecordDisplay(subRecordBuffer, subRecordLen);

      CHECKRC(rc, "LogSubRecordDisplay");

      break;

    case 162:

      printf("      function ID: Insert Recordn");

      subRecordLen = *(sqluint16 *) ( recordDataBuffer + sizeof(sqluint16)  );

      recid = *(struct RID *) (recordDataBuffer + 3 * sizeof(sqluint16) );

      subRecordOffset = *(sqluint16 *) ( recordDataBuffer +

                                         3 * sizeof(sqluint16) +

                                         sizeof(struct RID) );

      printf("        RID: ");

      RidToString( &recid, ridString );

      printf("%sn", ridString );

      printf("        subrecord length: %un", subRecordLen);

      printf("        subrecord offset: %un", subRecordOffset);

      subRecordBuffer = recordDataBuffer + 3 * sizeof(sqluint16) +

                        sizeof(struct RID) + sizeof(sqluint16);

      rc = LogSubRecordDisplay(subRecordBuffer, subRecordLen);

      CHECKRC(rc, "LogSubRecordDisplay");

      break;

    case 163:

      printf("      function ID: Update Recordn");

      newSubRecordLen = *(sqluint16 *) ( recordDataBuffer  +

                                         sizeof(sqluint16)  );

      oldSubRecordLen = recordDataSize + 6 -  

                        (2 * 20) -            

                        newSubRecordLen;

      oldRecid = *(struct RID *) (recordDataBuffer + 3 * sizeof(sqluint16));

      oldSubRecordOffset = *(sqluint16 *) ( recordDataBuffer  +

                                            3 * sizeof(sqluint16) +

                                            sizeof(struct RID) );

                                sizeof(struct RID) +

                                sizeof(sqluint16) +

                                oldSubRecordLen +

                                recordHeaderSize +

                                (3 * sizeof(sqluint16));

      newSubRecordOffset = *(sqluint16 *) ( recordDataBuffer +

                                           3 * sizeof(sqluint16) +

                                           sizeof(struct RID) +

                                           sizeof(sqluint16) +

                                           oldSubRecordLen +

                                           recordHeaderSize +

                                           3 * sizeof(sqluint16) +

                                           sizeof(struct RID) +

                                           sizeof(sqluint16) );

      printf("        oldRID:");

      RidToString( &oldRecid, ridString );

      printf("%sn", ridString );

      printf("        old subrecord length: %un", oldSubRecordLen);

      printf("        old subrecord offset: %un", oldSubRecordOffset);

      oldSubRecordBuffer = recordDataBuffer +

                           3 * sizeof(sqluint16) +

                           sizeof(struct RID) +

                           sizeof(sqluint16);

      rc = LogSubRecordDisplay(oldSubRecordBuffer, oldSubRecordLen);

      CHECKRC(rc, "LogSubRecordDisplay");

      printf("        newRID: " );

      RidToString( &newRecid, ridString );

      printf("%sn", ridString );

      printf("        new subrecord length: %un", newSubRecordLen);

      printf("        new subrecord offset: %un", newSubRecordOffset);

      newSubRecordBuffer = recordDataBuffer +

                           3 * sizeof(sqluint16) +

                           sizeof(struct RID) +

                           sizeof(sqluint16) +

                           oldSubRecordLen +

                           recordHeaderSize +

                           3 * sizeof(sqluint16) +

                           sizeof(struct RID) +

                           sizeof(sqluint16);

      rc = LogSubRecordDisplay(newSubRecordBuffer, newSubRecordLen);

      CHECKRC(rc, "LogSubRecordDisplay");

      break;

    case 165:

      printf("      function ID: Insert Record to Empty Pagen");

      subRecordLen = *(sqluint16 *) ( recordDataBuffer + sizeof(sqluint16)  );

      recid = *(struct RID *) (recordDataBuffer + 3 * sizeof(sqluint16) );

      subRecordOffset = *(sqluint16 *) ( recordDataBuffer +

                                         6 * sizeof(sqluint16) +

                                         sizeof(struct RID) );

      printf("        RID: ");

      RidToString( &recid, ridString );

      printf("%sn", ridString );

      printf("        subrecord length: %un", subRecordLen);

      printf("        subrecord offset: %un", subRecordOffset);

      subRecordBuffer = recordDataBuffer + 6 * sizeof(sqluint16) +

                        sizeof(struct RID) + sizeof(sqluint16);

      rc = LogSubRecordDisplay(subRecordBuffer, subRecordLen);

      CHECKRC(rc, "LogSubRecordDisplay");

      break;

    case 164:

      printf("      function ID: Delete Record to Empty Pagen");

      subRecordLen = *(sqluint16 *) ( recordDataBuffer + sizeof(sqluint16)  );

      recid = *(struct RID *) (recordDataBuffer + 3 * sizeof(sqluint16) );

      subRecordOffset = *(sqluint16 *) ( recordDataBuffer +

                                         6 * sizeof(sqluint16) +

                                         sizeof(struct RID) );

      printf("        RID: ");

      RidToString( &recid, ridString );

      printf("%sn", ridString );

      printf("        subrecord length: %un", subRecordLen);

      printf("        subrecord offset: %un", subRecordOffset);

      subRecordBuffer = recordDataBuffer + 6 * sizeof(sqluint16) +

                        sizeof(struct RID) + sizeof(sqluint16);

      rc = LogSubRecordDisplay(subRecordBuffer, subRecordLen);

      CHECKRC(rc, "LogSubRecordDisplay");

      break;

    case 166:

      printf("      function ID: Rollback delete Record to Empty Pagen");

      subRecordLen = *(sqluint16 *) ( recordDataBuffer + sizeof(sqluint16)  );

      recid = *(struct RID *) (recordDataBuffer + 3 * sizeof(sqluint16) );

      subRecordOffset = *(sqluint16 *) ( recordDataBuffer +

                                         6 * sizeof(sqluint16) +

                                         sizeof(struct RID) );

      printf("        RID: ");

      RidToString( &recid, ridString );

      printf("%sn", ridString );

      printf("        subrecord length: %un", subRecordLen);

      printf("        subrecord offset: %un", subRecordOffset);

      subRecordBuffer = recordDataBuffer + 6 * sizeof(sqluint16) +

                        sizeof(struct RID) + sizeof(sqluint16);

      rc = LogSubRecordDisplay(subRecordBuffer, subRecordLen);

      CHECKRC(rc, "LogSubRecordDisplay");

      break;

    case 124:

      printf("      function ID:  Alter Table Attributen");

      alterBitMask = *(sqluint64 *) (recordDataBuffer);

      alterBitValues = *(sqluint64 *) (recordDataBuffer + sizeof(sqluint64));

      if (alterBitMask & 0x00000001)

      {

          /* Alter the value of the 'propagation' attribute: */

          printf("        Propagation attribute is changed to: ");

          if (alterBitValues & 0x00000001)

          {

              printf("ONn");

          }

          else

          {

              printf("OFFn");

          }

      }

      if (alterBitMask & 0x00000002)

      {

          /* Alter the value of the 'pending' attribute: */

          printf("        Pending attribute is changed to: ");

          if (alterBitValues & 0x00000002)

          {

              printf("ONn");

          }

          else

          {

              printf("OFFn");

          }

      }

      if (alterBitMask & 0x00010000)

      {

          /* Alter the value of the 'append mode' attribute: */

          printf("        Append Mode attribute is changed to: ");

          if (alterBitValues & 0x00010000)

          {

              printf("ONn");

          }

          else

          {

              printf("OFFn");

          }

      }

      if (alterBitMask & 0x00200000)

      {

          /* Alter the value of the 'LF Propagation' attribute: */

          printf("        LF Propagation attribute is changed to: ");

          if (alterBitValues & 0x00200000)

          {

              printf("ONn");

          }

          else

          {

              printf("OFFn");

          }

      }

      if (alterBitMask & 0x00400000)

      {

          /* Alter the value of the 'LOB Propagation' attribute: */

          printf("        LOB Propagation attribute is changed to: ");

          if (alterBitValues & 0x00400000)

          {

              printf("ONn");

          }

          else

          {

              printf("OFFn");

          }

      }

      break;

    default:

      printf("      unknown function identifier: %un", functionIdentifier);

      break;

  }

  return 0;

} /* ComplexLogRecordDisplay */

/***************************************************************************/

/* LogSubRecordDisplay                                                     */

/* Prints the sub records for the log                                      */

/***************************************************************************/

int LogSubRecordDisplay( char      *recordBuffer,

                         sqluint16 recordSize )

{

  int       rc = 0;

  sqluint8  recordType = 0;

  sqluint8  updatableRecordType = 0;

  sqluint16 userDataFixedLength = 0;

  char      *userDataBuffer = NULL;

  sqluint16 userDataSize = 0;

  recordType = *(sqluint8 *) (recordBuffer);

  if ((recordType != 0) && (recordType != 4) && (recordType != 16))

  {

    printf("        Unknown subrecord type: %xn", recordType);

  }

  else if (recordType == 4)

  {

    printf("        subrecord type: Special controln");

  }

  else

  {

    /* recordType == 0 or recordType == 16

     * record Type 0 indicates a normal record

     * record Type 16, for the purposes of this program, should be treated

     * as type 0

     */

    printf("        subrecord type: Updatable, ");

    updatableRecordType = *(sqluint8 *) (recordBuffer + sizeof(sqluint32));

    if (updatableRecordType != 1)

    {

      printf("Internal controln");

    }

    else

    {

      printf("Formatted user datan");

      userDataFixedLength =

                     *(sqluint16 *) ( recordBuffer + sizeof(sqluint16) +

                                      sizeof(sqluint32));

      printf("        user data fixed length: %un", userDataFixedLength);

      userDataBuffer = recordBuffer + 8;

      userDataSize = recordSize - 8;

      rc = UserDataDisplay(userDataBuffer, userDataSize);

      CHECKRC(rc, "UserDataDisplay");

    }

  }

  return 0;

} /* LogSubRecordDisplay */

/***************************************************************************/

/* UserDataDisplay                                                         */

/* Displays the user data section                                          */

/***************************************************************************/

int UserDataDisplay(char *dataBuffer, sqluint16 dataSize)

{

  int       rc = 0;

  sqluint16 line = 0;

  sqluint16 col = 0;

  const int rowLength = 10;

  printf("        user data:n");

  printf("%dn", *(sqluint32 *) dataBuffer);

  for (line = 0; line * rowLength < dataSize; line++)

  {

    printf("        ");

    for (col = 0; col < rowLength; col++)

    {

      if (line * rowLength + col < dataSize)

      {

          printf("%02X ", (unsigned char)dataBuffer[line * rowLength + col]);

      }

      else

      {

          printf("   ");

      }

    }

    printf("*");

    for (col = 0; col < rowLength; col++)

    {

      if (line * rowLength + col < dataSize)

      {

          if( isalpha(dataBuffer[line * rowLength + col]) ||

              isdigit(dataBuffer[line * rowLength + col]))

          {

              printf("%c", dataBuffer[line * rowLength + col]);

          }

          else

          {

              printf(".");

          }

      }

      else

      {

          printf(" ");

      }

    }

    printf("*");

    printf("n");

  }

  return 0;

} /* UserDataDisplay */

我创建了一张表:db2 "create table t6(col1 int, col2 char(10), col3 varchar(20), col4 date, col5 decimal(6,2))"

打开前镜像:db2 "alter table t6 data capture changes”

插入一条记录:db2 "insert into t6 values(1,'a','b','2016-7-24',3.45)"

下面这段时这个程序的输出:

RLOG_FILTERDATA:

    recordLSN: 83054981

    realLogRecLen: 84

    sqlcode: 0

    recordSize: 84

recordType: 4e

tablespace id is 2 and table id is 16

recordType: 4e

    Record type: Normal

      component ID: DMS log record

      function ID: Insert Record

        RID: x040000000000

        subrecord length: 40

        subrecord offset: 2828

        subrecord type: Updatable, Formatted user data

        user data fixed length: 31

        user data:

1

        01 00 00 00 00 61 20 20 20 20 *.....a....*

        20 20 20 20 20 00 1F 00 01 00 *..........*

        00 20 16 07 24 00 00 00 34 5C *........4.*

        00 62                         *.b        *

RLOG_FILTERDATA:

    recordLSN: 83055065

    realLogRecLen: 42

    sqlcode: 0

    recordSize: 42

recordType: 84

    Record type: Normal commit

      UTC transaction committed (in seconds since 70-01-01): 1469273823

      authorization ID of the application: DB2INST1

自己对号找一找各个字段吧。从输出里面可以看出来:DB2把变长的字段放在最后;Decimal时字符串形式存储;日期也是字符串形式存储。


前面说的都是复制,现在聊一下读写分离。读写分离听上去像是一个过时的技术,因为RAC和purescale都能够支持多活了,为什么还需要读写分离呢?

第一:读写分离对读写的隔离性更强,读写分离可以完全把一个系统的交易功能和分析功能分离出去。我们见过很多系统,分析功能已运行,整个系统的资源就被用完了,交易也会受影响。如果我们把这样的交易分离出去,整个系统的体验就会提高很多。如果是RAC或PS,节点间往往还是会受影响。

第二:读写分离可以提高灾备和异地灾备的资源的使用率。

第三:读写分离对数据库的性能扩展几乎成线性(虽然需要应用程序支持),而多活数据库是做不到线性扩展的。

第四:最重要的一条,多活数据库的组成基本是:数据库实例、共享文件系统和集群管理器三部分。对数据库的维护也就意味着分成了三部分,故障点也增加了。而在实际应用的过程中,一个节点的问题,往往会给整个集群带来影响。而基于复制的续写分离,运维成本会降低很多。

简单的复制,然后将读操作分配到备机上,这样的方案其实是一种简约但不简单的方案。备机的延时是肯定有的,所以,使用这种方式必须做两件事情:读应用可以接受延时,这个多数应用都是可以的,比如说账户查询,肯定可以接受几秒钟的延时,更不用说那些统计分析功能了;尽量减少延时。

复制是每个数据库都有的功能,前面也提到像CDC这样的复制软件。那应用怎样做读写分离呢?

这个方面,MySQL比DB2强了很多。Informix更强,感兴趣的可以了解一下。MySQL的读写分离一搜一片,不管是工具还是驱动本身都提供了这种功能。自己百度一下吧。

DB2的就麻烦了,路子比较少,修改框架是可以实现的,但是这个链接中的方法是比较合适的:http://www.cnblogs.com/pier2/p/spring-boot-read-write-split.html

写到这吧,前后磨蹭了一个多月了,实在是没空当作家。

如果觉得我的文章对您有用,请点赞。您的支持将鼓励我继续创作!

4

添加新评论0 条评论

Ctrl+Enter 发表

作者其他文章

相关文章

相关问题

相关资料

X社区推广