• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

light-spark: Spark快速开发框架,提高开发效率,主要针对离线任务场景 ...

原作者: [db:作者] 来自: 网络 收藏 邀请

开源软件名称:

light-spark

开源软件地址:

https://gitee.com/hellowzk/light-spark

开源软件介绍:

Light-Spark 开发脚架

[toc]

1. 概述

本人从 2014 年开始使用 Apache Spark,基于以往项目经验,为了降低项目开发复杂度,减少开发成本,开发了 light-spark 框架。本框架屏蔽了底层技术细节,让开发人员面向 SQL 编程,更专注于业务逻辑,减少开发过程中 Debug 工作,让 Apache Spark 更方便使用。

2. 适用场景

项目业务大部分功能可以用 SQL 实现,符合以下情况,建议使用本框架:

  • 逻辑链条较长,开发周期较长
  • 逻辑较复杂,开发时需要反复调试,从而反复修改 Debug 代码
  • 业务不稳定,功能频繁迭代
  • 开发人员对大数据技术细节不太熟悉

3. 设计思想

  • 大数据项目中整个处理流程如下:
    开始 -> 加载数据 -> 业务计算 -> 结果存储 -> 结束
    流程图
  • 面向配置编程
    核心思想是面向 Spark DataFrame(SQL) 编程,通过 DataFrame 串联起数据处理流程各个环节,使用 SparkSQL 处理业务逻辑,并定义到配置文件,以便于开发和后期维护,业务调整时避免修改代码。同时,暴露接口支持开发人员自定义硬编码,实现不能使用 SQL 处理的复杂逻辑,通过配置,把这些逻辑纳入整个处理流程。
  • 使用样例该样例背景为一个指标计算,需要加载 hive 表数据并关联 jdbc 中的表数据,经过 SQL 计算处理后,结果数据存储到 jdbc 表中,整个处理逻辑定义到一个配置文件中
    yaml1yaml2yaml3yaml4

4. 使用

4.1 maven 引用

本项目已发布到 maven 中央仓库,暂时支持 Spark 2.2.x、Spark 2.3.x、Spark 2.4.x

针对 Spark 2.2.x

<dependency>    <groupId>com.github.hellowzk</groupId>    <artifactId>light-spark-core_2.2</artifactId>    <version>1.0.4-release</version></dependency>

针对 Spark 2.3.x

<dependency>    <groupId>com.github.hellowzk</groupId>    <artifactId>light-spark-core_2.3</artifactId>    <version>1.0.4-release</version></dependency>

针对 Spark 2.4.x

<dependency>    <groupId>com.github.hellowzk</groupId>    <artifactId>light-spark-core_2.4</artifactId>    <version>1.0.4-release</version></dependency>

4.2 依赖配置

参考 pom 文件

4.3 启动脚本样例

参考 run

4.4 组件

4.4.1 debug 开发工具

开发过程中,可以通过配置,自动把指定步骤的结果持久化存储,支持 hdfs csv、hive 表

4.4.2 input - 数据加载

  1. 支持读取 hive 数据;
  2. 支持读取 hdfs 数据,配置中定义好字段信息后自动加载成表;
  3. 支持读取 classpath 数据,配置中定义好字段信息后自动加载成表;
  4. 支持数据源类型比较复杂时,支持 自定义 处理源数据为表;
  5. 支持读取 JDBC 的源数据,jdbcOpts 中参数请看 SparkSQL官方文档

4.4.3 process - 业务计算

  1. 支持 SparkSQL 处理。
  2. 处理逻辑较复杂,SQL 叫难实现时,支持硬编码实现业务需求。

4.4.4 output - 结果保存

  1. 存储到 HDFS ,格式可以选择为 txt / lzo 。
  2. 存储到 HIVE 。
  3. 存储到支持 JDBC 的数据库。

4.4.5 变量

4.4.5.1 定义及使用

本框架支持在配置中自定义及使用变量,支持变量嵌套使用

  • 灵活的定义位置
    自定义变量可以在 yaml 配置中,也可以定义在 class 类中,定义的变量名称必须唯一。
    定义在 class 中,例子 MyConstants

  • 变量使用

    • 在 yaml 中使用,可以直接通过 ${variable} 使用预定义的变量
    • 在类中使用,如下
      import com.hellowzk.light.spark.constants.AppConstantsAppConstants.variables("mysql_url")// 类型 variables: Map[String, String]
    所有可用变量存放在 AppConstants.variables 中
4.4.5.2 日期表达式

支持通过日期表达式定义日期,表达式格式为

DATE([yyyy-MM-dd, 20200722, yyyyMMdd][+3d][-1m][+1y]...)
  • DATE() 为日期表达式声明符
  • 第一个 [ ] 中, 有三个参数
    • 第一个为表达式最终计算结果格式
    • 第二个参数为基准时间,后续的日期计算是基于该时间的,可以省略,代表当前日期
    • 第三个参数为第二个参数的时间格式,基于该格式解析第二个参数,第二个参数没有时,该参数同样不能定义
  • 后面的 [ ] 中,代表时间计算表达式,大小写不敏感
    • ±nd 加/减 n 天
    • ±nm 加/减 n 月
    • ±ny 加/减 n 年
  • 例子
...constansMap:  today: "${DATE([yyyy-MM-dd])}" # 当前日期    today2: "${DATE([yyyy-MM, 2020-08-05, yyyy-MM-dd])}" # 2020-08-05 所在月份,2020-08  date1: "${DATE([yyyy-MM-dd, 2020-08-05][-1d])}" # 2020-08-05 减一天,2020-08-04  date2: "${DATE([yyyy-MM-dd, ${date1}][-1d])}" # 2020-08-04 减 1 天,2020-08-03  date3: "${DATE([yyyy-MM-dd, ${date2}][-1d])}" # 2020-08-03 减 1 天,2020-08-02  date4: "${DATE([yyyy-MM, ${date3}, yyyy-MM-dd][-1m])}" # 2020-08-02 减 1 月月份,2020-07  date5: "${DATE([yyyy-MM-dd, ${date4}, yyyy-MM][-1d])}" # 2020-07 月初减 1 天, 2020-06-30...

5. 配置文件

5.1 整体结构

配置项默认值是否必填说明
name定义当前 yaml 文件流程名称
constansMap否,Map 类型自定义常量,可以在当前 yaml 中通过 ${variable} 使用;在自定义代码中通过 AppConstants.variables("variable" 使用)
persistType使用 debug 组件,配置中间数据持久化的地址 hive/hdfs;开启条件,启动脚本中传入 --debug 参数启用 debug 组件
persistDir持久化到 hdfs 的位置,需要启用 debug 组件
persistHiveDb持久化到 hive 的库,需要启用 debug 组件
udaf自定义 UDAF,Map类型, key 为 udaf 命名,value 为值
inputs定义的输入数据
processes处理逻辑
outputs输出
envs环境参数,暂时只支持 Spark 相关参数
# 流程名称name: "xxx" # 自定义常量,可以在sql/文件路径中使用 ${variable},常量名称不可重复constansMap:   hive_out_db: xxx  mysql_url: xxx  mysql_username: xxx  mysql_passwd: xxx# 启用 debug 工具,SQL处理时,如果 store=true , 结果会缓存到 $baseDir/{{EVENT_DATE}}/$tableName 目录,启动脚本必须传入参数 --debug, 否则该配置不生效persistType: hive # hive hdfspersistDir: /tmp/spark/app/full-batchpersistHiveDb: "${outputDB}"# 输入定义,支持 3 种数据源inputs:  - ...    # 逻辑处理,支持SQL 与 自定义处理, 有 3 种processes:  - ...# 结果输出, 支持 3 种outputs:  - ...# 环境设置envs:  # 暂时只支持 spark 相关参数,如下  spark:    - ...

5.2 数据加载

支持类型

type 值说明
classpathFile加载 classpath 文件,通过配置字段、分隔符生成表
customClasspath加载 hdfs 文件,自定义加载逻辑,生成表
hdfsfile加载 hdfs 文件,通过配置字段、分隔符生成表
customHdfs加载 hdfs 文件,自定义加载逻辑,生成表
hive加载 hive 表
jdbc加载 jdbc 表
clickhouseBalanced加载 clickhouse 表,使用 BalancedClickhouseDataSource 负载均衡接口,单机版或通过 SLB 做负载使用 jdbc 就好
5.2.1. 加载 classpathFile 数据
配置说明
配置项默认值是否必填说明
name定义该步生成的表的名称,可以在下面处理中引用
type固定值 classpathFile
columns数据生成的 columns, 用 "," 隔开
path文件在 classpath 的路径
fs列分割符
partations0进行重新分区的数量,不配置不重分区
cachefalse是否缓存,如果下面的操作多次使用该表,建议设置为true
show0开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数
storefalse开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数
demo
...inputs:  - name: person    type: classpathFile    columns: name, gen, age    path: person.txt    fs: ","...
5.2.2. 加载 customClasspath 数据
配置说明

自定义加载 classpath 的数据,生成表

配置项默认值是否必填说明
name定义该步生成的表的名称,可以在下面处理中引用
type固定值 customClasspath
path文件在 classpath 的路径
clazz自定义的加载逻辑类,必须继承 com.hellowzk.light.spark.stages.custom.CustomBaseInput 实现 process 方法
partations0进行重新分区的数量,不配置不重分区
cachefalse是否缓存,如果下面的操作多次使用该表,建议设置为true
show0开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数
storefalse开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数
demo
...inputs:  - name: custompersonClass    type: customClasspath    path: person2.txt    clazz: com.hellowzk.light.spark.stages.Process1...
5.2.3. 加载 hdfsfile 数据
配置说明
配置项默认值是否必填说明
name定义该步生成的表的名称,可以在下面处理中引用
type固定值 hdfsfile
columns数据生成的 columns, 用 "," 隔开
path文件在 hdfs 的路径
fs列分割符
partations0进行重新分区的数量,不配置不重分区
cachefalse是否缓存,如果下面的操作多次使用该表,建议设置为true
show0开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数
storefalse开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数
demo
...inputs:  - name: person_hdfs    type: hdfsfile    columns: name,gen,age    path: /tmp/zhaokui/testdata/test/20200706/    fs: ","...
5.2.4. 加载 customHdfs 数据

自定义加载 customHdfs 的数据,生成表

  • 自定义加载逻辑类必须继承 com.hellowzk.light.spark.stages.custom.CustomBaseInput 特质,实现 process 方法,如 Process1
配置说明
配置项默认值是否必填说明
name定义该步生成的表的名称,可以在下面处理中引用
type固定值 customHdfs
path文件在 hdfs 的路径
clazz自定义加载逻辑类
partations0进行重新分区的数量,不配置不重分区
cachefalse是否缓存,如果下面的操作多次使用该表,建议设置为true
show0开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数
storefalse开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数
demo
...inputs:  - name: customPersonHdfs    type: customHdfs    path: /tmp/zhaokui/testdata/test/20200706/    clazz: com.hellowzk.light.spark.stages.Process1...
5.2.5 加载 jdbc 数据

自定义加载 customHdfs 的数据,生成表

配置说明
配置项默认值是否必填说明
name定义该步生成的表的名称,可以在下面处理中引用
type固定值 jdbc
driverjdbc driver
urljdbc url 地址
user用户名
password密码
dbtable要加载的表,为 map 结构
jdbcOpts请参看 SparkSQL官方文档
demo
...inputs:  - name: mysql1    type: jdbc    driver: "com.mysql.jdbc.Driver"    url: "${jdbcurl}" # 在 constansMap 中定义的该值    user: root    password: 123456    dbtable:      b_osp_app_event_detail: b_osp_app_event_detail      b_osp_app_area_analyze_total: b_osp_app_area_analyze_total...
5.2.6 加载 clickhouse 数据

使用 BalancedClickhouseDataSource 加载 clickhouse 数据,生成 spark 表,如果 clickhouse 为单机版或集群做了 SLB,请使用使用 jdbc 方式

配置说明
配置项默认值是否必填说明
name定义该步生成的表的名称,可以在下面处理中引用
type固定值 clickhouseBalance
urlclickhouse url 地址
user用户名
password密码
dbtable要加载的表,为 map 结构
jdbcOpts请参看 SparkSQL官方文档
demo
...inputs:  - name: clickhouse1    type: clickhouseBalance    url: "${jdbcurl}" # 在 constansMap 中定义的该值    user: root    password: 123456    dbtable:      preson_all: all_data...
5.2.7 加载 hive 数据
配置说明
配置项默认值是否必填说明
name定义该步生成的表的名称,可以在下面处理中引用
type固定值 hive
databasehive 库名
dbtable要加载的表,为 map 结构
demo
...inputs:  - name: hive1    type: hive    database: "${inputDB}"    dbtable:      dm_road_trackset_di: dm_road_trackset_di      dm_trackset_di: dm_trackset_di...

5.3 数据处理

支持类型

type 值说明
sql使用 SQL 处理数据
custom硬编码处理数据
5.3.1 使用 SQL 处理
配置说明
配置项默认值是否必填说明
name定义该步生成的表的名称,可以在下面处理中引用
sqlSQL 语句
dimKey用于计算多维度数据
allPlaceholderall多维度数据 select 占位符
partations0进行重新分区的数量,不配置不重分区
cachefalse是否缓存,如果下面的操作多次使用该表,建议设置为true
show0开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数
storefalse开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数
demo
...processes:  - name: table1    sql: "select name,gen,age, '${DATE([yyyy-MM-dd, 2020-08-05][-3d])}' from person"    cache: true # 出发action操作,缓存执行SQL后的表    store: true # 是否保存到本地,debug用,保存目录 $baseDir/{{EVENT_DATE}}/table    show: 20 # 在控制台打印表中数据,不打印则删除该节点,df.show(20)    partations: 3 # reparation 个数,不reparation则删除节点  # 多维度数据处理  - name: table1_1    dimKey: "gen, age" # 两个维度    allPlaceholder: "ALL"    sql: |-     #  |- :yaml 语法,支持字符串换行,适合格式化写 SQL      select name, ${DIM_FIELDS}, count(1) as count from person group by name ${DIM_GROUP}    cache: true # 出发action操作,缓存执行SQL后的表    store: true # 是否保存到本地,debug用,保存目录 $baseDir/{{EVENT_DATE}}/table    show: 20 # 在控制台打印表中数据,不打印则删除该节点,df.show(20)    partations: 3 # reparation 个数,多维度数据处理,建议使用该功能,否则 task 数量会根据维度个数爆炸膨胀,严重降低后续计算过程...
5.3.2 自定义编码处理

自定义处理类,必须继承 com.hellowzk.light.spark.stages.custom.CustomBaseTransform 类,实现 doProcess 方法,如 Process2

配置说明
配置项默认值是否必填说明
name定义该步生成的表的名称,可以在下面处理中引用
clazz自定义处理类
partations0进行重新分区的数量,不配置不重分区
cachefalse是否缓存,如果下面的操作多次使用该表,建议设置为true
show0开发时 debug 使用,对表进行 show 操作,配置成需要 show 的条数
storefalse开发时 debug 使用,数据存储起来,支持存储到 hdfs/hive,请配置 persistType 参数
demo
...processes:  - name: customProcessShow    clazz: "com.hellowzk.light.spark.stages.Process2"    cache: true    store: true    show: 20    partations: 1...

5.4 结果输出

支持类型

type 值说明
jdbc保存数据到 jdbc
clickhouseBalance使用 BalancedClickhouseDataSource 负载均衡接口,保存数据到 clickhouse,如果 clickhouse 为单机版或集群做了 SLB,请使用使用 jdbc
hive保存数据到 hive
hdfsfile保存数据到 hdfs 目录
kafkaField指定分隔符,数据发送到 kafka
kafkaJsonJSON 格式,数据发送到 kafka
5.4.1 输出到 jdbc

通过此配置把数据保存到支持 jdbc 的持久化工具

配置说明
配置项默认值是否必填说明
name节点名称,用于打印日志,定位问题
type固定值 jdbc
driver数据库 driver
url连接数据库的 url
user用户名
password密码
tables要输出的表,为 map 类型
modeappend保存模式,默认为 append
optsjdbc 操作的其他参数
preSQL执行保存操作前,要对目标数据库执行的 SQL语句
demo
...outputs:  - name: mysql1    type: jdbc    driver: "com.mysql.jdbc.Driver"    url: "${jdbcurl}"    user: root    password: 123456    opts:      isolationLevel: NONE # 关闭事务      numPartitions: 1 # 设置并发      batchsize: 50000    preSQL:      - delete from b_osp_app_event_detail      - delete from b_osp_app_area_analyze_total    tables:      table3: b_osp_app_event_detail      b_osp_app_area_analyze_total: b_osp_app_area_analyze_total...
5.4.2 输出到 clickhouse 表

如果 clickhouse 为单机版或集群做了 SLB,请使用使用 jdbc

配置说明
配置项默认值是否必填说明
name节点名称,用于打印日志,定位问题
type固定值 clickhouseBalance
url连接数据库的 url
user用户名
password密码
tables要输出的表,为 map 类型
modeappend保存模式,默认为 append
optsjdbc 操作的其他参数
preSQL执行保存操作前,要对目标数据库执行的 SQL语句,为 clickhouse 分布式表操作语句
demo
...outputs:  - name: clickhouse1    type: clickhouseBalance    url: "${jdbcurl}"    user: root    password: 123456    opts:      isolationLevel: NONE # 关闭事务      numPartitions: 1 # 设置并发      batchsize: 50000    # clickhouse 分布式表的操作语句    preSQL:      - alter table b_osp_app_event_detail on cluster test_cluster delete where dt = '${event_date}'    tables:      table3: b_osp_app_event_detail...
5.4.3 输出到 HIVE
配置说明
配置项默认值是否必填说明
name节点名称,用于打印日志,定位问题
type固定值 hive
database要保存到的 hive 库
tables要输出的表,为 map 类型
modeoverwrite保存模式,默认为 overwrite
preSQL执行保存操作前,要对目标数据库执行的 SQL语句
demo
...outputs:  - name: mysql1    type: hive    database: test    tables:      table3: b_osp_app_event_detail      b_osp_app_area_analyze_total: b_osp_app_area_analyze_total...
5.4.4 输出到 HDFS
配置说明
配置项默认值是否必填说明
name节点名称,用于打印日志,定位问题
type固定值 hdfsfile
srcName要保存的数据,必须使用上面定义的表名
format要输出的文件类型,暂时支持 csv, txt, lzo, json, parquet 这些类型
path数据要保存的目录
fs\u0001保存成文件时的列分隔符,只针对 txt lzo 类型输出生效
demo
...outputs:  - name: out1    type: hdfsfile    srcName: table1    format: csv # 保存为 csv    path: /user/osp/data/out1/${EVENT_DATE}  - name: out2     type: hdfsfile    srcName: table2    format: txt # 保存为 txt,字段用逗号分隔    fs: ","    path: /user/osp/data/out2/${EVENT_DATE}    - name: out3    type: hdfsfile    srcName: table3    format: lzo # 保存为 lzo,字段用逗号分隔    fs: ","    path: /user/osp/data/out3/${EVENT_DATE}    - name: out4    type: hdfsfile    srcName: table4
                      

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
热门推荐
热门话题
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap