开源框架open-replicator原理分析_凌风郎少的博客-CSDN博客_open-replicator


本站和网页 https://blog.csdn.net/lingfenglangshao/article/details/49705001 的作者无关,不对其内容负责。快照谨为网络故障时之索引,不代表被搜索网站的即时页面。

开源框架open-replicator原理分析_凌风郎少的博客-CSDN博客_open-replicator
开源框架open-replicator原理分析
凌风郎少
于 2015-11-07 20:24:54 发布
5162
收藏
分类专栏:
databus
文章标签:
databus
mysql
io流
binlog
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/lingfenglangshao/article/details/49705001
版权
databus
专栏收录该内容
1 篇文章
0 订阅
订阅专栏
最近公司计划用databus升级一下数据处理逻辑,就抽时间看了一下代码,databus使用了open-replicator作为获取数据库变更事件的组件,这里就简单介绍一下这个组件的基本实现原理。
代码分析
open-replicator: 功能: 实时获取mysql的binlog日志,并封装成Event对象,然后调用已注册的listener进行处理。
实现原理: 1.首先数据库需要开始binlog,设置好serverid,binlog名字。具体设置细节可以google。 2.mysql数据库本身就支持client端使用命令mysqlbinlog来读取binlog中的内容,open-replicator也是通过这种方式实现。 先来看下主类 OpenReplicator中的部分属性:
protected int port = 3306;
protected String host;
protected String user;
protected String password;
protected int serverId = 6789;
protected String binlogFileName;
protected long binlogPosition = 4;
在启动该工程的时候需要设置这些属性,相信大家看了都很熟悉,就是关于所要连接数据库的一些信息。 接下来看一下设置好这些属性以后连接数据库的代码,该逻辑在TransportImpl类中:
this.socket = this.socketFactory.create(host, port);
this.os = new TransportOutputStreamImpl(this.socket.getOutputStream());
if(this.level2BufferSize <= 0) {
this.is = new TransportInputStreamImpl(this.socket.getInputStream(), this.level1BufferSize);
} else {
this.is = new TransportInputStreamImpl(new ActiveBufferedInputStream(this.socket.getInputStream(), this.level2BufferSize), this.level1BufferSize);
逻辑很清晰,也比较简单,是通过soket方式进行连接,当成功连接以后server端返回一个数据包,client 端会根据这个数据包的内容进行下一步处理:
final Packet packet = this.is.readPacket();
如果连接成功,则会进行身份的验证:
this.authenticator.login(this);
当成功连接数据库以后,会向mysql发送请求binlog的数据包,看一下代码:
protected void dumpBinlog() throws Exception {
//
final ComBinlogDumpPacket command = new ComBinlogDumpPacket();
command.setBinlogFlag(0);
command.setServerId(this.serverId);
command.setBinlogPosition(this.binlogPosition);
command.setBinlogFileName(StringColumn.valueOf(this.binlogFileName.getBytes(this.encoding)));
this.transport.getOutputStream().writePacket(command);
this.transport.getOutputStream().flush();
//
final Packet packet = this.transport.getInputStream().readPacket();
if(packet.getPacketBody()[0] == ErrorPacket.PACKET_MARKER) {
final ErrorPacket error = ErrorPacket.valueOf(packet);
throw new TransportException(error);
数据包的构成是按照mysql官网上提供的协议格式进行包装的。 server端收到数据包以后,会向client端返回binlog日志信息,open-replicator会针对这些原始的信息 进行封装,默认的解析器如下:
r.registgerEventParser(new StopEventParser());
r.registgerEventParser(new RotateEventParser());
r.registgerEventParser(new IntvarEventParser());
r.registgerEventParser(new XidEventParser());
r.registgerEventParser(new RandEventParser());
r.registgerEventParser(new QueryEventParser());
r.registgerEventParser(new UserVarEventParser());
r.registgerEventParser(new IncidentEventParser());
r.registgerEventParser(new TableMapEventParser());
r.registgerEventParser(new WriteRowsEventParser());
r.registgerEventParser(new UpdateRowsEventParser());
r.registgerEventParser(new DeleteRowsEventParser());
r.registgerEventParser(new WriteRowsEventV2Parser());
r.registgerEventParser(new UpdateRowsEventV2Parser());
r.registgerEventParser(new DeleteRowsEventV2Parser());
r.registgerEventParser(new FormatDescriptionEventParser());
这些解析器分别对应mysql不同的事件类型,open-replicator会根据收到的事件类型选择相应的解析器进行解析。 关于mysql的事件类型文档大家可以参照mysql官网,这里给出地址: http://dev.mysql.com/doc/internals/en/start-event-v3.html 下面看一下open-replicator对于server端返回数据的处理逻辑,这部分代码在ReplicationBasedBinlogParser类中
// Parse the event header
final BinlogEventV4HeaderImpl header = new BinlogEventV4HeaderImpl();
header.setTimestamp(is.readLong(4) * 1000L);
header.setEventType(is.readInt(1));
header.setServerId(is.readLong(4));
header.setEventLength(is.readInt(4));
header.setNextPosition(is.readLong(4));
header.setFlags(is.readInt(2));
header.setTimestampOfReceipt(System.currentTimeMillis());
我们可以看到首先就是对于事件头信息的读取和封装,接下来会根据头信息选择相应的解析器进一步处理: 对事件包装完成以后就是对相应监听器的调用了,看下代码:
public void parse(XInputStream is, BinlogEventV4Header header, BinlogParserContext context)
throws IOException {
final QueryEvent event = new QueryEvent(header);
event.setThreadId(is.readLong(4));
event.setElapsedTime(is.readLong(4));
event.setDatabaseNameLength(is.readInt(1));
event.setErrorCode(is.readInt(2));
event.setStatusVariablesLength(is.readInt(2));
event.setStatusVariables(parseStatusVariables(is.readBytes(event.getStatusVariablesLength())));
event.setDatabaseName(is.readNullTerminatedString());
event.setSql(is.readFixedLengthString(is.available()));
context.getEventListener().onEvents(event);
这里我选了一个解析器的处理逻辑,可以看到前面几步是对相应事件的进一步读取和封装,最后一句代码则是对 相应监听器的调用。 至此整个open-replicator的处理逻辑就走通了,整体来看逻辑比较清晰简单,易于理解。
下面就open-replicator关于输入流的处理做一下分析: open-replicator对于输入流的处理采用了两级缓存的方式,先来看一下初始化代码:
if(this.level2BufferSize <= 0) {
this.is = new TransportInputStreamImpl(this.socket.getInputStream(), this.level1BufferSize);
} else {
this.is = new TransportInputStreamImpl(new ActiveBufferedInputStream(this.socket.getInputStream(), this.level2BufferSize), this.level1BufferSize);
可以看出是通过参数level2BufferSize决定是否开启二级缓存的,我们这里看一下开启以后的处理逻辑, 在ActiveBufferedInputStream初始化以后会启动一个线程读取server端返回的数据,来看一下run方法的实现:
public void run() {
try {
final byte[] buffer = new byte[512 * 1024];
while(!this.closed.get()) {
//
int r = this.is.read(buffer, 0, buffer.length);
if(r < 0) throw new EOFException();
//
int offset = 0;
while(r > 0) {
final int w = write(buffer, offset, r);
r -= w;
offset += w;
} catch(IOException e) {
this.exception = e;
} catch(Exception e) {
LOGGER.error("failed to transfer data", e);
这里的逻辑是通过InputStream读取数据,放在一个byte[]数组中,重点看一下后面的write方法:
public int write(byte b[], int off, int len) throws IOException {
this.lock.lock();
try {
//
while (this.ringBuffer.isFull()) {
this.bufferNotFull.awaitUninterruptibly();
if(this.closed.get()) throw new EOFException();
//
final int w = this.ringBuffer.write(b, off, len);
this.bufferNotEmpty.signal();
return w;
} finally {
this.lock.unlock();
这里就是把数据保存起来的逻辑,抓紧来对比一下读取数据的逻辑:
public int read(byte b[], int off, int len) throws IOException {
this.lock.lock();
try {
//
while (this.ringBuffer.isEmpty()) {
if(this.exception != null) throw this.exception;
this.bufferNotEmpty.awaitUninterruptibly();
if(this.closed.get()) throw new EOFException();
//
final int r = this.ringBuffer.read(b, off, len);
this.bufferNotFull.signal();
return r;
} finally {
this.lock.unlock();
对比之下就会发现这里使用了一个生产者消费者模型来实现,看完了二级缓存的实现方式,我们再看一下 以及缓存的调用方式:
public int read() throws IOException {
if(this.readLimit > 0 && (this.readCount + 1) > this.readLimit) {
throw new ExceedLimitException();
} else {
if(this.head >= this.tail) doFill();
final int r = this.buffer[this.head++] & 0xFF;
++this.readCount;
return r;
private void doFill() throws IOException {
this.head = 0;
this.tail = this.is.read(this.buffer, 0, this.buffer.length);
if(this.tail <= 0) throw new EOFException();
上面连续列出了两个方法,主要是为了让大家清晰的看到读取过程中二级缓存的处理,这里强调一点这个输入流is的 类型是ActiveBufferedInputStream,大家看代码的时候需要对应上。 至此就看到了open-replicator中二级缓存的实现方式。
设计探讨
下面以我低微的水平揣测一下这里采用二级缓存的原因,不对之处欢迎大家留言, 我个人感觉这样做主要是为了提升数据的处理效率,我们假设这里只用一个byte[],这样的话当往里面装数据时候 读取数据的线程就需要等待,也就是说写和读两个线程同一时间必须有一个需要等待,而采用二级缓存以后,两个 线程是可以同时处理的,因为各自有一个缓存区即byte[],所以互不影响,只有需要在两个byte[]数组之间拷贝数据的 时候才需要阻塞。
个人随笔
最后再来一条分割线,说一下自己写博客的感触,虽然是第一次写,水平也很有限,但依然希望可以自成一家,既然要写,就要努力写出自己的东西,自己的理解,我一直认为给别人讲不明白问题的唯一原因就是自己也不是很清楚问题,所以大家如果对我的文章又不理解或者有错误地方还请不吝赐教,多多留言,我会尽量回复大家。
在后面的文章中,我也会尽量针对某些地方提出自己对它这么设计的原因,和上面关于二级缓存的探讨一样,希望可以和大家交流,共同进步。
凌风郎少
关注
关注
点赞
收藏
打赏
评论
开源框架open-replicator原理分析
databus组件open-replicator的实现原理分析,包括IO流的实现分析。
复制链接
扫一扫
专栏目录
OpenLogReplicator:完全用C ++编写的开源Oracle数据库CDC。直接从数据库重做日志文件和JSON或Protobuf格式的流中读取事务
03-22
OpenLogReplicator
完全用C ++编写的开源Oracle数据库CDC。直接从数据库重做日志文件和JSON或Protobuf格式的流中读取事务到:
卡夫卡
平面文件
网络流(计划TCP / IP或ZeroMQ)
请注意,该代码有2个分支:
主-具有稳定代码的分支-每月更新
每晚-当前分支不稳定,每天更新代码
更新Protobuf代码:
光盘原型
导出PATH = / opt / protobuf / bin:$ PATH
协议OraProtoBuf.proto --cpp_out =。
mv OraProtoBuf.pb.cc ../src/OraProtoBuf.pb.cpp
mv OraProtoBuf.pb.h ../src/OraProtoBuf.pb.h
调试编译:
git clone
cd OpenLogReplicator
autoreconf -f
采用OpenReplicator解析MySQL binlog
朱小厮的博客
11-07
1万+
Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景...
评论 4
您还未登录,请先
登录
后发表或查看评论
opentrace在mysql中使用_采用OpenReplicator解析MySQL binlog
weixin_28704675的博客
01-19
56
Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通知应用。Open Replicator可以被应用到MySQL数据变化的实时推送,多Master到单Slave的数据同步等多种应用场景...
OpenReplicator解析binlog异常java.io.EOFException: null
荣耀之路
09-13
473
OpenReplicator解析binlog异常java.io.EOFException: null
[笔记]CDC(Change Data Capture) 数据变化捕获,实现原理 方案
最新发布
:: Dotnet Fantasy ::
10-29
807
整理了下对CDC的理解,以及基于Oracle的实现方式。备案如下
CDC(Change Data Capture) 数据变化捕获
实现方式
原理
缺点
优点
典型方案
时间戳/版本号/修改状态列
根据更新时间戳列,识别数据变化。
一般是非实时。
1) 实时性
2) 不能识别多次更新
3) 不能记录删除操作
4) 要改程序
快照
通过比较源表和快照表来获得数据变化。
1)
Open Replicator
weixin_34360651的博客
03-02
126
Open Replicator ( http://code.google.com/p/open-replicator/ ) 开源了。Open Replicator是一个用Java编写的MySQL binlog分析程序。Open Replicator 首先连接到MySQL(就像一个普通的MySQL Slave一样),然后接收和分析binlog,最终将分析得出的binlog events以回调的方式通...
23 open-replicator 解析binlog失败 available: 4, event type: 19
970655147的专栏
07-10
1183
问题出现
使用 open-replicator 来解析 binLog 的时候出现了这个问题, 这个包 似乎是14年之后 就没有杂更新了
open-replicator 的版本是现在的最新的版本1.0.7, 详情请见 参考的ref
15:55:33.435 [binlog-parser-1] ERROR c.g.c.o.b.impl.AbstractBinlogParser - faile
Zookeeper使用
mary0712的博客
03-03
242
一、准备条件:
系统中安装java rpm -ivh jdk-7u45-linux-i586.rpm
如果没有java则需要安装1.6版本以上的JDK
如果安装JDK,需要在/etc/profile中增加:
JAVA_HOME=/usr/java/jdk1.7.0_45
CLASSPATH=.:$JAVA_HOME/lib/tools.jar
PATH =$JAVA_HOME/bin:$PATH
export JAVA_HOME CLASSPATH PATH
如果防火墙未关闭,则需要关闭防火.
点击事件及按钮事件。
weixin_45617334的博客
09-14
2249
事件源
事件名
事件注册
事件处理
点击事件
(1)单击事件
在这个案例中,事件源是id为“p1”的元素,事件名是单击事件注册οnclick=“fun()”,
也就是说,当单击id为“p1”的元素时,交给fun函数来处理。
(2)双击事件
(3)鼠标按下/抬起。(on mouse down / on mouse up)
(4)鼠标移入移出(on mouse enter / on mouse leave)
(5)事件综合
<!DOCTYPE html>
<html>
OpenStack快速入门
热门推荐
Jack Zhou的专栏
10-13
2万+
第一部分:OpenStack及其构成简介
一、云计算
云计算是一种计算模型,它将诸如运算能力、存储、网络和软件等资源抽象成为服务,以便让用户通过互联网远程享用,付费的形式也如同传统公共服务设施一样。因需而定、提供方便、动态改变和无限的虚拟化扩展能力是云计算的几个重要特征。
不同的“云”对应着不同的基础设施。下面是三种广义的“云”:
(1)基础设施即服务(
Openstack架构-个人理解
昆仑山论剑的博客
01-30
742
各位小伙伴们,我们今天给大家分享的是Openstack 架构,非常感谢小伙伴在技术沙龙上的讲解,令在场参加的各位受益匪浅,但独乐乐,不如众乐乐,故将此经验共享出来,愿大家都能从中收益!也欢迎大家踊跃报名参加哟!!!
...
OpenStack基础认识
一峰的技术博客
10-11
779
OpenStack
既是一个社区,也是一个项目和一个开源软件,提供了一个部署云的操作平台或工具集。用OpenStack易于构建虚拟计算或存储服务的云,既可以为公有云、私有云,也可以为大云、小云提供可扩展、灵活的云计算。
OpenStack是什么
OpenStack是一个管理计算、存储和网络资源的数据中心云计算开放平台,通过一个仪表板,为管理员提供了所有的管理控制,同时通过Web界面为其用户提供资源。2010年成立,基本上每六个月发布一个新版本。
基础架构云开源项目
作为基础设施即服务(IaaS)资源的通用前
MySQL binlog分析程序:Open Replicator
menergy
12-26
8381
第0章:简介
(1)下面是http://code.google.com中的binlog事件分析结构图:
(2)获取开源包的maven坐标
com.google.code
open-replicator
1.0.5
open replicator
术业有专攻
12-22
107
http://blog.csdn.net/menergy/article/details/17583823
[技巧.DotNet]位移运算符在枚举使用Flags关键字时的运用
:: Dotnet Fantasy ::
08-20
163
Flags关键字允许我们在使用.net 枚举变量时,使用多个组合值。
几种写法里,用位移运算符<<的是不是有点意思呢 :)
[Flags]
enum XXFlags
None = 0,
A = 1,
B = 2,
C = 4,
D = 8
[Flags]
public enum XXFlags
None = 0,
...
基于MySQL binlog分析的开源项目收录
sheungxin的博客
02-04
178
[b]canal[/b]:阿里巴巴mysql数据库binlog的增量订阅&消费组件,开源地址:[url]https://github.com/alibaba/canal[/url],论坛讨论:[url]http://www.iteye.com/topic/1129002[/url]
[b]Open Replicator[/b]:Open Replicator是一个用Java编写的MySQL ...
“相关推荐”对你有帮助么?
非常没帮助
没帮助
一般
有帮助
非常有帮助
提交
©️2022 CSDN
皮肤主题:大白
设计师:CSDN官方博客
返回首页
凌风郎少
CSDN认证博客专家
CSDN认证企业博客
码龄7年
暂无认证
14
原创
104万+
周排名
93万+
总排名
1万+
访问
等级
311
积分
粉丝
获赞
评论
收藏
私信
关注
热门文章
Log4j各级别日志重复打印的问题
7740
开源框架open-replicator原理分析
5162
也谈JVM垃圾回收
548
深入浅出AQS之共享锁模式
533
深入浅出AQS之条件队列
401
分类专栏
java
7篇
个人随想
1篇
databus
1篇
git
1篇
redis
4篇
最新评论
开源框架open-replicator原理分析
dongyuxin_:
5.7.X的mysql开启checksum后,该如何处理?
开源框架open-replicator原理分析
psychem:
你好,请问每做一个操作,都报这个错误是怎么回事呀?
can not find table name from sql:begin
深入浅出AQS之共享锁模式
bladeandmaster88:
共享锁每次只唤醒一个后继节点吧
深入浅出AQS之条件队列
sixabs:
我也觉得park和unpark是挂起和恢复,为什么很多博文说阻塞呢?挂起和阻塞概念还是要区分一下的
开源框架open-replicator原理分析
H.King:
有 项目地址么?
您愿意向朋友推荐“博客详情页”吗?
强烈不推荐
不推荐
一般般
推荐
强烈推荐
提交
最新文章
redis源码分析之有序集SortedSet
redis源码分析之事务Transaction(下)
redis源码分析之事务Transaction(上)
2017年10篇
2016年1篇
2015年3篇
目录
目录
分类专栏
java
7篇
个人随想
1篇
databus
1篇
git
1篇
redis
4篇
目录
评论 4
被折叠的 条评论
为什么被折叠?
到【灌水乐园】发言
查看更多评论
打赏作者
凌风郎少
你的鼓励将是我创作的最大动力
¥2
¥4
¥6
¥10
¥20
输入1-500的整数
余额支付
(余额:-- )
扫码支付
扫码支付:¥2
获取中
扫码支付
您的余额不足,请更换扫码支付或充值
打赏作者
实付元
使用余额支付
点击重新获取
扫码支付
钱包余额
抵扣说明:
1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。 2.余额无法直接购买下载,可以购买VIP、C币套餐、付费专栏及课程。
余额充值