• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java CoGroupFunction类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.api.common.functions.CoGroupFunction的典型用法代码示例。如果您正苦于以下问题:Java CoGroupFunction类的具体用法?Java CoGroupFunction怎么用?Java CoGroupFunction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



CoGroupFunction类属于org.apache.flink.api.common.functions包,在下文中一共展示了CoGroupFunction类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: translateSelectorFunctionCoGroup

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
private static <I1, I2, K, OUT> PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K> translateSelectorFunctionCoGroup(
		SelectorFunctionKeys<I1, ?> rawKeys1, SelectorFunctionKeys<I2, ?> rawKeys2,
		CoGroupFunction<I1, I2, OUT> function,
		TypeInformation<OUT> outputType, String name,
		Operator<I1> input1, Operator<I2> input2) {
	@SuppressWarnings("unchecked")
	final SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>) rawKeys1;
	@SuppressWarnings("unchecked")
	final SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>) rawKeys2;

	final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1);
	final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = KeyFunctions.createTypeWithKey(keys2);

	final Operator<Tuple2<K, I1>> keyedInput1 = KeyFunctions.appendKeyExtractor(input1, keys1);
	final Operator<Tuple2<K, I2>> keyedInput2 = KeyFunctions.appendKeyExtractor(input2, keys2);

	final PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
		new PlanBothUnwrappingCoGroupOperator<>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);

	cogroup.setFirstInput(keyedInput1);
	cogroup.setSecondInput(keyedInput2);

	return cogroup;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:CoGroupOperator.java


示例2: CoGroupRawOperator

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public CoGroupRawOperator(DataSet<I1> input1, DataSet<I2> input2,
		Keys<I1> keys1, Keys<I2> keys2,
		CoGroupFunction<I1, I2, OUT> function,
		TypeInformation<OUT> returnType,
		String defaultName) {
	super(input1, input2, returnType);
	this.function = function;
	this.defaultName = defaultName;
	this.name = defaultName;

	if (keys1 == null || keys2 == null) {
		throw new NullPointerException();
	}

	this.keys1 = keys1;
	this.keys2 = keys2;

	extractSemanticAnnotationsFromUdf(function.getClass());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:CoGroupRawOperator.java


示例3: PlanLeftUnwrappingCoGroupOperator

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public PlanLeftUnwrappingCoGroupOperator(
		CoGroupFunction<I1, I2, OUT> udf,
		Keys.SelectorFunctionKeys<I1, K> key1,
		int[] key2,
		String name,
		TypeInformation<OUT> resultType,
		TypeInformation<Tuple2<K, I1>> typeInfoWithKey1,
		TypeInformation<I2> typeInfo2) {

	super(
			new TupleLeftUnwrappingCoGrouper<I1, I2, OUT, K>(udf),
			new BinaryOperatorInformation<Tuple2<K, I1>, I2, OUT>(
					typeInfoWithKey1,
					typeInfo2,
					resultType),
			key1.computeLogicalKeyPositions(),
			key2,
			name);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:PlanLeftUnwrappingCoGroupOperator.java


示例4: PlanRightUnwrappingCoGroupOperator

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public PlanRightUnwrappingCoGroupOperator(
		CoGroupFunction<I1, I2, OUT> udf,
		int[] key1,
		Keys.SelectorFunctionKeys<I2, K> key2,
		String name,
		TypeInformation<OUT> resultType,
		TypeInformation<I1> typeInfo1,
		TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) {

	super(
			new TupleRightUnwrappingCoGrouper<I1, I2, OUT, K>(udf),
			new BinaryOperatorInformation<I1, Tuple2<K, I2>, OUT>(
					typeInfo1,
					typeInfoWithKey2,
					resultType),
			key1,
			key2.computeLogicalKeyPositions(),
			name);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:PlanRightUnwrappingCoGroupOperator.java


示例5: PlanBothUnwrappingCoGroupOperator

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public PlanBothUnwrappingCoGroupOperator(
		CoGroupFunction<I1, I2, OUT> udf,
		Keys.SelectorFunctionKeys<I1, K> key1,
		Keys.SelectorFunctionKeys<I2, K> key2,
		String name,
		TypeInformation<OUT> type,
		TypeInformation<Tuple2<K, I1>> typeInfoWithKey1,
		TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) {

	super(
			new TupleBothUnwrappingCoGrouper<I1, I2, OUT, K>(udf),
			new BinaryOperatorInformation<Tuple2<K, I1>, Tuple2<K, I2>, OUT>(
					typeInfoWithKey1,
					typeInfoWithKey2,
					type),
			key1.computeLogicalKeyPositions(),
			key2.computeLogicalKeyPositions(),
			name);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:PlanBothUnwrappingCoGroupOperator.java


示例6: getCoGroupOperator

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
private CoGroupOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>,
		Tuple2<String, Integer>, CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,
		Tuple2<String, Integer>>> getCoGroupOperator(
		RichCoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> udf) {

	return new CoGroupOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>,
			CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>>(
			udf,
			new BinaryOperatorInformation<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>(
					TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"),
					TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"),
					TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>")
			),
			new int[]{0},
			new int[]{0},
			"coGroup on Collections"
	);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:CoGroupOperatorCollectionTest.java


示例7: getCoGroupReturnTypes

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
		TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
	return getBinaryOperatorReturnType(
		(Function) coGroupInterface,
		CoGroupFunction.class,
		0,
		1,
		2,
		new int[]{0, 0},
		new int[]{1, 0},
		new int[]{2, 0},
		in1Type,
		in2Type,
		functionName,
		allowMissing);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:TypeExtractor.java


示例8: testCoGroupOperatorWithCheckpoint

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
/**
 * Verifies that pipelines including {@link CoGroupedStreams} can be checkpointed properly,
 * which includes snapshotting configurations of any involved serializers.
 *
 * @see <a href="https://issues.apache.org/jira/browse/FLINK-6808">FLINK-6808</a>
 */
@Test
public void testCoGroupOperatorWithCheckpoint() throws Exception {

	// generate an operator for the co-group operation
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
	env.setParallelism(1);

	DataStream<Tuple2<String, Integer>> source1 = env.fromElements(Tuple2.of("a", 0), Tuple2.of("b", 3));
	DataStream<Tuple2<String, Integer>> source2 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("b", 6));

	DataStream<String> coGroupWindow = source1.coGroup(source2)
		.where(new Tuple2KeyExtractor())
		.equalTo(new Tuple2KeyExtractor())
		.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
		.apply(new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
			@Override
			public void coGroup(Iterable<Tuple2<String, Integer>> first,
								Iterable<Tuple2<String, Integer>> second,
								Collector<String> out) throws Exception {
				out.collect(first + ":" + second);
			}
		});

	OneInputTransformation<Tuple2<String, Integer>, String> transform = (OneInputTransformation<Tuple2<String, Integer>, String>) coGroupWindow.getTransformation();
	OneInputStreamOperator<Tuple2<String, Integer>, String> operator = transform.getOperator();

	// wrap the operator in the test harness, and perform a snapshot
	OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
		new KeyedOneInputStreamOperatorTestHarness<>(operator, new Tuple2KeyExtractor(), BasicTypeInfo.STRING_TYPE_INFO);

	testHarness.open();
	testHarness.snapshot(0L, 0L);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:41,代码来源:CoGroupJoinITCase.java


示例9: testCoGroupLambda

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
@Test
public void testCoGroupLambda() {
	CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};

	TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
	if (!(ti instanceof MissingTypeInfo)) {
		Assert.assertTrue(ti.isTupleType());
		Assert.assertEquals(2, ti.getArity());
		Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
		Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:13,代码来源:LambdaExtractionTest.java


示例10: translateSelectorFunctionCoGroupRight

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
private static <I1, I2, K, OUT> PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K> translateSelectorFunctionCoGroupRight(
		int[] logicalKeyPositions1, SelectorFunctionKeys<I2, ?> rawKeys2,
		CoGroupFunction<I1, I2, OUT> function,
		TypeInformation<I1> inputType1, TypeInformation<OUT> outputType, String name,
		Operator<I1> input1, Operator<I2> input2) {
	if (!inputType1.isTupleType()) {
		throw new InvalidParameterException("Should not happen.");
	}

	@SuppressWarnings("unchecked")
	final SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>) rawKeys2;
	final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = KeyFunctions.createTypeWithKey(keys2);
	final Operator<Tuple2<K, I2>> keyedInput2 = KeyFunctions.appendKeyExtractor(input2, keys2);

	final PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
			new PlanRightUnwrappingCoGroupOperator<>(
					function,
					logicalKeyPositions1,
					keys2,
					name,
					outputType,
					inputType1,
					typeInfoWithKey2);

	cogroup.setFirstInput(input1);
	cogroup.setSecondInput(keyedInput2);

	return cogroup;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:CoGroupOperator.java


示例11: translateSelectorFunctionCoGroupLeft

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
private static <I1, I2, K, OUT> PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> translateSelectorFunctionCoGroupLeft(
		SelectorFunctionKeys<I1, ?> rawKeys1, int[] logicalKeyPositions2,
		CoGroupFunction<I1, I2, OUT> function,
		TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
		Operator<I1> input1, Operator<I2> input2) {
	if (!inputType2.isTupleType()) {
		throw new InvalidParameterException("Should not happen.");
	}

	@SuppressWarnings("unchecked")
	final SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>) rawKeys1;
	final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1);
	final Operator<Tuple2<K, I1>> keyedInput1 = KeyFunctions.appendKeyExtractor(input1, keys1);

	final PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
			new PlanLeftUnwrappingCoGroupOperator<>(
					function,
					keys1,
					logicalKeyPositions2,
					name,
					outputType,
					typeInfoWithKey1,
					inputType2);

	cogroup.setFirstInput(keyedInput1);
	cogroup.setSecondInput(input2);

	return cogroup;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:CoGroupOperator.java


示例12: with

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
/**
 * Finalizes a CoGroup transformation by applying a {@link org.apache.flink.api.common.functions.RichCoGroupFunction} to groups of elements with identical keys.
 *
 * <p>Each CoGroupFunction call returns an arbitrary number of keys.
 *
 * @param function The CoGroupFunction that is called for all groups of elements with identical keys.
 * @return An CoGroupOperator that represents the co-grouped result DataSet.
 *
 * @see org.apache.flink.api.common.functions.RichCoGroupFunction
 * @see DataSet
 */
public <R> CoGroupOperator<I1, I2, R> with(CoGroupFunction<I1, I2, R> function) {
	if (function == null) {
		throw new NullPointerException("CoGroup function must not be null.");
	}
	TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType(),
			Utils.getCallLocationName(), true);

	return new CoGroupOperator<>(input1, input2, keys1, keys2, input1.clean(function), returnType,
			groupSortKeyOrderFirst, groupSortKeyOrderSecond,
			customPartitioner, Utils.getCallLocationName());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:CoGroupOperator.java


示例13: testWebLogAnalysisExamplesAntiJoinVisits

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
@Test
public void testWebLogAnalysisExamplesAntiJoinVisits() {
	compareAnalyzerResultWithAnnotationsDualInputWithKeys(CoGroupFunction.class, AntiJoinVisits.class,
			"Tuple3<Integer, String, Integer>",
			"Tuple1<String>",
			"Tuple3<Integer, String, Integer>",
			new String[] { "1" }, new String[] { "0" });
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:UdfAnalyzerExamplesTest.java


示例14: run

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
@Override
public void run() throws Exception
{
	final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();

	final CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub();
	final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
	final CoGroupTaskIterator<IT1, IT2> coGroupIterator = this.coGroupIterator;
	
	while (this.running && coGroupIterator.next()) {
		coGroupStub.coGroup(coGroupIterator.getValues1(), coGroupIterator.getValues2(), collector);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:14,代码来源:CoGroupDriver.java


示例15: run

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
@Override
public void run() throws Exception {
	final CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub();
	final Collector<OT> collector = this.taskContext.getOutputCollector();
	final SimpleIterable<IT1> i1 = this.coGroupIterator1;
	final SimpleIterable<IT2> i2 = this.coGroupIterator2;

	coGroupStub.coGroup(i1, i2, collector);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:CoGroupRawDriver.java


示例16: apply

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
/**
 * Completes the co-group operation with the user function that is executed
 * for windowed groups.
 *
 * <p>Note: This method's return type does not support setting an operator-specific parallelism.
 * Due to binary backwards compatibility, this cannot be altered. Use the {@link #with(CoGroupFunction)}
 * method to set an operator-specific parallelism.
 */
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {

	TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
		function,
		input1.getType(),
		input2.getType(),
		"CoGroup",
		false);

	return apply(function, resultType);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:CoGroupedStreams.java


示例17: apply

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
    //clean the closure
    function = input1.getExecutionEnvironment().clean(function);
    boolean setProcessingTime = input1.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;

    CoGroupOperator<KEY, T1, T2, T> coGroupOperator =
            new CoGroupOperator<>(function,
                    windowAssigner1, windowAssigner2,
                    keySelector1, keySelector2,
                    new HeapWindowBuffer.Factory<T1>(),
                    new HeapWindowBuffer.Factory<T2>(),
                    input1.getType().createSerializer(getExecutionEnvironment().getConfig()),
                    input2.getType().createSerializer(getExecutionEnvironment().getConfig()),
                    TypeExtractor.getKeySelectorTypes(keySelector1, input1.getType()).createSerializer(getExecutionEnvironment().getConfig()),
                    windowAssigner1.getDefaultTrigger(getExecutionEnvironment()),
                    windowAssigner2.getDefaultTrigger(getExecutionEnvironment()),
                    windowAssigner1.getWindowSerializer(getExecutionEnvironment().getConfig())
            ).enableSetProcessingTime(setProcessingTime);

    TwoInputTransformation<T1, T2, T>
            twoInputTransformation = new TwoInputTransformation<>(
            input1.getTransformation(),
            input2.getTransformation(),
            "Join",
            coGroupOperator,
            resultType,
            parallelism
    );

    return new DataStream<>(getExecutionEnvironment(), twoInputTransformation);
}
 
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:32,代码来源:MultiWindowsJoinedStreams.java


示例18: CoGroupOperator

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public CoGroupOperator(CoGroupFunction<IN1, IN2, OUT> userFunction,
                       WindowAssigner<? super IN1, TimeWindow> windowAssigner1,
                       WindowAssigner<? super IN2, TimeWindow> windowAssigner2,
                       KeySelector<IN1, K> keySelector1,
                       KeySelector<IN2, K> keySelector2,
                       WindowBufferFactory<? super IN1, ? extends WindowBuffer<IN1>> windowBufferFactory1,
                       WindowBufferFactory<? super IN2, ? extends WindowBuffer<IN2>> windowBufferFactory2,
                       TypeSerializer<IN1> inputSerializer1,
                       TypeSerializer<IN2> inputSerializer2,
                       TypeSerializer<K> keySerializer,
                       Trigger<? super IN1, TimeWindow> trigger1,
                       Trigger<? super IN2, TimeWindow> trigger2,
                       TypeSerializer<TimeWindow> windowSerializer) {
    super(userFunction);
    this.trigger1 = trigger1;
    this.trigger2 = trigger2;
    this.windowAssigner1 = requireNonNull(windowAssigner1);
    this.windowAssigner2 = requireNonNull(windowAssigner2);
    this.keySelector1 = requireNonNull(keySelector1);
    this.keySelector2 = requireNonNull(keySelector2);
    this.windowBufferFactory1 = requireNonNull(windowBufferFactory1);
    this.windowBufferFactory2 = requireNonNull(windowBufferFactory2);
    this.inputSerializer1 = requireNonNull(inputSerializer1);
    this.inputSerializer2 = requireNonNull(inputSerializer2);
    this.keySerializer = requireNonNull(keySerializer);
    this.windowSerializer = windowSerializer;
}
 
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:28,代码来源:CoGroupOperator.java


示例19: main

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public static void main(final String[] args) throws Exception {
	// set up the execution environment
	final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

	DataSet<AvroLineitem> lineItemFromAvro = env.createInput(
			new AvroInputFormat<AvroLineitem>(new Path(args[0]), AvroLineitem.class));
	DataSet<AvroLineitem> lineItemFromCsv = env.readTextFile(args[1]).map(new Prepare.AvroLineItemMapper());
	DataSet<String> empty = lineItemFromAvro
		.coGroup(lineItemFromCsv).where("orderKey", "partKey", "supplierKey", "lineNumber").equalTo("orderKey", "partKey", "supplierKey", "lineNumber").with(new CoGroupFunction<AvroLineitem, AvroLineitem, String>() {
			@Override
			public void coGroup(Iterable<AvroLineitem> avro, Iterable<AvroLineitem> csv, Collector<String> collector) throws Exception {
				Iterator<AvroLineitem> aIt = avro.iterator();
				if(!aIt.hasNext()) {
					throw new RuntimeException("Expected item from Avro input");
				}
				AvroLineitem left = aIt.next();
				if(aIt.hasNext()) {
					throw new RuntimeException("Unexpectedly received two avro records on this side. left="+left+" next="+aIt.next());
				}

				Iterator<AvroLineitem> cIt = csv.iterator();
				if(!cIt.hasNext()) {
					throw new RuntimeException("Expected item from CSV input");
				}
				AvroLineitem right = cIt.next();
				if(cIt.hasNext()) {
					throw new RuntimeException("Unexpectedly received two CSV records on this side");
				}
				if(!right.equals(left)) {
					throw new RuntimeException("Records are not equal");
				}
			}
		});
	empty.output(new DiscardingOutputFormat<String>());
	env.execute("Compare Job");
}
 
开发者ID:project-flink,项目名称:flink-perf,代码行数:37,代码来源:CompareJob.java


示例20: testCoGroupLambda

import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
@Test
public void testCoGroupLambda() {
	CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
	
	TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
	Assert.assertTrue(ti.isTupleType());
	Assert.assertEquals(2, ti.getArity());
	Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
	Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:11,代码来源:LambdaExtractionTest.java



注:本文中的org.apache.flink.api.common.functions.CoGroupFunction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ResourceEvent类代码示例发布时间:2022-05-22
下一篇:
Java Column类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap