张子阳的博客

首页 读书 技术 店铺 关于
张子阳的博客 首页 读书 技术 关于

MSSQL 使用Change Tracking(更改跟踪)同步数据

2019-01-23 张子阳 分类: 数据库与数据访问

很多时候,系统的瓶颈都在数据库的读写方面。一般解决方案都是分库、分表,但是分库分表也有自己的问题:1、连接查询(Join操作)效率降低;2、往往需要修改代码。此时,另一个常用的方法就是读写分离,主库进行增删改,从库只进行读。这样就涉及到主从数据库同步的问题。不同的数据库有不同的解决方案,对于MS SQL Server有日志传送、Always On等方案。

但是,上面的方案都是在同质的数据存储之间进行同步,简单来说,从MSSQL同步到另一台MSSQL。如果从MSSQL同步到MySql数据库,或者从MSSQL同步到消息队列或者缓存,则需要采用其他方法。不同的数据库提供了不同的解决方案,MSSQL提供了两个方案,Change TrackingChange Data Capture。我在Kafka Connect 实时读取MSSQL数据到Kafka 中也做了简单的介绍。这篇文章将更详细介绍如何使用Change Tracking来实现数据同步。

开启Change Tracking

新建测试表 users 和 user_play

新建测试库DataSync,然后建两张表。users表示用户表,user_play表示用户在哪个房间进行游戏(一个虚拟的游戏数据库)。

USE [DataSync]
GO

CREATE TABLE [dbo].[users] (
	[user_id] [int] IDENTITY ( 1, 1 ) NOT NULL,
	[user_name] [varchar] ( 50 ) NOT NULL,
	CONSTRAINT [PK_users] PRIMARY KEY CLUSTERED ( [user_id] ASC )) 
	
CREATE TABLE [dbo].[user_play] (
	[id] [int] IDENTITY ( 1, 1 ) NOT NULL,
	[user_name] [varchar] ( 50 ) NOT NULL,
	[room] [varchar] ( 50 ) NOT NULL,
	[score] [int] NOT NULL,
[remark] [varchar] ( 500 ) NULL,
CONSTRAINT [PK_user_play] PRIMARY KEY CLUSTERED ( [user_name] ASC, [room] ASC ))

我们会先用users表测试,后面再看user_play表。

开启ChangeTracking

需要先在数据库开启Change Tracking,再在表上开启:

ALTER DATABASE DataSync  
SET CHANGE_TRACKING = ON  
(CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON)  

CHANGE_RETENTION说明了变更记录保留的时间为2天(默认值);AUTO_CLEANUP说明是否开启自动清理记录,一般选择打开。

接下来现在users表上开启:

ALTER TABLE DataSync.dbo.users
ENABLE CHANGE_TRACKING  
WITH (TRACK_COLUMNS_UPDATED = OFF) 

当开启 TRACK_COLUMNS_UPDATED 这个选项时,会记录对哪个行的哪个列进行了更新;否则只会记录对哪个行进行了更新(根据主键)。

Change Tracking的常用SQL语句

接下来我们就可以测试一下Change Tracking,它的名字(更改跟踪)已经很好的说明了它的功能:用一个long型的数字作为版本号,记录下发生变更的表的主键。如果开启了TRACK_COLUMNS_UPDATED,则会记录变更的列,否则不记录。

它只会记录哪个列发生了变更,并不会记录变更的内容。举例来说,当users表id=1的行的user_name从jimmy改为了jack,只会记录user_name,而不会记录jimmy和jack。

获取数据库中的哪些表启用了Change Tracking

我们首先要知道哪些表启用了更改跟踪,然后才能对这些启用了的表进行同步:

Select Object_Name(object_id) table_name, *
    from sys.change_tracking_tables
检查哪些表开启了Change Tracking

这里的几个列的含义如下:

获取表的最小可用版本和当前版本

这里最重要的一点就是注意到:版本号是个自增字段,并且是在多个表之间共同的。我们会在后面回顾这一点。

select CHANGE_TRACKING_MIN_VALID_VERSION(OBJECT_ID('users')) as MinValidVersion,
CHANGE_TRACKING_CURRENT_VERSION() as CurrentVersion
检查某个表的最小可用版本和当前版本
可以看到,在初始状态时,当前的版本为0。而最小可用版本为1(这里有点奇怪,是容易出BUG的地方)。

测试Change Tracking

插入数据

insert into users(user_name) values('子阳')

此时执行检查版本的语句,可以看到最小可用版本由0变为了1:

user表当前的最小可用版本和当前版本

接下来要查看更改的记录,执行下面的SQL语句:

select * from CHANGETABLE(CHANGES [users], 0) c
查看user表变更

每次想要获取变更信息,都需要调用CHANGETABLE这个函数。它的第一个参数是要查询的表名,这里是users,第二个参数是版本号。这个版本号的取值非常重要,因为CHANGETABLE函数将返回该版本与当前版本的差异。就此处而言,因为当前版本是1,所以会返回从版本0到版本1之间的差异。

注意这里的几个字段:

SYS_CHANGE_VERSION 可以用于解决一些不一致问题,在 MSSQL ChangeTracking的不一致问题中介绍。

如果修改下上面的语句,将version传入为1,因为当前版本也是1,两个版本相同,返回空行:

version传入1时为空

注意到当前可用版本为1,因此我们在同步时,如果上一次同步的结果小于MinValidVersion,则从 MinValidVersion-1开始同步(如果从MinValidVersion开始,那么会少了最小可用版本的数据),C#伪代码如下:

if(lastVersion < minValidVersion){
    lastVersion = minValidVersion - 1;
}

// 接下来
// 1. 调用 select CHANGE_TRACKING_CURRENT_VERSION(), 获取当前版本currentVersion,并保存之;同时作为下一次同步时的 lastVersion
// 2. 调用 select * from CHANGETABLE(CHANGES [table_name], lastVersion) c 获取从lastVersion到currentVersion的变更数据

在做同步时,我们不仅需要知道哪些行更新了,也需要知道更新的内容。虽然上面只返回了主键,但这个可以很容易地通过join连接查询来实现:

select t.*, c.SYS_CHANGE_OPERATION from
CHANGETABLE(CHANGES [users], 0) c left join
users t 
on c.user_id = t.user_id

更改数据

了解了这个机制以后,后面的就很容易了,调用下面的语句将user_name从“子阳”改为“jimmy”:

update users set user_name='jimmy' where user_id=1

先查看当前的版本:

查看users当前版本

此时,可以看到当前的版本变为了2。接下来再查看哪些行进行了变更,为了进行对比,分别调用CHANGETABLE(CHANGES [users], 0)和CHANGETABLE(CHANGES [users], 1):

CHANGETABLE(CHANGES [users], 0)

注意上图的SYS_CHANGE_OPERATION为I,因为版本0和版本2的差异是新增了一条数据。

CHANGETABLE(CHANGES [users], 1)

这里的SYS_CHANGE_OPERATION为U,因为版本1和版本2的差异是更新了一条数据。

这个机制可以用于同步多个数据池,可能每个数据池同步的进度不一致,那么就需要传入不同的version,然后根据返回的参数进行操作。如果是I,则插入数据;如果是U则更新数据。

如果输入CHANGETABLE(CHANGES [users], 2),返回空行,因为当前的版本也是2.

删除数据

接下来执行删除操作,删除user_id为1的数据:

delete users where user_id=1

删除操作比较有意思,因为不论哪个版本,和当前版本(现在已经为3了)对比,都是少了一条数据,因此CHANGETABLE(CHANGES [users], 0)、CHANGETABLE(CHANGES [users], 1)、CHANGETABLE(CHANGES [users], 2) 的SYS_CHANGE_OPERATION均为D。

其他注意事项

当有多个表时,当前版本是共用的

换句话来说,就是:当前版本是针对数据库的,不是针对表的。

还记得我们上面创建了两张表吧?先往user_play表中插入一条数据

insert into user_play(user_name, room, score) values('子阳', '标准场', 100)

接着开启user_play表的Change Tracking:

ALTER TABLE DataSync.dbo.user_play
ENABLE CHANGE_TRACKING  
WITH (TRACK_COLUMNS_UPDATED = ON) 

然后查询一下两张表的最小可用版本:

查看最小可用版本

因为user_play表是在当前版本为3时才开启Change Tracking功能,因此它的最小可用版本为3。而CurrentVersion对于user_play和users表都是一样的。

此时,对于user_play而言,不论 CHANGETABLE(CHANGES [user_play], version) 中的版本传入几,都会返回空行,因为开启Chnage Tracking之前的记录是无法获得的。因此前面插入的记录是无法同步的,只能同步开启Change Tracking后的记录。

如果此时向user_play再插入一条记录,其最小可用版本不变,但CurrentVersion变为了4。因此,不论是从MinValidVersion(3)开始,还是从MinValidVersion-1(2)开始,都可以获得正确结果。

通过TRACK_COLUMNS_UPDATED获取更新的列

在开启user_play表的Change Tracking时,我们将TRACK_COLUMNS_UPDATED选项设置为了NO,它可以用来记录当执行Update操作时,更新了哪些列。先执行下面的语句更新一下user_play表:

update user_play set score=150 where id=1

查看一下变更:

查看update变更
这里最后的两列user_name和room是user_play表的主键(对于users表来说是user_id),这里是动态的,总是所查询表的主键。当主键为复合主键时,就会列出主键包含的所有列。

看到SYS_CHANGE_COLUMNS列不再为NULL,但是一个二进制数据,为了得到它的值,需要调用CHANGE_TRACKING_IS_COLUMN_IN_MASK函数。

Select CHANGE_TRACKING_IS_COLUMN_IN_MASK( 
        COLUMNPROPERTY(OBJECT_ID('user_play'), 'score', 'ColumnId'),
        c.sys_change_columns
    ) score_changed,
    CHANGE_TRACKING_IS_COLUMN_IN_MASK( 
        COLUMNPROPERTY(OBJECT_ID('user_play'), 'id', 'ColumnId'),
        c.sys_change_columns
    ) id_changed, *
from CHANGETABLE(CHANGES [user_play], 4) c
查看update变更的列

通常来说,是不对主键和自增列进行更新操作的,只会新增和删除。在上面我们查询id列的变化只是作为一个示范。在上表中,并没有返回变化的值(100 --> 150),而只是返回了是否有变化:1表示有变化,0表示没有变化。

因此,后续的同步操作通常是:对于发生变化的列,执行update操作。但是这样通常显得比较繁琐,更简单的办法,和上面的insert时是一样的:不去查询哪个列发生了变化,通过join操作,查询出所有列(包含变化和没变化的),然后对所有非主键、非自增列进行update操作。

Change Tracking不记录中间的变化过程,只记录最终变化的是什么。这句话的意思就是:如果你连续执行很多个update,再调用CHANGETABLE语句(假设当前版本从4递增到了9),那么不轮version传入的是4、5、6、7、8,都返回的是同样的结果:“某一行数据发生了update变化,主键是什么,仅此而已”。但对于我们而言,这也就足够了。

总结

这篇文章演示了如何使用MSSQL的Change Tracking功能来获取变更数据,进而可以通过编写代码来实现数据同步的功能。Change Tracking是一个轻量级的方案,在实际应用中主从服务器的开销并不大。它适合用来将MSSQL数据库中的数据同步到其他异质的数据源中,例如MySQL、Redis、消息队列、HDFS中。

感谢阅读,希望这篇文章能给你带来帮助!