在线时间:8:00-16:00
迪恩网络APP
随时随地掌握行业动态
扫描二维码
关注迪恩网络微信公众号
开源软件名称:setl开源软件地址:https://gitee.com/qxzzxq/setl开源软件介绍:If you’re a data scientist or data engineer, this might sound familiar while working on an ETL project:
SETL (pronounced "settle") is a Scala framework powered by Apache Spark that helps you structure your Spark ETL projects, modularize your data transformation logic and speed up your development. Use SETLIn a new projectYou can start working by cloning this template project. In an existing project<dependency> <groupId>com.jcdecaux.setl</groupId> <artifactId>setl_2.12</artifactId> <version>1.0.0-RC1</version></dependency> To use the SNAPSHOT version, add Sonatype snapshot repository to your <repositories> <repository> <id>ossrh-snapshots</id> <url>https://oss.sonatype.org/content/repositories/snapshots/</url> </repository></repositories><dependencies> <dependency> <groupId>com.jcdecaux.setl</groupId> <artifactId>setl_2.12</artifactId> <version>1.0.0-SNAPSHOT</version> </dependency></dependencies> Quick StartBasic conceptWith SETL, an ETL application could be represented by a The class The class The entry point of a SETL project is the object Show me some codeYou can find the following tutorial code in the starter template of SETL. Go and clone it :) Here we show a simple example of creating and saving a Dataset[TestObject]. The case class TestObject is defined as follows: case class TestObject(partition1: Int, partition2: String, clustering1: String, value: Long) Context initializationSuppose that we want to save our output into testObjectRepository { storage = "CSV" path = "src/main/resources/test_csv" inferSchema = "true" delimiter = ";" header = "true" saveMode = "Append"} In our val setl: Setl = Setl.builder() .withDefaultConfigLoader() .getOrCreate()// Register a SparkRepository to contextsetl.setSparkRepository[TestObject]("testObjectRepository") Implementation of FactoryWe will create our
class MyFactory() extends Factory[Dataset[TestObject]] with HasSparkSession { import spark.implicits._ // A repository is needed for writing data. It will be delivered by the pipeline @Delivery private[this] val repo = SparkRepository[TestObject] private[this] var output = spark.emptyDataset[TestObject] override def read(): MyFactory.this.type = { // in our demo we don't need to read any data this } override def process(): MyFactory.this.type = { output = Seq( TestObject(1, "a", "A", 1L), TestObject(2, "b", "B", 2L) ).toDS() this } override def write(): MyFactory.this.type = { repo.save(output) // use the repository to save the output this } override def get(): Dataset[TestObject] = output} Define the pipelineTo execute the factory, we should add it into a pipeline. When we call val pipeline = setl .newPipeline() .addStage[MyFactory]() Run our pipelinepipeline.describe().run() The dataset will be saved into What's more?As our class AnotherFactory extends Factory[String] with HasSparkSession { import spark.implicits._ @Delivery private[this] val outputOfMyFactory = spark.emptyDataset[TestObject] override def read(): AnotherFactory.this.type = this override def process(): AnotherFactory.this.type = this override def write(): AnotherFactory.this.type = { outputOfMyFactory.show() this } override def get(): String = "output"} Add this factory into the pipeline: pipeline.addStage[AnotherFactory]() Custom ConnectorYou can implement you own data source connector by implementing the class CustomConnector extends ConnectorInterface with CanDrop { override def setConf(conf: Conf): Unit = null override def read(): DataFrame = { import spark.implicits._ Seq(1, 2, 3).toDF("id") } override def write(t: DataFrame, suffix: Option[String]): Unit = logDebug("Write with suffix") override def write(t: DataFrame): Unit = logDebug("Write") /** * Drop the entire table. */ override def drop(): Unit = logDebug("drop")} To use it, just set the storage to OTHER and provide the class reference of your connector: myConnector { storage = "OTHER" class = "com.example.CustomConnector" // class reference of your connector } Generate pipeline diagramYou can generate a Mermaid diagram by doing: pipeline.showDiagram() You will have some log like this: --------- MERMAID DIAGRAM ---------classDiagramclass MyFactory { <<Factory[Dataset[TestObject]]>> +SparkRepository[TestObject]}class DatasetTestObject { <<Dataset[TestObject]>> >partition1: Int >partition2: String >clustering1: String >value: Long}DatasetTestObject <|.. MyFactory : Outputclass AnotherFactory { <<Factory[String]>> +Dataset[TestObject]}class StringFinal { <<String>> }StringFinal <|.. AnotherFactory : Outputclass SparkRepositoryTestObjectExternal { <<SparkRepository[TestObject]>> }AnotherFactory <|-- DatasetTestObject : InputMyFactory <|-- SparkRepositoryTestObjectExternal : Input------- END OF MERMAID CODE -------You can copy the previous code to a markdown viewer that supports Mermaid.Or you can try the live editor: https://mermaid-js.github.io/mermaid-live-editor/#/edit/eyJjb2RlIjoiY2xhc3NEaWFncmFtXG5jbGFzcyBNeUZhY3Rvcnkge1xuICA8PEZhY3RvcnlbRGF0YXNldFtUZXN0T2JqZWN0XV0-PlxuICArU3BhcmtSZXBvc2l0b3J5W1Rlc3RPYmplY3RdXG59XG5cbmNsYXNzIERhdGFzZXRUZXN0T2JqZWN0IHtcbiAgPDxEYXRhc2V0W1Rlc3RPYmplY3RdPj5cbiAgPnBhcnRpdGlvbjE6IEludFxuICA-cGFydGl0aW9uMjogU3RyaW5nXG4gID5jbHVzdGVyaW5nMTogU3RyaW5nXG4gID52YWx1ZTogTG9uZ1xufVxuXG5EYXRhc2V0VGVzdE9iamVjdCA8fC4uIE15RmFjdG9yeSA6IE91dHB1dFxuY2xhc3MgQW5vdGhlckZhY3Rvcnkge1xuICA8PEZhY3RvcnlbU3RyaW5nXT4-XG4gICtEYXRhc2V0W1Rlc3RPYmplY3RdXG59XG5cbmNsYXNzIFN0cmluZ0ZpbmFsIHtcbiAgPDxTdHJpbmc-PlxuICBcbn1cblxuU3RyaW5nRmluYWwgPHwuLiBBbm90aGVyRmFjdG9yeSA6IE91dHB1dFxuY2xhc3MgU3BhcmtSZXBvc2l0b3J5VGVzdE9iamVjdEV4dGVybmFsIHtcbiAgPDxTcGFya1JlcG9zaXRvcnlbVGVzdE9iamVjdF0-PlxuICBcbn1cblxuQW5vdGhlckZhY3RvcnkgPHwtLSBEYXRhc2V0VGVzdE9iamVjdCA6IElucHV0XG5NeUZhY3RvcnkgPHwtLSBTcGFya1JlcG9zaXRvcnlUZXN0T2JqZWN0RXh0ZXJuYWwgOiBJbnB1dFxuIiwibWVybWFpZCI6eyJ0aGVtZSI6ImRlZmF1bHQifX0= You can either copy the code into a Markdown viewer or just copy the link into your browser (link) |
请发表评论