在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:flink-jobs开源软件地址:https://gitee.com/tenmg/flink-jobs开源软件介绍:flink-jobs介绍flink-jobs为基于Flink的Java应用程序提供快速集成的能力,可通过继承FlinkJobsRunner快速构建基于Java的Flink流批一体应用程序,实现异构数据库实时同步和ETL。flink-jobs提供了数据源管理模块,通过flink-jobs运行Flink SQL会变得极其简单。使用flink-jobs-clients可以实现基于Java API启动flink-jobs应用程序,还可以将flink任务实现通过XML配置文件来管理。一个典型的flink-jobs部署架构如下: 当然,如果您选择使用Flink CDC,那么以上的Debezium和Kafka就不需要了。总体而言,flink-jobs是一个集成开发框架,它能够帮助用户更好地使用Flink及Flink的周边生态(包括但不限于Flink CDC、FlinkX),尤其是Flink SQL和Flink CDC。 Flink版本flink-jobs对Flink特定版本依赖较弱,已知在1.13+环境下运行良好,用户可根据需要自行选择Flink的发行版本。 起步以Maven项目为例
<!-- https://mvnrepository.com/artifact/cn.tenmg/flink-jobs --><dependency> <groupId>cn.tenmg</groupId> <artifactId>flink-jobs</artifactId> <version>${flink-jobs.version}</version></dependency>
#Flink Table API配置#空值处理配置table.exec.sink.not-null-enforcer=drop#flink-jobs数据同步类型转换配置(将BIGINT表示的时间减去8小时得到北京时间,并转为TIMESTAMP)data.sync.columns.convert=BIGINT,TIMESTAMP:TO_TIMESTAMP(FROM_UNIXTIME(#columnName/1000 - 8*60*60, 'yyyy-MM-dd HH:mm:ss'))#FlinkSQL数据源配置#配置名称为kafka的数据源datasource.kafka.connector=kafkadatasource.kafka.properties.bootstrap.servers=192.168.100.24:9092,192.168.100.25:9092,192.168.100.26:9092datasource.kafka.properties.group.id=flink-jobsdatasource.kafka.scan.startup.mode=earliest-offsetdatasource.kafka.format=debezium-jsondatasource.kafka.debezium-json.schema-include=false#配置名称为bidb的数据源datasource.bidb.connector=jdbcdatasource.bidb.driver=com.mysql.jdbc.Driverdatasource.bidb.url=jdbc:mysql://192.168.100.66:3306/bidb?useSSL=false&serverTimezone=GMT+8&zeroDateTimeBehavior=convertToNulldatasource.bidb.username=your_namedatasource.bidb.password=your_passworddatasource.starrocks.jdbc-url=jdbc:mysql://192.168.10.140:9030datasource.starrocks.load-url=192.168.10.140:8030datasource.starrocks.connector=starrocksdatasource.starrocks.username=your_namedatasource.starrocks.password=your_passworddatasource.starrocks.database-name=your_dbdatasource.starrocks.sink.properties.column_separator=\\x01datasource.starrocks.sink.properties.row_delimiter=\\x02# the flushing time interval, range: [1000ms, 3600000ms].datasource.starrocks.sink.buffer-flush.interval-ms=10000# max retry times of the stream load request, range: [0, 10].datasource.starrocks.sink.max-retries=3
public class App { /** * 服务基础包名 */ private static final String basePackage = "cn.tenmg.flink.jobs.quickstart.service"; public static void main(String... args) throws Exception { FlinkJobsRunner runner = new FlinkJobsRunner() { @SuppressWarnings("unchecked") @Override protected StreamService getStreamService(String serviceName) {// 根据类名获取流服务实例 StreamService streamService = null; try { Class<StreamService> streamServiceClass = (Class<StreamService>) Class .forName(basePackage + "." + serviceName); streamService = streamServiceClass.newInstance(); } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) { e.printStackTrace(); } return streamService; } }; runner.run(args); }}
public class HelloWorldService implements StreamService { /** * */ private static final long serialVersionUID = -6651233695630282701L; @Override public void run(StreamExecutionEnvironment env, Arguments arguments) throws Exception { Properties kafkaProperties = new Properties(); kafkaProperties.put("bootstrap.servers", FlinkJobsContext.getProperty("datasource.kafka.properties.bootstrap.servers"));// 直接使用配置文件的配置 kafkaProperties.put("auto.offset.reset", "latest"); kafkaProperties.put("group.id", "flink-jobs"); DataStream<String> stream; if (RuntimeExecutionMode.STREAMING.equals(arguments.getRuntimeMode())) { stream = env.addSource(new FlinkKafkaConsumer<String>(Arrays.asList("topic1","topic2"), new SimpleStringSchema(), kafkaProperties)); } else { stream = env.fromElements("Hello, World!"); } stream.print(); }}
快速入门详见https://gitee.com/tenmg/flink-jobs-quickstart 运行参数(arguments)flink-jobs应用程序的运行参数通过JSON格式的字符串(注意,如果是命令行运行,JSON格式字符串前后需加上双引号或单引号,JSON格式字符串内部的双引号或单引号则需要转义)或者一个.json文件提供,结构如下: { "serviceName": "specifyName", "runtimeMode": "BATCH"/"STREAMING"/"AUTOMATIC", "params": { "key1": "value1", "key2": "value2", … }, "operates": [{ "script": "specifySQL", "type": "ExecuteSql" }, { "dataSource": "kafka", "script": "specifySQL", "type": "ExecuteSql" }, { "saveAs": "specifyTemporaryTableName", "catalog": "specifyCatalog", "script": "specifySQL", "type": "SqlQuery" }, … ]}
Bsh操作Bsh操作的作用是运行基于Beanshell的java代码,支持版本:1.1.0+,相关属性及说明如下:
Var
ExecuteSql操作ExecuteSql操作的作用是运行基于DSL的SQL代码,支持版本:1.1.0+,相关属性及说明如下:
SqlQuery操作SqlQuery操作的作用是运行基于DSL的SQL查询代码,支持版本:1.1.0+,相关属性及说明如下:
Jdbc操作Jdbc操作的作用是运行基于DSL的JDBC SQL代码,支持版本:1.1.1+,相关属性及说明如下:
目标JDBC SQL代码是在flink-jobs应用程序的main函数中运行的。 DataSync操作DataSync操作的作用是运行基于Flink SQL的流式任务实现数据同步,其原理是根据配置信息自动生成并执行Flink SQL。支持版本:1.1.2+,相关属性及说明如下:
Column
相关配置可以增加数据同步的相关配置,详见配置文件的数据同步配置。 配置文件默认的配置文件为flink-jobs.properties(注意:需在classpath下),可通过flink-jobs-context-loader.properties配置文件的 数据源配置每个数据源有一个唯一的命名,数据源配置以“datasource”为前缀,以“.”作为分隔符,格式为 #FlinkSQL数据源配置#Debezium#配置名称为kafka的数据源datasource.kafka.connector=kafkadatasource.kafka.properties.bootstrap.servers=192.168.1.101:9092,192.168.1.102:9092,192.168.1.103:9092datasource.kafka.properties.group.id=flink-jobsdatasource.kafka.scan.startup.mode=earliest-offsetdatasource.kafka.format=debezium-jsondatasource.kafka.debezium-json.schema-include=true#PostgreSQL#配置名称为bidb的数据源datasource.bidb.connector=jdbcdatasource.bidb.driver=org.postgresql.Driverdatasource.bidb.url=jdbc:postgresql://192.168.1.104:5432/bidbdatasource.bidb.username=your_namedatasource.bidb.password=your_password#引用配置文件内的另一个配置#配置名称为syndb的数据源datasource.syndb.connector=${datasource.bidb.connector}datasource.syndb.driver=${datasource.bidb.driver}datasource.syndb.url=${datasource.bidb.url}?currentSchema=syndbdatasource.syndb.username=${datasource.bidb.username}datasource.syndb.password=${datasource.bidb.password}#MySQL#配置名称为kaorder的数据源datasource.kaorder.connector=jdbcdatasource.kaorder.driver=com.mysql.cj.jdbc.Driverdatasource.kaorder.url=jdbc:mysql://192.168.1.105:3306/kaorder?useSSL=false&serverTimezone=Asia/Shanghaidatasource.kaorder.username=your_namedatasource.kaorder.password=your_password#SQLServer#配置名称为sqltool的数据源datasource.sqltool.connector=jdbcdatasource.sqltool.driver=org.postgresql.Driverdatasource.sqltool.url=jdbc:sqlserver://192.168.1.106:1433;DatabaseName=sqltool;datasource.sqltool.username=your_namedatasource.sqltool.password=your_password#Hive#配置名称为hivedb的数据源datasource.hivedb.type=hivedatasource.hivedb.default-database=defaultdatasource.hivedb.hive-conf-dir=/etc/hive/conf#StarRocks#配置名称为starrocks的数据源datasource.starrocks.jdbc-url=jdbc:mysql://192.168.10.140:9030datasource.starrocks.load-url=192.168.10.140:8030datasource.starrocks.username=your_namedatasource.starrocks.password=your_passworddatasource.starrocks.sink.properties.column_separator=\\x01datasource.starrocks.sink.properties.row_delimiter=\\x02# the flushing time interval, range: [1000ms, 3600000ms].datasource.starrocks.sink.buffer-flush.interval-ms=10000# max retry times of the stream load request, range: [0, 10].datasource.starrocks.sink.max-retries=3datasource.starrocks.connector=starrocksdatasource.starrocks.database-name=your_db Table API & SQLFlink的Table API & SQL配置除了在Flink配置文件中指定之外,也可以在flink-jobs的配置文件中指定。例如:
注意:如果是在flink-jobs的配置文件中配置这些参数,当执行自定义Java服务时,只有通过 数据同步配置data.sync.smart是否开启数据同步的智能模式,默认为 data.sync.from_table_prefix源表(Source Table)表名的前缀,默认为 data.sync.group_id_prefix数据同步时消费消息队列(Kafka)的 data.sync.metadata.getter.*用户可以根据需要实现 data.sync.metadata.getter.jdbc=cn.tenmg.flink.jobs.operator.data.sync.getter.JDBCMetaDataGetterdata.sync.metadata.getter.starrocks=cn.tenmg.flink.jobs.operator.data.sync.getter.StarrocksMetaDataGetter data.sync.columns.convert1.1.3版本开始支持 示例1: data.sync.columns.convert=BIGINT,TIMESTAMP:TO_TIMESTAMP(FROM_UNIXTIME(#columnName/1000 - 8*60*60, 'yyyy-MM-dd HH:mm:ss')) 上述配置旨在将
示例2: data.sync.columns.convert=BIGINT,TIMESTAMP:TO_TIMESTAMP(FROM_UNIXTIME(#columnName/1000 - 8*60*60, 'yyyy-MM-dd HH:mm:ss'));INT,DATE:TO_TIMESTAMP(FROM_UNIXTIME(#columnName/1000 - 8*60*60, 'yyyy-MM-dd HH:mm:ss')) data.sync.timestamp.case_sensitive1.1.4版本开始支持 data.sync.timestamp.from_type1.1.4版本开始支持 data.sync.timestamp.to_type1.1.4版本开始支持 data.sync.*.from_type1.1.4版本开始支持 data.sync.*.to_type1.1.4版本开始支持 data.sync.*.strategy1.1.4版本开始支持 data.sync.*.script1.1.4版本开始支持 类型映射可以增加数据同步的类型映射配置,详见类型映射配置。 配置示例以下是一个使用Debezium实现数据同步的典型数据同步配置示例,不仅完成了时间格式和时区的转换,还完成了时间戳的自动写入(智能模式下,时间戳是否写入取决于目标表中对应列是否存在): #数据同步类型转换配置data.sync.columns.convert=BIGINT,TIMESTAMP:TO_TIMESTAMP(FROM_UNIXTIME(#columnName/1000 - 8*60*60, 'yyyy-MM-dd HH:mm:ss'));INT,DATE:TO_TIMESTAMP(FROM_UNIXTIME(#columnName/1000 - 8*60*60, 'yyyy-MM-dd HH:mm:ss'))#数据同步自动添加时间戳列data.sync.timestamp.columns=INGESTION_TIMESTAMP,EVENT_TIMESTAMP,ETL_TIMESTAMP#数据同步自动添加EVENT_TIMESTAMP时间戳列的类型配置data.sync.EVENT_TIMESTAMP.from_type=TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUALdata.sync.EVENT_TIMESTAMP.to_type=TIMESTAMP(3)data.sync.EVENT_TIMESTAMP.to_type=TIMESTAMP(3)#ETL_TIMESTAMP列取当前时间戳,策略设置为to,仅创建目标列而不创建来源列data.sync.ETL_TIMESTAMP.strategy=todata.sync.ETL_TIMESTAMP.script=NOW()#INGESTION_TIMESTAMP列类型使用默认配置,这里无需指定 类型映射配置类型映射配置用于配置JDBC数据类型到Flink SQL数据类型的映射关系,尽管flink-jobs的默认配置可以使得Flink SQL对所有Flink SQL支持的JDBC的数据库能够正常运行。但是,我们依然留了用户自定义配置的余地,甚至可以针对不同类型的目标数据库配置不同的映射关系。 flink.sql.type.default默认类型,默认值为 flink.sql.type.with_precision含精度的Flink SQL数据类型,使用大写表示,多个类型使用“,”分隔,默认值为 flink.sql.type.with_size含长度的Flink SQL数据类型,使用大写表示,多个类型使用“,”分隔,默认值为 flink.sql.type.*.size_offset某一含长度的Flink SQL数据类型的长度偏移量,用于将JDBC获取到的 # Size offset for Convert JDBC type to Flink SQL type TIMEflink.sql.type.TIME.size_offset=9# Size offset for Convert JDBC type to Flink SQL type TIMESTAMPflink.sql.type.TIMESTAMP.size_offset=20 flink.sql.type..某一类型的JDBC目标数据库的JDBC数据类型到Flink SQL数据类型的映射关系配置。其中第一个表示某一JDBC目标数据库的类型,第二个表示某一JDBC数据类型,配置的值是对应的Flink SQL数据类型。默认值为: # Starrocks JDBC type java.sql.Types.OTHER to Flink SQL type DECIMALflink.sql.type.starrocks.java.sql.Types.OTHER=DECIMAL java.sql.Types.*某一JDBC数据类型到Flink SQL数据类型的映射关系配置。默认值为: # Specific JDBC type to Flink SQL type configurationjava.sql.Types.VARCHAR=STRINGjava.sql.Types.CHAR=STRINGjava.sql.Types.NVARCHAR=STRINGjava.sql.Types.NCHAR=STRINGjava.sql.Types.LONGNVARCHAR=STRINGjava.sql.Types.LONGVARCHAR=STRINGjava.sql.Types.BIGINT=BIGINTjava.sql.Types.BOOLEAN=BOOLEANjava.sql.Types.BIT(1)=BOOLEANjava.sql.Types.BIT=TINYINTjava.sql.Types.DECIMAL=DECIMALjava.sql.Types.DOUBLE=DOUBLEjava.sql.Types.FLOAT=FLOATjava.sql.Types.REAL=FLOATjava.sql.Types.INTEGER=INTjava.sql.Types.NUMERIC=NUMERICjava.sql.Types.SMALLINT=SMALLINTjava.sql.Types.TINYINT=TINYINTjava.sql.Types.DATE=DATEjava.sql.Types.TIME=TIMEjava.sql.Types.TIME_WITH_TIMEZONE=TIMEjava.sql.Types.TIMESTAMP=TIMESTAMPjava.sql.Types.TIMESTAMP_WITH_TIMEZONE=TIMESTAMPjava.sql.Types.BINARY=BYTESjava.sql.Types.LONGVARBINARY=BYTESjava.sql.Types.VARBINARY=BYTESjava.sql.Types.REF=REFjava.sql.Types.DATALINK=DATALINKjava.sql.Types.ARRAY=ARRAYjava.sql.Types.BLOB=BLOBjava.sql.Types.CLOB=CLOBjava.sql.Types.NCLOB=CLOBjava.sql.Types.STRUCT=STRUCT 完整的类型映射配置默认值为: # JDBC types to Flink SQL types configuration# Default Flink SQL type when unexpectedflink.sql.type.default=STRING# Flink SQL types with precisionflink.sql.type.with_precision=DECIMAL,NUMERIC# Flink SQL types with sizeflink.sql.type.with_size=TIME,TIMESTAMP# Size offset for Convert JDBC type to Flink SQL type TIMEflink.sql.type.TIME.size_offset=9# Size offset for Convert JDBC type to Flink SQL type TIMESTAMPflink.sql.type.TIMESTAMP.size_offset=20# Starrocks JDBC type java.sql.Types.OTHER to Flink SQL type DECIMALflink.sql.type.starrocks.java.sql.Types.OTHER=DECIMAL# Specific JDBC type to Flink SQL type configurationjava.sql.Types.VARCHAR=STRINGjava.sql.Types.CHAR=STRINGjava.sql.Types.NVARCHAR=STRINGjava.sql.Types.NCHAR=STRINGjava.sql.Types.LONGNVARCHAR=STRINGjava.sql.Types.LONGVARCHAR=STRINGjava.sql.Types.BIGINT=BIGINTjava.sql.Types.BOOLEAN=BOOLEANjava.sql.Types.BIT(1)=BOOLEANjava.sql.Types.BIT=TINYINTjava.sql.Types.DECIMAL=DECIMALjava.sql.Types.DOUBLE=DOUBLEjava.sql.Types.FLOAT=FLOATjava.sql.Types.REAL=FLOATjava.sql.Types.INTEGER=INTjava.sql.Types.NUMERIC=NUMERICjava.sql.Types.SMALLINT=SMALLINTjava.sql.Types.TINYINT=TINYINTjava.sql.Types.DATE=DATEjava.sql.Types.TIME=TIMEjava.sql.Types.TIME_WITH_TIMEZONE=TIMEjava.sql.Types.TIMESTAMP=TIMESTAMPjava.sql.Types.TIMESTAMP_WITH_TIMEZONE=TIMESTAMPjava.sql.Types.BINARY=BYTESjava.sql.Types.LONGVARBINARY=BYTESjava.sql.Types.VARBINARY=BYTESjava.sql.Types.REF=REFjava.sql.Types.DATALINK=DATALINKjava.sql.Types.ARRAY=ARRAYjava.sql.Types.BLOB=BLOBjava.sql.Types.CLOB=CLOBjava.sql.Types.NCLOB=CLOBjava.sql.Types.STRUCT=STRUCT 系统集成flink-jobs-clients实现了使用XML配置文件来管理flink-jobs任务,这样开发Flink SQL任务会显得非常简单;同时,用户自定义的flink-jobs服务也可以被更轻松得集成到其他系统中。XML文件具有良好的可读性,并且在IDE环境下能够对配置进行自动提示。具体使用方法详见flink-jobs-clients开发文档,以下介绍几种通过XML管理的flink-jobs任务: 运行自定义服务以下为一个自定义服务任务XML配置文件: <?xml version="1.0" encoding="UTF-8"?><flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd" jar="/yourPath/yourJar.jar" serviceName="yourServiceName"></flink-jobs> 运行批处理SQL以下为一个简单订单量统计SQL批处理任务XML配置文件: <?xml version="1.0" encoding="UTF-8"?><flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd" jar="/yourPath/yourJar.jar"> <!--任务运行参数 --> <params> <param name="beginDate">2021-01-01</param> <param name="endDate">2021-07-01</param> </params> <!-- 使用名为hivedb的数据源创建名为hive的catalog --> <execute-sql dataSource="hivedb"> <![CDATA[ create catalog hive ]]> </execute-sql> <!--加载hive模块 --> <execute-sql> <![CDATA[ load module hive ]]> </execute-sql> <!--使用hive,core模块 --> <execute-sql> <![CDATA[ use modules hive,core ]]> </execute-sql> <!-- 使用名为pgdb的数据源创建表order_stats_daily(如果源表名和建表语句指定的表名不一致,可以通过 WITH ('table-name' = 'actrual_table_name') 来指定) --> <execute-sql dataSource="pgdb"> <![CDATA[ CREATE TABLE order_stats_daily ( stats_date DATE, `count` BIGINT, PRIMARY KEY (stats_date) NOT ENFORCED ) WITH ('sink.buffer-flush.max-rows' = '0') ]]> </execute-sql> <!-- 使用hive catalog查询,并将结果存为临时表tmp,tmp放在默认的default_catalog中 --> <sql-query saveAs="tmp" catalog="hive"> <![CDATA[ select cast(to_date(o.business_date) as date) stats_date, count(*) `count` from odc_order_info_par o where o.business_date >= :beginDate and o.business_date < :endDate group by cast(to_date(o.business_date) as date) ]]> </sql-query> <!-- 删除原有数据order_stats_daily(FLINK SQL不支持DELETE,此处执行的是JDBC)--> <execute-sql dataSource="pgdb"> <![CDATA[ delete from order_stats_daily where stats_date >= :beginDate and stats_date < :endDate ]]> </execute-sql> <!-- 数据插入。实际上Flink最终将执行Upsert语法 --> <execute-sql> <![CDATA[ INSERT INTO order_stats_daily(stats_date,`count`) SELECT stats_date, `count` FROM tmp ]]> </execute-sql></flink-jobs> 运行流处理SQL以下为通过Debezium实现异构数据库同步任务XML配置文件: <?xml version="1.0" encoding="UTF-8"?><flink-jobs xmlns="http://www.10mg.cn/schema/flink-jobs" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.10mg.cn/schema/flink-jobs http://www.10mg.cn/schema/flink-jobs.xsd"> <!-- Flink内创建SOURCE数据库 --> <!-- <execute-sql> <![CDATA[ CREATE DATABASE SOURCE ]]> </execute-sql> --> <!-- 使用SOURCE数据库执行Flink SQL --> <!-- <execute-sql> <![CDATA[ USE SOURCE ]]> </execute-sql> --> <!-- 上述两步操作是非必须的,只是为了Flink自动生成的作业名称更容易识别 --> <!-- 定义名为kafka的数据源的订单明细表 --> <execute-sql dataSource="kafka"> <![CDATA[ CREATE TABLE KAFKA_ORDER_DETAIL ( DETAIL_ID STRING, ORDER_ID STRING, ITEM_ID STRING, ITEM_CODE STRING, ITEM_NAME STRING, ITEM_TYPE STRING, ITEM_SPEC STRING, ITEM_UNIT STRING, ITEM_PRICE DECIMAL(12, 2), ITEM_QUANTITY DECIMAL(12, 2), SALE_PRICE DECIMAL(12, 2), SALE_AMOUNT DECIMAL(12, 2), SALE_DISCOUNT DECIMAL(12, 2), SALE_MODE STRING, CURRENCY STRING, SUPPLY_TYPE STRING, SUPPLY_CODE STRING, REMARKS STRING, CREATE_BY STRING, CREATE_TIME BIGINT, UPDATE_BY STRING, UPDATE_TIME BIGINT, OIL_GUN STRING, EVENT_TIME TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, PRIMARY KEY (DETAIL_ID) NOT ENFORCED ) WITH ('topic' = 'kaorder1.kaorder.order_detail', 'properties.group.id' = 'flink-jobs_source_order_detail') ]]> </execute-sql> <!-- 定义名为source的数据源的订单明细表 --> <execute-sql dataSource="source"> <![CDATA[ CREATE TABLE ORDER_DETAIL ( DETAIL_ID STRING, ORDER_ID STRING, ITEM_ID STRING, ITEM_CODE STRING, ITEM_NAME STRING, ITEM_TYPE STRING, ITEM_SPEC STRING, ITEM_UNIT STRING, ITEM_PRICE DECIMAL(12, 2), ITEM_QUANTITY DECIMAL(12, 2), SALE_PRICE DECIMAL(12, 2), SALE_AMOUNT DECIMAL(12, 2), SALE_DISCOUNT DECIMAL(12, 2), SALE_MODE STRING, CURRENCY STRING, SUPPLY_TYPE STRING, SUPPLY_CODE STRING, REMARKS STRING, CREATE_BY STRING, CREATE_TIME TIMESTAMP(3), UPDATE_BY STRING, UPDATE_TIME TIMESTAMP(3), OIL_GUN STRING, EVENT_TIME TIMESTAMP(3), PRIMARY KEY (DETAIL_ID) NOT ENFORCED ) ]]> </execute-sql> <!-- 将kafka订单明细数据插入到source数据源的订单明细表中 --> <execute-sql> <![CDATA[ INSERT INTO ORDER_DETAIL( DETAIL_ID, ORDER_ID, ITEM_ID, ITEM_CODE, ITEM_NAME, ITEM_TYPE, ITEM_SPEC, ITEM_UNIT, ITEM_PRICE, ITEM_QUANTITY, SALE_PRICE, SALE_AMOUNT, SALE_DISCOUNT, SALE_MODE, CURRENCY, SUPPLY_TYPE, SUPPLY_CODE, REMARKS, CREATE_BY, CREATE_TIME, UPDATE_BY, UPDATE_TIME, OIL_GUN, EVENT_TIME ) SELECT DETAIL_ID, ORDER_ID, ITEM_ID, ITEM_CODE, ITEM_NAME, ITEM_TYPE, ITEM_SPEC, ITEM_UNIT, ITEM_PRICE, ITEM_QUANTITY, SALE_PRICE, SALE_AMOUNT, SALE_DISCOUNT, SALE_MODE, CURRENCY, SUPPLY_TYPE, SUPPLY_CODE, REMARKS, CREATE_BY, TO_TIMESTAMP(FROM_UNIXTIME(CREATE_TIME/1000, 'yyyy-MM-dd HH:mm:ss')) CREATE_TIME, UPDATE_BY, TO_TIMESTAMP(FROM_UNIXTIME(CREATE_TIME/1000, 'yyyy-MM-dd HH:mm:ss')) UPDATE_TIME, OIL_GUN, EVENT_TIME FROM KAFKA_ORDER_DETAIL ]]> </execute-sql></flink-jobs> 运行数据同步任务以下为通过kafka(配合Debezium、Cannal或Maxwell等)实现异构数据库同步任务XML配置文件: 全部评论
专题导读
上一篇:ipfs-cloud: 一个基于IPFS的星际文件系统,也是分布式的WebScoket服务端,基于区块链 ...发布时间:2022-03-25下一篇:zcms: 自己研发的消息中间件,根据团队需求,自定义开发,支持集群和分布式;单个实例 ...发布时间:2022-03-25热门推荐
热门话题
阅读排行榜
|
请发表评论