大数据时代,数据实时同步解决方案的思考—最全的数据同步总结

  • 时间:
  • 浏览:2

1、 早期关系型数据库之间的数据同步

1)、全量同步

比如从oracle数据库中同步一张表的数据到Mysql中,通常的做法假如有一天 分页查询源端的表,假如有一天通过 jdbc的batch 法律法子插入到目标表,或多或少地方不能 注意的是,分页查询时,一定要按照主键id来排序分页,出理 重复插入。

2)、基于数据文件导出和导入的全量同步,或多或少同步法律法子一般只适用于同种数据库之间的同步,将会是不同的数据库,或多或少法律法子将会会指在问題。

3)、基于触发器的增量同步

增量同步一般是做实时的同步,早期全都数据同步总要基于关系型数据库的触发器trigger来做的。

使用触发器实时同步数据的步骤:

A、 基于原表创触发器,触发器所含insert,modify,delete 有一种类型的操作,数据库的触发器分Before和After有一种状态,有一种是在insert,modify,delete 有一种类型的操作指在后来触发(比如记录日志操作,一般是Before),有一种是在insert,modify,delete 有一种类型的操作后来触发。

B、 创建增量表,增量表中的字段和原表中的字段删改一样,假如有一天不能 多一有5个 多 操作类型字段(分表代表insert,modify,delete 有一种类型的操作),假如有一天不能 一有5个 多 唯一自增ID,代表数据原表中数据操作的顺序,或多或少自增id非常重要,不然数据同步就会错乱。

C、 原表中出显insert,modify,delete 有一种类型的操作时,通过触发器自动产生增量数据,插入增量表中。

D、出理 增量表中的数据,出理 时,一定是按照自增id的顺序来出理 ,或多或少强度会非常低,没办法 律法子做批量操作,不然数据会错乱。  有人将会会说,是总要不能 把insert操作合并在共同,modify合并在共同,delete操作合并在共同,假如有一天批量出理 ,我给的答案是不行,将会数据的增删改是有顺序的,合并后,就没办法 顺序了,同第三根数据的增删改顺序一旦错了,那数据同步就肯定错了。

市面上全都数据etl数据交换产品总要基于或多或少思想来做的。

E、 或多或少思想使用kettle 很容易就不能 实现,笔者从前在本人的博客中写过 kettle的文章,https://www.cnblogs.com/laoqing/p/73500673.html

4)、基于时间戳的增量同步

A、首先亲戚亲戚朋友不能 一张临时temp表,用来存取每次读取的待同步的数据,也假如有一天把每次从原表中根据时间戳读取到数据先插入到临时表中,每次在插入前,先清空临时表的数据

B、亲戚亲戚朋友还不能 创建一有5个 多 时间戳配置表,用于存放每次读取的出理 完的数据的最后的时间戳。

C、每次从原表中读取数据时,先查询时间戳配置表,假如有一天就知道了查询原表时的后来开始了了时间戳。

D、根据时间戳读取到原表的数据,插入到临时表中,假如有一天再将临时表中的数据插入到目标表中。

E、从缓存表中读取出数据的最大时间戳,假如有一天更新到时间戳配置表中。缓存表的作用假如有一天使用sql获取每次读取到的数据的最大的时间戳,当然哪此总要删改基于sql说说在kettle中来配置,才不能 从前的一张临时表。

2、    大数据时代下的数据同步

1)、基于数据库日志(比如mysql的binlog)的同步

亲戚亲戚朋友都知道全都数据库都支持了主从自动同步,尤其是mysql,不能 支持多主多从的模式。没办法 亲戚亲戚朋友是总要不能 利用或多或少思想呢,答案当然是肯定的,mysql的主从同步的过程是从前的。

  A、master将改变记录到二进制日志(binary log)中(哪此记录叫做二进制日志事件,binary log events,不能 通过show binlog events进行查看);

  B、slave将master的binary log events拷贝到它的中继日志(relay log);

  C、slave重做中继日志中的事件,将改变反映它本人的数据。

阿里巴巴开源的canal就完美的使用或多或少法律法子,canal 伪装了一有5个 多 Slave 去喝Master进行同步。

A、 canal模拟mysql slave的交互协议,伪装本人为mysql slave,向mysql master发送dump协议

B、 mysql master收到dump请求,后来开始了了推送binary log给slave(也假如有一天canal)

C、 canal解析binary log对象(原始为byte流)

另外canal 在设计时,不得劲设计了 client-server 模式,交互协议使用 protobuf 3.0 , client 端可采用不同语言实现不同的消费逻辑。

canal java 客户端: https://github.com/alibaba/canal/wiki/ClientExample

canal c# 客户端: https://github.com/dotnetcore/CanalSharp

canal go客户端: https://github.com/CanalClient/canal-go

canal php客户端: https://github.com/xingwenge/canal-php、

github的地址:https://github.com/alibaba/canal/

另外canal 1.1.1版本后来, 默认支持将canal server接收到的binlog数据直接投递到MQ   https://github.com/alibaba/canal/wiki/Canal-Kafka-RocketMQ-QuickStart

D、在使用canal时,mysql不能 开启binlog,假如有一天binlog-format不能 为row,不能 在mysql的my.cnf文件中增加如下配置

log-bin=E:/mysql5.5/bin_log/mysql-bin.log

binlog-format=ROW

server-id=123、

E、 部署canal的服务端,配置canal.properties文件,假如有一天 启动 bin/startup.sh 或bin/startup.bat

#设置要监听的mysql服务器的地址和端口

canal.instance.master.address = 127.0.0.1:35006

#设置一有5个 多 可访问mysql的用户名和密码并具有相应的权限,本示例用户名、密码都为canal

canal.instance.dbUsername = canal

canal.instance.dbPassword = canal

#连接的数据库

canal.instance.defaultDatabaseName =test

#订阅实例中所有的数据库和表

canal.instance.filter.regex = .*\\..*

#连接canal的端口

canal.port= 11111

#监听到的数据变更发送的队列

canal.destinations= example

F、 客户端开发,在maven中引入canal的依赖

   <dependency>
         <groupId>com.alibaba.otter</groupId>
          <artifactId>canal.client</artifactId>
          <version>1.0.21</version>
      </dependency>

代码示例:

package com.example;

import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

 
public class CanalClientExample {

    public static void main(String[] args) {
        while (true) {
            //连接canal
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example", "canal", "canal");
            connector.connect();
            //订阅 监控的 数据库.表
            connector.subscribe("demo_db.user_tab");
            //一次取10条
            Message msg = connector.getWithoutAck(10);

            long batchId = msg.getId();
            int size = msg.getEntries().size();
            if (batchId < 0 || size == 0) {
                System.out.println("没办法

消息,休眠5秒");
                try {
                    Thread.sleep(50000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            } else {
                //
                CanalEntry.RowChange row = null;
                for (CanalEntry.Entry entry : msg.getEntries()) {
                    try {
                        row = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                        List<CanalEntry.RowData> rowDatasList = row.getRowDatasList();
                        for (CanalEntry.RowData rowdata : rowDatasList) {
                            List<CanalEntry.Column> afterColumnsList = rowdata.getAfterColumnsList();
                            Map<String, Object> dataMap = transforListToMap(afterColumnsList);
                            if (row.getEventType() == CanalEntry.EventType.INSERT) {
                                //具体业务操作
                                System.out.println(dataMap);
                            } else if (row.getEventType() == CanalEntry.EventType.UPDATE) {
                                //具体业务操作
                                System.out.println(dataMap);
                            } else if (row.getEventType() == CanalEntry.EventType.DELETE) {
                                List<CanalEntry.Column> beforeColumnsList = rowdata.getBeforeColumnsList();
                                for (CanalEntry.Column column : beforeColumnsList) {
                                    if ("id".equals(column.getName())) {
                                        //具体业务操作
                                        System.out.println("删除的id:" + column.getValue());
                                    }
                                }
                            } else {
                                System.out.println("或多或少操作类型不做出理

");
                            }

                        }

                    } catch (InvalidProtocolBufferException e) {
                        e.printStackTrace();
                    }
                }
                //确认消息
                connector.ack(batchId);
            }


        }
    }

    public static Map<String, Object> transforListToMap(List<CanalEntry.Column> afterColumnsList) {
        Map map = new HashMap();
        if (afterColumnsList != null && afterColumnsList.size() > 0) {
            for (CanalEntry.Column column : afterColumnsList) {
                map.put(column.getName(), column.getValue());
            }
        }
        return map;
    }


}

2)、基于BulkLoad的数据同步,比如从hive同步数据到hbase

 

亲戚亲戚朋友有有一种法律法子不能 实现,

A、 使用spark任务,通过HQl读取数据,假如有一天再通过hbase的Api插入到hbase中。

假如有一天或多或少做法,强度很低,假如有一天大批量的数据共同插入Hbase,对Hbase的性能影响很大。

在大数据量的状态下,使用BulkLoad不能 快速导入,BulkLoad主假如有一天借用了hbase的存储设计思想,将会hbase本质是存储在hdfs上的一有5个 多 文件夹,假如有一天底层是以一有5个 多 个的Hfile指在的。HFile的形式指在。Hfile的路径格式一般是从前的:

/hbase/data/default(默认是或多或少,将会hbase的表没办法 指定命名空间说说,将会指定了,或多或少假如有一天命名空间的名字)/<tbl_name>/<region_id>/<cf>/<hfile_id>

B、 BulkLoad实现的原理假如有一天按照HFile格式存储数据到HDFS上,生成Hfile不能 使用hadoop的MapReduce来实现。将会总要hive中的数据,比如内外部的数据,没办法 亲戚亲戚朋友不能 将内外部的数据生成文件,假如有一天上传到hdfs中,组装RowKey,假如有一天将封装后的数据在回写到HDFS上,以HFile的形式存储到HDFS指定的目录中。

 

当然亲戚亲戚朋友可是能 不后来生成hfile,不能 使用spark任务直接从hive中读取数据转加进去RDD,假如有一天使用HbaseContext的自动生成Hfile文件,要素关键代码如下:

…
//将DataFrame转换bulkload不能

的RDD格式
    val rddnew = datahiveDF.rdd.map(row => {
      val rowKey = row.getAs[String](rowKeyField)
 
      fields.map(field => {
        val fieldValue = row.getAs[String](field)
        (Bytes.toBytes(rowKey), Array((Bytes.toBytes("info"), Bytes.toBytes(field), Bytes.toBytes(fieldValue))))
      })
    }).flatMap(array => {
      (array)
    })
…
//使用HBaseContext的bulkload生成HFile文件
    hbaseContext.bulkLoad[Put](rddnew.map(record => {
      val put = new Put(record._1)
      record._2.foreach((putValue) => put.addColumn(putValue._1, putValue._2, putValue._3))
      put
    }), TableName.valueOf(hBaseTempTable), (t : Put) => putForLoad(t), "/tmp/bulkload")
 
    val conn = ConnectionFactory.createConnection(hBaseConf)
    val hbTableName = TableName.valueOf(hBaseTempTable.getBytes())
    val regionLocator = new HRegionLocator(hbTableName, classOf[ClusterConnection].cast(conn))
    val realTable = conn.getTable(hbTableName)
    HFileOutputFormat2.configureIncrementalLoad(Job.getInstance(), realTable, regionLocator)
 
    // bulk load start
    val loader = new LoadIncrementalHFiles(hBaseConf)
    val admin = conn.getAdmin()
    loader.doBulkLoad(new Path("/tmp/bulkload"),admin,realTable,regionLocator)
 
    sc.stop()
  }
…
  def putForLoad(put: Put): Iterator[(KeyFamilyQualifier, Array[Byte])] = {
    val ret: mutable.MutableList[(KeyFamilyQualifier, Array[Byte])] = mutable.MutableList()
    import scala.collection.JavaConversions._
    for (cells <- put.getFamilyCellMap.entrySet().iterator()) {
      val family = cells.getKey
      for (value <- cells.getValue) {
        val kfq = new KeyFamilyQualifier(CellUtil.cloneRow(value), family, CellUtil.cloneQualifier(value))
        ret.+=((kfq, CellUtil.cloneValue(value)))
      }
    }
    ret.iterator
  }
}

…

C、pg_bulkload的使用

这是一有5个 多 支持pg库(PostgreSQL)批量导入的插件工具,它的思想也是通过内外部文件加载的法律法子,或多或少工具笔者没办法 亲自去用过,删改的介绍不能 参考:https://my.oschina.net/u/3317105/blog/852785   pg_bulkload项目的地址:http://pgfoundry.org/projects/pgbulkload/

3)、基于sqoop的全量导入

Sqoop 是hadoop生态中的一有5个 多 工具,专门用于内外部数据导入进入到hdfs中,内外部数据导出时,支持全都常见的关系型数据库,也是在大数据中常用的一有5个 多 数据导出导入的交换工具。

 

Sqoop从内外部导入数据的流程图如下:

Sqoop将hdfs中的数据导出的流程如下:

本质总要用了大数据的数据分布式出理 来快速的导入和导出数据。

4)、HBase中建表,假如有一天Hive中建一有5个 多 内外部表,从前当Hive中写入数据后,HBase中也会共同更新,假如有一天不能 注意

A、hbase中的空cell在hive中会补null

B、hive和hbase中不匹配的字段会补null

亲戚亲戚朋友不能 在hbase的shell 交互模式下,创建一张hbse表

create 'bokeyuan','zhangyongqing'

使用或多或少命令,亲戚亲戚朋友不能 创建一张叫bokeyuan的表,假如有一天上端有一有5个 多 列族zhangyongqing,hbase创建表时,不能 不不指定字段,假如有一天不能 指定表名以及列族

亲戚亲戚朋友不能 使用的hbase的put命令插入或多或少数据

put 'bokeyuan','001','zhangyongqing:name','robot'

put 'bokeyuan','001','zhangyongqing:age','20'

put 'bokeyuan','002','zhangyongqing:name','spring'

put 'bokeyuan','002','zhangyongqing:age','18'

不能 通过hbase的scan 全表扫描的法律法子查看亲戚亲戚朋友插入的数据

scan ' bokeyuan'

亲戚亲戚朋友继续创建一张hive内外部表

create external table bokeyuan (id int, name string, age int) 

STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 

WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,user:name,user:age") 

TBLPROPERTIES("hbase.table.name" = " bokeyuan");

内外部表创建好了后,亲戚亲戚朋友不能 使用HQL说说来查询hive中的数据了

select * from classes;

OK

1 robot 20

2 spring 18

Debezium是一有5个 多 开源项目,为捕获数据更改(change data capture,CDC)提供了一有5个 多 低延迟的流式出理 平台。让我安装假如有一天配置Debezium去监控你的数据库,假如有一天你的应用就不能 消费对数据库的每一有5个 多 行级别(row-level)的更改。没办法 已提交的更改才是可见的,全都你的应用不不担心事务(transaction)将会更改被回滚(roll back)。Debezium为所有的数据库更改事件提供了一有5个 多 统一的模型,全都你的应用不不担心每有一种数据库管理系统的错综僵化 性。另外,将会Debezium用持久化的、有副本备份的日志来记录数据库数据变化的历史,假如有一天,你的应用不能 随时停止再重启,而不不错过它停止运行时指在的事件,保证了所有的事件都能被正确地、删改指在理掉。

该项目的GitHub地址为:https://github.com/debezium/debezium   这是一有5个 多 开源的项目。

 

  从前监控数据库,假如有一天在数据变动的后来获得通知虽然一直是一件很僵化 的事情。关系型数据库的触发器不能 做到,假如有一天只对特定的数据库有效,假如有一天通常没办法 更新数据库内的状态(无法和内外部的tcp连接运行运行通信)。或多或少数据库提供了监控数据变动的API将会框架,假如有一天没办法 一有5个 多 标准,要素数据库的实现法律法子总要不同的,假如有一天不能 大量特定的知识和理解特定的代码不能运用。确保以相同的顺序查看和出理 所有更改,共同最小化影响数据库仍然非常具有挑战性。

       Debezium正好提供了模块为你做哪此僵化 的工作。或多或少模块是通用的,假如有一天不能适用多种数据库管理系统,但在功能和性能方面仍有或多或少限制。另或多或少模块是为特定的数据库管理系统定制的,全都亲戚亲戚朋友通常不能 更多地利用数据库系统有一种的形状来提供更多功能,Debezium提供了对MongoDB,mysql,pg,sqlserver的支持。

