• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java ChainingStrategy类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.streaming.api.operators.ChainingStrategy的典型用法代码示例。如果您正苦于以下问题:Java ChainingStrategy类的具体用法?Java ChainingStrategy怎么用?Java ChainingStrategy使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ChainingStrategy类属于org.apache.flink.streaming.api.operators包,在下文中一共展示了ChainingStrategy类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: isChainable

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) {
	StreamNode upStreamVertex = edge.getSourceVertex();
	StreamNode downStreamVertex = edge.getTargetVertex();

	StreamOperator<?> headOperator = upStreamVertex.getOperator();
	StreamOperator<?> outOperator = downStreamVertex.getOperator();

	return downStreamVertex.getInEdges().size() == 1
			&& outOperator != null
			&& headOperator != null
			&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
			&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
			&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
			headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
			&& (edge.getPartitioner() instanceof ForwardPartitioner)
			&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
			&& isChainingEnabled;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:StreamGraphHasherV1.java


示例2: isChainable

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
	StreamNode upStreamVertex = edge.getSourceVertex();
	StreamNode downStreamVertex = edge.getTargetVertex();

	StreamOperator<?> headOperator = upStreamVertex.getOperator();
	StreamOperator<?> outOperator = downStreamVertex.getOperator();

	return downStreamVertex.getInEdges().size() == 1
			&& outOperator != null
			&& headOperator != null
			&& upStreamVertex.isSameSlotSharingGroup(downStreamVertex)
			&& outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS
			&& (headOperator.getChainingStrategy() == ChainingStrategy.HEAD ||
				headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS)
			&& (edge.getPartitioner() instanceof ForwardPartitioner)
			&& upStreamVertex.getParallelism() == downStreamVertex.getParallelism()
			&& streamGraph.isChainingEnabled();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:StreamingJobGraphGenerator.java


示例3: DoFnOperator

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
public DoFnOperator(
    DoFn<InputT, OutputT> doFn,
    String stepName,
    Coder<WindowedValue<InputT>> inputCoder,
    TupleTag<OutputT> mainOutputTag,
    List<TupleTag<?>> additionalOutputTags,
    OutputManagerFactory<OutputT> outputManagerFactory,
    WindowingStrategy<?, ?> windowingStrategy,
    Map<Integer, PCollectionView<?>> sideInputTagMapping,
    Collection<PCollectionView<?>> sideInputs,
    PipelineOptions options,
    Coder<?> keyCoder) {
  this.doFn = doFn;
  this.stepName = stepName;
  this.inputCoder = inputCoder;
  this.mainOutputTag = mainOutputTag;
  this.additionalOutputTags = additionalOutputTags;
  this.sideInputTagMapping = sideInputTagMapping;
  this.sideInputs = sideInputs;
  this.serializedOptions = new SerializablePipelineOptions(options);
  this.windowingStrategy = windowingStrategy;
  this.outputManagerFactory = outputManagerFactory;

  setChainingStrategy(ChainingStrategy.ALWAYS);

  this.keyCoder = keyCoder;

  this.timerCoder =
      TimerInternals.TimerDataCoder.of(windowingStrategy.getWindowFn().windowCoder());

  FlinkPipelineOptions flinkOptions = options.as(FlinkPipelineOptions.class);

  this.maxBundleSize = flinkOptions.getMaxBundleSize();
  this.maxBundleTimeMills = flinkOptions.getMaxBundleTimeMills();
}
 
开发者ID:apache,项目名称:beam,代码行数:36,代码来源:DoFnOperator.java


示例4: testOperatorChainedToSource

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
/**
 * Note: this test fails if we don't check for exceptions in the source contexts and do not
 * synchronize in the source contexts.
 */
@Test
public void testOperatorChainedToSource() throws Exception {

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(timeCharacteristic);
	env.setParallelism(1);

	DataStream<String> source = env.addSource(new InfiniteTestSource());

	source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.ALWAYS));

	boolean testSuccess = false;
	try {
		env.execute("Timer test");
	} catch (JobExecutionException e) {
		if (e.getCause() instanceof TimerException) {
			TimerException te = (TimerException) e.getCause();
			if (te.getCause() instanceof RuntimeException) {
				RuntimeException re = (RuntimeException) te.getCause();
				if (re.getMessage().equals("TEST SUCCESS")) {
					testSuccess = true;
				} else {
					throw e;
				}
			} else {
				throw e;
			}
		} else {
			throw e;
		}
	}
	Assert.assertTrue(testSuccess);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:38,代码来源:StreamTaskTimerITCase.java


示例5: testOneInputOperatorWithoutChaining

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
/**
 * Note: this test fails if we don't check for exceptions in the source contexts and do not
 * synchronize in the source contexts.
 */
@Test
public void testOneInputOperatorWithoutChaining() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(timeCharacteristic);
	env.setParallelism(1);

	DataStream<String> source = env.addSource(new InfiniteTestSource());

	source.transform("Custom Operator", BasicTypeInfo.STRING_TYPE_INFO, new TimerOperator(ChainingStrategy.NEVER));

	boolean testSuccess = false;
	try {
		env.execute("Timer test");
	} catch (JobExecutionException e) {
		if (e.getCause() instanceof TimerException) {
			TimerException te = (TimerException) e.getCause();
			if (te.getCause() instanceof RuntimeException) {
				RuntimeException re = (RuntimeException) te.getCause();
				if (re.getMessage().equals("TEST SUCCESS")) {
					testSuccess = true;
				} else {
					throw e;
				}
			} else {
				throw e;
			}
		} else {
			throw e;
		}
	}
	Assert.assertTrue(testSuccess);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:37,代码来源:StreamTaskTimerITCase.java


示例6: testTwoInputOperatorWithoutChaining

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
@Test
public void testTwoInputOperatorWithoutChaining() throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	env.setStreamTimeCharacteristic(timeCharacteristic);
	env.setParallelism(1);

	DataStream<String> source = env.addSource(new InfiniteTestSource());

	source.connect(source).transform(
			"Custom Operator",
			BasicTypeInfo.STRING_TYPE_INFO,
			new TwoInputTimerOperator(ChainingStrategy.NEVER));

	boolean testSuccess = false;
	try {
		env.execute("Timer test");
	} catch (JobExecutionException e) {
		if (e.getCause() instanceof TimerException) {
			TimerException te = (TimerException) e.getCause();
			if (te.getCause() instanceof RuntimeException) {
				RuntimeException re = (RuntimeException) te.getCause();
				if (re.getMessage().equals("TEST SUCCESS")) {
					testSuccess = true;
				} else {
					throw e;
				}
			} else {
				throw e;
			}
		} else {
			throw e;
		}
	}
	Assert.assertTrue(testSuccess);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:36,代码来源:StreamTaskTimerITCase.java


示例7: AsyncWaitOperator

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
public AsyncWaitOperator(
		AsyncFunction<IN, OUT> asyncFunction,
		long timeout,
		int capacity,
		AsyncDataStream.OutputMode outputMode) {
	super(asyncFunction);
	chainingStrategy = ChainingStrategy.ALWAYS;

	Preconditions.checkArgument(capacity > 0, "The number of concurrent async operation should be greater than 0.");
	this.capacity = capacity;

	this.outputMode = Preconditions.checkNotNull(outputMode, "outputMode");

	this.timeout = timeout;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:16,代码来源:AsyncWaitOperator.java


示例8: WindowOperator

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
/**
 * Creates a new {@code WindowOperator} based on the given policies and user functions.
 */
public WindowOperator(
		WindowAssigner<? super IN, W> windowAssigner,
		TypeSerializer<W> windowSerializer,
		KeySelector<IN, K> keySelector,
		TypeSerializer<K> keySerializer,
		StateDescriptor<? extends AppendingState<IN, ACC>, ?> windowStateDescriptor,
		InternalWindowFunction<ACC, OUT, K, W> windowFunction,
		Trigger<? super IN, ? super W> trigger,
		long allowedLateness,
		OutputTag<IN> lateDataOutputTag) {

	super(windowFunction);

	checkArgument(!(windowAssigner instanceof BaseAlignedWindowAssigner),
		"The " + windowAssigner.getClass().getSimpleName() + " cannot be used with a WindowOperator. " +
			"This assigner is only used with the AccumulatingProcessingTimeWindowOperator and " +
			"the AggregatingProcessingTimeWindowOperator");

	checkArgument(allowedLateness >= 0);

	checkArgument(windowStateDescriptor == null || windowStateDescriptor.isSerializerInitialized(),
			"window state serializer is not properly initialized");

	this.windowAssigner = checkNotNull(windowAssigner);
	this.windowSerializer = checkNotNull(windowSerializer);
	this.keySelector = checkNotNull(keySelector);
	this.keySerializer = checkNotNull(keySerializer);
	this.windowStateDescriptor = windowStateDescriptor;
	this.trigger = checkNotNull(trigger);
	this.allowedLateness = allowedLateness;
	this.lateDataOutputTag = lateDataOutputTag;

	setChainingStrategy(ChainingStrategy.ALWAYS);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:38,代码来源:WindowOperator.java


示例9: EventTimeOrderingOperator

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
/**
 * Creates an event time-based reordering operator.
 */
public EventTimeOrderingOperator() {
    chainingStrategy = ChainingStrategy.ALWAYS;
}
 
开发者ID:pravega,项目名称:flink-connectors,代码行数:7,代码来源:EventTimeOrderingOperator.java


示例10: disableChaining

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
@Override
public DataStreamSink<T> disableChaining() {
	this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
	return this;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:6,代码来源:FlinkKafkaProducer010.java


示例11: CustomOperator

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
public CustomOperator(boolean timestampsEnabled) {
	setChainingStrategy(ChainingStrategy.ALWAYS);
	this.timestampsEnabled = timestampsEnabled;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:TimestampITCase.java


示例12: TimestampCheckingOperator

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
public TimestampCheckingOperator() {
	setChainingStrategy(ChainingStrategy.ALWAYS);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:4,代码来源:TimestampITCase.java


示例13: TimerOperator

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
public TimerOperator(ChainingStrategy chainingStrategy) {
	setChainingStrategy(chainingStrategy);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:4,代码来源:StreamTaskTimerITCase.java


示例14: TwoInputTimerOperator

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
public TwoInputTimerOperator(ChainingStrategy chainingStrategy) {
	setChainingStrategy(chainingStrategy);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:4,代码来源:StreamTaskTimerITCase.java


示例15: setChainingStrategy

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
@Override
public final void setChainingStrategy(ChainingStrategy strategy) {
	throw new UnsupportedOperationException("Cannot set chaining strategy on Split Transformation.");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:FeedbackTransformation.java


示例16: setChainingStrategy

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
@Override
public final void setChainingStrategy(ChainingStrategy strategy) {
	operator.setChainingStrategy(strategy);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:OneInputTransformation.java


示例17: setChainingStrategy

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
@Override
public final void setChainingStrategy(ChainingStrategy strategy) {
	throw new UnsupportedOperationException("Cannot set chaining strategy on Union Transformation.");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:PartitionTransformation.java


示例18: setChainingStrategy

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
@Override
public final void setChainingStrategy(ChainingStrategy strategy) {
	throw new UnsupportedOperationException("Cannot set chaining strategy on SideOutput Transformation.");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:SideOutputTransformation.java


示例19: setChainingStrategy

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
@Override
public final void setChainingStrategy(ChainingStrategy strategy) {
	throw new UnsupportedOperationException("Cannot set chaining strategy on Select Transformation.");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:SelectTransformation.java


示例20: TimestampsAndPeriodicWatermarksOperator

import org.apache.flink.streaming.api.operators.ChainingStrategy; //导入依赖的package包/类
public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
	super(assigner);
	this.chainingStrategy = ChainingStrategy.ALWAYS;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:5,代码来源:TimestampsAndPeriodicWatermarksOperator.java



注:本文中的org.apache.flink.streaming.api.operators.ChainingStrategy类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java ZkBaseDataAccessor类代码示例发布时间:2022-05-22
下一篇:
Java WorkflowDocumentService类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap