• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java Time类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.spark.streaming.Time的典型用法代码示例。如果您正苦于以下问题:Java Time类的具体用法?Java Time怎么用?Java Time使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Time类属于org.apache.spark.streaming包,在下文中一共展示了Time类的17个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: call

import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
public void call(JavaPairRDD<K,M> rdd, Time time) throws IOException {
  if (rdd.isEmpty()) {
    log.info("RDD was empty, not saving to HDFS");
  } else {
    String file = prefix + "-" + time.milliseconds() + "." + suffix;
    Path path = new Path(file);
    FileSystem fs = FileSystem.get(path.toUri(), hadoopConf);
    if (fs.exists(path)) {
      log.warn("Saved data already existed, possibly from a failed job. Deleting {}", path);
      fs.delete(path, true);
    }
    log.info("Saving RDD to HDFS at {}", file);
    rdd.mapToPair(
        new ValueToWritableFunction<>(keyClass, messageClass, keyWritableClass, messageWritableClass)
    ).saveAsNewAPIHadoopFile(
        file,
        keyWritableClass,
        messageWritableClass,
        SequenceFileOutputFormat.class,
        hadoopConf);
  }
}
 
开发者ID:oncewang,项目名称:oryx2,代码行数:24,代码来源:SaveToHDFSFunction.java


示例2: getFunction

import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
protected Function2<JavaPairRDD<String, RowMutation>, Time, Void> getFunction() {
  return new Function2<JavaPairRDD<String, RowMutation>, Time, Void>() {
    // Blur Thrift Client
    @Override
    public Void call(JavaPairRDD<String, RowMutation> rdd, Time time) throws Exception {
      Iface client = getBlurClient();
      for (Tuple2<String, RowMutation> tuple : rdd.collect()) {
        if (tuple != null) {
          try {
            RowMutation rm = tuple._2;
            // Index using enqueue mutate call
            client.enqueueMutate(rm);
          } catch (Exception ex) {
            LOG.error("Unknown error while trying to call enqueueMutate.", ex);
            throw ex;
          }
        }
      }
      return null;
    }
  };
}
 
开发者ID:apache,项目名称:incubator-blur,代码行数:24,代码来源:BlurBulkLoadSparkProcessor.java


示例3: compute

import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
public scala.Option<RDD<WindowedValue<T>>> compute(final Time validTime) {
  final long batchTime = validTime.milliseconds();

  LOG.trace("BEFORE waiting for watermark sync, "
                + "LastWatermarkedBatchTime: {}, current batch time: {}",
            GlobalWatermarkHolder.getLastWatermarkedBatchTime(),
            batchTime);

  final Stopwatch stopwatch = Stopwatch.createStarted();

  awaitWatermarkSyncWith(batchTime);

  stopwatch.stop();

  LOG.info("Waited {} millis for watermarks to sync up with the current batch ({})",
           stopwatch.elapsed(TimeUnit.MILLISECONDS),
           batchTime);

  LOG.info("Watermarks are now: {}", GlobalWatermarkHolder.get(batchDuration));

  LOG.trace("AFTER waiting for watermark sync, "
                + "LastWatermarkedBatchTime: {}, current batch time: {}",
            GlobalWatermarkHolder.getLastWatermarkedBatchTime(),
            batchTime);

  final RDD<WindowedValue<T>> rdd = generateRdd();
  isFirst = false;
  return scala.Option.apply(rdd);
}
 
开发者ID:apache,项目名称:beam,代码行数:31,代码来源:WatermarkSyncedDStream.java


示例4: compute

import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
public scala.Option<RDD<Tuple2<Source<T>, CheckpointMarkT>>> compute(Time validTime) {
  RDD<scala.Tuple2<Source<T>, CheckpointMarkT>> rdd =
      new SourceRDD.Unbounded<>(
          ssc().sparkContext(),
          options,
          createMicrobatchSource(),
          numPartitions);
  return scala.Option.apply(rdd);
}
 
开发者ID:apache,项目名称:beam,代码行数:11,代码来源:SourceDStream.java


示例5: report

import org.apache.spark.streaming.Time; //导入依赖的package包/类
private void report(Time batchTime, long count, SparkWatermarks sparkWatermark) {
  // metadata - #records read and a description.
  scala.collection.immutable.Map<String, Object> metadata =
      new scala.collection.immutable.Map.Map1<String, Object>(
          StreamInputInfo.METADATA_KEY_DESCRIPTION(),
          String.format(
              "Read %d records with observed watermarks %s, from %s for batch time: %s",
              count,
              sparkWatermark == null ? "N/A" : sparkWatermark,
              sourceName,
              batchTime));
  StreamInputInfo streamInputInfo = new StreamInputInfo(inputDStreamId, count, metadata);
  ssc().scheduler().inputInfoTracker().reportInfo(batchTime, streamInputInfo);
}
 
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:SparkUnboundedSource.java


示例6: call

import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
public Void call(JavaRDD<T> tJavaRDD, Time time) throws Exception {

    List<T> list = tJavaRDD.take(10);
    // scalastyle:off println
    logger.warn("-------------------------------------------");
    logger.warn("Time: " + time);
    logger.warn("-------------------------------------------");
    for (T t : list) {
        logger.warn(t.toString());
    }
    logger.warn("\n");
    return null;
}
 
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:15,代码来源:PrintFunctionImpl.java


示例7: save

import org.apache.spark.streaming.Time; //导入依赖的package包/类
/**
 * Save all RDDs in the given DStream to the given view.
 * @param dstream
 * @param view
 */
public static <T> void save(JavaDStream<T> dstream, final View<T> view) {

  final String uri = view.getUri().toString();

  dstream.foreachRDD(new Function2<JavaRDD<T>, Time, Void>() {
    @Override
    public Void call(JavaRDD<T> rdd, Time time) throws Exception {

      save(rdd, uri);

      return null;
    }
  });
}
 
开发者ID:rbrush,项目名称:kite-apps,代码行数:20,代码来源:SparkDatasets.java


示例8: call

import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
public void call(JavaPairRDD<K,M> newData, Time timestamp)
    throws IOException, InterruptedException {

  if (newData.isEmpty()) {
    log.info("No data in current generation's RDD; nothing to do");
    return;
  }

  log.info("Beginning update at {}", timestamp);

  Configuration hadoopConf = sparkContext.hadoopConfiguration();
  if (hadoopConf.getResource("core-site.xml") == null) {
    log.warn("Hadoop config like core-site.xml was not found; " +
             "is the Hadoop config directory on the classpath?");
  }

  JavaPairRDD<K,M> pastData;
  Path inputPathPattern = new Path(dataDirString + "/*/part-*");
  FileSystem fs = FileSystem.get(inputPathPattern.toUri(), hadoopConf);
  FileStatus[] inputPathStatuses = fs.globStatus(inputPathPattern);
  if (inputPathStatuses == null || inputPathStatuses.length == 0) {

    log.info("No past data at path(s) {}", inputPathPattern);
    pastData = null;

  } else {

    log.info("Found past data at path(s) like {}", inputPathStatuses[0].getPath());
    Configuration updatedConf = new Configuration(hadoopConf);
    updatedConf.set(FileInputFormat.INPUT_DIR, joinFSPaths(fs, inputPathStatuses));

    @SuppressWarnings("unchecked")
    JavaPairRDD<Writable,Writable> pastWritableData = (JavaPairRDD<Writable,Writable>)
        sparkContext.newAPIHadoopRDD(updatedConf,
                                     SequenceFileInputFormat.class,
                                     keyWritableClass,
                                     messageWritableClass);

    pastData = pastWritableData.mapToPair(
        new WritableToValueFunction<>(keyClass,
                                      messageClass,
                                      keyWritableClass,
                                      messageWritableClass));
  }

  if (updateTopic == null || updateBroker == null) {
    log.info("Not producing updates to update topic since none was configured");
    updateInstance.runUpdate(sparkContext,
                             timestamp.milliseconds(),
                             newData,
                             pastData,
                             modelDirString,
                             null);
  } else {
    // This TopicProducer should not be async; sends one big model generally and
    // needs to occur before other updates reliably rather than be buffered
    try (TopicProducer<String,U> producer =
             new TopicProducerImpl<>(updateBroker, updateTopic, false)) {
      updateInstance.runUpdate(sparkContext,
                               timestamp.milliseconds(),
                               newData,
                               pastData,
                               modelDirString,
                               producer);
    }
  }
}
 
开发者ID:oncewang,项目名称:oryx2,代码行数:69,代码来源:BatchUpdateFunction.java


示例9: main

import org.apache.spark.streaming.Time; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    Logger.getLogger("org").setLevel(Level.WARN);
    Logger.getLogger("akka").setLevel(Level.WARN);

    final Pattern SPACE = Pattern.compile(" ");

    SparkConf conf = new SparkConf().setAppName("Big Apple").setMaster("local[2]");
    JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));

    JavaDStream<String> lines = ssc.textFileStream("src/main/resources/stream");
    lines.print();

    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
        @Override
        public Iterator<String> call(String x) {
            return Lists.newArrayList(SPACE.split(x)).iterator();
        }
    });

    words.foreachRDD(
            new VoidFunction2<JavaRDD<String>, Time>() {
                @Override
                public void call(JavaRDD<String> rdd, Time time) {

                    // Get the singleton instance of SQLContext
                    SQLContext sqlContext = SQLContext.getOrCreate(rdd.context());

                    // Convert RDD[String] to RDD[case class] to Dataset
                    JavaRDD<JavaRecord> rowRDD = rdd.map(new Function<String, JavaRecord>() {
                        public JavaRecord call(String word) {
                            JavaRecord record = new JavaRecord();
                            record.setWord(word);
                            return record;
                        }
                    });
                    Dataset<Row> wordsDataset = sqlContext.createDataFrame(rowRDD, JavaRecord.class);

                    // Register as table
                    wordsDataset.registerTempTable("words");

                    // Do word count on table using SQL and print it
                    Dataset wordCountsDataset =
                            sqlContext.sql("select word, count(*) as total from words group by word");
                    wordCountsDataset.show();
                }
            }
    );


    ssc.start();
    ssc.awaitTermination();

}
 
开发者ID:knoldus,项目名称:Sparkathon,代码行数:54,代码来源:SQLonStreams.java


示例10: buildPairDStream

import org.apache.spark.streaming.Time; //导入依赖的package包/类
private static <K, InputT> PairDStreamFunctions<ByteArray, byte[]> buildPairDStream(
    final JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream,
    final Coder<K> keyCoder,
    final Coder<WindowedValue<InputT>> wvCoder) {

  // we have to switch to Scala API to avoid Optional in the Java API, see: SPARK-4819.
  // we also have a broader API for Scala (access to the actual key and entire iterator).
  // we use coders to convert objects in the PCollection to byte arrays, so they
  // can be transferred over the network for the shuffle and be in serialized form
  // for checkpointing.
  // for readability, we add comments with actual type next to byte[].
  // to shorten line length, we use:
  //---- WV: WindowedValue
  //---- Iterable: Itr
  //---- AccumT: A
  //---- InputT: I
  final DStream<Tuple2<ByteArray, byte[]>> tupleDStream =
      inputDStream
          .transformToPair(
              new Function2<
                  JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>>, Time,
                  JavaPairRDD<ByteArray, byte[]>>() {

                // we use mapPartitions with the RDD API because its the only available API
                // that allows to preserve partitioning.
                @Override
                public JavaPairRDD<ByteArray, byte[]> call(
                    final JavaRDD<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> rdd,
                    final Time time)
                    throws Exception {
                  return rdd.mapPartitions(
                          TranslationUtils.functionToFlatMapFunction(
                              WindowingHelpers
                                  .<KV<K, Iterable<WindowedValue<InputT>>>>unwindowFunction()),
                          true)
                      .mapPartitionsToPair(
                          TranslationUtils
                              .<K, Iterable<WindowedValue<InputT>>>toPairFlatMapFunction(),
                          true)
                      .mapValues(
                          new Function<
                              Iterable<WindowedValue<InputT>>,
                              KV<Long, Iterable<WindowedValue<InputT>>>>() {

                            @Override
                            public KV<Long, Iterable<WindowedValue<InputT>>> call(
                                final Iterable<WindowedValue<InputT>> values) throws Exception {
                              // add the batch timestamp for visibility (e.g., debugging)
                              return KV.of(time.milliseconds(), values);
                            }
                          })
                      // move to bytes representation and use coders for deserialization
                      // because of checkpointing.
                      .mapPartitionsToPair(
                          TranslationUtils.pairFunctionToPairFlatMapFunction(
                              CoderHelpers.toByteFunction(
                                  keyCoder,
                                  KvCoder.of(VarLongCoder.of(), IterableCoder.of(wvCoder)))),
                          true);
                }
              })
          .dstream();

  return DStream.toPairDStreamFunctions(
      tupleDStream,
      JavaSparkContext$.MODULE$.<ByteArray>fakeClassTag(),
      JavaSparkContext$.MODULE$.<byte[]>fakeClassTag(),
      null);
}
 
开发者ID:apache,项目名称:beam,代码行数:70,代码来源:SparkGroupAlsoByWindowViaWindowSet.java


示例11: compute

import org.apache.spark.streaming.Time; //导入依赖的package包/类
@Override
public scala.Option<RDD<BoxedUnit>> compute(Time validTime) {
  // compute parent.
  scala.Option<RDD<Metadata>> parentRDDOpt = parent.getOrCompute(validTime);
  final Accumulator<MetricsContainerStepMap> metricsAccum = MetricsAccumulator.getInstance();
  long count = 0;
  SparkWatermarks sparkWatermark = null;
  Instant globalLowWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
  Instant globalHighWatermarkForBatch = BoundedWindow.TIMESTAMP_MIN_VALUE;
  long maxReadDuration = 0;
  if (parentRDDOpt.isDefined()) {
    JavaRDD<Metadata> parentRDD = parentRDDOpt.get().toJavaRDD();
    for (Metadata metadata: parentRDD.collect()) {
      count += metadata.getNumRecords();
      // compute the global input watermark - advance to latest of all partitions.
      Instant partitionLowWatermark = metadata.getLowWatermark();
      globalLowWatermarkForBatch =
          globalLowWatermarkForBatch.isBefore(partitionLowWatermark)
              ? partitionLowWatermark : globalLowWatermarkForBatch;
      Instant partitionHighWatermark = metadata.getHighWatermark();
      globalHighWatermarkForBatch =
          globalHighWatermarkForBatch.isBefore(partitionHighWatermark)
              ? partitionHighWatermark : globalHighWatermarkForBatch;
      // Update metrics reported in the read
      final Gauge gauge = Metrics.gauge(NAMESPACE, READ_DURATION_MILLIS);
      final MetricsContainer container = metadata.getMetricsContainers().getContainer(stepName);
      try (Closeable ignored = MetricsEnvironment.scopedMetricsContainer(container)) {
        final long readDurationMillis = metadata.getReadDurationMillis();
        if (readDurationMillis > maxReadDuration) {
          gauge.set(readDurationMillis);
        }
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      metricsAccum.value().updateAll(metadata.getMetricsContainers());
    }

    sparkWatermark =
        new SparkWatermarks(
            globalLowWatermarkForBatch,
            globalHighWatermarkForBatch,
            new Instant(validTime.milliseconds()));
    // add to watermark queue.
    GlobalWatermarkHolder.add(inputDStreamId, sparkWatermark);
  }
  // report - for RateEstimator and visibility.
  report(validTime, count, sparkWatermark);
  return scala.Option.empty();
}
 
开发者ID:apache,项目名称:beam,代码行数:50,代码来源:SparkUnboundedSource.java


示例12: main

import org.apache.spark.streaming.Time; //导入依赖的package包/类
public static void main(String[] args) {

//        String inputFile = StreamKMeans.class.getClassLoader().getResource("centroids.txt").getFile();
        SparkConf sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("JavaKMeans");

        JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.milliseconds(1000));

        HashSet<String> topicsSet = new HashSet<>();
        topicsSet.add("KMeans");
        HashMap<String, String> kafkaParams = new HashMap<>();
//        kafkaParams.put("metadata.broker.list", "kafka1:9092,kafka2:9092,kafka3:9092");
        kafkaParams.put("metadata.broker.list", "localhost:9092");
        kafkaParams.put("auto.offset.reset", "largest");
        kafkaParams.put("zookeeper.connect", "zoo1:2181");
        kafkaParams.put("group.id", "spark");

        // Create direct kafka stream with brokers and topics
        JavaPairInputDStream<String, String> lines = KafkaUtils.createDirectStream(
                jssc,
                String.class,
                String.class,
                StringDecoder.class,
                StringDecoder.class,
                kafkaParams,
                topicsSet
        );

        JavaDStream<Vector> points = lines.map(new ParseKafkaString()).map(new ParsePoint());

        Vector[] initCentroids = loadInitCentroids();
        double[] weights = new double[96];
        for (int i = 0; i < 96; i++) {
            weights[i] = 1.0 / 96;
        }

        final StreamingKMeans model = new StreamingKMeans()
                .setK(96)
                .setDecayFactor(0)
                .setInitialCenters(initCentroids, weights);

        model.trainOn(points);

        points.foreachRDD(new Function2<JavaRDD<Vector>, Time, Void>() {
            @Override
            public Void call(JavaRDD<Vector> vectorJavaRDD, Time time) throws Exception {
                Vector[] vector = model.latestModel().clusterCenters();
                for (int i = 0; i < vector.length; i++) {
                    logger.warn(vector[i].toArray()[0] + "\t" + vector[i].toArray()[1]);
                }
                return null;
            }
        });

        jssc.addStreamingListener(new PerformanceStreamingListener());
        jssc.start();
        jssc.awaitTermination();
    }
 
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:58,代码来源:StreamKMeans.java


示例13: main

import org.apache.spark.streaming.Time; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
        SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("Stateful Network Word Count");
        JavaStreamingContext ssc = new JavaStreamingContext(conf, Durations.seconds(1));
        ssc.checkpoint("checkpoint");

        ssc.addStreamingListener(new PerformanceStreamingListener());


        JavaReceiverInputDStream<String> lines = ssc.socketTextStream("127.0.0.1", 9999);

        JavaPairDStream<String, Long> wordCounts = lines.flatMap(new FlatMapFunction<String, String>() {
            public Iterable<String> call(String l) throws Exception {
                return Arrays.asList(l.split(" "));
            }
        }).mapToPair(new PairFunction<String, String, Long>() {
            public Tuple2<String, Long> call(String w) throws Exception {
                return new Tuple2<>(w, 1L);
            }
        })
                .reduceByKey(new Function2<Long, Long, Long>() {
                    @Override
                    public Long call(Long aLong, Long aLong2) throws Exception {
                        return aLong + aLong2;
                    }
                })
                .updateStateByKey(new Function2<List<Long>, Optional<Long>, Optional<Long>>() {
                    public Optional<Long> call(List<Long> values, Optional<Long> state) throws Exception {
                        if (values == null || values.isEmpty()) {
                            return state;
                        }
                        long sum = 0L;
                        for (Long v : values) {
                            sum += v;
                        }

                        return Optional.of(state.or(0L) + sum);
                    }
                });
//                .updateStateByKey(new Function2<List<Iterable<Long>>, Optional<Long>, Optional<Long>>() {
//                    @Override
//                    public Optional<Long> call(List<Iterable<Long>> iterables, Optional<Long> longOptional) throws Exception {
//                        if (iterables == null || iterables.isEmpty()) {
//                            return longOptional;
//                        }
//                        long sum = 0L;
//                        for (Iterable<Long> iterable : iterables) {
//                            for(Long l : iterable)
//                                sum += l;
//                        }
//                        return Optional.of(longOptional.or(0L) + sum);
//                    }
//                });

        wordCounts.print();
        wordCounts.foreach(new Function2<JavaPairRDD<String, Long>, Time, Void>() {
            @Override
            public Void call(JavaPairRDD<String, Long> stringLongJavaPairRDD, Time time) throws Exception {
                return null;
            }
        });
        ssc.start();
        ssc.awaitTermination();
    }
 
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:64,代码来源:StreamingWordCount.java


示例14: main

import org.apache.spark.streaming.Time; //导入依赖的package包/类
public static void main(String[] args) {
	if (args.length == 0) {
		System.err
				.println("Usage: SparkStreamingFromFlumeToHBaseExample {master} {host} {port} {table} {columnFamily}");
		System.exit(1);
	}

	String master = args[0];
	String host = args[1];
	int port = Integer.parseInt(args[2]);
	String tableName = args[3];
	String columnFamily = args[4];
	
	Duration batchInterval = new Duration(2000);

	JavaStreamingContext sc = new JavaStreamingContext(master,
			"FlumeEventCount", batchInterval,
			System.getenv("SPARK_HOME"), "/home/cloudera/SparkOnALog.jar");
	
	final Broadcast<String> broadcastTableName = sc.sparkContext().broadcast(tableName);
	final Broadcast<String> broadcastColumnFamily = sc.sparkContext().broadcast(columnFamily);
	
	//JavaDStream<SparkFlumeEvent> flumeStream = sc.flumeStream(host, port);
	
	JavaDStream<SparkFlumeEvent> flumeStream = FlumeUtils.createStream(sc, host, port);
	
	JavaPairDStream<String, Integer> lastCounts = flumeStream
			.flatMap(new FlatMapFunction<SparkFlumeEvent, String>() {

				@Override
				public Iterable<String> call(SparkFlumeEvent event)
						throws Exception {
					String bodyString = new String(event.event().getBody()
							.array(), "UTF-8");
					return Arrays.asList(bodyString.split(" "));
				}
			}).map(new PairFunction<String, String, Integer>() {
				@Override
				public Tuple2<String, Integer> call(String str)
						throws Exception {
					return new Tuple2(str, 1);
				}
			}).reduceByKey(new Function2<Integer, Integer, Integer>() {

				@Override
				public Integer call(Integer x, Integer y) throws Exception {
					// TODO Auto-generated method stub
					return x.intValue() + y.intValue();
				}
			});
			
			
			lastCounts.foreach(new Function2<JavaPairRDD<String,Integer>, Time, Void>() {

				@Override
				public Void call(JavaPairRDD<String, Integer> values,
						Time time) throws Exception {
					
					values.foreach(new VoidFunction<Tuple2<String, Integer>> () {

						@Override
						public void call(Tuple2<String, Integer> tuple)
								throws Exception {
							HBaseCounterIncrementor incrementor = 
									HBaseCounterIncrementor.getInstance(broadcastTableName.value(), broadcastColumnFamily.value());
							incrementor.incerment("Counter", tuple._1(), tuple._2());
							System.out.println("Counter:" + tuple._1() + "," + tuple._2());
							
						}} );
					
					return null;
				}});
	
	

	sc.start();

}
 
开发者ID:tmalaska,项目名称:SparkOnALog,代码行数:79,代码来源:SparkStreamingFromFlumeToHBaseExample.java


示例15: setTime

import org.apache.spark.streaming.Time; //导入依赖的package包/类
public void setTime(Time time) {
  ROW_ID = Bytes.toBytes(time.toString());
}
 
开发者ID:saintstack,项目名称:hbase-downstreamer,代码行数:4,代码来源:JavaNetworkWordCountStoreInHBase.java


示例16: main

import org.apache.spark.streaming.Time; //导入依赖的package包/类
public static void main(String[] args) {
  if (args.length < 2) {
    System.err.println("Usage: JavaNetworkWordCountStoreInHBase <hostname> <port>");
    System.exit(1);
  }

  // Create the context with a 1 second batch size
  SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCountStoreInHBase");
  JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(1));

  // Copy the keytab to our executors
  ssc.sparkContext().addFile(sparkConf.get("spark.yarn.keytab"));

  // Create a JavaReceiverInputDStream on target ip:port and count the
  // words in input stream of \n delimited text (eg. generated by 'nc')
  // Note that no duplication in storage level only for running locally.
  // Replication necessary in distributed scenario for fault tolerance.
  JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
          args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
  JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
    @Override
    public Iterable<String> call(String x) {
      return Lists.newArrayList(SPACE.split(x));
    }
  });
  JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
    new PairFunction<String, String, Integer>() {
      @Override
      public Tuple2<String, Integer> call(String s) {
        return new Tuple2<String, Integer>(s, 1);
      }
    }).reduceByKey(new Function2<Integer, Integer, Integer>() {
      @Override
      public Integer call(Integer i1, Integer i2) {
        return i1 + i2;
      }
    });

  final StoreCountsToHBase store = new StoreCountsToHBase(sparkConf);

  wordCounts.foreachRDD(new Function2<JavaPairRDD<String, Integer>, Time, Void>() {
    @Override
    public Void call(JavaPairRDD<String, Integer> rdd, Time time) throws IOException {
      store.setTime(time);
      rdd.foreachPartition(store);
      return null;
    }
  });

  ssc.start();
  ssc.awaitTermination();
}
 
开发者ID:saintstack,项目名称:hbase-downstreamer,代码行数:53,代码来源:JavaNetworkWordCountStoreInHBase.java


示例17: getFunction

import org.apache.spark.streaming.Time; //导入依赖的package包/类
protected abstract Function2<JavaPairRDD<String, RowMutation>, Time, Void> getFunction(); 
开发者ID:apache,项目名称:incubator-blur,代码行数:2,代码来源:BlurLoadSparkProcessor.java



注:本文中的org.apache.spark.streaming.Time类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java HttpParams类代码示例发布时间:2022-05-22
下一篇:
Java ItemShield类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap