• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java FlinkKafkaConsumer09类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09的典型用法代码示例。如果您正苦于以下问题:Java FlinkKafkaConsumer09类的具体用法?Java FlinkKafkaConsumer09怎么用?Java FlinkKafkaConsumer09使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



FlinkKafkaConsumer09类属于org.apache.flink.streaming.connectors.kafka包,在下文中一共展示了FlinkKafkaConsumer09类的9个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	Properties properties = new Properties();
	properties.setProperty("bootstrap.servers", "localhost:9092");
	properties.setProperty("group.id", "test");

	DataStream<TemperatureEvent> inputEventStream = env.addSource(
			new FlinkKafkaConsumer09<TemperatureEvent>("test", new EventDeserializationSchema(), properties));

	Pattern<TemperatureEvent, ?> warningPattern = Pattern.<TemperatureEvent> begin("first")
			.subtype(TemperatureEvent.class).where(new FilterFunction<TemperatureEvent>() {
				private static final long serialVersionUID = 1L;

				public boolean filter(TemperatureEvent value) {
					if (value.getTemperature() >= 26.0) {
						return true;
					}
					return false;
				}
			}).within(Time.seconds(10));

	DataStream<Alert> patternStream = CEP.pattern(inputEventStream, warningPattern)
			.select(new PatternSelectFunction<TemperatureEvent, Alert>() {
				private static final long serialVersionUID = 1L;

				public Alert select(Map<String, TemperatureEvent> event) throws Exception {

					return new Alert("Temperature Rise Detected:" + event.get("first").getTemperature()
							+ " on machine name:" + event.get("first").getMachineName());
				}

			});

	patternStream.print();
	env.execute("CEP on Temperature Sensor");
}
 
开发者ID:PacktPublishing,项目名称:Mastering-Apache-Flink,代码行数:38,代码来源:KafkaApp.java


示例2: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
  // create execution environment
  StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

  Properties properties = new Properties();
  properties.setProperty("bootstrap.servers", "localhost:9092");
  properties.setProperty("group.id", "flink_consumer");


  DataStream<String> stream = env
          .addSource(new FlinkKafkaConsumer09<>("flink-demo", new SimpleStringSchema(), properties));

  stream.map(new MapFunction<String, String>() {
    private static final long serialVersionUID = -6867736771747690202L;

    @Override
    public String map(String value) throws Exception {
      return "Stream Value: " + value;
    }
  }).print();

  env.execute();
}
 
开发者ID:tgrall,项目名称:kafka-flink-101,代码行数:24,代码来源:ReadFromKafka.java


示例3: create

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; //导入依赖的package包/类
/**
 * Create a {@link FlinkKafkaConsumer} depending on the provided settings
 * @param version
 * @return
 */
public FlinkKafkaConsumer09<T> create() {
	
	/////////////////////////////////////////////////////////////////////////
	// validate provided input
	if(StringUtils.isBlank(this.topic))
		throw new IllegalArgumentException("Missing required topic");
	if(this.properties.isEmpty())
		throw new IllegalArgumentException("Missing required properties");
	if(!this.properties.containsKey(KAFKA_PROPS_AUTO_COMMIT_ENABLE))
		throw new IllegalArgumentException("Missing value for required property '"+KAFKA_PROPS_AUTO_COMMIT_ENABLE+"'");
	if(!this.properties.containsKey(KAFKA_PROPS_BOOTSTRAP_SERVERS))
		throw new IllegalArgumentException("Missing value for required property '"+KAFKA_PROPS_BOOTSTRAP_SERVERS+"'");
	if(!this.properties.containsKey(KAFKA_PROPS_GROUP_ID))
		throw new IllegalArgumentException("Missing value for required property '"+KAFKA_PROPS_GROUP_ID+"'");
	/////////////////////////////////////////////////////////////////////////

	return new FlinkKafkaConsumer09<>(this.topic, this.deserializationSchema, this.properties);
}
 
开发者ID:ottogroup,项目名称:flink-operator-library,代码行数:24,代码来源:KafkaConsumerBuilder.java


示例4: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	// parse arguments
	ParameterTool params = ParameterTool.fromPropertiesFile(args[0]);

	// create streaming environment
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	// enable event time processing
	env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

	// enable fault-tolerance
	env.enableCheckpointing(1000);

	// enable restarts
	env.setRestartStrategy(RestartStrategies.fixedDelayRestart(50, 500L));

	env.setStateBackend(new FsStateBackend("file:///home/robert/flink-workdir/flink-streaming-etl/state-backend"));

	// run each operator separately
	env.disableOperatorChaining();

	// get data from Kafka
	Properties kParams = params.getProperties();
	kParams.setProperty("group.id", UUID.randomUUID().toString());
	DataStream<ObjectNode> inputStream = env.addSource(new FlinkKafkaConsumer09<>(params.getRequired("topic"), new JSONDeserializationSchema(), kParams)).name("Kafka 0.9 Source")
		.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<ObjectNode>(Time.minutes(1L)) {
			@Override
			public long extractTimestamp(ObjectNode jsonNodes) {
				return jsonNodes.get("timestamp_ms").asLong();
			}
		}).name("Timestamp extractor");

	// filter out records without lang field
	DataStream<ObjectNode> tweetsWithLang = inputStream.filter(jsonNode -> jsonNode.has("user") && jsonNode.get("user").has("lang")).name("Filter records without 'lang' field");

	// select only lang = "en" tweets
	DataStream<ObjectNode> englishTweets = tweetsWithLang.filter(jsonNode -> jsonNode.get("user").get("lang").asText().equals("en")).name("Select 'lang'=en tweets");

	// write to file system
	RollingSink<ObjectNode> rollingSink = new RollingSink<>(params.get("sinkPath", "/home/robert/flink-workdir/flink-streaming-etl/rolling-sink"));
	rollingSink.setBucketer(new DateTimeBucketer("yyyy-MM-dd-HH-mm")); // do a bucket for each minute
	englishTweets.addSink(rollingSink).name("Rolling FileSystem Sink");

	// build aggregates (count per language) using window (10 seconds tumbling):
	DataStream<Tuple3<Long, String, Long>> languageCounts = tweetsWithLang.keyBy(jsonNode -> jsonNode.get("user").get("lang").asText())
		.timeWindow(Time.seconds(10))
		.apply(new Tuple3<>(0L, "", 0L), new JsonFoldCounter(), new CountEmitter()).name("Count per Langauage (10 seconds tumbling)");

	// write window aggregate to ElasticSearch
	List<InetSocketAddress> transportNodes = ImmutableList.of(new InetSocketAddress(InetAddress.getByName("localhost"), 9300));
	ElasticsearchSink<Tuple3<Long, String, Long>> elasticsearchSink = new ElasticsearchSink<>(params.toMap(), transportNodes, new ESRequest());

	languageCounts.addSink(elasticsearchSink).name("ElasticSearch2 Sink");

	// word-count on the tweet stream
	DataStream<Tuple2<Date, List<Tuple2<String, Long>>>> topWordCount = tweetsWithLang
		// get text from tweets
		.map(tweet -> tweet.get("text").asText()).name("Get text from Tweets")
		// split text into (word, 1) tuples
		.flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
			@Override
			public void flatMap(String s, Collector<Tuple2<String, Long>> collector) throws Exception {
				String[] splits = s.split(" ");
				for (String sp : splits) {
					collector.collect(new Tuple2<>(sp, 1L));
				}
			}
		}).name("Tokenize words")
		// group by word
		.keyBy(0)
		// build 1 min windows, compute every 10 seconds --> count word frequency
		.timeWindow(Time.minutes(1L), Time.seconds(10L)).apply(new WordCountingWindow()).name("Count word frequency (1 min, 10 sec sliding window)")
		// build top n every 10 seconds
		.timeWindowAll(Time.seconds(10L)).apply(new TopNWords(10)).name("TopN Window (10s)");

	// write top Ns to Kafka topic
	topWordCount.addSink(new FlinkKafkaProducer09<>(params.getRequired("wc-topic"), new ListSerSchema(), params.getProperties())).name("Write topN to Kafka");

	env.execute("Streaming ETL");

}
 
开发者ID:rmetzger,项目名称:flink-streaming-etl,代码行数:82,代码来源:StreamingETL.java


示例5: createConsumer

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; //导入依赖的package包/类
public FlinkKafkaConsumer09<String> createConsumer() {
    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", parameters.getRequired("kafka-bootstrap-servers"));
    properties.setProperty("group.id", parameters.getRequired("kafka-group-id"));

    return new FlinkKafkaConsumer09<String>(
            parameters.getRequired("kafka-topic"),
            new SimpleStringSchema(),
            properties
    );
}
 
开发者ID:webaio,项目名称:processor,代码行数:12,代码来源:FlinkKafkaConsumerFactory.java


示例6: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {

        final int popThreshold = 20; // threshold for popular places

        // set up streaming execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.getConfig().setAutoWatermarkInterval(1000);

        // configure the Kafka consumer
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("zookeeper.connect", LOCAL_ZOOKEEPER_HOST);
        kafkaProps.setProperty("bootstrap.servers", LOCAL_KAFKA_BROKER);
        kafkaProps.setProperty("group.id", RIDE_SPEED_GROUP);
        // always read the Kafka topic from the start
        kafkaProps.setProperty("auto.offset.reset", "earliest");

        // create a Kafka consumer
        FlinkKafkaConsumer09<TaxiRide> consumer = new FlinkKafkaConsumer09<>(
                "cleansedRides",
                new TaxiRideSchema(),
                kafkaProps);
        // assign a timestamp extractor to the consumer
        consumer.assignTimestampsAndWatermarks(new TaxiRideTSExtractor());

        // create a TaxiRide data stream
        DataStream<TaxiRide> rides = env.addSource(consumer);

        // find popular places
        DataStream<Tuple5<Float, Float, Long, Boolean, Integer>> popularPlaces = rides
                // match ride to grid cell and event type (start or end)
                .map(new GridCellMatcher())
                // partition by cell id and event type
                .keyBy(0, 1)
                // build sliding window
                .timeWindow(Time.minutes(15), Time.minutes(5))
                // count ride events in window
                .apply(new RideCounter())
                // filter by popularity threshold
                .filter(new FilterFunction<Tuple4<Integer, Long, Boolean, Integer>>() {
                    @Override
                    public boolean filter(Tuple4<Integer, Long, Boolean, Integer> count) throws Exception {
                        return count.f3 >= popThreshold;
                    }
                })
                // map grid cell to coordinates
                .map(new GridToCoordinates());

        //popularPlaces.print();
        popularPlaces.writeAsText("file:\\\\C:\\Users\\ht\\kafka_java.txt");

        // execute the transformation pipeline
        env.execute("Popular Places from Kafka");
    }
 
开发者ID:thr0n,项目名称:clojured-taxi-rides,代码行数:55,代码来源:PopularPlacesFromKafka.java


示例7: main

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	// set up the streaming execution environment
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	// env.enableCheckpointing(5000);
	env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

	Properties properties = new Properties();
	properties.setProperty("bootstrap.servers", "localhost:9092");

	properties.setProperty("zookeeper.connect", "localhost:2181");
	properties.setProperty("group.id", "test");

	FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>("temp", new SimpleStringSchema(),
			properties);
	myConsumer.assignTimestampsAndWatermarks(new CustomWatermarkEmitter());


	DataStream<Tuple2<String, Double>> keyedStream = env.addSource(myConsumer).flatMap(new Splitter()).keyBy(0)
			.timeWindow(Time.seconds(300))
			.apply(new WindowFunction<Tuple2<String, Double>, Tuple2<String, Double>, Tuple, TimeWindow>() {

				@Override
				public void apply(Tuple key, TimeWindow window, Iterable<Tuple2<String, Double>> input,
						Collector<Tuple2<String, Double>> out) throws Exception {
					double sum = 0L;
					int count = 0;
					for (Tuple2<String, Double> record : input) {
						sum += record.f1;
						count++;
					}

					Tuple2<String, Double> result = input.iterator().next();
					result.f1 = (sum/count);
					out.collect(result);

				}
			});

	keyedStream.print();

	// execute program
	env.execute("Flink Streaming Java API Skeleton");
}
 
开发者ID:PacktPublishing,项目名称:Mastering-Apache-Flink,代码行数:44,代码来源:StreamingJob.java


示例8: getKafkaConsumer

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; //导入依赖的package包/类
@Override
FlinkKafkaConsumerBase<Row> getKafkaConsumer(String topic, Properties properties, DeserializationSchema<Row> deserializationSchema) {
    return new FlinkKafkaConsumer09<>(topic, deserializationSchema, properties);
}
 
开发者ID:datafibers-community,项目名称:df_data_service,代码行数:5,代码来源:Kafka09AvroTableSource.java


示例9: testFlinkSQL

import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09; //导入依赖的package包/类
public static void testFlinkSQL() {

        LOG.info("Only Unit Testing Function is enabled");
        String resultFile = "/home/vagrant/test.txt";

        try {

            String jarPath = DFInitService.class.getProtectionDomain().getCodeSource().getLocation().getPath();
            StreamExecutionEnvironment env = StreamExecutionEnvironment.createRemoteEnvironment("localhost", 6123, jarPath)
                    .setParallelism(1);
            String kafkaTopic = "finance";
            String kafkaTopic_stage = "df_trans_stage_finance";
            String kafkaTopic_out = "df_trans_out_finance";



            StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "localhost:9092");
            properties.setProperty("group.id", "consumer3");

            // Internal covert Json String to Json - Begin
            DataStream<String> stream = env
                    .addSource(new FlinkKafkaConsumer09<>(kafkaTopic, new SimpleStringSchema(), properties));

            stream.map(new MapFunction<String, String>() {
                @Override
                public String map(String jsonString) throws Exception {
                    return jsonString.replaceAll("\\\\", "").replace("\"{", "{").replace("}\"","}");
                }
            }).addSink(new FlinkKafkaProducer09<String>("localhost:9092", kafkaTopic_stage, new SimpleStringSchema()));
            // Internal covert Json String to Json - End

            String[] fieldNames =  new String[] {"name"};
            Class<?>[] fieldTypes = new Class<?>[] {String.class};

            Kafka09AvroTableSource kafkaTableSource = new Kafka09AvroTableSource(
                    kafkaTopic_stage,
                    properties,
                    fieldNames,
                    fieldTypes);

            //kafkaTableSource.setFailOnMissingField(true);

            tableEnv.registerTableSource("Orders", kafkaTableSource);

            //Table result = tableEnv.sql("SELECT STREAM name FROM Orders");
            Table result = tableEnv.sql("SELECT name FROM Orders");

            Files.deleteIfExists(Paths.get(resultFile));

            // create a TableSink
            TableSink sink = new CsvTableSink(resultFile, "|");
            // write the result Table to the TableSink
            result.writeToSink(sink);

            env.execute("FlinkConsumer");

        } catch (Exception e) {
            e.printStackTrace();
        }
    }
 
开发者ID:datafibers-community,项目名称:df_data_service,代码行数:63,代码来源:UnitTestSuiteFlink.java



注:本文中的org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer09类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java RandomLocationTransformer类代码示例发布时间:2022-05-22
下一篇:
Java IClasspathContainerPageExtension类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap