• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java KafkaUtils类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.spark.streaming.kafka010.KafkaUtils的典型用法代码示例。如果您正苦于以下问题:Java KafkaUtils类的具体用法?Java KafkaUtils怎么用?Java KafkaUtils使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



KafkaUtils类属于org.apache.spark.streaming.kafka010包,在下文中一共展示了KafkaUtils类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: providesKafkaInputStream

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
@Provides
JavaInputDStream<ConsumerRecord<String, RawRating>> providesKafkaInputStream(JavaStreamingContext streamingContext) {
    Map<String, Object> kafkaParams = new HashedMap();
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", JsonDeserializer.class);
    kafkaParams.put("serializedClass", RawRating.class);
    kafkaParams.put("group.id", "rating_stream");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);
    Collection<String> topics = Arrays.asList("topicA", "topicB");

    return KafkaUtils.createDirectStream(
            streamingContext,
            LocationStrategies.PreferConsistent(),
            ConsumerStrategies.<String, RawRating>Subscribe(topics, kafkaParams)
    );
}
 
开发者ID:cosminseceleanu,项目名称:movie-recommender,代码行数:19,代码来源:SparkModule.java


示例2: main

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
    Map<String, Object> kafkaParams = new HashMap<>();
    kafkaParams.put("bootstrap.servers", "localhost:9092");
    kafkaParams.put("key.deserializer", StringDeserializer.class);
    kafkaParams.put("value.deserializer", StringDeserializer.class);
    kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
    kafkaParams.put("auto.offset.reset", "latest");
    kafkaParams.put("enable.auto.commit", false);

    Collection<String> topics = Arrays.asList("data-in");

    SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaSpark");
    JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(5));

    final JavaInputDStream<ConsumerRecord<String, String>> stream =
            KafkaUtils.createDirectStream(
                    streamingContext,
                    LocationStrategies.PreferConsistent(),
                    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
            );

    JavaPairDStream<String, Integer>  countOfMessageKeys = stream
            .map((ConsumerRecord<String, String> record) -> record.key())
            .mapToPair((String s) -> new Tuple2<>(s, 1))
            .reduceByKey((Integer i1, Integer i2)-> i1 + i2);

    countOfMessageKeys.print();

    // Start the computation
    streamingContext.start();
    streamingContext.awaitTermination();
}
 
开发者ID:ebi-wp,项目名称:kafka-streams-api-websockets,代码行数:33,代码来源:SparkConsume.java


示例3: main

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public static void main(String[] args) {
        SparkConf sc = new SparkConf()
                .setMaster("local[2]") // local mode with 2 threads
                .setAppName("RealtimeSpeedCalculator");

        JavaStreamingContext streamingContext = new JavaStreamingContext(sc, new Duration(60 * 1000L));

        // Kafka configuration
        Map<String, Object> kafkaParams = new HashMap();
        kafkaParams.put("bootstrap.servers", "10.128.184.199:9121");
        kafkaParams.put("key.deserializer", StringDeserializer.class);
        kafkaParams.put("value.deserializer", StringDeserializer.class);
        kafkaParams.put("group.id", 0);
        kafkaParams.put("auto.offset.reset", "latest");
        kafkaParams.put("enable.auto.commit", false);

        Collection<String> topics = Arrays.asList("topic-taxi");
        JavaInputDStream<ConsumerRecord<String, String>> stream =
                KafkaUtils.createDirectStream(
                        streamingContext,
                        LocationStrategies.PreferConsistent(),
                        ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
                );

        stream.map(record -> {
            System.out.println("#############");
            return record.value();
        }).count();

//        streamingContext.start();
    }
 
开发者ID:wang1365,项目名称:spark-traffic,代码行数:32,代码来源:StreamingApplication.java


示例4: createDirectStream

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
/**
 *
 * @param <K>
 * @param <V>
 * @return
 */
public <K extends Object, V extends Object> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream() {
  JavaInputDStream<ConsumerRecord<K, V>> directKafkaStream
      = KafkaUtils.
          createDirectStream(jsc, LocationStrategies.PreferConsistent(),
              ConsumerStrategies.Subscribe(topics, kafkaParams));
  return directKafkaStream;
}
 
开发者ID:hopshadoop,项目名称:hops-util,代码行数:14,代码来源:SparkConsumer.java


示例5: main

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public static void main(String[] args) {
  	//Window Specific property if Hadoop is not instaalled or HADOOP_HOME is not set
 System.setProperty("hadoop.home.dir", "E:\\hadoop");
  	//Logger rootLogger = LogManager.getRootLogger();
 		//rootLogger.setLevel(Level.WARN); 
      SparkConf conf = new SparkConf().setAppName("KafkaExample").setMaster("local[*]");    
      JavaSparkContext sc = new JavaSparkContext(conf);
      JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.minutes(2));
      streamingContext.checkpoint("E:\\hadoop\\checkpoint");
      Logger rootLogger = LogManager.getRootLogger();
 		rootLogger.setLevel(Level.WARN); 
      Map<String, Object> kafkaParams = new HashMap<>();
      kafkaParams.put("bootstrap.servers", "10.0.75.1:9092");
      kafkaParams.put("key.deserializer", StringDeserializer.class);
      kafkaParams.put("value.deserializer", StringDeserializer.class);
      kafkaParams.put("group.id", "use_a_separate_group_id_for_each_strea");
      kafkaParams.put("auto.offset.reset", "latest");
     // kafkaParams.put("enable.auto.commit", false);

      Collection<String> topics = Arrays.asList("mytopic", "anothertopic");

      final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),
      				ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));

      JavaPairDStream<String, String> pairRDD = stream.mapToPair(record-> new Tuple2<>(record.key(), record.value()));
     
      pairRDD.foreachRDD(pRDD-> { pRDD.foreach(tuple-> System.out.println(new Date()+" :: Kafka msg key ::"+tuple._1() +" the val is ::"+tuple._2()));});
     
      JavaDStream<String> tweetRDD = pairRDD.map(x-> x._2()).map(new TweetText());
      
      tweetRDD.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" :: "+x)));
      
     JavaDStream<String> hashtagRDD = tweetRDD.flatMap(twt-> Arrays.stream(twt.split(" ")).filter(str-> str.contains("#")).collect(Collectors.toList()).iterator() );
 
      hashtagRDD.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(x)));
      
      JavaPairDStream<String, Long> cntByVal = hashtagRDD.countByValue();
      
      cntByVal.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The count tag is ::"+x._1() +" and the val is ::"+x._2())));
      
     /* hashtagRDD.window(Durations.seconds(60), Durations.seconds(30))
                .countByValue()
               .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
      
     hashtagRDD.countByValueAndWindow(Durations.seconds(60), Durations.seconds(30))
               .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println("The window&count tag is ::"+x._1() +" and the val is ::"+x._2())));
      */
     hashtagRDD.window(Durations.minutes(8)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
     hashtagRDD.window(Durations.minutes(8),Durations.minutes(2)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
     hashtagRDD.window(Durations.minutes(12),Durations.minutes(8)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
     hashtagRDD.window(Durations.minutes(2),Durations.minutes(2)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
     hashtagRDD.window(Durations.minutes(12),Durations.minutes(12)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
     
     /*hashtagRDD.window(Durations.minutes(5),Durations.minutes(2)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));*/
     /* hashtagRDD.window(Durations.minutes(10),Durations.minutes(1)).countByValue()
     .foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));*/
     
      streamingContext.start();
      try {
	streamingContext.awaitTermination();
} catch (InterruptedException e) {
	// TODO Auto-generated catch block
	e.printStackTrace();
}
  }
 
开发者ID:PacktPublishing,项目名称:Apache-Spark-2x-for-Java-Developers,代码行数:72,代码来源:KafkaExample.java


示例6: createRDD

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public <K extends Object, V extends Object> JavaRDD<ConsumerRecord<K, V>> createRDD(
    JavaSparkContext jsc,
    OffsetRange[] osr, LocationStrategy ls) {
  return KafkaUtils.createRDD(jsc, kafkaParams, osr, ls);
}
 
开发者ID:hopshadoop,项目名称:hops-util,代码行数:6,代码来源:SparkConsumer.java


示例7: initializeLogIfNecessary

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public void initializeLogIfNecessary(boolean init) {
  KafkaUtils.initializeLogIfNecessary(init);
}
 
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java


示例8: isTraceEnabled

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public boolean isTraceEnabled() {
  return KafkaUtils.isTraceEnabled();
}
 
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java


示例9: log

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public Logger log() {
  return KafkaUtils.log();
}
 
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java


示例10: logName

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public String logName() {
  return KafkaUtils.logName();
}
 
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java


示例11: logDebug

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public void logDebug(Function0<String> fun) {
  KafkaUtils.logDebug(fun);
}
 
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java


示例12: logError

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public void logError(Function0<String> fun) {
  KafkaUtils.logError(fun);
}
 
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java


示例13: logInfo

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public void logInfo(Function0<String> fun) {
  KafkaUtils.logInfo(fun);
}
 
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java


示例14: logTrace

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public void logTrace(Function0<String> fun) {
  KafkaUtils.logTrace(fun);
}
 
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java


示例15: logWarning

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public void logWarning(Function0<String> fun) {
  KafkaUtils.logWarning(fun);
}
 
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java


示例16: processRuleUpdate

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
private static void processRuleUpdate(JavaStreamingContext jssc, String brokers, Set<String> topicsSet,
		final AnalyticsEngineManager engineManager) {
	Map<String, Object> kafkaParams = new HashMap<String, Object>();
	kafkaParams.put("metadata.broker.list", brokers);
	kafkaParams.put("bootstrap.servers", brokers);
	kafkaParams.put("spark.streaming.kafka.maxRatePerPartition", "100");
	kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
	kafkaParams.put("group.id", "MyAnalyticsEngineConsumerGroup1");
	kafkaParams.put("enable.auto.commit", false);
	kafkaParams.put("auto.offset.reset", "earliest");

	System.out.println("Initiate kafka messages for rules....");
	// Create direct kafka stream with brokers and topics
	ConsumerStrategy<String, String> consumerStrategy = ConsumerStrategies.Subscribe(topicsSet, kafkaParams);
	JavaInputDStream<ConsumerRecord<String, String>> streams = KafkaUtils.createDirectStream(jssc,
			LocationStrategies.PreferConsistent(), consumerStrategy);

	System.out.println("Waiting for kafka messages of rules....");

	// Get the data
	streams.foreachRDD(rdd -> {
		rdd.collect().forEach(consumerRecord -> {
			String key = consumerRecord.key();
			long offset = consumerRecord.offset();
			int partition = consumerRecord.partition();
			String topic = consumerRecord.topic();
			String value = consumerRecord.value();
			System.out.println("consumerRecord:" + consumerRecord.toString());
			System.out.println("[ruleupdate]key:" + key + ", value:" + value);

			engineManager.getEngine().addRule(key, value);
		});

		OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
		// some time later, after outputs have completed
		((CanCommitOffsets) streams.inputDStream()).commitAsync(offsetRanges);
	});

	System.out.println("Prepare rule validation....");

}
 
开发者ID:osswangxining,项目名称:another-rule-based-analytics-on-spark,代码行数:43,代码来源:AnalyticsEngine.java


示例17: getDStream

import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
@Override
public JavaDStream<?> getDStream() throws Exception {
  Map<String, Object> kafkaParams = Maps.newHashMap();

  String brokers = config.getString(BROKERS_CONFIG);
  kafkaParams.put("bootstrap.servers", brokers);

  topic = config.getString(TOPIC_CONFIG);
  Set<String> topicSet = Sets.newHashSet(topic);

  String encoding = config.getString(ENCODING_CONFIG);
  if (encoding.equals("string")) {
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  }
  else if (encoding.equals("bytearray")) {
    kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
    kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
  }
  else {
    throw new RuntimeException("Invalid Kafka input encoding type. Valid types are 'string' and 'bytearray'.");
  }
  
  if (config.hasPath(GROUP_ID_CONFIG)) {
    groupID = config.getString(GROUP_ID_CONFIG);
  }
  else {
    groupID = UUID.randomUUID().toString();
  }
  kafkaParams.put("group.id", groupID);
  
  kafkaParams.put("enable.auto.commit", "false");

  addCustomParams(kafkaParams);

  JavaStreamingContext jssc = Contexts.getJavaStreamingContext();
  JavaDStream<?> dStream = null;

  if (encoding.equals("string")) {
    if (doesRecordProgress() && hasLastOffsets()) {
      dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
          ConsumerStrategies.<String, String>Subscribe(topicSet, kafkaParams, getLastOffsets()));
    }
    else {
      dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
          ConsumerStrategies.<String, String>Subscribe(topicSet, kafkaParams));
    }
  }
  else if (encoding.equals("bytearray")) {
    if (doesRecordProgress() && hasLastOffsets()) {      
      dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
          ConsumerStrategies.<byte[], byte[]>Subscribe(topicSet, kafkaParams, getLastOffsets()));
    }
    else {
      dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
          ConsumerStrategies.<byte[], byte[]>Subscribe(topicSet, kafkaParams));
    }
  }
  else {
    throw new RuntimeException("Invalid Kafka input encoding type. Valid types are 'string' and 'bytearray'.");
  }

  if (config.hasPath(WINDOW_ENABLED_CONFIG) && config.getBoolean(WINDOW_ENABLED_CONFIG)) {
    int windowDuration = config.getInt(WINDOW_MILLISECONDS_CONFIG);

    dStream = dStream.window(new Duration(windowDuration));
  }

  return dStream;
}
 
开发者ID:cloudera-labs,项目名称:envelope,代码行数:71,代码来源:KafkaInput.java



注:本文中的org.apache.spark.streaming.kafka010.KafkaUtils类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java SWTBotTable类代码示例发布时间:2022-05-22
下一篇:
Java ObjectMapper类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap