• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java GroupReduceOperator类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.api.java.operators.GroupReduceOperator的典型用法代码示例。如果您正苦于以下问题:Java GroupReduceOperator类的具体用法?Java GroupReduceOperator怎么用?Java GroupReduceOperator使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



GroupReduceOperator类属于org.apache.flink.api.java.operators包,在下文中一共展示了GroupReduceOperator类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: sampleWithSize

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
/**
 * Generate a sample of DataSet which contains fixed size elements.
 *
 * <p><strong>NOTE:</strong> Sample with fixed size is not as efficient as sample with fraction, use sample with
 * fraction unless you need exact precision.
 *
 * @param withReplacement Whether element can be selected more than once.
 * @param numSamples       The expected sample size.
 * @param seed            Random number generator seed.
 * @return The sampled DataSet
 */
public static <T> DataSet<T> sampleWithSize(
	DataSet <T> input,
	final boolean withReplacement,
	final int numSamples,
	final long seed) {

	SampleInPartition<T> sampleInPartition = new SampleInPartition<>(withReplacement, numSamples, seed);
	MapPartitionOperator mapPartitionOperator = input.mapPartition(sampleInPartition);

	// There is no previous group, so the parallelism of GroupReduceOperator is always 1.
	String callLocation = Utils.getCallLocationName();
	SampleInCoordinator<T> sampleInCoordinator = new SampleInCoordinator<>(withReplacement, numSamples, seed);
	return new GroupReduceOperator<>(mapPartitionOperator, input.getType(), sampleInCoordinator, callLocation);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:DataSetUtils.java


示例2: first

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
/**
 * Returns a new set containing the first n elements in this {@link DataSet}.
 *
 * @param n The desired number of elements.
 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
 */
public GroupReduceOperator<T, T> first(int n) {
	if (n < 1) {
		throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
	}

	return reduceGroup(new FirstReducer<T>(n));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:14,代码来源:DataSet.java


示例3: testSemanticPropsWithKeySelector1

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector1() {

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);

	GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
			tupleDs.groupBy(new DummyTestKeySelector())
					.reduceGroup(new DummyGroupReduceFunction1());

	SemanticProperties semProps = reduceOp.getSemanticProperties();

	assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
	assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
	assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
	assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
	assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
	assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);

	assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
	assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
	assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
	assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
	assertTrue(semProps.getForwardingSourceField(0, 4) == 2);

	assertTrue(semProps.getReadFields(0).size() == 3);
	assertTrue(semProps.getReadFields(0).contains(2));
	assertTrue(semProps.getReadFields(0).contains(5));
	assertTrue(semProps.getReadFields(0).contains(6));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:GroupReduceOperatorTest.java


示例4: testSemanticPropsWithKeySelector2

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector2() {

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);

	GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
			tupleDs.groupBy(new DummyTestKeySelector())
					.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
					.reduceGroup(new DummyGroupReduceFunction1());

	SemanticProperties semProps = reduceOp.getSemanticProperties();

	assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
	assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
	assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
	assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
	assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
	assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);

	assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
	assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
	assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
	assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
	assertTrue(semProps.getForwardingSourceField(0, 4) == 4);

	assertTrue(semProps.getReadFields(0).size() == 3);
	assertTrue(semProps.getReadFields(0).contains(4));
	assertTrue(semProps.getReadFields(0).contains(7));
	assertTrue(semProps.getReadFields(0).contains(8));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:39,代码来源:GroupReduceOperatorTest.java


示例5: testSemanticPropsWithKeySelector3

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector3() {

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);

	GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
			tupleDs.groupBy(new DummyTestKeySelector())
					.reduceGroup(new DummyGroupReduceFunction2())
						.withForwardedFields("0->4;1;1->3;2");

	SemanticProperties semProps = reduceOp.getSemanticProperties();

	assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 2).contains(4));
	assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 2);
	assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
	assertTrue(semProps.getForwardingTargetFields(0, 3).contains(3));
	assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
	assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);

	assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
	assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
	assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
	assertTrue(semProps.getForwardingSourceField(0, 3) == 3);
	assertTrue(semProps.getForwardingSourceField(0, 4) == 2);

	assertTrue(semProps.getReadFields(0).size() == 3);
	assertTrue(semProps.getReadFields(0).contains(2));
	assertTrue(semProps.getReadFields(0).contains(5));
	assertTrue(semProps.getReadFields(0).contains(6));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:37,代码来源:GroupReduceOperatorTest.java


示例6: testSemanticPropsWithKeySelector4

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector4() {

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);

	GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
			tupleDs.groupBy(new DummyTestKeySelector())
					.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
					.reduceGroup(new DummyGroupReduceFunction2())
						.withForwardedFields("0->4;1;1->3;2");

	SemanticProperties semProps = reduceOp.getSemanticProperties();

	assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 4).contains(4));
	assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
	assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
	assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
	assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
	assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 0);

	assertTrue(semProps.getForwardingSourceField(0, 0) < 0);
	assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
	assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
	assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
	assertTrue(semProps.getForwardingSourceField(0, 4) == 4);

	assertTrue(semProps.getReadFields(0).size() == 3);
	assertTrue(semProps.getReadFields(0).contains(4));
	assertTrue(semProps.getReadFields(0).contains(7));
	assertTrue(semProps.getReadFields(0).contains(8));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:40,代码来源:GroupReduceOperatorTest.java


示例7: testSemanticPropsWithKeySelector5

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector5() {

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);

	GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
			tupleDs.groupBy(new DummyTestKeySelector())
					.reduceGroup(new DummyGroupReduceFunction3())
					.withForwardedFields("4->0;3;3->1;2");

	SemanticProperties semProps = reduceOp.getSemanticProperties();

	assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 4).contains(2));
	assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 2);
	assertTrue(semProps.getForwardingTargetFields(0, 5).contains(1));
	assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
	assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 6).contains(0));

	assertTrue(semProps.getForwardingSourceField(0, 0) == 6);
	assertTrue(semProps.getForwardingSourceField(0, 1) == 5);
	assertTrue(semProps.getForwardingSourceField(0, 2) == 4);
	assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
	assertTrue(semProps.getForwardingSourceField(0, 4) < 0);

	assertTrue(semProps.getReadFields(0) == null);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:34,代码来源:GroupReduceOperatorTest.java


示例8: testSemanticPropsWithKeySelector6

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector6() {

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);

	GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
			tupleDs.groupBy(new DummyTestKeySelector())
					.sortGroup(new DummyTestKeySelector(), Order.ASCENDING)
					.reduceGroup(new DummyGroupReduceFunction3())
					.withForwardedFields("4->0;3;3->1;2");

	SemanticProperties semProps = reduceOp.getSemanticProperties();

	assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 6).contains(2));
	assertTrue(semProps.getForwardingTargetFields(0, 7).size() == 2);
	assertTrue(semProps.getForwardingTargetFields(0, 7).contains(1));
	assertTrue(semProps.getForwardingTargetFields(0, 7).contains(3));
	assertTrue(semProps.getForwardingTargetFields(0, 8).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 8).contains(0));

	assertTrue(semProps.getForwardingSourceField(0, 0) == 8);
	assertTrue(semProps.getForwardingSourceField(0, 1) == 7);
	assertTrue(semProps.getForwardingSourceField(0, 2) == 6);
	assertTrue(semProps.getForwardingSourceField(0, 3) == 7);
	assertTrue(semProps.getForwardingSourceField(0, 4) < 0);

	assertTrue(semProps.getReadFields(0) == null);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:37,代码来源:GroupReduceOperatorTest.java


示例9: testSemanticPropsWithKeySelector7

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testSemanticPropsWithKeySelector7() {

	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
	DataSet<Tuple5<Integer, Long, String, Long, Integer>> tupleDs = env.fromCollection(emptyTupleData, tupleTypeInfo);

	GroupReduceOperator<Tuple5<Integer, Long, String, Long, Integer>, Tuple5<Integer, Long, String, Long, Integer>> reduceOp =
			tupleDs.groupBy(new DummyTestKeySelector())
					.reduceGroup(new DummyGroupReduceFunction4());

	SemanticProperties semProps = reduceOp.getSemanticProperties();

	assertTrue(semProps.getForwardingTargetFields(0, 0).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 1).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 2).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 2).contains(0));
	assertTrue(semProps.getForwardingTargetFields(0, 3).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 3).contains(1));
	assertTrue(semProps.getForwardingTargetFields(0, 4).size() == 0);
	assertTrue(semProps.getForwardingTargetFields(0, 5).size() == 1);
	assertTrue(semProps.getForwardingTargetFields(0, 5).contains(3));
	assertTrue(semProps.getForwardingTargetFields(0, 6).size() == 0);

	assertTrue(semProps.getForwardingSourceField(0, 0) == 2);
	assertTrue(semProps.getForwardingSourceField(0, 1) == 3);
	assertTrue(semProps.getForwardingSourceField(0, 2) < 0);
	assertTrue(semProps.getForwardingSourceField(0, 3) == 5);
	assertTrue(semProps.getForwardingSourceField(0, 4) < 0);

	assertTrue(semProps.getReadFields(0) == null);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:32,代码来源:GroupReduceOperatorTest.java


示例10: translateSink

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
private void translateSink(FlowProcess flowProcess, DataSet<Tuple> input, FlowNode node) {

		Tap tap = this.getSingle(node.getSinkTaps());
		Configuration sinkConfig = this.getNodeConfig(node);
		tap.sinkConfInit(flowProcess, sinkConfig);

		int desiredDop = tap.getScheme().getNumSinkParts();
		int inputDop = ((Operator)input).getParallelism();
		int dop;

		if (inputDop == 1) {
			// input operators have dop 1. Probably because they perform a non-keyed reduce or coGroup
			dop = 1;
		}
		else {
			if (desiredDop > 0) {
				// output dop explicitly set.
				if (input instanceof GroupReduceOperator) {
					// input is a reduce and we must preserve its sorting.
					// we must set the desired dop also for reduce and related operators
					adjustDopOfReduceOrCoGroup((GroupReduceOperator) input, desiredDop);
				}
				dop = desiredDop;
			}
			else {
				dop = inputDop;
			}
		}

		input
				.output(new TapOutputFormat(node))
				.name(tap.getIdentifier())
				.setParallelism(dop)
				.withParameters(FlinkConfigConverter.toFlinkConfig(sinkConfig));

	}
 
开发者ID:dataArtisans,项目名称:cascading-flink,代码行数:37,代码来源:FlinkFlowStep.java


示例11: first

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
/**
 * Returns a new set containing the first n elements in this {@link DataSet}.<br/>
 * @param n The desired number of elements.
 * @return A ReduceGroupOperator that represents the DataSet containing the elements.
*/
public GroupReduceOperator<T, T> first(int n) {
	if(n < 1) {
		throw new InvalidProgramException("Parameter n of first(n) must be at least 1.");
	}
	
	return reduceGroup(new FirstReducer<T>(n));
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:13,代码来源:DataSet.java


示例12: translateNode

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Override
public void translateNode(
    PTransform<PCollection<KV<K, InputT>>, PCollection<KV<K, Iterable<InputT>>>> transform,
    FlinkBatchTranslationContext context) {

  // for now, this is copied from the Combine.PerKey translater. Once we have the new runner API
  // we can replace GroupByKey by a Combine.PerKey with the Concatenate CombineFn

  DataSet<WindowedValue<KV<K, InputT>>> inputDataSet =
      context.getInputDataSet(context.getInput(transform));

  Combine.CombineFn<InputT, List<InputT>, List<InputT>> combineFn = new Concatenate<>();

  KvCoder<K, InputT> inputCoder =
      (KvCoder<K, InputT>) context.getInput(transform).getCoder();

  Coder<List<InputT>> accumulatorCoder;

  try {
    accumulatorCoder =
        combineFn.getAccumulatorCoder(
            context.getInput(transform).getPipeline().getCoderRegistry(),
            inputCoder.getValueCoder());
  } catch (CannotProvideCoderException e) {
    throw new RuntimeException(e);
  }

  WindowingStrategy<?, ?> windowingStrategy =
      context.getInput(transform).getWindowingStrategy();

  TypeInformation<WindowedValue<KV<K, List<InputT>>>> partialReduceTypeInfo =
      new CoderTypeInformation<>(
          WindowedValue.getFullCoder(
              KvCoder.of(inputCoder.getKeyCoder(), accumulatorCoder),
              windowingStrategy.getWindowFn().windowCoder()));


  Grouping<WindowedValue<KV<K, InputT>>> inputGrouping =
      inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder()));

  @SuppressWarnings("unchecked")
  WindowingStrategy<Object, BoundedWindow> boundedStrategy =
      (WindowingStrategy<Object, BoundedWindow>) windowingStrategy;

  FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction =
      new FlinkPartialReduceFunction<>(
          combineFn, boundedStrategy,
          Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
          context.getPipelineOptions());

  FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction =
      new FlinkReduceFunction<>(
          combineFn, boundedStrategy,
          Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(),
          context.getPipelineOptions());

  // Partially GroupReduce the values into the intermediate format AccumT (combine)
  GroupCombineOperator<
      WindowedValue<KV<K, InputT>>,
      WindowedValue<KV<K, List<InputT>>>> groupCombine =
      new GroupCombineOperator<>(
          inputGrouping,
          partialReduceTypeInfo,
          partialReduceFunction,
          "GroupCombine: " + transform.getName());

  Grouping<WindowedValue<KV<K, List<InputT>>>> intermediateGrouping =
      groupCombine.groupBy(new KvKeySelector<List<InputT>, K>(inputCoder.getKeyCoder()));

  // Fully reduce the values and create output format VO
  GroupReduceOperator<
      WindowedValue<KV<K, List<InputT>>>, WindowedValue<KV<K, List<InputT>>>> outputDataSet =
      new GroupReduceOperator<>(
          intermediateGrouping, partialReduceTypeInfo, reduceFunction, transform.getName());

  context.setOutputDataSet(context.getOutput(transform), outputDataSet);

}
 
开发者ID:apache,项目名称:beam,代码行数:79,代码来源:FlinkBatchTransformTranslators.java


示例13: runInternal

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Override
public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
		throws Exception {
	// s, d(s), 1/log(d(s))
	DataSet<Tuple3<K, LongValue, FloatValue>> inverseLogDegree = input
		.run(new VertexDegree<K, VV, EV>()
			.setParallelism(parallelism))
		.map(new VertexInverseLogDegree<>())
			.setParallelism(parallelism)
			.name("Vertex score");

	// s, t, 1/log(d(s))
	DataSet<Tuple3<K, K, FloatValue>> sourceInverseLogDegree = input
		.getEdges()
		.join(inverseLogDegree, JoinHint.REPARTITION_HASH_SECOND)
		.where(0)
		.equalTo(0)
		.projectFirst(0, 1)
		.<Tuple3<K, K, FloatValue>>projectSecond(2)
			.setParallelism(parallelism)
			.name("Edge score");

	// group span, s, t, 1/log(d(s))
	DataSet<Tuple4<IntValue, K, K, FloatValue>> groupSpans = sourceInverseLogDegree
		.groupBy(0)
		.sortGroup(1, Order.ASCENDING)
		.reduceGroup(new GenerateGroupSpans<>())
			.setParallelism(parallelism)
			.name("Generate group spans");

	// group, s, t, 1/log(d(s))
	DataSet<Tuple4<IntValue, K, K, FloatValue>> groups = groupSpans
		.rebalance()
			.setParallelism(parallelism)
			.name("Rebalance")
		.flatMap(new GenerateGroups<>())
			.setParallelism(parallelism)
			.name("Generate groups");

	// t, u, 1/log(d(s)) where (s, t) and (s, u) are edges in graph
	DataSet<Tuple3<K, K, FloatValue>> twoPaths = groups
		.groupBy(0, 1)
		.sortGroup(2, Order.ASCENDING)
		.reduceGroup(new GenerateGroupPairs<>())
			.name("Generate group pairs");

	// t, u, adamic-adar score
	GroupReduceOperator<Tuple3<K, K, FloatValue>, Result<K>> scores = twoPaths
		.groupBy(0, 1)
		.reduceGroup(new ComputeScores<>(minimumScore, minimumRatio))
			.name("Compute scores");

	if (minimumRatio > 0.0f) {
		// total score, number of pairs of neighbors
		DataSet<Tuple2<FloatValue, LongValue>> sumOfScoresAndNumberOfNeighborPairs = inverseLogDegree
			.map(new ComputeScoreFromVertex<>())
				.setParallelism(parallelism)
				.name("Average score")
			.sum(0)
			.andSum(1);

		scores
			.withBroadcastSet(sumOfScoresAndNumberOfNeighborPairs, SUM_OF_SCORES_AND_NUMBER_OF_NEIGHBOR_PAIRS);
	}

	if (mirrorResults) {
		return scores
			.flatMap(new MirrorResult<>())
				.name("Mirror results");
	} else {
		return scores;
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:74,代码来源:AdamicAdar.java


示例14: testAllReduceWithCombiner

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testAllReduceWithCombiner() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(8);
		
		DataSet<Long> data = env.generateSequence(1, 8000000).name("source");

		GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new CombineReducer2()).name("reducer");

		reduced.setCombinable(true);
		reduced.output(new DiscardingOutputFormat<Long>()).name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// get the combiner
		SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
		
		// check wiring
		assertEquals(sourceNode, combineNode.getInput().getSource());
		assertEquals(reduceNode, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
		assertEquals(DriverStrategy.ALL_GROUP_REDUCE_COMBINE, combineNode.getDriverStrategy());
		
		// check parallelism
		assertEquals(8, sourceNode.getParallelism());
		assertEquals(8, combineNode.getParallelism());
		assertEquals(1, reduceNode.getParallelism());
		assertEquals(1, sinkNode.getParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:47,代码来源:GroupReduceCompilationTest.java


示例15: testGroupedReduceWithFieldPositionKeyCombinable

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testGroupedReduceWithFieldPositionKeyCombinable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(8);
		
		DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
			.name("source").setParallelism(6);
		
		GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
				.groupBy(1)
				.reduceGroup(new CombineReducer()).name("reducer");
		
		reduced.setCombinable(true);
		reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// get the combiner
		SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
		
		// check wiring
		assertEquals(sourceNode, combineNode.getInput().getSource());
		assertEquals(reduceNode, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
		
		// check the keys
		assertEquals(new FieldList(1), reduceNode.getKeys(0));
		assertEquals(new FieldList(1), combineNode.getKeys(0));
		assertEquals(new FieldList(1), combineNode.getKeys(1));
		assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
		
		// check parallelism
		assertEquals(6, sourceNode.getParallelism());
		assertEquals(6, combineNode.getParallelism());
		assertEquals(8, reduceNode.getParallelism());
		assertEquals(8, sinkNode.getParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:56,代码来源:GroupReduceCompilationTest.java


示例16: testGroupedReduceWithSelectorFunctionKeyCombinable

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(8);
		
		DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
			.name("source").setParallelism(6);
		
		GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
			.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
				public String getKey(Tuple2<String, Double> value) { return value.f0; }
			})
			.reduceGroup(new CombineReducer()).name("reducer");
		
		reduced.setCombinable(true);
		reduced.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// get the combiner
		SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
		
		// get the key extractors and projectors
		SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
		SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
		
		// check wiring
		assertEquals(sourceNode, keyExtractor.getInput().getSource());
		assertEquals(keyProjector, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
		
		// check the keys
		assertEquals(new FieldList(0), reduceNode.getKeys(0));
		assertEquals(new FieldList(0), combineNode.getKeys(0));
		assertEquals(new FieldList(0), combineNode.getKeys(1));
		assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
		
		// check parallelism
		assertEquals(6, sourceNode.getParallelism());
		assertEquals(6, keyExtractor.getParallelism());
		assertEquals(6, combineNode.getParallelism());
		
		assertEquals(8, reduceNode.getParallelism());
		assertEquals(8, keyProjector.getParallelism());
		assertEquals(8, sinkNode.getParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:65,代码来源:GroupReduceCompilationTest.java


示例17: testAllReduceWithCombiner

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testAllReduceWithCombiner() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setDegreeOfParallelism(8);
		
		DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
		
		GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
			public void reduce(Iterable<Long> values, Collector<Long> out) {}
		}).name("reducer");
		
		reduced.setCombinable(true);
		reduced.print().name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// get the combiner
		SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
		
		// check wiring
		assertEquals(sourceNode, combineNode.getInput().getSource());
		assertEquals(reduceNode, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
		assertEquals(DriverStrategy.ALL_GROUP_COMBINE, combineNode.getDriverStrategy());
		
		// check DOP
		assertEquals(8, sourceNode.getDegreeOfParallelism());
		assertEquals(8, combineNode.getDegreeOfParallelism());
		assertEquals(1, reduceNode.getDegreeOfParallelism());
		assertEquals(1, sinkNode.getDegreeOfParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:49,代码来源:GroupReduceCompilationTest.java


示例18: testGroupedReduceWithFieldPositionKeyCombinable

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testGroupedReduceWithFieldPositionKeyCombinable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setDegreeOfParallelism(8);
		
		DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
			.name("source").setParallelism(6);
		
		GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
				.groupBy(1)
				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
			public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
		}).name("reducer");
		
		reduced.setCombinable(true);
		reduced.print().name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// get the combiner
		SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
		
		// check wiring
		assertEquals(sourceNode, combineNode.getInput().getSource());
		assertEquals(reduceNode, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
		
		// check the keys
		assertEquals(new FieldList(1), reduceNode.getKeys(0));
		assertEquals(new FieldList(1), combineNode.getKeys(0));
		assertEquals(new FieldList(1), combineNode.getKeys(1));
		assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
		
		// check DOP
		assertEquals(6, sourceNode.getDegreeOfParallelism());
		assertEquals(6, combineNode.getDegreeOfParallelism());
		assertEquals(8, reduceNode.getDegreeOfParallelism());
		assertEquals(8, sinkNode.getDegreeOfParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:58,代码来源:GroupReduceCompilationTest.java


示例19: testGroupedReduceWithSelectorFunctionKeyCombinable

import org.apache.flink.api.java.operators.GroupReduceOperator; //导入依赖的package包/类
@Test
public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setDegreeOfParallelism(8);
		
		DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
			.name("source").setParallelism(6);
		
		GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
			.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
				public String getKey(Tuple2<String, Double> value) { return value.f0; }
			})
			.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
			public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
		}).name("reducer");
		
		reduced.setCombinable(true);
		reduced.print().name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// get the combiner
		SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
		
		// get the key extractors and projectors
		SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
		SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
		
		// check wiring
		assertEquals(sourceNode, keyExtractor.getInput().getSource());
		assertEquals(keyProjector, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
		
		// check the keys
		assertEquals(new FieldList(0), reduceNode.getKeys(0));
		assertEquals(new FieldList(0), combineNode.getKeys(0));
		assertEquals(new FieldList(0), combineNode.getKeys(1));
		assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
		
		// check DOP
		assertEquals(6, sourceNode.getDegreeOfParallelism());
		assertEqual 

鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java AnimationAdapter类代码示例发布时间:2022-05-23
下一篇:
Java ApplicationTenancyRepository类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap