所在位置:首页 > 游戏新闻 > flink使用教程(flink视频教程)

flink使用教程(flink视频教程)

发布时间:2023-06-30 17:34:15来源:头条浏览:0

一.简介

FlinkX是基于Flink的分布式离线实时数据同步框架,由袋鼠云开放。它可以收集静态数据,如MYSQL、HDFS等。以及实时变化的数据如MYSQL BINLOG,KAFKA等。

目前,**已经支持各种异构数据源之间的高效数据同步。

二、建筑设计

FlinkX整体架构设计采用框架插件模式。不同的数据源被抽象成不同的阅读器插件和写入器插件,

用户只需要在配置文件中配置相应的插件参数就可以实现数据迁移,用户只需要实现插件扩展的Reader和Writer接口,其他框架都会支持。理论上,FlinkX可以水平伸缩,支持任何类型数据源的同步。

三、支持的数据源FlinkX目前支持十几个数据源:

四、基本特征

4.1脏数据管理异构系统进行大数据迁移时,不可避免会产生脏数据,影响同步任务的执行。写入数据时,FlinkX的Writer插件会将以下类型作为脏数据写入脏数据表:

1.类型转换错误2。空指针3。主键冲突4。其他错误。

4.2流量控制管理

大数据同步有时候在负载高的时候会给系统带来很大的压力。FlinkX使用令牌桶电流限制来限制速度。当源生成数据的速率达到某个阈值时,它将不会读取数据。

4.3断点续传部分插件通过Flink的检查点机制支持任务从失败位置恢复。数据源上断点续传的强制要求:

1.必须包含一个升序字段,如主键或日期类型的字段。在同步过程中,将使用检查点机制记录该字段的值,当任务恢复运行时,该字段将用于构造查询条件以过滤同步的数据。

如果这个字段的值不是升序,那么任务恢复时过滤的数据是错误的,最终会导致数据的丢失或重复。2.数据源必须支持数据筛选。如果没有,任务就不能从断点继续运行,这将导致数据重复。

3.目标数据源必须支持事务,如关系数据库,文件类型的数据源也可以由临时文件支持。

动词(verb的缩写)操作原理

FlinkX的底层实现依赖于Flink,数据同步任务会打包成StreamGraph,在Flink上执行。

不及物动词系统安装

6.1 flink集群安装

wget https://存档。阿帕奇。org/dist/flink/flink-1。10 .3阿帕奇-弗林克-1。10 .3 .焦油。gz。阿帕奇-弗林克-1。10 .3 .焦油。gz。ASC #目录修改flink-conf.yaml配置文件激光唱片./confrest.bind-port: 8081#启动弗林克. bin start-cluster.sh查看集群启动情况在浏览器查看集群启动情况http://本地主机:8081

6.2 安装flinkX

wget https://github。com/dt stack/flink x/archive/1.10 _ release。zip解压1.10 _ release。zipcd flinkx-1.10 _发布号源码编译mvn清洁包装-DskipTests#编译完之后在根目录生成同步插件目录,

七。配置示例

项目工程示例目录提供了各种插件的示例配置。由于我只在本地安装了mysql,所以这里我就用mysql到mysql作为演示。在同步之前,必须确保目标数据库和表已经存在,否则将会报告错误。

7.1源表数据:

7.2 配置文件配置文件mysql_mysql_batch.json:

{ 'job': { 'content': [ { 'reader': { 'name': 'mysqlreader', 'parameter': {'column': [ { 'name': 'id', 'type': 'int' }, { 'name': 'name', 'type': 'string' }, { 'name': 'age', 'type': 'int' }],'username': 'root','password': 'root','connection': [ { 'jdbcUrl': [ 'jdbc:mysql://localhost:3306/rc useSSL=false' ], 'table': [ 'my' ] }] } }, 'writer': { 'name': 'mysqlwriter', 'parameter': {'username': 'root','password': 'test','connection': [ { 'jdbcUrl': 'jdbc:mysql://localhost:3306/rc useSSL=false', 'table': [ 'my_sync' ] }],'writeMode': 'insert','column': [ { 'name': 'id', 'type': 'int' }, { 'name': 'name', 'type': 'string' }, { 'name': 'age', 'type': 'int' }] } } } ], 'setting': { 'speed': { 'channel': 1, 'bytes': 0 } } } }

7.3 local模式启动

bin/flinkx \-mode local \-job bin/mysql_mysql_batch.json \-pluginRoot syncplugins \-flinkconf flinkconf

7.4 standalone模式启动

bin/flinkx \-mode standalone \-job bin/mysql_mysql_batch.json \-pluginRoot syncplugins \-flinkconf /Users/hsw/Documents/project_install/flink/flink-1.12.2/conf

7.5 执行之后结果:

至此数据已经完美同步过来了。