本文整理汇总了Java中org.apache.spark.streaming.Time类的典型用法代码示例。如果您正苦于以下问题:Java Time类的具体用法?Java Time怎么用?Java Time使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Time类属于org.apache.spark.streaming包,在下文中一共展示了Time类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: call
import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
public void call(JavaPairRDD<K,M> rdd, Time time) throws IOException {
if (rdd.isEmpty()) {
log.info("RDD was empty, not saving to HDFS");
} else {
String file = prefix + "-" + time.milliseconds() + "." + suffix;
Path path = new Path(file);
FileSystem fs = FileSystem.get(path.toUri(), hadoopConf);
if (fs.exists(path)) {
log.warn("Saved data already existed, possibly from a failed job. Deleting {}", path);
fs.delete(path, true);
}
log.info("Saving RDD to HDFS at {}", file);
rdd.mapToPair(
new ValueToWritableFunction<>(keyClass, messageClass, keyWritableClass, messageWritableClass)
).saveAsNewAPIHadoopFile(
file,
keyWritableClass,
messageWritableClass,
SequenceFileOutputFormat.class,
hadoopConf);
}
}
开发者ID:oncewang,项目名称:oryx2,代码行数:24,代码来源:SaveToHDFSFunction.java
示例2: getFunction
import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
protected Function2<JavaPairRDD<String, RowMutation>, Time, Void> getFunction() {
return new Function2<JavaPairRDD<String, RowMutation>, Time, Void>() {
// Blur Thrift Client
@Override
public Void call(JavaPairRDD<String, RowMutation> rdd, Time time) throws Exception {
Iface client = getBlurClient();
for (Tuple2<String, RowMutation> tuple : rdd.collect()) {
if (tuple != null) {
try {
RowMutation rm = tuple._2;
// Index using enqueue mutate call
client.enqueueMutate(rm);
} catch (Exception ex) {
LOG.error("Unknown error while trying to call enqueueMutate.", ex);
throw ex;
}
}
}
return null;
}
};
}
开发者ID:apache,项目名称:incubator-blur,代码行数:24,代码来源:BlurBulkLoadSparkProcessor.java
示例3: compute
import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
public scala.Option<RDD<WindowedValue<T>>> compute(final Time validTime) {
final long batchTime = validTime.milliseconds();
LOG.trace("BEFORE waiting for watermark sync, "
+ "LastWatermarkedBatchTime: {}, current batch time: {}",
GlobalWatermarkHolder.getLastWatermarkedBatchTime(),
batchTime);
final Stopwatch stopwatch = Stopwatch.createStarted();
awaitWatermarkSyncWith(batchTime);
stopwatch.stop();
LOG.info("Waited {} millis for watermarks to sync up with the current batch ({})",
stopwatch.elapsed(TimeUnit.MILLISECONDS),
batchTime);
LOG.info("Watermarks are now: {}", GlobalWatermarkHolder.get(batchDuration));
LOG.trace("AFTER waiting for watermark sync, "
+ "LastWatermarkedBatchTime: {}, current batch time: {}",
GlobalWatermarkHolder.getLastWatermarkedBatchTime(),
batchTime);
final RDD<WindowedValue<T>> rdd = generateRdd();
isFirst = false;
return scala.Option.apply(rdd);
}
开发者ID:apache,项目名称:beam,代码行数:31,代码来源:WatermarkSyncedDStream.java
示例4: compute
import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
public scala.Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time validTime) {
RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd =
new SourceRDD.Unbounded<>(
ssc().sparkContext(),
options,
createMicrobatchSource(),
numPartitions);
return scala.Option.apply(rdd);
}
开发者ID:apache,项目名称:beam,代码行数:11,代码来源:SourceDStream.java
示例5: report
import org.apache.spark.streaming.Time; //导入依赖的package包/类
private void report(Time batchTime, long count, SparkWatermarks sparkWatermark) {
// metadata - #records read and a description.
scala.collection.immutable.Map<String, Object> metadata =
new scala.collection.immutable.Map.Map1<String, Object>(
StreamInputInfo.METADATA_KEY_DESCRIPTION(),
String.format(
"Read %d records with observed watermarks %s, from %s for batch time: %s",
count,
sparkWatermark == null ? "N/A" : sparkWatermark,
sourceName,
batchTime));
StreamInputInfo streamInputInfo = new StreamInputInfo(inputDStreamId, count, metadata);
ssc().scheduler().inputInfoTracker().reportInfo(batchTime, streamInputInfo);
}
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:SparkUnboundedSource.java
示例6: call
import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
public Void call(JavaRDD<T> tJavaRDD, Time time) throws Exception {
List<T> list = tJavaRDD.take(10);
// scalastyle:off println
logger.warn("-------------------------------------------");
logger.warn("Time: " + time);
logger.warn("-------------------------------------------");
for (T t : list) {
logger.warn(t.toString());
}
logger.warn("\n");
return null;
}
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:15,代码来源:PrintFunctionImpl.java
示例7: save
import org.apache.spark.streaming.Time; //导入依赖的package包/类
/**
* Save all RDDs in the given DStream to the given view.
* @param dstream
* @param view
*/
public static <T> void save(JavaDStream<T> dstream, final View<T> view) {
final String uri = view.getUri().toString();
dstream.foreachRDD(new Function2<JavaRDD<T>, Time, Void>() {
@Override
public Void call(JavaRDD<T> rdd, Time time) throws Exception {
save(rdd, uri);
return null;
}
});
}
开发者ID:rbrush,项目名称:kite-apps,代码行数:20,代码来源:SparkDatasets.java
示例8: call
import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
public void call(JavaPairRDD<K,M> newData, Time timestamp)
throws IOException, InterruptedException {
if (newData.isEmpty()) {
log.info("No data in current generation's RDD; nothing to do");
return;
}
log.info("Beginning update at {}", timestamp);
Configuration hadoopConf = sparkContext.hadoopConfiguration();
if (hadoopConf.getResource("core-site.xml") == null) {
log.warn("Hadoop config like core-site.xml was not found; " +
"is the Hadoop config directory on the classpath?");
}
JavaPairRDD<K,M> pastData;
Path inputPathPattern = new Path(dataDirString + "/*/part-*");
FileSystem fs = FileSystem.get(inputPathPattern.toUri(), hadoopConf);
FileStatus[] inputPathStatuses = fs.globStatus(inputPathPattern);
if (inputPathStatuses == null || inputPathStatuses.length == 0) {
log.info("No past data at path(s) {}", inputPathPattern);
pastData = null;
} else {
log.info("Found past data at path(s) like {}", inputPathStatuses[0].getPath());
Configuration updatedConf = new Configuration(hadoopConf);
updatedConf.set(FileInputFormat.INPUT_DIR, joinFSPaths(fs, inputPathStatuses));
@SuppressWarnings("unchecked")
JavaPairRDD<Writable,Writable> pastWritableData = (JavaPairRDD<Writable,Writable>)
sparkContext.newAPIHadoopRDD(updatedConf,
SequenceFileInputFormat.class,
keyWritableClass,
messageWritableClass);
pastData = pastWritableData.mapToPair(
new WritableToValueFunction<>(keyClass,
messageClass,
keyWritableClass,
messageWritableClass));
}
if (updateTopic == null || updateBroker == null) {
log.info("Not producing updates to update topic since none was configured");
updateInstance.runUpdate(sparkContext,
timestamp.milliseconds(),
newData,
pastData,
modelDirString,
null);
} else {
// This TopicProducer should not be async; sends one big model generally and
// needs to occur before other updates reliably rather than be buffered
try (TopicProducer<String,U> producer =
new TopicProducerImpl<>(updateBroker, updateTopic, false)) {
updateInstance.runUpdate(sparkContext,
timestamp.milliseconds(),
newData,
pastData,
modelDirString,
producer);
}
}
}
开发者ID:oncewang,项目名称:oryx2,代码行数:69,代码来源:BatchUpdateFunction.java
示例9: main
import org.apache.spark.streaming.Time; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
Logger.getLogger("org").setLevel(Level.WARN);
Logger.getLogger("akka").setLevel(Level.WARN);
final Pattern SPACE = Pattern.compile(" ");
SparkConf conf = new SparkConf().setAppName("Big Apple").setMaster("local[2]");
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));
JavaDStream<String> lines = ssc.textFileStream("src/main/resources/stream");
lines.print();
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String x) {
return Lists.newArrayList(SPACE.split(x)).iterator();
}
});
words.foreachRDD(
new VoidFunction2<JavaRDD<String>, Time>() {
@Override
public void call(JavaRDD<String> rdd, Time time) {
// Get the singleton instance of SQLContext
SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());
// Convert RDD[String] to RDD[case class] to Dataset
JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, JavaRecord>() {
public JavaRecord call(String word) {
JavaRecord record = new JavaRecord();
record.setWord(word);
return record;
}
});
Dataset<Row> wordsDataset = sqlContext.createDataFrame(rowRDD, JavaRecord.class);
// Register as table
wordsDataset.registerTempTable("words");
// Do word count on table using SQL and print it
Dataset wordCountsDataset =
sqlContext.sql("select word, count(*) as total from words group by word");
wordCountsDataset.show();
}
}
);
ssc.start();
ssc.awaitTermination();
}
开发者ID:knoldus,项目名称:Sparkathon,代码行数:54,代码来源:SQLonStreams.java
示例10: buildPairDStream
import org.apache.spark.streaming.Time; //导入依赖的package包/类
private static <K, InputT> PairDStreamFunctions<ByteArray, byte[]> buildPairDStream(
final JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream,
final Coder<K> keyCoder,
final Coder<WindowedValue<InputT>> wvCoder) {
// we have to switch to Scala API to avoid Optional in the Java API, see: SPARK-4819.
// we also have a broader API for Scala (access to the actual key and entire iterator).
// we use coders to convert objects in the PCollection to byte arrays, so they
// can be transferred over the network for the shuffle and be in serialized form
// for checkpointing.
// for readability, we add comments with actual type next to byte[].
// to shorten line length, we use:
//---- WV: WindowedValue
//---- Iterable: Itr
//---- AccumT: A
//---- InputT: I
final DStream<Tuple2<ByteArray, byte[]>> tupleDStream =
inputDStream
.transformToPair(
new Function2<
JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>>, Time,
JavaPairRDD<ByteArray, byte[]>>() {
// we use mapPartitions with the RDD API because its the only available API
// that allows to preserve partitioning.
@Override
public JavaPairRDD<ByteArray, byte[]> call(
final JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> rdd,
final Time time)
throws Exception {
return rdd.mapPartitions(
TranslationUtils.functionToFlatMapFunction(
WindowingHelpers
.<KV<K, Iterable<WindowedValue<InputT>>>>unwindowFunction()),
true)
.mapPartitionsToPair(
TranslationUtils
.<K, Iterable<WindowedValue<InputT>>>toPairFlatMapFunction(),
true)
.mapValues(
new Function<
Iterable<WindowedValue<InputT>>,
KV<Long, Iterable<WindowedValue<InputT>>>>() {
@Override
public KV<Long, Iterable<WindowedValue<InputT>>> call(
final Iterable<WindowedValue<InputT>> values) throws Exception {
// add the batch timestamp for visibility (e.g., debugging)
return KV.of(time.milliseconds(), values);
}
})
// move to bytes representation and use coders for deserialization
// because of checkpointing.
.mapPartitionsToPair(
TranslationUtils.pairFunctionToPairFlatMapFunction(
CoderHelpers.toByteFunction(
keyCoder,
KvCoder.of(VarLongCoder.of(), IterableCoder.of(wvCoder)))),
true);
}
})
.dstream();
return DStream.toPairDStreamFunctions(
tupleDStream,
JavaSparkContext$.MODULE$.<ByteArray>fakeClassTag(),
JavaSparkContext$.MODULE$.<byte[]>fakeClassTag(),
null);
}
开发者ID:apache,项目名称:beam,代码行数:70,代码来源:SparkGroupAlsoByWindowViaWindowSet.java
示例11: compute
import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
public scala.Option<RDD<BoxedUnit>> compute(Time validTime) {
// compute parent.
scala.Option<RDD<Metadata>> parentRDDOpt = parent.getOrCompute(validTime);
final Accumulator<MetricsContainerStepMap> metricsAccum = MetricsAccumulator.getInstance();
long count = 0;
SparkWatermarks sparkWatermark = null;
Instant globalLowWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
Instant globalHighWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
long maxReadDuration = 0;
if (parentRDDOpt.isDefined()) {
JavaRDD<Metadata> parentRDD = parentRDDOpt.get().toJavaRDD();
for (Metadata metadata: parentRDD.collect()) {
count += metadata.getNumRecords();
// compute the global input watermark - advance to latest of all partitions.
Instant partitionLowWatermark = metadata.getLowWatermark();
globalLowWatermarkForBatch =
globalLowWatermarkForBatch.isBefore(partitionLowWatermark)
? partitionLowWatermark : globalLowWatermarkForBatch;
Instant partitionHighWatermark = metadata.getHighWatermark();
globalHighWatermarkForBatch =
globalHighWatermarkForBatch.isBefore(partitionHighWatermark)
? partitionHighWatermark : globalHighWatermarkForBatch;
// Update metrics reported in the read
final Gauge gauge = Metrics.gauge(NAMESPACE, READ_DURATION_MILLIS);
final MetricsContainer container = metadata.getMetricsContainers().getContainer(stepName);
try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(container)) {
final long readDurationMillis = metadata.getReadDurationMillis();
if (readDurationMillis > maxReadDuration) {
gauge.set(readDurationMillis);
}
} catch (IOException e) {
throw new RuntimeException(e);
}
metricsAccum.value().updateAll(metadata.getMetricsContainers());
}
sparkWatermark =
new SparkWatermarks(
globalLowWatermarkForBatch,
globalHighWatermarkForBatch,
new Instant(validTime.milliseconds()));
// add to watermark queue.
GlobalWatermarkHolder.add(inputDStreamId, sparkWatermark);
}
// report - for RateEstimator and visibility.
report(validTime, count, sparkWatermark);
return scala.Option.empty();
}
开发者ID:apache,项目名称:beam,代码行数:50,代码来源:SparkUnboundedSource.java
示例12: main
import org.apache.spark.streaming.Time; //导入依赖的package包/类
public static void main(String[] args) {
// String inputFile = StreamKMeans.class.getClassLoader().getResource("centroids.txt").getFile();
SparkConf sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("JavaKMeans");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(1000));
HashSet<String> topicsSet = new HashSet<>();
topicsSet.add("KMeans");
HashMap<String, String> kafkaParams = new HashMap<>();
// kafkaParams.put("metadata.broker.list", "kafka1:9092,kafka2:9092,kafka3:9092");
kafkaParams.put("metadata.broker.list", "localhost:9092");
kafkaParams.put("auto.offset.reset", "largest");
kafkaParams.put("zookeeper.connect", "zoo1:2181");
kafkaParams.put("group.id", "spark");
// Create direct kafka stream with brokers and topics
JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(
jssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams,
topicsSet
);
JavaDStream<Vector> points = lines.map(new ParseKafkaString()).map(new ParsePoint());
Vector[] initCentroids = loadInitCentroids();
double[] weights = new double[96];
for (int i = 0; i < 96; i++) {
weights[i] = 1.0 / 96;
}
final StreamingKMeans model = new StreamingKMeans()
.setK(96)
.setDecayFactor(0)
.setInitialCenters(initCentroids, weights);
model.trainOn(points);
points.foreachRDD(new Function2<JavaRDD<Vector>, Time, Void>() {
@Override
public Void call(JavaRDD<Vector> vectorJavaRDD, Time time) throws Exception {
Vector[] vector = model.latestModel().clusterCenters();
for (int i = 0; i < vector.length; i++) {
logger.warn(vector[i].toArray()[0] + "\t" + vector[i].toArray()[1]);
}
return null;
}
});
jssc.addStreamingListener(new PerformanceStreamingListener());
jssc.start();
jssc.awaitTermination();
}
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:58,代码来源:StreamKMeans.java
示例13: main
import org.apache.spark.streaming.Time; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Stateful Network Word Count");
JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));
ssc.checkpoint("checkpoint");
ssc.addStreamingListener(new PerformanceStreamingListener());
JavaReceiverInputDStream<String> lines = ssc.socketTextStream("127.0.0.1", 9999);
JavaPairDStream<String, Long> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String l) throws Exception {
return Arrays.asList(l.split(" "));
}
}).mapToPair(new PairFunction<String, String, Long>() {
public Tuple2<String, Long> call(String w) throws Exception {
return new Tuple2<>(w, 1L);
}
})
.reduceByKey(new Function2<Long, Long, Long>() {
@Override
public Long call(Long aLong, Long aLong2) throws Exception {
return aLong + aLong2;
}
})
.updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
public Optional<Long> call(List<Long> values, Optional<Long> state) throws Exception {
if (values == null || values.isEmpty()) {
return state;
}
long sum = 0L;
for (Long v : values) {
sum += v;
}
return Optional.of(state.or(0L) + sum);
}
});
// .updateStateByKey(new Function2<List<Iterable<Long>>, Optional<Long>, Optional<Long>>() {
// @Override
// public Optional<Long> call(List<Iterable<Long>> iterables, Optional<Long> longOptional) throws Exception {
// if (iterables == null || iterables.isEmpty()) {
// return longOptional;
// }
// long sum = 0L;
// for (Iterable<Long> iterable : iterables) {
// for(Long l : iterable)
// sum += l;
// }
// return Optional.of(longOptional.or(0L) + sum);
// }
// });
wordCounts.print();
wordCounts.foreach(new Function2<JavaPairRDD<String, Long>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Long> stringLongJavaPairRDD, Time time) throws Exception {
return null;
}
});
ssc.start();
ssc.awaitTermination();
}
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:64,代码来源:StreamingWordCount.java
示例14: main
import org.apache.spark.streaming.Time; //导入依赖的package包/类
public static void main(String[] args) {
if (args.length == 0) {
System.err
.println("Usage: SparkStreamingFromFlumeToHBaseExample {master} {host} {port} {table} {columnFamily}");
System.exit(1);
}
String master = args[0];
String host = args[1];
int port = Integer.parseInt(args[2]);
String tableName = args[3];
String columnFamily = args[4];
Duration batchInterval = new Duration(2000);
JavaStreamingContext sc = new JavaStreamingContext(master,
"FlumeEventCount", batchInterval,
System.getenv("SPARK_HOME"), "/home/cloudera/SparkOnALog.jar");
final Broadcast<String> broadcastTableName = sc.sparkContext().broadcast(tableName);
final Broadcast<String> broadcastColumnFamily = sc.sparkContext().broadcast(columnFamily);
//JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream(host, port);
JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(sc, host, port);
JavaPairDStream<String, Integer> lastCounts = flumeStream
.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {
@Override
public Iterable<String> call(SparkFlumeEvent event)
throws Exception {
String bodyString = new String(event.event().getBody()
.array(), "UTF-8");
return Arrays.asList(bodyString.split(" "));
}
}).map(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String str)
throws Exception {
return new Tuple2(str, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer x, Integer y) throws Exception {
// TODO Auto-generated method stub
return x.intValue() + y.intValue();
}
});
lastCounts.foreach(new Function2<JavaPairRDD<String,Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> values,
Time time) throws Exception {
values.foreach(new VoidFunction<Tuple2<String, Integer>> () {
@Override
public void call(Tuple2<String, Integer> tuple)
throws Exception {
HBaseCounterIncrementor incrementor =
HBaseCounterIncrementor.getInstance(broadcastTableName.value(), broadcastColumnFamily.value());
incrementor.incerment("Counter", tuple._1(), tuple._2());
System.out.println("Counter:" + tuple._1() + "," + tuple._2());
}} );
return null;
}});
sc.start();
}
开发者ID:tmalaska,项目名称:SparkOnALog,代码行数:79,代码来源:SparkStreamingFromFlumeToHBaseExample.java
示例15: setTime
import org.apache.spark.streaming.Time; //导入依赖的package包/类
public void setTime(Time time) {
ROW_ID = Bytes.toBytes(time.toString());
}
开发者ID:saintstack,项目名称:hbase-downstreamer,代码行数:4,代码来源:JavaNetworkWordCountStoreInHBase.java
示例16: main
import org.apache.spark.streaming.Time; //导入依赖的package包/类
public static void main(String[] args) {
if (args.length < 2) {
System.err.println("Usage: JavaNetworkWordCountStoreInHBase <hostname> <port>");
System.exit(1);
}
// Create the context with a 1 second batch size
SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCountStoreInHBase");
JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));
// Copy the keytab to our executors
ssc.sparkContext().addFile(sparkConf.get("spark.yarn.keytab"));
// Create a JavaReceiverInputDStream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
return Lists.newArrayList(SPACE.split(x));
}
});
JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<String, Integer>(s, 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});
final StoreCountsToHBase store = new StoreCountsToHBase(sparkConf);
wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
@Override
public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
store.setTime(time);
rdd.foreachPartition(store);
return null;
}
});
ssc.start();
ssc.awaitTermination();
}
开发者ID:saintstack,项目名称:hbase-downstreamer,代码行数:53,代码来源:JavaNetworkWordCountStoreInHBase.java
示例17: getFunction
import org.apache.spark.streaming.Time; //导入依赖的package包/类
protected abstract Function2<JavaPairRDD<String, RowMutation>, Time, Void> getFunction();
开发者ID:apache,项目名称:incubator-blur,代码行数:2,代码来源:BlurLoadSparkProcessor.java
注:本文中的org.apache.spark.streaming.Time类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论