• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java RichGroupReduceFunction类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.api.common.functions.RichGroupReduceFunction的典型用法代码示例。如果您正苦于以下问题:Java RichGroupReduceFunction类的具体用法?Java RichGroupReduceFunction怎么用?Java RichGroupReduceFunction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



RichGroupReduceFunction类属于org.apache.flink.api.common.functions包,在下文中一共展示了RichGroupReduceFunction类的15个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testBasicType

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testBasicType() {
	// use getGroupReduceReturnTypes()
	RichGroupReduceFunction<?, ?> function = new RichGroupReduceFunction<Boolean, Boolean>() {
		private static final long serialVersionUID = 1L;

		@Override
		public void reduce(Iterable<Boolean> values, Collector<Boolean> out) throws Exception {
			// nothing to do
		}
	};

	TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Boolean"));

	Assert.assertTrue(ti.isBasicType());
	Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
	Assert.assertEquals(Boolean.class, ti.getTypeClass());

	// use getForClass()
	Assert.assertTrue(TypeExtractor.getForClass(Boolean.class).isBasicType());
	Assert.assertEquals(ti, TypeExtractor.getForClass(Boolean.class));

	// use getForObject()
	Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeExtractor.getForObject(Boolean.valueOf(true)));
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:27,代码来源:TypeExtractorTest.java


示例2: testBasicType

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void testBasicType() {
	// use getGroupReduceReturnTypes()
	RichGroupReduceFunction<?, ?> function = new RichGroupReduceFunction<Boolean, Boolean>() {
		private static final long serialVersionUID = 1L;

		@Override
		public void reduce(Iterable<Boolean> values, Collector<Boolean> out) throws Exception {
			// nothing to do
		}
	};

	TypeInformation<?> ti = TypeExtractor.getGroupReduceReturnTypes(function, (TypeInformation) TypeInfoParser.parse("Boolean"));

	Assert.assertTrue(ti.isBasicType());
	Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, ti);
	Assert.assertEquals(Boolean.class, ti.getTypeClass());

	// use getForClass()
	Assert.assertTrue(TypeExtractor.getForClass(Boolean.class).isBasicType());
	Assert.assertEquals(ti, TypeExtractor.getForClass(Boolean.class));

	// use getForObject()
	Assert.assertEquals(BasicTypeInfo.BOOLEAN_TYPE_INFO, TypeExtractor.getForObject(true));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:TypeExtractorTest.java


示例3: PlanUnwrappingReduceGroupOperator

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
public PlanUnwrappingReduceGroupOperator(GroupReduceFunction<IN, OUT> udf, Keys.SelectorFunctionKeys<IN, K> key, String name,
		TypeInformation<OUT> outType, TypeInformation<Tuple2<K, IN>> typeInfoWithKey, boolean combinable)
{
	super(combinable ? new TupleUnwrappingFlatCombinableGroupReducer<IN, OUT, K>((RichGroupReduceFunction<IN, OUT>) udf) : new TupleUnwrappingNonCombinableGroupReducer<IN, OUT, K>(udf),
			new UnaryOperatorInformation<Tuple2<K, IN>, OUT>(typeInfoWithKey, outType), key.computeLogicalKeyPositions(), name);
	
	super.setCombinable(combinable);
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:9,代码来源:PlanUnwrappingReduceGroupOperator.java


示例4: testAllGroupReduceNoCombiner

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
@Test
public void testAllGroupReduceNoCombiner() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(8);
		
		DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
		
		data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
			public void reduce(Iterable<Double> values, Collector<Double> out) {}
		}).name("reducer")
		.output(new DiscardingOutputFormat<Double>()).name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		
		// the all-reduce has no combiner, when the parallelism of the input is one
		
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// check wiring
		assertEquals(sourceNode, reduceNode.getInput().getSource());
		assertEquals(reduceNode, sinkNode.getInput().getSource());
		
		// check that reduce has the right strategy
		assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
		
		// check parallelism
		assertEquals(1, sourceNode.getParallelism());
		assertEquals(1, reduceNode.getParallelism());
		assertEquals(1, sinkNode.getParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:44,代码来源:GroupReduceCompilationTest.java


示例5: testGroupedReduceWithFieldPositionKeyNonCombinable

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
@Test
public void testGroupedReduceWithFieldPositionKeyNonCombinable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(8);
		
		DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
			.name("source").setParallelism(6);
		
		data
			.groupBy(1)
			.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
			public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
		}).name("reducer")
		.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// check wiring
		assertEquals(sourceNode, reduceNode.getInput().getSource());
		assertEquals(reduceNode, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
		
		// check the keys
		assertEquals(new FieldList(1), reduceNode.getKeys(0));
		assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
		
		// check parallelism
		assertEquals(6, sourceNode.getParallelism());
		assertEquals(8, reduceNode.getParallelism());
		assertEquals(8, sinkNode.getParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:49,代码来源:GroupReduceCompilationTest.java


示例6: testGroupedReduceWithSelectorFunctionKeyNoncombinable

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
@Test
public void testGroupedReduceWithSelectorFunctionKeyNoncombinable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setParallelism(8);
		
		DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
			.name("source").setParallelism(6);
		
		data
			.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
				public String getKey(Tuple2<String, Double> value) { return value.f0; }
			})
			.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
			public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
		}).name("reducer")
		.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// get the key extractors and projectors
		SingleInputPlanNode keyExtractor = (SingleInputPlanNode) reduceNode.getInput().getSource();
		SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
		
		// check wiring
		assertEquals(sourceNode, keyExtractor.getInput().getSource());
		assertEquals(keyProjector, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
		
		// check the keys
		assertEquals(new FieldList(0), reduceNode.getKeys(0));
		assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
		
		// check parallelism
		assertEquals(6, sourceNode.getParallelism());
		assertEquals(6, keyExtractor.getParallelism());
		
		assertEquals(8, reduceNode.getParallelism());
		assertEquals(8, keyProjector.getParallelism());
		assertEquals(8, sinkNode.getParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:58,代码来源:GroupReduceCompilationTest.java


示例7: testAllGroupReduceNoCombiner

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
@Test
public void testAllGroupReduceNoCombiner() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setDegreeOfParallelism(8);
		
		DataSet<Double> data = env.fromElements(0.2, 0.3, 0.4, 0.5).name("source");
		
		data.reduceGroup(new RichGroupReduceFunction<Double, Double>() {
			public void reduce(Iterable<Double> values, Collector<Double> out) {}
		}).name("reducer")
		.print().name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		
		// the all-reduce has no combiner, when the DOP of the input is one
		
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// check wiring
		assertEquals(sourceNode, reduceNode.getInput().getSource());
		assertEquals(reduceNode, sinkNode.getInput().getSource());
		
		// check that reduce has the right strategy
		assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
		
		// check DOP
		assertEquals(1, sourceNode.getDegreeOfParallelism());
		assertEquals(1, reduceNode.getDegreeOfParallelism());
		assertEquals(1, sinkNode.getDegreeOfParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:44,代码来源:GroupReduceCompilationTest.java


示例8: testAllReduceWithCombiner

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
@Test
public void testAllReduceWithCombiner() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setDegreeOfParallelism(8);
		
		DataSet<Long> data = env.generateSequence(1, 8000000).name("source");
		
		GroupReduceOperator<Long, Long> reduced = data.reduceGroup(new RichGroupReduceFunction<Long, Long>() {
			public void reduce(Iterable<Long> values, Collector<Long> out) {}
		}).name("reducer");
		
		reduced.setCombinable(true);
		reduced.print().name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// get the combiner
		SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
		
		// check wiring
		assertEquals(sourceNode, combineNode.getInput().getSource());
		assertEquals(reduceNode, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.ALL_GROUP_REDUCE, reduceNode.getDriverStrategy());
		assertEquals(DriverStrategy.ALL_GROUP_COMBINE, combineNode.getDriverStrategy());
		
		// check DOP
		assertEquals(8, sourceNode.getDegreeOfParallelism());
		assertEquals(8, combineNode.getDegreeOfParallelism());
		assertEquals(1, reduceNode.getDegreeOfParallelism());
		assertEquals(1, sinkNode.getDegreeOfParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:49,代码来源:GroupReduceCompilationTest.java


示例9: testGroupedReduceWithFieldPositionKeyNonCombinable

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
@Test
public void testGroupedReduceWithFieldPositionKeyNonCombinable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setDegreeOfParallelism(8);
		
		DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
			.name("source").setParallelism(6);
		
		data
			.groupBy(1)
			.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
			public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
		}).name("reducer")
		.print().name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// check wiring
		assertEquals(sourceNode, reduceNode.getInput().getSource());
		assertEquals(reduceNode, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
		
		// check the keys
		assertEquals(new FieldList(1), reduceNode.getKeys(0));
		assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
		
		// check DOP
		assertEquals(6, sourceNode.getDegreeOfParallelism());
		assertEquals(8, reduceNode.getDegreeOfParallelism());
		assertEquals(8, sinkNode.getDegreeOfParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:49,代码来源:GroupReduceCompilationTest.java


示例10: testGroupedReduceWithFieldPositionKeyCombinable

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
@Test
public void testGroupedReduceWithFieldPositionKeyCombinable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setDegreeOfParallelism(8);
		
		DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
			.name("source").setParallelism(6);
		
		GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
				.groupBy(1)
				.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
			public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
		}).name("reducer");
		
		reduced.setCombinable(true);
		reduced.print().name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// get the combiner
		SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
		
		// check wiring
		assertEquals(sourceNode, combineNode.getInput().getSource());
		assertEquals(reduceNode, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
		
		// check the keys
		assertEquals(new FieldList(1), reduceNode.getKeys(0));
		assertEquals(new FieldList(1), combineNode.getKeys(0));
		assertEquals(new FieldList(1), combineNode.getKeys(1));
		assertEquals(new FieldList(1), reduceNode.getInput().getLocalStrategyKeys());
		
		// check DOP
		assertEquals(6, sourceNode.getDegreeOfParallelism());
		assertEquals(6, combineNode.getDegreeOfParallelism());
		assertEquals(8, reduceNode.getDegreeOfParallelism());
		assertEquals(8, sinkNode.getDegreeOfParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:58,代码来源:GroupReduceCompilationTest.java


示例11: testGroupedReduceWithSelectorFunctionKeyNoncombinable

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
@Test
public void testGroupedReduceWithSelectorFunctionKeyNoncombinable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setDegreeOfParallelism(8);
		
		DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
			.name("source").setParallelism(6);
		
		data
			.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
				public String getKey(Tuple2<String, Double> value) { return value.f0; }
			})
			.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
			public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
		}).name("reducer")
		.print().name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// get the key extractors and projectors
		SingleInputPlanNode keyExtractor = (SingleInputPlanNode) reduceNode.getInput().getSource();
		SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
		
		// check wiring
		assertEquals(sourceNode, keyExtractor.getInput().getSource());
		assertEquals(keyProjector, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
		
		// check the keys
		assertEquals(new FieldList(0), reduceNode.getKeys(0));
		assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
		
		// check DOP
		assertEquals(6, sourceNode.getDegreeOfParallelism());
		assertEquals(6, keyExtractor.getDegreeOfParallelism());
		
		assertEquals(8, reduceNode.getDegreeOfParallelism());
		assertEquals(8, keyProjector.getDegreeOfParallelism());
		assertEquals(8, sinkNode.getDegreeOfParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:58,代码来源:GroupReduceCompilationTest.java


示例12: testGroupedReduceWithSelectorFunctionKeyCombinable

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
@Test
public void testGroupedReduceWithSelectorFunctionKeyCombinable() {
	try {
		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
		env.setDegreeOfParallelism(8);
		
		DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class)
			.name("source").setParallelism(6);
		
		GroupReduceOperator<Tuple2<String, Double>, Tuple2<String, Double>> reduced = data
			.groupBy(new KeySelector<Tuple2<String,Double>, String>() { 
				public String getKey(Tuple2<String, Double> value) { return value.f0; }
			})
			.reduceGroup(new RichGroupReduceFunction<Tuple2<String, Double>, Tuple2<String, Double>>() {
			public void reduce(Iterable<Tuple2<String, Double>> values, Collector<Tuple2<String, Double>> out) {}
		}).name("reducer");
		
		reduced.setCombinable(true);
		reduced.print().name("sink");
		
		Plan p = env.createProgramPlan();
		OptimizedPlan op = compileNoStats(p);
		
		OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);
		
		// get the original nodes
		SourcePlanNode sourceNode = resolver.getNode("source");
		SingleInputPlanNode reduceNode = resolver.getNode("reducer");
		SinkPlanNode sinkNode = resolver.getNode("sink");
		
		// get the combiner
		SingleInputPlanNode combineNode = (SingleInputPlanNode) reduceNode.getInput().getSource();
		
		// get the key extractors and projectors
		SingleInputPlanNode keyExtractor = (SingleInputPlanNode) combineNode.getInput().getSource();
		SingleInputPlanNode keyProjector = (SingleInputPlanNode) sinkNode.getInput().getSource();
		
		// check wiring
		assertEquals(sourceNode, keyExtractor.getInput().getSource());
		assertEquals(keyProjector, sinkNode.getInput().getSource());
		
		// check that both reduce and combiner have the same strategy
		assertEquals(DriverStrategy.SORTED_GROUP_REDUCE, reduceNode.getDriverStrategy());
		assertEquals(DriverStrategy.SORTED_GROUP_COMBINE, combineNode.getDriverStrategy());
		
		// check the keys
		assertEquals(new FieldList(0), reduceNode.getKeys(0));
		assertEquals(new FieldList(0), combineNode.getKeys(0));
		assertEquals(new FieldList(0), combineNode.getKeys(1));
		assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());
		
		// check DOP
		assertEquals(6, sourceNode.getDegreeOfParallelism());
		assertEquals(6, keyExtractor.getDegreeOfParallelism());
		assertEquals(6, combineNode.getDegreeOfParallelism());
		
		assertEquals(8, reduceNode.getDegreeOfParallelism());
		assertEquals(8, keyProjector.getDegreeOfParallelism());
		assertEquals(8, sinkNode.getDegreeOfParallelism());
	}
	catch (Exception e) {
		System.err.println(e.getMessage());
		e.printStackTrace();
		fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:67,代码来源:GroupReduceCompilationTest.java


示例13: checkCombinability

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
private void checkCombinability() {
	if (function instanceof FlatCombineFunction &&
			function.getClass().getAnnotation(RichGroupReduceFunction.Combinable.class) != null) {
		this.combinable = true;
	}
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:7,代码来源:GroupReduceOperator.java


示例14: translateSelectorFunctionDistinct

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
private static <IN, OUT, K> PlanUnwrappingReduceGroupOperator<IN, OUT, K> translateSelectorFunctionDistinct(
		Keys.SelectorFunctionKeys<IN, ?> rawKeys, RichGroupReduceFunction<IN, OUT> function,
		TypeInformation<IN> inputType, TypeInformation<OUT> outputType, String name, Operator<IN> input)
{
	@SuppressWarnings("unchecked")
	final Keys.SelectorFunctionKeys<IN, K> keys = (Keys.SelectorFunctionKeys<IN, K>) rawKeys;
	
	TypeInformation<Tuple2<K, IN>> typeInfoWithKey = new TupleTypeInfo<Tuple2<K, IN>>(keys.getKeyType(), inputType);
	
	KeyExtractingMapper<IN, K> extractor = new KeyExtractingMapper<IN, K>(keys.getKeyExtractor());


	PlanUnwrappingReduceGroupOperator<IN, OUT, K> reducer =
			new PlanUnwrappingReduceGroupOperator<IN, OUT, K>(function, keys, name, outputType, typeInfoWithKey, true);
	
	MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>> mapper = new MapOperatorBase<IN, Tuple2<K, IN>, MapFunction<IN, Tuple2<K, IN>>>(extractor, new UnaryOperatorInformation<IN, Tuple2<K, IN>>(inputType, typeInfoWithKey), "Key Extractor");

	reducer.setInput(mapper);
	mapper.setInput(input);
	
	// set the mapper's parallelism to the input parallelism to make sure it is chained
	mapper.setDegreeOfParallelism(input.getDegreeOfParallelism());
	
	return reducer;
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:26,代码来源:DistinctOperator.java


示例15: TupleUnwrappingFlatCombinableGroupReducer

import org.apache.flink.api.common.functions.RichGroupReduceFunction; //导入依赖的package包/类
private TupleUnwrappingFlatCombinableGroupReducer(RichGroupReduceFunction<IN, OUT> wrapped) {
	super(wrapped);
	this.iter = new TupleUnwrappingIterator<IN, K>();
	this.coll = new TupleWrappingCollector<IN, K>(this.iter);
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:6,代码来源:PlanUnwrappingReduceGroupOperator.java



注:本文中的org.apache.flink.api.common.functions.RichGroupReduceFunction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ICommonsMap类代码示例发布时间:2022-05-22
下一篇:
Java PropertyPath类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap