本文整理汇总了Java中org.apache.flink.api.common.io.RichInputFormat类的典型用法代码示例。如果您正苦于以下问题:Java RichInputFormat类的具体用法?Java RichInputFormat怎么用?Java RichInputFormat使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
RichInputFormat类属于org.apache.flink.api.common.io包,在下文中一共展示了RichInputFormat类的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: executeDataSource
import org.apache.flink.api.common.io.RichInputFormat; //导入依赖的package包/类
private <OUT> List<OUT> executeDataSource(GenericDataSourceBase<?, ?> source, int superStep)
throws Exception {
@SuppressWarnings("unchecked")
GenericDataSourceBase<OUT, ?> typedSource = (GenericDataSourceBase<OUT, ?>) source;
// build the runtime context and compute broadcast variables, if necessary
TaskInfo taskInfo = new TaskInfo(typedSource.getName(), 1, 0, 1, 0);
RuntimeUDFContext ctx;
MetricGroup metrics = new UnregisteredMetricsGroup();
if (RichInputFormat.class.isAssignableFrom(typedSource.getUserCodeWrapper().getUserCodeClass())) {
ctx = superStep == 0 ? new RuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics) :
new IterationRuntimeUDFContext(taskInfo, classLoader, executionConfig, cachedFiles, accumulators, metrics);
} else {
ctx = null;
}
return typedSource.executeOnCollections(ctx, executionConfig);
}
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:CollectionExecutor.java
示例2: executeOnCollections
import org.apache.flink.api.common.io.RichInputFormat; //导入依赖的package包/类
protected List<OUT> executeOnCollections(RuntimeContext ctx, ExecutionConfig executionConfig) throws Exception {
@SuppressWarnings("unchecked")
InputFormat<OUT, InputSplit> inputFormat = (InputFormat<OUT, InputSplit>) this.formatWrapper.getUserCodeObject();
//configure the input format
inputFormat.configure(this.parameters);
//open the input format
if (inputFormat instanceof RichInputFormat) {
((RichInputFormat) inputFormat).setRuntimeContext(ctx);
((RichInputFormat) inputFormat).openInputFormat();
}
List<OUT> result = new ArrayList<OUT>();
// splits
InputSplit[] splits = inputFormat.createInputSplits(1);
TypeSerializer<OUT> serializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
for (InputSplit split : splits) {
inputFormat.open(split);
while (!inputFormat.reachedEnd()) {
OUT next = inputFormat.nextRecord(serializer.createInstance());
if (next != null) {
result.add(serializer.copy(next));
}
}
inputFormat.close();
}
//close the input format
if (inputFormat instanceof RichInputFormat) {
((RichInputFormat) inputFormat).closeInputFormat();
}
return result;
}
开发者ID:axbaretto,项目名称:flink,代码行数:39,代码来源:GenericDataSourceBase.java
示例3: open
import org.apache.flink.api.common.io.RichInputFormat; //导入依赖的package包/类
@Override
@SuppressWarnings("unchecked")
public void open(Configuration parameters) throws Exception {
StreamingRuntimeContext context = (StreamingRuntimeContext) getRuntimeContext();
if (format instanceof RichInputFormat) {
((RichInputFormat) format).setRuntimeContext(context);
}
format.configure(parameters);
provider = context.getInputSplitProvider();
serializer = typeInfo.createSerializer(getRuntimeContext().getExecutionConfig());
splitIterator = getInputSplits();
isRunning = splitIterator.hasNext();
}
开发者ID:axbaretto,项目名称:flink,代码行数:16,代码来源:InputFormatSourceFunction.java
示例4: run
import org.apache.flink.api.common.io.RichInputFormat; //导入依赖的package包/类
@Override
public void run(SourceContext<OUT> ctx) throws Exception {
try {
Counter completedSplitsCounter = getRuntimeContext().getMetricGroup().counter("numSplitsProcessed");
if (isRunning && format instanceof RichInputFormat) {
((RichInputFormat) format).openInputFormat();
}
OUT nextElement = serializer.createInstance();
while (isRunning) {
format.open(splitIterator.next());
// for each element we also check if cancel
// was called by checking the isRunning flag
while (isRunning && !format.reachedEnd()) {
nextElement = format.nextRecord(nextElement);
if (nextElement != null) {
ctx.collect(nextElement);
} else {
break;
}
}
format.close();
completedSplitsCounter.inc();
if (isRunning) {
isRunning = splitIterator.hasNext();
}
}
} finally {
format.close();
if (format instanceof RichInputFormat) {
((RichInputFormat) format).closeInputFormat();
}
isRunning = false;
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:40,代码来源:InputFormatSourceFunction.java
示例5: close
import org.apache.flink.api.common.io.RichInputFormat; //导入依赖的package包/类
@Override
public void close() throws Exception {
format.close();
if (format instanceof RichInputFormat) {
((RichInputFormat) format).closeInputFormat();
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:8,代码来源:InputFormatSourceFunction.java
注:本文中的org.apache.flink.api.common.io.RichInputFormat类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论