本文整理汇总了Java中org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream类的典型用法代码示例。如果您正苦于以下问题:Java JavaPairReceiverInputDStream类的具体用法?Java JavaPairReceiverInputDStream怎么用?Java JavaPairReceiverInputDStream使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
JavaPairReceiverInputDStream类属于org.apache.spark.streaming.api.java包,在下文中一共展示了JavaPairReceiverInputDStream类的7个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; //导入依赖的package包/类
public static void main(String[] args) {
if (args.length < 4) {
System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
System.exit(1);
}
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount");
// Create the context with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = args[2].split(",");
for (String topic : topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages = KafkaUtils.createStream(jssc, args[0], args[1],
topicMap);
JavaDStream<String> lines = messages.map(tuple2 -> tuple2._2());
JavaDStream<String> words = lines.flatMap(x -> Lists.newArrayList(SPACE.split(x)));
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(s -> new Tuple2<String, Integer>(s, 1)).reduceByKey(
(i1, i2) -> i1 + i2);
wordCounts.print();
jssc.start();
jssc.awaitTermination();
}
开发者ID:ogidogi,项目名称:laughing-octo-sansa,代码行数:26,代码来源:TestSparkKafkaReceiverApproach.java
示例2: main
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; //导入依赖的package包/类
public static void main(String[] args) throws InterruptedException {
SparkConf sc = new SparkConf().setAppName("POC-Kafka");
try(JavaStreamingContext jsc = new JavaStreamingContext(sc, new Duration(2000))) {
JavaPairReceiverInputDStream<String, String> stream = KafkaUtils.createStream(
jsc, ZK_HOST_PORT, "a_group_id", Collections.singletonMap(EXAMPLE_TOPIC, 1));
JavaDStream<ExampleXML> records = stream.map(t -> t._2()).map(new ParseXML());
records.foreachRDD(rdd -> System.out.printf("Amount of XMLs: %d\n", rdd.count()));
jsc.start();
jsc.awaitTermination();
}
}
开发者ID:ciandt-dev,项目名称:gcp,代码行数:16,代码来源:Spark3Kafka.java
示例3: main
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; //导入依赖的package包/类
public static void main(String[] args) {
Logger.getLogger("org").setLevel(Level.WARN);
Logger.getLogger("akka").setLevel(Level.WARN);
SparkConf sparkConf = new SparkConf().setMaster("spark://10.204.100.206:7077").setAppName("StreamingKafka101");
sparkConf.setJars(new String[] { "target\\TestProjects-1.0-SNAPSHOT.jar" });
//sparkConf.setExecutorEnv("executor-memory", "8G");
//sparkConf.setExecutorEnv("spark.executor.memory", "8G");
sparkConf.set("spark.executor.memory", "4G");
//sparkConf.set("executor-memory", "8G");
int duration = 2;
if(args.length > 0){
try{
duration = Integer.parseInt(args[0]);
System.out.println("duration changed to " + duration);
}catch(Exception e){
System.out.println("Duration reset to defaults");
}
}
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(duration));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put("loadtest", 4);
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(ssc,"10.204.100.172:2182","kafka-group1",topicMap);
JavaDStream<String> lines = kafkaStream.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
lines.foreachRDD(new Function<JavaRDD<String>, Void>() {
@Override
public Void call(JavaRDD<String> rdd) throws Exception {
System.out.println(new Date() + " Total records read: " + rdd.count() );
return null;
}
});
ssc.start();
ssc.awaitTermination();
}
开发者ID:atulsm,项目名称:Test_Projects,代码行数:48,代码来源:StreamingKafka101.java
示例4: main
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; //导入依赖的package包/类
@SuppressWarnings("serial")
public static void main(String[] args) throws InterruptedException {
// if (args.length < 4) {
// System.err.println("Usage: JavaKafkaWordCount <zkQuorum> <group> <topics> <numThreads>");
// System.exit(1);
// }
args = new String[4];
args[0]="localhost:2181";
args[1]= "1";
args[2]= "test";
args[3]= "1";
SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount").setMaster("spark://Impetus-NL163U:7077");
// Create the context with a 1 second batch size
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(20000));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(jssc, args[0], args[1], topicMap);
final JavaDStream<String> lines = messages.map(new Function<Tuple2<String,String>, String>() {
@Override
public String call(Tuple2<String, String> v1) throws Exception {
ObjectMapper objectMapper = new ObjectMapper();
objectMapper.configure(Feature.USE_ANNOTATIONS, false);
Map<String,String> mapValue = objectMapper.readValue(v1._2(), new TypeReference<Map<String,String>>() {
});
Collection<String> values = mapValue.values();
String finalString = "";
for (Iterator<String> iterator = values.iterator(); iterator.hasNext();) {
String value = iterator.next();
if(finalString.length()==0){
finalString = finalString +value;
}else {
finalString = finalString+","+ value;
}
}
return finalString;
}
});
lines.print();
new Thread(){
public void run() {
while(true){
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("#############################################################################"+lines.count());
}
};
}.start();
jssc.start();
jssc.awaitTermination();
}
开发者ID:PacktPublishing,项目名称:Practical-Real-time-Processing-and-Analytics,代码行数:66,代码来源:JavaKafkaWordCount.java
示例5: main
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
String zkQuorum = "localhost:2181";
String groupName = "stream";
int numThreads = 3;
String topicsName = "test1";
SparkConf sparkConf = new SparkConf().setAppName("WordCountKafkaStream");
JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, new Duration(5000));
Map<String, Integer> topicToBeUsedBySpark = new HashMap<>();
String[] topics = topicsName.split(",");
for (String topic : topics) {
topicToBeUsedBySpark.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> streamMessages =
KafkaUtils.createStream(javaStreamingContext, zkQuorum, groupName, topicToBeUsedBySpark);
JavaDStream<String> lines = streamMessages.map(new Function<Tuple2<String, String>, String>() {
@Override
public String call(Tuple2<String, String> tuple2) {
return tuple2._2();
}
});
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Arrays.asList(WORD_DELIMETER.split(x)).iterator();
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
wordCounts.print();
javaStreamingContext.start();
javaStreamingContext.awaitTermination();
}
开发者ID:PacktPublishing,项目名称:Building-Data-Streaming-Applications-with-Apache-Kafka,代码行数:50,代码来源:KafkaReceiverWordCountJava.java
示例6: main
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; //导入依赖的package包/类
public static void main(String[] args) {
if (args.length < 4) {
System.err.println("Usage: StreamingAvg <zkQuorum> <group> <topics> <numThreads>");
System.exit(1);
}
//Configure the Streaming Context
SparkConf sparkConf = new SparkConf().setAppName("StreamingAvg");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new Duration(10000));
int numThreads = Integer.parseInt(args[3]);
Map<String, Integer> topicMap = new HashMap<String, Integer>();
String[] topics = args[2].split(",");
for (String topic: topics) {
topicMap.put(topic, numThreads);
}
JavaPairReceiverInputDStream<String, String> messages =
KafkaUtils.createStream(ssc, args[0], args[1], topicMap);
System.out.println("Got my DStream! connecting to zookeeper "+ args[0] + " group " + args[1] + " topics" +
topicMap);
JavaPairDStream<Integer,Integer> nums = messages.mapToPair(new PairFunction<Tuple2<String,String>, Integer, Integer>()
{
@Override
public Tuple2<Integer,Integer> call(Tuple2<String, String> tuple2) {
return new Tuple2<Integer,Integer>(1,Integer.parseInt(tuple2._2()));
}
});
JavaDStream<Tuple2<Integer,Integer>> countAndSum = nums.reduce(new Function2<Tuple2<Integer,Integer>, Tuple2<Integer,Integer>, Tuple2<Integer,Integer>>() {
@Override
public Tuple2<Integer, Integer> call(Tuple2<Integer, Integer> a, Tuple2<Integer, Integer> b) {
return new Tuple2<Integer, Integer>(a._1() + b._1(), a._2() + b._2());
}
});
countAndSum.foreachRDD(new Function<JavaRDD<Tuple2<Integer, Integer>>, Void>() {
@Override
public Void call(JavaRDD<Tuple2<Integer, Integer>> tuple2JavaRDD) throws Exception {
if (tuple2JavaRDD.count() > 0) {
System.out.println("Current avg: " + tuple2JavaRDD.first()._2() / tuple2JavaRDD.first()._1());
} else {
System.out.println("Got no data in this window");
}
return null;
}
});
ssc.start();
ssc.awaitTermination();
}
开发者ID:gwenshap,项目名称:kafka-examples,代码行数:59,代码来源:StreamingAvg.java
示例7: handleRequest
import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; //导入依赖的package包/类
@Override
protected void handleRequest(RestRequest request, RestChannel channel, Client client)
throws Exception {
final String topic = request.param("topic", "");
final boolean schema = request.paramAsBoolean("schema", false);
final String master = request.param("masterAddress", "local");
final String hdfs = request.param("hdfs", "hdfs://localhost:50070");
final String memory = request.param("memory", "2g");
final String appName = request.param("appName", "appName-"+topic);
final int duration = request.paramAsInt("duration", 1000);
Thread exec = new Thread(new Runnable(){
@Override
public void run() {
SparkConf sparkConf = new SparkConf().setAppName(appName).setMaster(master).set("spark.executor.memory", memory);
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(duration));
Map<String, Integer> topicMap = new HashMap<String, Integer>();
topicMap.put(topic, 3);
JavaPairReceiverInputDStream<String, byte[]> kafkaStream = KafkaUtils.createStream(jssc, String.class, byte[].class,
kafka.serializer.DefaultDecoder.class, kafka.serializer.DefaultDecoder.class, null,
topicMap, StorageLevel.MEMORY_ONLY());
//JobConf confHadoop = new JobConf();
//confHadoop.set("mapred.output.compress", "true");
//confHadoop.set("mapred.output.compression.codec", "com.hadoop.compression.lzo.LzopCodec");
kafkaStream.saveAsHadoopFiles(hdfs, "seq", Text.class, BytesWritable.class, KafkaStreamSeqOutputFormat.class);
topicContextMap.put(topic, jssc);
jssc.start();
jssc.awaitTermination();
}
});
exec.start();
channel.sendResponse(new BytesRestResponse(RestStatus.OK, String.format("{\"topic\":\"%s\"}", topic)));
}
开发者ID:huangchen007,项目名称:elasticsearch-rest-command,代码行数:46,代码来源:KafkaStreamRestHandler.java
注:本文中的org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论