本文整理汇总了Java中org.apache.flink.api.common.functions.CoGroupFunction类的典型用法代码示例。如果您正苦于以下问题:Java CoGroupFunction类的具体用法?Java CoGroupFunction怎么用?Java CoGroupFunction使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
CoGroupFunction类属于org.apache.flink.api.common.functions包,在下文中一共展示了CoGroupFunction类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: translateSelectorFunctionCoGroup
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
private static <I1, I2, K, OUT> PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K> translateSelectorFunctionCoGroup(
SelectorFunctionKeys<I1, ?> rawKeys1, SelectorFunctionKeys<I2, ?> rawKeys2,
CoGroupFunction<I1, I2, OUT> function,
TypeInformation<OUT> outputType, String name,
Operator<I1> input1, Operator<I2> input2) {
@SuppressWarnings("unchecked")
final SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>) rawKeys1;
@SuppressWarnings("unchecked")
final SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>) rawKeys2;
final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1);
final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = KeyFunctions.createTypeWithKey(keys2);
final Operator<Tuple2<K, I1>> keyedInput1 = KeyFunctions.appendKeyExtractor(input1, keys1);
final Operator<Tuple2<K, I2>> keyedInput2 = KeyFunctions.appendKeyExtractor(input2, keys2);
final PlanBothUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
new PlanBothUnwrappingCoGroupOperator<>(function, keys1, keys2, name, outputType, typeInfoWithKey1, typeInfoWithKey2);
cogroup.setFirstInput(keyedInput1);
cogroup.setSecondInput(keyedInput2);
return cogroup;
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:CoGroupOperator.java
示例2: CoGroupRawOperator
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public CoGroupRawOperator(DataSet<I1> input1, DataSet<I2> input2,
Keys<I1> keys1, Keys<I2> keys2,
CoGroupFunction<I1, I2, OUT> function,
TypeInformation<OUT> returnType,
String defaultName) {
super(input1, input2, returnType);
this.function = function;
this.defaultName = defaultName;
this.name = defaultName;
if (keys1 == null || keys2 == null) {
throw new NullPointerException();
}
this.keys1 = keys1;
this.keys2 = keys2;
extractSemanticAnnotationsFromUdf(function.getClass());
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:CoGroupRawOperator.java
示例3: PlanLeftUnwrappingCoGroupOperator
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public PlanLeftUnwrappingCoGroupOperator(
CoGroupFunction<I1, I2, OUT> udf,
Keys.SelectorFunctionKeys<I1, K> key1,
int[] key2,
String name,
TypeInformation<OUT> resultType,
TypeInformation<Tuple2<K, I1>> typeInfoWithKey1,
TypeInformation<I2> typeInfo2) {
super(
new TupleLeftUnwrappingCoGrouper<I1, I2, OUT, K>(udf),
new BinaryOperatorInformation<Tuple2<K, I1>, I2, OUT>(
typeInfoWithKey1,
typeInfo2,
resultType),
key1.computeLogicalKeyPositions(),
key2,
name);
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:PlanLeftUnwrappingCoGroupOperator.java
示例4: PlanRightUnwrappingCoGroupOperator
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public PlanRightUnwrappingCoGroupOperator(
CoGroupFunction<I1, I2, OUT> udf,
int[] key1,
Keys.SelectorFunctionKeys<I2, K> key2,
String name,
TypeInformation<OUT> resultType,
TypeInformation<I1> typeInfo1,
TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) {
super(
new TupleRightUnwrappingCoGrouper<I1, I2, OUT, K>(udf),
new BinaryOperatorInformation<I1, Tuple2<K, I2>, OUT>(
typeInfo1,
typeInfoWithKey2,
resultType),
key1,
key2.computeLogicalKeyPositions(),
name);
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:PlanRightUnwrappingCoGroupOperator.java
示例5: PlanBothUnwrappingCoGroupOperator
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public PlanBothUnwrappingCoGroupOperator(
CoGroupFunction<I1, I2, OUT> udf,
Keys.SelectorFunctionKeys<I1, K> key1,
Keys.SelectorFunctionKeys<I2, K> key2,
String name,
TypeInformation<OUT> type,
TypeInformation<Tuple2<K, I1>> typeInfoWithKey1,
TypeInformation<Tuple2<K, I2>> typeInfoWithKey2) {
super(
new TupleBothUnwrappingCoGrouper<I1, I2, OUT, K>(udf),
new BinaryOperatorInformation<Tuple2<K, I1>, Tuple2<K, I2>, OUT>(
typeInfoWithKey1,
typeInfoWithKey2,
type),
key1.computeLogicalKeyPositions(),
key2.computeLogicalKeyPositions(),
name);
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:PlanBothUnwrappingCoGroupOperator.java
示例6: getCoGroupOperator
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
private CoGroupOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple2<String, Integer>, CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>,
Tuple2<String, Integer>>> getCoGroupOperator(
RichCoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>> udf) {
return new CoGroupOperatorBase<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>,
CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>>(
udf,
new BinaryOperatorInformation<Tuple2<String, Integer>, Tuple2<String, Integer>, Tuple2<String, Integer>>(
TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"),
TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>"),
TypeInfoParser.<Tuple2<String, Integer>>parse("Tuple2<String, Integer>")
),
new int[]{0},
new int[]{0},
"coGroup on Collections"
);
}
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:CoGroupOperatorCollectionTest.java
示例7: getCoGroupReturnTypes
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
@PublicEvolving
public static <IN1, IN2, OUT> TypeInformation<OUT> getCoGroupReturnTypes(CoGroupFunction<IN1, IN2, OUT> coGroupInterface,
TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type, String functionName, boolean allowMissing)
{
return getBinaryOperatorReturnType(
(Function) coGroupInterface,
CoGroupFunction.class,
0,
1,
2,
new int[]{0, 0},
new int[]{1, 0},
new int[]{2, 0},
in1Type,
in2Type,
functionName,
allowMissing);
}
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:TypeExtractor.java
示例8: testCoGroupOperatorWithCheckpoint
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
/**
* Verifies that pipelines including {@link CoGroupedStreams} can be checkpointed properly,
* which includes snapshotting configurations of any involved serializers.
*
* @see <a href="https://issues.apache.org/jira/browse/FLINK-6808">FLINK-6808</a>
*/
@Test
public void testCoGroupOperatorWithCheckpoint() throws Exception {
// generate an operator for the co-group operation
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Tuple2<String, Integer>> source1 = env.fromElements(Tuple2.of("a", 0), Tuple2.of("b", 3));
DataStream<Tuple2<String, Integer>> source2 = env.fromElements(Tuple2.of("a", 1), Tuple2.of("b", 6));
DataStream<String> coGroupWindow = source1.coGroup(source2)
.where(new Tuple2KeyExtractor())
.equalTo(new Tuple2KeyExtractor())
.window(TumblingEventTimeWindows.of(Time.of(3, TimeUnit.MILLISECONDS)))
.apply(new CoGroupFunction<Tuple2<String, Integer>, Tuple2<String, Integer>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Integer>> first,
Iterable<Tuple2<String, Integer>> second,
Collector<String> out) throws Exception {
out.collect(first + ":" + second);
}
});
OneInputTransformation<Tuple2<String, Integer>, String> transform = (OneInputTransformation<Tuple2<String, Integer>, String>) coGroupWindow.getTransformation();
OneInputStreamOperator<Tuple2<String, Integer>, String> operator = transform.getOperator();
// wrap the operator in the test harness, and perform a snapshot
OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, String> testHarness =
new KeyedOneInputStreamOperatorTestHarness<>(operator, new Tuple2KeyExtractor(), BasicTypeInfo.STRING_TYPE_INFO);
testHarness.open();
testHarness.snapshot(0L, 0L);
}
开发者ID:axbaretto,项目名称:flink,代码行数:41,代码来源:CoGroupJoinITCase.java
示例9: testCoGroupLambda
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
@Test
public void testCoGroupLambda() {
CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
if (!(ti instanceof MissingTypeInfo)) {
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:13,代码来源:LambdaExtractionTest.java
示例10: translateSelectorFunctionCoGroupRight
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
private static <I1, I2, K, OUT> PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K> translateSelectorFunctionCoGroupRight(
int[] logicalKeyPositions1, SelectorFunctionKeys<I2, ?> rawKeys2,
CoGroupFunction<I1, I2, OUT> function,
TypeInformation<I1> inputType1, TypeInformation<OUT> outputType, String name,
Operator<I1> input1, Operator<I2> input2) {
if (!inputType1.isTupleType()) {
throw new InvalidParameterException("Should not happen.");
}
@SuppressWarnings("unchecked")
final SelectorFunctionKeys<I2, K> keys2 = (SelectorFunctionKeys<I2, K>) rawKeys2;
final TypeInformation<Tuple2<K, I2>> typeInfoWithKey2 = KeyFunctions.createTypeWithKey(keys2);
final Operator<Tuple2<K, I2>> keyedInput2 = KeyFunctions.appendKeyExtractor(input2, keys2);
final PlanRightUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
new PlanRightUnwrappingCoGroupOperator<>(
function,
logicalKeyPositions1,
keys2,
name,
outputType,
inputType1,
typeInfoWithKey2);
cogroup.setFirstInput(input1);
cogroup.setSecondInput(keyedInput2);
return cogroup;
}
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:CoGroupOperator.java
示例11: translateSelectorFunctionCoGroupLeft
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
private static <I1, I2, K, OUT> PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> translateSelectorFunctionCoGroupLeft(
SelectorFunctionKeys<I1, ?> rawKeys1, int[] logicalKeyPositions2,
CoGroupFunction<I1, I2, OUT> function,
TypeInformation<I2> inputType2, TypeInformation<OUT> outputType, String name,
Operator<I1> input1, Operator<I2> input2) {
if (!inputType2.isTupleType()) {
throw new InvalidParameterException("Should not happen.");
}
@SuppressWarnings("unchecked")
final SelectorFunctionKeys<I1, K> keys1 = (SelectorFunctionKeys<I1, K>) rawKeys1;
final TypeInformation<Tuple2<K, I1>> typeInfoWithKey1 = KeyFunctions.createTypeWithKey(keys1);
final Operator<Tuple2<K, I1>> keyedInput1 = KeyFunctions.appendKeyExtractor(input1, keys1);
final PlanLeftUnwrappingCoGroupOperator<I1, I2, OUT, K> cogroup =
new PlanLeftUnwrappingCoGroupOperator<>(
function,
keys1,
logicalKeyPositions2,
name,
outputType,
typeInfoWithKey1,
inputType2);
cogroup.setFirstInput(keyedInput1);
cogroup.setSecondInput(input2);
return cogroup;
}
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:CoGroupOperator.java
示例12: with
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
/**
* Finalizes a CoGroup transformation by applying a {@link org.apache.flink.api.common.functions.RichCoGroupFunction} to groups of elements with identical keys.
*
* <p>Each CoGroupFunction call returns an arbitrary number of keys.
*
* @param function The CoGroupFunction that is called for all groups of elements with identical keys.
* @return An CoGroupOperator that represents the co-grouped result DataSet.
*
* @see org.apache.flink.api.common.functions.RichCoGroupFunction
* @see DataSet
*/
public <R> CoGroupOperator<I1, I2, R> with(CoGroupFunction<I1, I2, R> function) {
if (function == null) {
throw new NullPointerException("CoGroup function must not be null.");
}
TypeInformation<R> returnType = TypeExtractor.getCoGroupReturnTypes(function, input1.getType(), input2.getType(),
Utils.getCallLocationName(), true);
return new CoGroupOperator<>(input1, input2, keys1, keys2, input1.clean(function), returnType,
groupSortKeyOrderFirst, groupSortKeyOrderSecond,
customPartitioner, Utils.getCallLocationName());
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:CoGroupOperator.java
示例13: testWebLogAnalysisExamplesAntiJoinVisits
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
@Test
public void testWebLogAnalysisExamplesAntiJoinVisits() {
compareAnalyzerResultWithAnnotationsDualInputWithKeys(CoGroupFunction.class, AntiJoinVisits.class,
"Tuple3<Integer, String, Integer>",
"Tuple1<String>",
"Tuple3<Integer, String, Integer>",
new String[] { "1" }, new String[] { "0" });
}
开发者ID:axbaretto,项目名称:flink,代码行数:9,代码来源:UdfAnalyzerExamplesTest.java
示例14: run
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
@Override
public void run() throws Exception
{
final Counter numRecordsOut = this.taskContext.getMetricGroup().getIOMetricGroup().getNumRecordsOutCounter();
final CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub();
final Collector<OT> collector = new CountingCollector<>(this.taskContext.getOutputCollector(), numRecordsOut);
final CoGroupTaskIterator<IT1, IT2> coGroupIterator = this.coGroupIterator;
while (this.running && coGroupIterator.next()) {
coGroupStub.coGroup(coGroupIterator.getValues1(), coGroupIterator.getValues2(), collector);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:14,代码来源:CoGroupDriver.java
示例15: run
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
@Override
public void run() throws Exception {
final CoGroupFunction<IT1, IT2, OT> coGroupStub = this.taskContext.getStub();
final Collector<OT> collector = this.taskContext.getOutputCollector();
final SimpleIterable<IT1> i1 = this.coGroupIterator1;
final SimpleIterable<IT2> i2 = this.coGroupIterator2;
coGroupStub.coGroup(i1, i2, collector);
}
开发者ID:axbaretto,项目名称:flink,代码行数:10,代码来源:CoGroupRawDriver.java
示例16: apply
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
/**
* Completes the co-group operation with the user function that is executed
* for windowed groups.
*
* <p>Note: This method's return type does not support setting an operator-specific parallelism.
* Due to binary backwards compatibility, this cannot be altered. Use the {@link #with(CoGroupFunction)}
* method to set an operator-specific parallelism.
*/
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function) {
TypeInformation<T> resultType = TypeExtractor.getCoGroupReturnTypes(
function,
input1.getType(),
input2.getType(),
"CoGroup",
false);
return apply(function, resultType);
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:CoGroupedStreams.java
示例17: apply
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public <T> DataStream<T> apply(CoGroupFunction<T1, T2, T> function, TypeInformation<T> resultType) {
//clean the closure
function = input1.getExecutionEnvironment().clean(function);
boolean setProcessingTime = input1.getExecutionEnvironment().getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime;
CoGroupOperator<KEY, T1, T2, T> coGroupOperator =
new CoGroupOperator<>(function,
windowAssigner1, windowAssigner2,
keySelector1, keySelector2,
new HeapWindowBuffer.Factory<T1>(),
new HeapWindowBuffer.Factory<T2>(),
input1.getType().createSerializer(getExecutionEnvironment().getConfig()),
input2.getType().createSerializer(getExecutionEnvironment().getConfig()),
TypeExtractor.getKeySelectorTypes(keySelector1, input1.getType()).createSerializer(getExecutionEnvironment().getConfig()),
windowAssigner1.getDefaultTrigger(getExecutionEnvironment()),
windowAssigner2.getDefaultTrigger(getExecutionEnvironment()),
windowAssigner1.getWindowSerializer(getExecutionEnvironment().getConfig())
).enableSetProcessingTime(setProcessingTime);
TwoInputTransformation<T1, T2, T>
twoInputTransformation = new TwoInputTransformation<>(
input1.getTransformation(),
input2.getTransformation(),
"Join",
coGroupOperator,
resultType,
parallelism
);
return new DataStream<>(getExecutionEnvironment(), twoInputTransformation);
}
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:32,代码来源:MultiWindowsJoinedStreams.java
示例18: CoGroupOperator
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public CoGroupOperator(CoGroupFunction<IN1, IN2, OUT> userFunction,
WindowAssigner<? super IN1, TimeWindow> windowAssigner1,
WindowAssigner<? super IN2, TimeWindow> windowAssigner2,
KeySelector<IN1, K> keySelector1,
KeySelector<IN2, K> keySelector2,
WindowBufferFactory<? super IN1, ? extends WindowBuffer<IN1>> windowBufferFactory1,
WindowBufferFactory<? super IN2, ? extends WindowBuffer<IN2>> windowBufferFactory2,
TypeSerializer<IN1> inputSerializer1,
TypeSerializer<IN2> inputSerializer2,
TypeSerializer<K> keySerializer,
Trigger<? super IN1, TimeWindow> trigger1,
Trigger<? super IN2, TimeWindow> trigger2,
TypeSerializer<TimeWindow> windowSerializer) {
super(userFunction);
this.trigger1 = trigger1;
this.trigger2 = trigger2;
this.windowAssigner1 = requireNonNull(windowAssigner1);
this.windowAssigner2 = requireNonNull(windowAssigner2);
this.keySelector1 = requireNonNull(keySelector1);
this.keySelector2 = requireNonNull(keySelector2);
this.windowBufferFactory1 = requireNonNull(windowBufferFactory1);
this.windowBufferFactory2 = requireNonNull(windowBufferFactory2);
this.inputSerializer1 = requireNonNull(inputSerializer1);
this.inputSerializer2 = requireNonNull(inputSerializer2);
this.keySerializer = requireNonNull(keySerializer);
this.windowSerializer = windowSerializer;
}
开发者ID:wangyangjun,项目名称:StreamBench,代码行数:28,代码来源:CoGroupOperator.java
示例19: main
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
public static void main(final String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<AvroLineitem> lineItemFromAvro = env.createInput(
new AvroInputFormat<AvroLineitem>(new Path(args[0]), AvroLineitem.class));
DataSet<AvroLineitem> lineItemFromCsv = env.readTextFile(args[1]).map(new Prepare.AvroLineItemMapper());
DataSet<String> empty = lineItemFromAvro
.coGroup(lineItemFromCsv).where("orderKey", "partKey", "supplierKey", "lineNumber").equalTo("orderKey", "partKey", "supplierKey", "lineNumber").with(new CoGroupFunction<AvroLineitem, AvroLineitem, String>() {
@Override
public void coGroup(Iterable<AvroLineitem> avro, Iterable<AvroLineitem> csv, Collector<String> collector) throws Exception {
Iterator<AvroLineitem> aIt = avro.iterator();
if(!aIt.hasNext()) {
throw new RuntimeException("Expected item from Avro input");
}
AvroLineitem left = aIt.next();
if(aIt.hasNext()) {
throw new RuntimeException("Unexpectedly received two avro records on this side. left="+left+" next="+aIt.next());
}
Iterator<AvroLineitem> cIt = csv.iterator();
if(!cIt.hasNext()) {
throw new RuntimeException("Expected item from CSV input");
}
AvroLineitem right = cIt.next();
if(cIt.hasNext()) {
throw new RuntimeException("Unexpectedly received two CSV records on this side");
}
if(!right.equals(left)) {
throw new RuntimeException("Records are not equal");
}
}
});
empty.output(new DiscardingOutputFormat<String>());
env.execute("Compare Job");
}
开发者ID:project-flink,项目名称:flink-perf,代码行数:37,代码来源:CompareJob.java
示例20: testCoGroupLambda
import org.apache.flink.api.common.functions.CoGroupFunction; //导入依赖的package包/类
@Test
public void testCoGroupLambda() {
CoGroupFunction<Tuple2<Tuple1<Integer>, Boolean>, Tuple2<Tuple1<Integer>, Double>, Tuple2<Tuple1<Integer>, String>> f = (i1, i2, o) -> {};
TypeInformation<?> ti = TypeExtractor.getCoGroupReturnTypes(f, TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Boolean>"), TypeInfoParser.parse("Tuple2<Tuple1<Integer>, Double>"));
Assert.assertTrue(ti.isTupleType());
Assert.assertEquals(2, ti.getArity());
Assert.assertTrue(((TupleTypeInfo<?>) ti).getTypeAt(0).isTupleType());
Assert.assertEquals(((TupleTypeInfo<?>) ti).getTypeAt(1), BasicTypeInfo.STRING_TYPE_INFO);
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:11,代码来源:LambdaExtractionTest.java
注:本文中的org.apache.flink.api.common.functions.CoGroupFunction类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论