Debezium是一有5个 多 捕获数据更改(CDC)平台,假如有一天利用Kafka和Kafka Connect实现了本人的持久性、可靠性和容错性。每一有5个 多 部署在Kafka Connect分布式的、可扩展的、容错性的服务中的connector监控一有5个 多 上游数据库服务器,捕获所有的数据库更改,假如有一天记录到一有5个 多 将会多个Kafka topic(通常一有5个 多 数据库表对应一有5个 多 kafka topic)。Kafka确保所有哪此数据更改事件都不能多副本假如有一天总体上有序(Kafka没办法 保证一有5个 多 topic的单个分区内有序),从前,更多的客户端不能 独立消费同样的数据更改事件而对上游数据库系统造成的影响降到很小(将会N个应用都直接去监控数据库更改,对数据库的压力为N,而用debezium汇报数据库更改事件到kafka,所有的应用都去消费kafka中的消息,不能 把对数据库的压力降到1)。另外,客户端不能 随时停止消费,假如有一天重启,从上次停止消费的地方接着消费。每个客户端不能 自行决定亲戚亲戚朋友与非 不能 exactly-once将会at-least-once消息交付语义保证,假如有一天所有的数据库将会表的更改事件是按照上游数据库指在的顺序被交付的。

       对于不不能 将会不你会或多或少容错级别、性能、可扩展性、可靠性的应用,亲戚亲戚朋友不能 使用内嵌的Debezium connector引擎来直接在应用内内外部运行connector。或多或少应用仍不能 消费数据库更改事件,但更希望connector直接传递给它,而总要持久化到Kafka里。

更删改的介绍不能 参考:https://www.jianshu.com/p/f86219b1ab98

bireme 的github 地址  https://github.com/HashDataInc/bireme

bireme 的介绍:https://github.com/HashDataInc/bireme/blob/master/README_zh-cn.md

另外Maxwell也是不能 实现MySQL到Kafka的消息上端件,消息格式采用Json:

Download:

https://github.com/zendesk/maxwell/releases/download/v1.22.5/maxwell-1.22.5.tar.gz 

Source:https://github.com/zendesk/maxwell 

datax 是阿里开源的etl 工具,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能,采用java+python进行开发,核心是java语言实现。

github地址:https://github.com/alibaba/DataX    

A、设计架构:

数据交换通过DataX进行中转,任何数据源假如有一天和DataX连接上即不能 和已实现的任意数据源同步

B、框架

 

核心模块介绍:

  1. DataX完成单个数据同步的作业,亲戚亲戚朋友称之为Job,DataX接受到一有5个 多 Job后来,将启动一有5个 多 tcp连接运行运行来完成整个作业同步过程。DataX Job模块是单个作业的中枢管理节点,承担了数据清理、子任务切分(将单一作业计算转化为多个子Task)、TaskGroup管理等功能。
  2. DataXJob启动后,会根据不同的源端切分策略,将Job切分成多个小的Task(子任务),以便于并发执行。Task便是DataX作业的最小单元,每一有5个 多 Task总要负责一要素数据的同步工作。
  3. 切分多个Task后来,DataX Job会调用Scheduler模块,根据配置的并发数据量,将拆分成的Task重新组合,组装成TaskGroup(任务组)。每一有5个 多 TaskGroup负责以一定的并发运行完毕分配好的所有Task,默认单个任务组的并发数量为5。
  4. 每一有5个 多 Task都由TaskGroup负责启动,Task启动后,会固定启动Reader—>Channel—>Writer的tcp连接运行来完成任务同步工作。
  5. DataX作业运行起来后来, Job监控并等待英文多个TaskGroup模块任务完成,等待英文所有TaskGroup任务完成后Job成功退出。假如有一天,异常退出,tcp连接运行运行退出值非0

DataX调度流程:

举例来说,用户提交了一有5个 多 DataX作业,假如有一天配置了20个并发,目的是将一有5个 多 5000张分表的mysql数据同步到odps上端。 DataX的调度决策思路是:

  1. DataXJob根据分库分表切分成了5000个Task。
  2. 根据20个并发,DataX计算共不能 分配有5个 多 TaskGroup。
  3. 有5个 多 TaskGroup平分切分好的5000个Task,每一有5个 多 TaskGroup负责以5个并发共计运行25个Task。

优势:

  • 要素插件总要本人的数据转换策略,放置数据失真;
  • 提供作业全链路的流量以及数据量运行时监控,包括作业有一种状态、数据流量、数据强度、执行进度等。
  • 将会各种由于由于传输报错的脏数据,DataX不能 实现精确的过滤、识别、分类整理、展示,为用户提太多种脏数据出理 模式;
  • 精确的强度控制
  • 健壮的容错机制,包括tcp连接运行内内外部重试、tcp连接运行级别重试;

从插件视角看框架

  • Job:是DataX用来描述从一有5个 多 源头到目的的同步作业,是DataX数据同步的最小业务单元;
  • Task:为最大化而把Job拆分得到最小的执行单元,进行并发执行;
  • TaskGroup:一组Task集合,在同一有5个 多 TaskGroupContainer执行下的Task集合称为TaskGroup;
  • JobContainer:Job执行器,负责Job全局拆分、调度、前置说说和后置说说等工作的工作单元。同类于Yarn中的JobTracker;
  • TaskGroupContainer:TaskGroup执行器,负责执行一组Task的工作单元,同类于Yarn中的TAskTacker。

    总之,Job拆分为Task,分别在框架提供的容器中执行,插件只不能 实现Job和Task两要素逻辑。

    物理执行有有一种运行模式:

  • Standalone:单tcp连接运行运行运行,没办法 内外部依赖;
  • Local:单tcp连接运行运行运行,统计信息,错误信息汇报到集中存储;
  • Distrubuted:分布式多tcp连接运行运行,依赖DataX Service服务;

    总体来说,当JobContainer和TaskGroupContainer运行在同一有5个 多 tcp连接运行运行内的后来假如有一天单机模式,在不同tcp连接运行运行执行假如有一天分布式模式。

将会不能 开发插件,不能 看zhege或多或少插件开发指南:   https://github.com/alibaba/DataX/blob/master/dataxPluginDev.md 

数据源支持状态:

类型数据源Reader(读)Writer(写)文档
RDBMS 关系型数据库 MySQL 读 、写
            Oracle         √         √     读 、写
  SQLServer 读 、写
  PostgreSQL 读 、写
  DRDS 读 、写
  通用RDBMS(支持所有关系型数据库) 读 、写
阿里云数仓数据存储 ODPS 读 、写
  ADS  
  OSS 读 、写
  OCS 读 、写
NoSQL数据存储 OTS 读 、写
  Hbase0.94 读 、写
  Hbase1.1 读 、写
  Phoenix4.x 读 、写
  Phoenix5.x 读 、写
  MongoDB 读 、写
  Hive 读 、写
无形状化数据存储 TxtFile 读 、写
  FTP 读 、写
  HDFS 读 、写
  Elasticsearch  
时间序列数据库 OpenTSDB  
  TSDB  

OGG 一般主要用于Oracle数据库。即Oracle GoldenGate是Oracle的同步工具 ,不能 实现有5个 多 Oracle数据库之间的数据的同步,可是能 实现Oracle数据同步到Kafka,相关的配置操作不能 参考如下:

https://blog.csdn.net/dkl12/article/details/500447154

https://www.jianshu.com/p/446ed2f267fa

http://blog.itpub.net/15412087/viewspace-2154644/

Databus是一有5个 多 实时的、可靠的、支持事务的、保持一致性的数据变更抓取系统。 2011年在LinkedIn正式进入生产系统,2013年开源。

Databus通过挖掘数据库日志的法律法子,将数据库变更实时、可靠的从数据库拉取出来,业务不能 通过定制化client实时获取变更。

Databus的传输层端到端延迟是微秒级的,每台服务器每秒不能 出理 数千次数据吞吐变更事件,共同还支持无限回溯能力和充沛的变更订阅功能。

github:https://github.com/linkedin/databus

databus分类整理:

  • 来源独立:Databus支持多种数据来源的变更抓取,包括Oracle和MySQL。
  • 可扩展、淬硬层 可用:Databus能扩展到支持数千消费者和事务数据来源,共同保持淬硬层 可用性。
  • 事务按序提交:Databus能保持来源数据库中的事务删改性,并按照事务分组和来源的提交顺寻交付变更事件。
  • 低延迟、支持多种订阅机制:数据源变更完成后,Databus能在微秒级内将事务提交给消费者。共同,消费者使用Databus中的服务器端过滤功能,不能 只获取本人不能 的特定数据。
  • 无限回溯:这是Databus最具创新性的组件之一,对消费者支持无限回溯能力。当消费者不能 产生数据的删改拷贝时(比如新的搜索索引),它不不对数据库产生任何额外负担,就不能 达成目的。当消费者的数据大大落后于来源数据库时,可是能 使用该功能。
    • Databus Relay中继的功能主要包括:
    1. 从Databus来源读取变更行,并在内存缓存内将其序列化为Databus变更事件
    2. 监听来自Databus客户端(包括Bootstrap Producer)的请求,并传输新的Databus数据变更事件
    • Databus客户端的功能主要包括:
    1. 检查Relay上新的数据变更事件,并执行特定业务逻辑的回调
    2. 将会落后Relay太多,向Bootstrap Server发起查询
    3. 新Databus客户端会向Bootstrap Server发起bootstrap启动查询,假如有一天切换到向中继发起查询,以完成最新的数据变更事件
    4. 单一客户端不能 出理 整个Databus数据流,将会不能 成为消费者集群的一要素,其中每个消费者只出理 一要素流数据
    • Databus Bootstrap Producer的功能有:
    1. 检查中继上的新数据变更事件
    2. 将变更存储在MySQL数据库中
    3. MySQL数据库供Bootstrap和客户端使用
    • Databus Bootstrap Server的主要功能,监听来自Databus客户端的请求,并返回长期回溯数据变更事件。
    • 更多不能 参考 databus社区wiki主页:https://github.com/linkedin/Databus/wiki
    • Databus和canal的功能对比:

支持的数据库

mysql, oracle

mysql(据说内内外部版本支持oracle)

Databus目前支持的数据源更多

业务开发

业务只不能 实现事件出理 接口

事件出理 外,不能 出理 ack/rollback,

反序列化异常等

Databus开发接口用户友好度更高

服务模型

 relay

relay不能 共同服务多个client

一有5个 多 server instance没办法 服务一有5个 多 client

(受限于server端保存拉取位点)

Databus服务模式更灵活

client

client不能 拉取多个relay的变更,

访问的relay不能 指定拉取或多或少表或多或少分片的变更

client没办法 从一有5个 多 server拉取变更,

假如有一天没办法 是拉取全量的变更

可扩展性

client不能 线性扩展,出理 能力不能线性扩展

(Databus可识别pk,自动做数据分片)

client无法扩展

Databus扩展性更好

可用性

client ha

client支持cluster模式,每个client出理 一要素数据,

某个client挂掉,或多或少client自动接管对应分片数据

主备client模式,主client消费,

将会主client挂掉,备client可自动接管

Databus实时热备方案更心智心智心智成熟图片 是什么期期的句子 的句子

relay/server ha

多个relay可连接到同一有5个 多 数据库,

client不能 配置多个relay,relay故障启动切换

主备relay模式,relay通过zk进行failover

canal主备模式对数据库影响更小

故障对上游

数据库的影响

client故障,bootstrap会继续拉取变更,

client恢复后直接从bootstrap拉取历史变更

client故障会阻塞server拉取变更,

client恢复会由于server瞬时从数据库拉取大量变更

Databus有一种的故障对数据库影响几乎为0

系统状态监控

tcp连接运行通过http接口将运行状态暴露给内外部

暂无

Databustcp连接运行可监控性更好

开发语言

java,核心代码16w,测试代码6w

java,4.2w核心代码,6k测试代码

Databus项目更心智心智心智成熟图片 是什么期期的句子 的句子,当然学习成本也更大

总结:

1、databus活跃度不高,datax和canal 相对比较活跃。

2、datax 一般比较适合于全量数据同步,对全量数据同步强度很高(任务不能 拆分,并发同步,全都强度高),对于增量数据同步支持的不太好(不能 依靠时间戳+定时调度来实现,假如有一天没办法 做到实时,延迟较大)。

3、canal 、databus 等将会是通过日志抓取的法律法子进行同步,全都对增量同步支持的比较好。

4、以上哪此工具都缺少一有5个 多 监控和任务配置调度管理的平台来进行支撑。