在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称(OpenSource Name):RedPillAnalytics/gradle-confluent开源软件地址(OpenSource Url):https://github.com/RedPillAnalytics/gradle-confluent开源编程语言(OpenSource Language):Groovy 100.0%开源软件介绍(OpenSource Introduction):Gradle Confluent PluginYou can get this plugin from the Gradle Plugin Portal. You can also read the API documentation. You can run the unit tests by executing: ./gradlew test There are a series of integration tests that use the topics from the Confluent clickstream quickstart implemented using TestContainers with docker compose using a compose file, as well as a standalone TestContainer for Kafka. The integration tests, plus the unit tests, can be run with the command below: ./gradlew runAllTests MotivationThis plugin was motivated by a real-world project. We were stuggling to easily deploy all the pieces of our Confluent pipeline: KSQL scripts, KSQL user-defined functions (UDFs), and Kafka Streams microservices. The biggest gap we had was deploying KSQL scripts to downstream environments, so the majority of this plugin is for remedying that. Since Gradle already has functionality and plugins for compiling JARS (for UDFs) and building Java applications (for Kafka Streams microservices), this plugin addresses just a few gaps for those patterns. Plugin ExtensionConfiguration properties for the confluent {
enableFunctions = false
enableStreams = false
} or confluent.enableFunctions = false
confluent.enableStreams = false All of the extension properties and their default values are listed here. Confluent KSQLBuilding streaming pipelines using KSQL is done with a series of SQL statements, similar to the below: CREATE STREAM clickstream (_time bigint,time varchar, ip varchar, request varchar, status int, userid int, bytes bigint, agent varchar) with (kafka_topic = 'clickstream', value_format = 'json');
CREATE TABLE clickstream_codes (code int, definition varchar) with (key='code', kafka_topic = 'clickstream_codes', value_format = 'json');
CREATE TABLE events_per_min AS SELECT userid, count(*) AS events FROM clickstream window TUMBLING (size 60 second) GROUP BY userid; The third statement above is called a persistent query in KSQL terminology, as it selects data from a KSQL stream or table, creates or uses an underlying Kafka topic, and initialize the streaming processes to persist data to that topic.
Because of this, KSQL persistent query statements are regularly dependent on the creation of other KSQL streams and tables.
We wanted to eliminate the need for developers to concern themselves (much) with how to express these dependencies in their KSQL scripts.
We didn't want them to have to write and test driving scripts, which included DROP statements or TERMINATE statements, which is time-consuming and error-prone.
We also wanted to make it easy for developers to tweak and rerun their individual pipelines.
So we knew we wanted our approach to auto-generate DROP and TERMINATE statements as a part of the development and deployment processes.
We considered many alternatives for expressing these dependencies, and even briefly considered using the Gradle Task DAG to do this.
In the end, we decided on using simple alphanumeric file and directory structure naming.
We use Gradle's built-in FileTree functionality which makes this very easy.
You can see a sample of how this is achieved in the KSQL scripts used for testing this plugin.
Notice that none of these sample test scripts have DROP statements or any scripted dependencies.
Scripts and directories can use any naming standard desired, but the script order dependency is managed by a simple So let's start preparing our plugins {
id 'maven-publish'
id "com.redpillanalytics.gradle-confluent" version '1.1.11'
} Now we can use the
Executing KSQL PipelinesThe easiest wasy to use this plugin is to simply execute all of our persistent query statements--or a subset of them--in source control.
We do this using the ==> ./gradlew pipelineExecute --console=plain -i
> Configure project :
Evaluating root project 'ksql-examples' using build file '/Users/stewartbryson/Source/ksql-examples/build.gradle'.
Selected primary task ':jar' from project :
All projects evaluated.
Selected primary task 'pipelineExecute' from project :
Tasks to be executed: [task ':pipelineSync', task ':pipelineExecute']
:pipelineSync (Thread[Execution worker for ':',5,main]) started.
> Task :pipelineSync
Deleting stale output file: /Users/stewartbryson/Source/ksql-examples/build/pipeline
Task ':pipelineSync' is not up-to-date because:
No history is available.
Custom actions are attached to task ':pipelineSync'.
Synchronizing '/Users/stewartbryson/Source/ksql-examples/build/pipeline' from '/Users/stewartbryson/Source/ksql-examples/src/main/pipeline'.
:pipelineSync (Thread[Execution worker for ':',5,main]) completed. Took 0.018 secs.
:pipelineExecute (Thread[Execution worker for ':',5,main]) started.
> Task :pipelineExecute
Task ':pipelineExecute' is not up-to-date because:
Task.upToDateWhen is false.
Terminating query CTAS_CLICK_USER_SESSIONS_107...
DROP TABLE IF EXISTS CLICK_USER_SESSIONS;
Terminating query CTAS_USER_IP_ACTIVITY_106...
DROP TABLE IF EXISTS USER_IP_ACTIVITY;
Terminating query CSAS_USER_CLICKSTREAM_105...
DROP STREAM IF EXISTS USER_CLICKSTREAM;
Terminating query CSAS_CUSTOMER_CLICKSTREAM_104...
DROP STREAM IF EXISTS customer_clickstream;
Terminating query CTAS_ERRORS_PER_MIN_103...
DROP table IF EXISTS ERRORS_PER_MIN;
Terminating query CTAS_ERRORS_PER_MIN_ALERT_102...
DROP TABLE IF EXISTS ERRORS_PER_MIN_ALERT;
DROP TABLE IF EXISTS WEB_USERS;
Terminating query CTAS_ENRICHED_ERROR_CODES_COUNT_101...
DROP TABLE IF EXISTS ENRICHED_ERROR_CODES_COUNT DELETE TOPIC;
Terminating query CSAS_ENRICHED_ERROR_CODES_100...
DROP STREAM IF EXISTS ENRICHED_ERROR_CODES;
Terminating query CTAS_PAGES_PER_MIN_99...
DROP TABLE IF EXISTS pages_per_min;
Terminating query CTAS_EVENTS_PER_MIN_98...
DROP table IF EXISTS events_per_min;
DROP TABLE IF EXISTS clickstream_codes;
DROP STREAM IF EXISTS clickstream;
10 queries terminated.
13 objects dropped.
CREATE STREAM clickstream (_time bigint,time varchar, ip varchar, request varchar, status int, userid int, bytes bigint, agent varchar) with (kafka_topic = 'clickstream', value_format = 'json');
CREATE TABLE clickstream_codes (code int, definition varchar) with (key='code', kafka_topic = 'clickstream_codes', value_format = 'json');
CREATE table events_per_min AS SELECT userid, count(*) AS events FROM clickstream window TUMBLING (size 60 second) GROUP BY userid;
CREATE TABLE pages_per_min AS SELECT userid, count(*) AS pages FROM clickstream WINDOW HOPPING (size 60 second, advance by 5 second) WHERE request like '%html%' GROUP BY userid;
CREATE STREAM ENRICHED_ERROR_CODES AS SELECT code, definition FROM clickstream LEFT JOIN clickstream_codes ON clickstream.status = clickstream_codes.code;
CREATE TABLE ENRICHED_ERROR_CODES_COUNT AS SELECT code, definition, COUNT(*) AS count FROM ENRICHED_ERROR_CODES WINDOW TUMBLING (size 30 second) GROUP BY code, definition HAVING COUNT(*) > 1;
CREATE TABLE WEB_USERS (user_id int, registered_At bigint, username varchar, first_name varchar, last_name varchar, city varchar, level varchar) with (key='user_id', kafka_topic = 'clickstream_users', value_format = 'json');
CREATE TABLE ERRORS_PER_MIN_ALERT AS SELECT status, count(*) AS errors FROM clickstream window HOPPING ( size 30 second, advance by 20 second) WHERE status > 400 GROUP BY status HAVING count(*) > 5 AND count(*) is not NULL;
CREATE table ERRORS_PER_MIN AS SELECT status, count(*) AS errors FROM clickstream window HOPPING ( size 60 second, advance by 5 second) WHERE status > 400 GROUP BY status;
CREATE STREAM customer_clickstream WITH (PARTITIONS=2) AS SELECT userid, u.first_name, u.last_name, u.level, time, ip, request, status, agent FROM clickstream c LEFT JOIN web_users u ON c.userid = u.user_id;
CREATE STREAM USER_CLICKSTREAM AS SELECT userid, u.username, ip, u.city, request, status, bytes FROM clickstream c LEFT JOIN web_users u ON c.userid = u.user_id;
CREATE TABLE USER_IP_ACTIVITY AS SELECT username, ip, city, COUNT(*) AS count FROM USER_CLICKSTREAM WINDOW TUMBLING (size 60 second) GROUP BY username, ip, city HAVING COUNT(*) > 1;
CREATE TABLE CLICK_USER_SESSIONS AS SELECT username, count(*) AS events FROM USER_CLICKSTREAM window SESSION (300 second) GROUP BY username;
13 objects created.
:pipelineExecute (Thread[Execution worker for ':',5,main]) completed. Took 5.798 secs.
BUILD SUCCESSFUL in 7s
2 actionable tasks: 2 executed
==> First thing to notice is that the plugin automatically constructs and issues the DROP statements for any applicable CREATE statement encountered: no need to write those yourself. It runs all the DROP statements at the beginning, but also runs them in the reverse order of the CREATE statement dependency ordering: this just makes sense if you think about it. Additionally, if any DROP statements have persistent queries involving that table or stream, the plugin finds the query ID involved and issues the required TERMINATE statement. So there are a triad of statements that are run: TERMINATE, DROP and CREATE. This behavior can be controlled with command-line options. Here is the output from the help task command: ==> ./gradlew help --task pipelineExecute
> Task :help
Detailed task information for pipelineExecute
Path
:pipelineExecute
Type
PipelineExecuteTask (com.redpillanalytics.gradle.tasks.PipelineExecuteTask)
Options
--basic-password The Password for Basic Authentication with the REST API URL for the KSQL Server. Default: value of 'confluent.pipelinePassword' or ''.
--basic-username The Username for Basic Authentication with the REST API URL for the KSQL Server. Default: value of 'confluent.pipelineUsername' or ''.
--drop-only When defined, only DROP and TERMINATE statements in KSQL scripts are executed. Used primarily for cleaning existing TABLES/STREAMS and terminating queries.
--from-beginning When defined, set 'ksql.streams.auto.offset.reset' to 'earliest'.
--no-drop When defined, applicable DROP statements are not auto-generated and executed.
--no-reverse-drops When defined, DROP statements are not processed in reverse order of the CREATE statements, which is the default.
--no-terminate When defined, applicable TERMINATE statements are not auto-generated and executed.
--pipeline-dir The base directory containing SQL scripts to execute, including recursive subdirectories. Default: value of 'confluent.pipelineSourcePath' or 'src/main/pipeline'.
--rest-url The REST API URL for the KSQL Server. Default: value of 'confluent.pipelineEndpoint' or 'http://localhost:8088'.
--statement-pause The number of seconds to pause execution after a create statement. Default: value of 'confluent.statementPause'.
Description
Execute all KSQL pipelines from the provided source directory, in hierarchical order, proceeded by applicable DROP and TERMINATE commands.
Group
confluent
BUILD SUCCESSFUL in 1s
1 actionable task: 1 executed Seeing some command-line options, we can see how the ==> ./gradlew pipelineExecute --pipeline-dir 01-clickstream --from-beginning
> Task :pipelineExecute
8 queries terminated.
6 objects dropped.
6 objects created.
BUILD SUCCESSFUL in 3s
2 actionable tasks: 1 executed, 1 up-to-date Building ArtifactsWhile executing KSQL scripts from our source repository is useful for developers using KSQL, and might even suffice for some deployment pipelines, plugins {
id 'maven-publish'
id "com.redpillanalytics.gradle-confluent" version '1.1.11'
}
publishing {
repositories {
mavenLocal()
}
}
group = 'com.redpillanalytics'
version = '1.0.0' Now we can build and publish the artifacts with a single Gradle statement: ==> ./gradlew --console=plain build publish
> Task :assemble UP-TO-DATE
> Task :check UP-TO-DATE
> Task :pipelineSync UP-TO-DATE
> Task :pipelineScript
> Task :pipelineZip
> Task :build
> Task :generatePomFileForPipelinePublication
> Task :publishPipelinePublicationToMavenLocalRepository
> Task :publish
BUILD SUCCESSFUL in 1s
5 actionable tasks: 4 executed, 1 up-to-date We can now see our zip distribution file in the ==> cd build/distributions/
==> zipinfo ksql-examples-pipeline-1.0.0.zip
Archive: ksql-examples-pipeline-1.0.0.zip
Zip file size: 3632 bytes, number of entries: 9
drwxr-xr-x 2.0 unx 0 b- defN 19-Jan-11 04:00 01-clickstream/
-rw-r--r-- 2.0 unx 449 b- defN 19-Jan-11 04:00 01-clickstream/01-create.sql
-rw-r--r-- 2.0 unx 633 b- defN 19-Jan-11 04:00 01-clickstream/02-integrate.sql
-rw-r--r-- 2.0 unx 257 b- defN 19-Jan-11 04:00 01-clickstream/03-deliver.sql
drwxr-xr-x 2.0 unx 0 b- defN 19-Jan-11 04:00 02-clickstream-users/
-rw-r--r-- 2.0 unx 248 b- defN 19-Jan-11 04:00 02-clickstream-users/01-create.sql
-rw-r--r-- 2.0 unx 960 b- defN 19-Jan-11 04:00 02-clickstream-users/02-integrate.sql
-rw-r--r-- 2.0 unx 473 b- defN 19-Jan-11 04:00 02-clickstream-users/03-deliver.sql
-rw-r--r-- 2.0 unx 2312 b- defN 19-Jan-11 04:07 ksql-script.sql
9 files, 5332 bytes uncompressed, 2436 bytes compressed: 54.3% Notice our zip file has all the source scripts, but it also has the single, normalized Deploying KSQL ArtifactsIf we want to deploy our KSQL pipelines from Maven instead of Git (which let's face it, should be standard), then we define a Gradle dependency on the plugins {
id 'maven-publish'
id "com.redpillanalytics.gradle-confluent" version '1.1.11'
}
publishing {
repositories {
mavenLocal()
}
}
group = 'com.redpillanalytics'
version = '1.0.0'
repositories {
mavenLocal()
}
dependencies {
archives 'com.redpillanalytics:ksql-examples-pipeline:+'
} With our KSQL pipeline dependency added, we get a few more tasks in our Confluent task group when we run
Now we can execute with a simple ==> ./gradlew deploy
> Task :pipelineDeploy
6 queries terminated.
13 objects dropped.
13 objects created.
BUILD SUCCESSFUL in 4s
2 actionable tasks: 2 executed KSQL DirectivesBecause the Directives are signalled using
|
2023-10-27
2022-08-15
2022-08-17
2022-09-23
2022-08-13
请发表评论