Canal实时解析mysql binlog数据实战

一、说明

通过canal实时监听mysql binlog日志文件的变化,并将数据解析出来

二、环境准备

1、创建maven项目并修改pom.xml配置文件

 <dependencies>         <dependency>             <groupId>com.alibaba.otter</groupId>             <artifactId>canal.client</artifactId>             <version>1.1.4</version>         </dependency>     </dependencies>

Canal实时解析mysql binlog数据实战

 

2、嗦代码

 特别说明:在解析数据时,相当于程序是客户端,客户端在连接canal服务端时是不需要用户名和密码 

import com.alibaba.fastjson.JSONObject; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.protocol.CanalEntry; import com.alibaba.otter.canal.protocol.Message; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException;  import java.net.InetSocketAddress; import java.util.List;  public class CanalClient {     public static void main(String[] args) throws InterruptedException, InvalidProtocolBufferException {      // 获取连接     CanalConnector canalConnector=CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.140.131",11111),             "example","","");      while(true)     {        // 连接         canalConnector.connect();         // 订阅数据库         canalConnector.subscribe("CanalDb.*");         // 获取数据         Message message = canalConnector.get(100);         // 获取Entry集合         List<CanalEntry.Entry> entries=message.getEntries();         // 判断集合是否为空,如果为空,则线程等待2秒再拉取数据         if (entries.size()<=0)         {             System.out.println("当次抓取没有数据,休息一会儿。。。");             Thread.sleep(2000);         }         else         {             // 遍历entries,单条解析             for (CanalEntry.Entry entry:entries)             {                // 1,获取表名                 String tableName=entry.getHeader().getTableName();                 // 2,获取类型                 CanalEntry.EntryType entryType=entry.getEntryType();                 // 3,获取序列化后的数据                 ByteString storeValue=entry.getStoreValue();                 // 4.判断当前entryType类型是否为ROWDATA                 if (CanalEntry.EntryType.ROWDATA.equals(entryType))                 {                     //5.反序列化数据                     CanalEntry.RowChange rowChange=CanalEntry.RowChange.parseFrom(storeValue);                     //6.获取当前事件的操作类型                     CanalEntry.EventType eventType=rowChange.getEventType();                     //7.获取数据集                     List<CanalEntry.RowData> rowDataList=rowChange.getRowDatasList();                     //8.遍历rowDataList并打印数据集                     for(CanalEntry.RowData rowData:rowDataList)                     {                         JSONObject beforData=new JSONObject();                         List<CanalEntry.Column> beforClountList=rowData.getBeforeColumnsList();                         for (CanalEntry.Column column:beforClountList)                         {                             beforData.put(column.getName(),column.getValue());                         }                         JSONObject afterData=new JSONObject();                         List<CanalEntry.Column> afterClountList=rowData.getAfterColumnsList();                         for (CanalEntry.Column column:afterClountList)                         {                             afterData.put(column.getName(),column.getValue());                         }                         // 打印数据                         System.out.println(""+tableName+                                 ",EventType:"+eventType+                                 ",Before:"+beforData+                                 ",After:"+afterData);                     }                  }                 else                 {                     System.out.println("当前操作类型为"+entryType);                 }             }         }     }   } }

三、项目效果

Canal实时解析mysql binlog数据实战

 

发表评论

评论已关闭。

相关文章