本文整理汇总了Java中org.apache.spark.streaming.kafka010.KafkaUtils类的典型用法代码示例。如果您正苦于以下问题:Java KafkaUtils类的具体用法?Java KafkaUtils怎么用?Java KafkaUtils使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
KafkaUtils类属于org.apache.spark.streaming.kafka010包,在下文中一共展示了KafkaUtils类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: providesKafkaInputStream
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
@Provides
JavaInputDStream<ConsumerRecord<String, RawRating>> providesKafkaInputStream(JavaStreamingContext streamingContext) {
Map<String, Object> kafkaParams = new HashedMap();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", JsonDeserializer.class);
kafkaParams.put("serializedClass", RawRating.class);
kafkaParams.put("group.id", "rating_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topicA", "topicB");
return KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, RawRating>Subscribe(topics, kafkaParams)
);
}
开发者ID:cosminseceleanu,项目名称:movie-recommender,代码行数:19,代码来源:SparkModule.java
示例2: main
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("data-in");
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaSpark");
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(5));
final JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
JavaPairDStream<String, Integer> countOfMessageKeys = stream
.map((ConsumerRecord<String, String> record) -> record.key())
.mapToPair((String s) -> new Tuple2<>(s, 1))
.reduceByKey((Integer i1, Integer i2)-> i1 + i2);
countOfMessageKeys.print();
// Start the computation
streamingContext.start();
streamingContext.awaitTermination();
}
开发者ID:ebi-wp,项目名称:kafka-streams-api-websockets,代码行数:33,代码来源:SparkConsume.java
示例3: main
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public static void main(String[] args) {
SparkConf sc = new SparkConf()
.setMaster("local[2]") // local mode with 2 threads
.setAppName("RealtimeSpeedCalculator");
JavaStreamingContext streamingContext = new JavaStreamingContext(sc, new Duration(60 * 1000L));
// Kafka configuration
Map<String, Object> kafkaParams = new HashMap();
kafkaParams.put("bootstrap.servers", "10.128.184.199:9121");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", 0);
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("topic-taxi");
JavaInputDStream<ConsumerRecord<String, String>> stream =
KafkaUtils.createDirectStream(
streamingContext,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
);
stream.map(record -> {
System.out.println("#############");
return record.value();
}).count();
// streamingContext.start();
}
开发者ID:wang1365,项目名称:spark-traffic,代码行数:32,代码来源:StreamingApplication.java
示例4: createDirectStream
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
/**
*
* @param <K>
* @param <V>
* @return
*/
public <K extends Object, V extends Object> JavaInputDStream<ConsumerRecord<K, V>> createDirectStream() {
JavaInputDStream<ConsumerRecord<K, V>> directKafkaStream
= KafkaUtils.
createDirectStream(jsc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.Subscribe(topics, kafkaParams));
return directKafkaStream;
}
开发者ID:hopshadoop,项目名称:hops-util,代码行数:14,代码来源:SparkConsumer.java
示例5: main
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public static void main(String[] args) {
//Window Specific property if Hadoop is not instaalled or HADOOP_HOME is not set
System.setProperty("hadoop.home.dir", "E:\\hadoop");
//Logger rootLogger = LogManager.getRootLogger();
//rootLogger.setLevel(Level.WARN);
SparkConf conf = new SparkConf().setAppName("KafkaExample").setMaster("local[*]");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaStreamingContext streamingContext = new JavaStreamingContext(sc, Durations.minutes(2));
streamingContext.checkpoint("E:\\hadoop\\checkpoint");
Logger rootLogger = LogManager.getRootLogger();
rootLogger.setLevel(Level.WARN);
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "10.0.75.1:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_strea");
kafkaParams.put("auto.offset.reset", "latest");
// kafkaParams.put("enable.auto.commit", false);
Collection<String> topics = Arrays.asList("mytopic", "anothertopic");
final JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(streamingContext,LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
JavaPairDStream<String, String> pairRDD = stream.mapToPair(record-> new Tuple2<>(record.key(), record.value()));
pairRDD.foreachRDD(pRDD-> { pRDD.foreach(tuple-> System.out.println(new Date()+" :: Kafka msg key ::"+tuple._1() +" the val is ::"+tuple._2()));});
JavaDStream<String> tweetRDD = pairRDD.map(x-> x._2()).map(new TweetText());
tweetRDD.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" :: "+x)));
JavaDStream<String> hashtagRDD = tweetRDD.flatMap(twt-> Arrays.stream(twt.split(" ")).filter(str-> str.contains("#")).collect(Collectors.toList()).iterator() );
hashtagRDD.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(x)));
JavaPairDStream<String, Long> cntByVal = hashtagRDD.countByValue();
cntByVal.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The count tag is ::"+x._1() +" and the val is ::"+x._2())));
/* hashtagRDD.window(Durations.seconds(60), Durations.seconds(30))
.countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
hashtagRDD.countByValueAndWindow(Durations.seconds(60), Durations.seconds(30))
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println("The window&count tag is ::"+x._1() +" and the val is ::"+x._2())));
*/
hashtagRDD.window(Durations.minutes(8)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
hashtagRDD.window(Durations.minutes(8),Durations.minutes(2)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
hashtagRDD.window(Durations.minutes(12),Durations.minutes(8)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
hashtagRDD.window(Durations.minutes(2),Durations.minutes(2)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
hashtagRDD.window(Durations.minutes(12),Durations.minutes(12)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));
/*hashtagRDD.window(Durations.minutes(5),Durations.minutes(2)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));*/
/* hashtagRDD.window(Durations.minutes(10),Durations.minutes(1)).countByValue()
.foreachRDD(tRDD -> tRDD.foreach(x->System.out.println(new Date()+" ::The window count tag is ::"+x._1() +" and the val is ::"+x._2())));*/
streamingContext.start();
try {
streamingContext.awaitTermination();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
开发者ID:PacktPublishing,项目名称:Apache-Spark-2x-for-Java-Developers,代码行数:72,代码来源:KafkaExample.java
示例6: createRDD
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public <K extends Object, V extends Object> JavaRDD<ConsumerRecord<K, V>> createRDD(
JavaSparkContext jsc,
OffsetRange[] osr, LocationStrategy ls) {
return KafkaUtils.createRDD(jsc, kafkaParams, osr, ls);
}
开发者ID:hopshadoop,项目名称:hops-util,代码行数:6,代码来源:SparkConsumer.java
示例7: initializeLogIfNecessary
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public void initializeLogIfNecessary(boolean init) {
KafkaUtils.initializeLogIfNecessary(init);
}
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java
示例8: isTraceEnabled
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public boolean isTraceEnabled() {
return KafkaUtils.isTraceEnabled();
}
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java
示例9: log
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public Logger log() {
return KafkaUtils.log();
}
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java
示例10: logName
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public String logName() {
return KafkaUtils.logName();
}
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java
示例11: logDebug
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public void logDebug(Function0<String> fun) {
KafkaUtils.logDebug(fun);
}
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java
示例12: logError
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public void logError(Function0<String> fun) {
KafkaUtils.logError(fun);
}
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java
示例13: logInfo
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public void logInfo(Function0<String> fun) {
KafkaUtils.logInfo(fun);
}
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java
示例14: logTrace
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public void logTrace(Function0<String> fun) {
KafkaUtils.logTrace(fun);
}
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java
示例15: logWarning
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
public void logWarning(Function0<String> fun) {
KafkaUtils.logWarning(fun);
}
开发者ID:hopshadoop,项目名称:hops-util,代码行数:4,代码来源:SparkConsumer.java
示例16: processRuleUpdate
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
private static void processRuleUpdate(JavaStreamingContext jssc, String brokers, Set<String> topicsSet,
final AnalyticsEngineManager engineManager) {
Map<String, Object> kafkaParams = new HashMap<String, Object>();
kafkaParams.put("metadata.broker.list", brokers);
kafkaParams.put("bootstrap.servers", brokers);
kafkaParams.put("spark.streaming.kafka.maxRatePerPartition", "100");
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("group.id", "MyAnalyticsEngineConsumerGroup1");
kafkaParams.put("enable.auto.commit", false);
kafkaParams.put("auto.offset.reset", "earliest");
System.out.println("Initiate kafka messages for rules....");
// Create direct kafka stream with brokers and topics
ConsumerStrategy<String, String> consumerStrategy = ConsumerStrategies.Subscribe(topicsSet, kafkaParams);
JavaInputDStream<ConsumerRecord<String, String>> streams = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(), consumerStrategy);
System.out.println("Waiting for kafka messages of rules....");
// Get the data
streams.foreachRDD(rdd -> {
rdd.collect().forEach(consumerRecord -> {
String key = consumerRecord.key();
long offset = consumerRecord.offset();
int partition = consumerRecord.partition();
String topic = consumerRecord.topic();
String value = consumerRecord.value();
System.out.println("consumerRecord:" + consumerRecord.toString());
System.out.println("[ruleupdate]key:" + key + ", value:" + value);
engineManager.getEngine().addRule(key, value);
});
OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
// some time later, after outputs have completed
((CanCommitOffsets) streams.inputDStream()).commitAsync(offsetRanges);
});
System.out.println("Prepare rule validation....");
}
开发者ID:osswangxining,项目名称:another-rule-based-analytics-on-spark,代码行数:43,代码来源:AnalyticsEngine.java
示例17: getDStream
import org.apache.spark.streaming.kafka010.KafkaUtils; //导入依赖的package包/类
@Override
public JavaDStream<?> getDStream() throws Exception {
Map<String, Object> kafkaParams = Maps.newHashMap();
String brokers = config.getString(BROKERS_CONFIG);
kafkaParams.put("bootstrap.servers", brokers);
topic = config.getString(TOPIC_CONFIG);
Set<String> topicSet = Sets.newHashSet(topic);
String encoding = config.getString(ENCODING_CONFIG);
if (encoding.equals("string")) {
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
}
else if (encoding.equals("bytearray")) {
kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
}
else {
throw new RuntimeException("Invalid Kafka input encoding type. Valid types are 'string' and 'bytearray'.");
}
if (config.hasPath(GROUP_ID_CONFIG)) {
groupID = config.getString(GROUP_ID_CONFIG);
}
else {
groupID = UUID.randomUUID().toString();
}
kafkaParams.put("group.id", groupID);
kafkaParams.put("enable.auto.commit", "false");
addCustomParams(kafkaParams);
JavaStreamingContext jssc = Contexts.getJavaStreamingContext();
JavaDStream<?> dStream = null;
if (encoding.equals("string")) {
if (doesRecordProgress() && hasLastOffsets()) {
dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topicSet, kafkaParams, getLastOffsets()));
}
else {
dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topicSet, kafkaParams));
}
}
else if (encoding.equals("bytearray")) {
if (doesRecordProgress() && hasLastOffsets()) {
dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<byte[], byte[]>Subscribe(topicSet, kafkaParams, getLastOffsets()));
}
else {
dStream = KafkaUtils.createDirectStream(jssc, LocationStrategies.PreferConsistent(),
ConsumerStrategies.<byte[], byte[]>Subscribe(topicSet, kafkaParams));
}
}
else {
throw new RuntimeException("Invalid Kafka input encoding type. Valid types are 'string' and 'bytearray'.");
}
if (config.hasPath(WINDOW_ENABLED_CONFIG) && config.getBoolean(WINDOW_ENABLED_CONFIG)) {
int windowDuration = config.getInt(WINDOW_MILLISECONDS_CONFIG);
dStream = dStream.window(new Duration(windowDuration));
}
return dStream;
}
开发者ID:cloudera-labs,项目名称:envelope,代码行数:71,代码来源:KafkaInput.java
注:本文中的org.apache.spark.streaming.kafka010.KafkaUtils类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论