• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java ProcessorContext类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中storm.trident.planner.ProcessorContext的典型用法代码示例。如果您正苦于以下问题:Java ProcessorContext类的具体用法?Java ProcessorContext怎么用?Java ProcessorContext使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



ProcessorContext类属于storm.trident.planner包,在下文中一共展示了ProcessorContext类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: finishBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
    BatchState state = (BatchState) processorContext.state[_context.getStateIndex()];
    if(!state.tuples.isEmpty()) {
        List<Object> results = _function.batchRetrieve(_state, state.args);
        if(results.size()!=state.tuples.size()) {
            throw new RuntimeException("Results size is different than argument size: " + results.size() + " vs " + state.tuples.size());
        }
        for(int i=0; i<state.tuples.size(); i++) {
            TridentTuple tuple = state.tuples.get(i);
            Object result = results.get(i);
            _collector.setContext(processorContext, tuple);
            _function.execute(_projection.create(tuple), result, _collector);            
        }
    }
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:17,代码来源:StateQueryProcessor.java


示例2: finishBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
    _collector.setContext(processorContext);
    Object batchId = processorContext.batchId;
    // since this processor type is a committer, this occurs in the commit phase
    List<TridentTuple> buffer = (List) processorContext.state[_context.getStateIndex()];
    
    // don't update unless there are tuples
    // this helps out with things like global partition persist, where multiple tasks may still
    // exist for this processor. Only want the global one to do anything
    // this is also a helpful optimization that state implementations don't need to manually do
    if(buffer.size() > 0) {
        Long txid = null;
        // this is to support things like persisting off of drpc stream, which is inherently unreliable
        // and won't have a tx attempt
        if(batchId instanceof TransactionAttempt) {
            txid = ((TransactionAttempt) batchId).getTransactionId();
        }
        _state.beginCommit(txid);
        _updater.updateState(_state, buffer, _collector);
        _state.commit(txid);
    }
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:24,代码来源:PartitionPersistProcessor.java


示例3: finishBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
    BatchState state = (BatchState) processorContext.state[_context.getStateIndex()];
    if (!state.tuples.isEmpty()) {
        List<Object> results = _function.batchRetrieve(_state, state.args);
        if (results.size() != state.tuples.size()) {
            throw new RuntimeException("Results size is different than argument size: " + results.size() + " vs " + state.tuples.size());
        }
        for (int i = 0; i < state.tuples.size(); i++) {
            TridentTuple tuple = state.tuples.get(i);
            Object result = results.get(i);
            _collector.setContext(processorContext, tuple);
            _function.execute(_projection.create(tuple), result, _collector);
        }
    }
}
 
开发者ID:kkllwww007,项目名称:jstrom,代码行数:17,代码来源:StateQueryProcessor.java


示例4: finishBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
    _collector.setContext(processorContext);
    Object batchId = processorContext.batchId;
    // since this processor type is a committer, this occurs in the commit phase
    List<TridentTuple> buffer = (List) processorContext.state[_context.getStateIndex()];

    // don't update unless there are tuples
    // this helps out with things like global partition persist, where multiple tasks may still
    // exist for this processor. Only want the global one to do anything
    // this is also a helpful optimization that state implementations don't need to manually do
    if (buffer.size() > 0) {
        Long txid = null;
        // this is to support things like persisting off of drpc stream, which is inherently unreliable
        // and won't have a tx attempt
        if (batchId instanceof TransactionAttempt) {
            txid = ((TransactionAttempt) batchId).getTransactionId();
        }
        _state.beginCommit(txid);
        _updater.updateState(_state, buffer, _collector);
        _state.commit(txid);
    }
}
 
开发者ID:kkllwww007,项目名称:jstrom,代码行数:24,代码来源:PartitionPersistProcessor.java


示例5: finishBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
    BatchState state = (BatchState) processorContext.state[_context.getStateIndex()];
    if(!state.tuples.isEmpty()) {
        List<Object> results = _function.batchRetrieve(_state, Collections.unmodifiableList(state.args));
        if(results.size()!=state.tuples.size()) {
            throw new RuntimeException("Results size is different than argument size: " + results.size() + " vs " + state.tuples.size());
        }
        for (int i = 0; i < state.tuples.size(); i++) {
            TridentTuple tuple = state.tuples.get(i);
            Object result = results.get(i);
            _collector.setContext(processorContext, tuple);
            _function.execute(state.args.get(i), result, _collector);
        }
    }
}
 
开发者ID:alibaba,项目名称:jstorm,代码行数:17,代码来源:StateQueryProcessor.java


示例6: execute

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
    TridentTuple toEmit = _factory.create(tuple);
    for(TupleReceiver r: _context.getReceivers()) {
        r.execute(processorContext, _context.getOutStreamId(), toEmit);
    }
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:8,代码来源:ProjectedProcessor.java


示例7: execute

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
    TridentTuple toEmit = _factory.create(tuple);
    for (TupleReceiver r : _context.getReceivers()) {
        r.execute(processorContext, _context.getOutStreamId(), toEmit);
    }
}
 
开发者ID:kkllwww007,项目名称:jstrom,代码行数:8,代码来源:ProjectedProcessor.java


示例8: startBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void startBatch(ProcessorContext processorContext) {
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:4,代码来源:ProjectedProcessor.java


示例9: finishBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:4,代码来源:ProjectedProcessor.java


示例10: startBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void startBatch(ProcessorContext processorContext) {
    _collector.setContext(processorContext);
    processorContext.state[_context.getStateIndex()] = _reducer.init(_collector);
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:6,代码来源:MultiReducerProcessor.java


示例11: execute

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
    _collector.setContext(processorContext);
    int i = _streamToIndex.get(streamId);
    _reducer.execute(processorContext.state[_context.getStateIndex()], i, _projectionFactories[i].create(tuple), _collector);
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:7,代码来源:MultiReducerProcessor.java


示例12: finishBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
    _collector.setContext(processorContext);
    _reducer.complete(processorContext.state[_context.getStateIndex()], _collector);
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:6,代码来源:MultiReducerProcessor.java


示例13: setContext

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
public void setContext(ProcessorContext pc) {
    this.context = pc;
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:4,代码来源:FreshCollector.java


示例14: startBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void startBatch(ProcessorContext processorContext) {
    _collector.setContext(processorContext);
    processorContext.state[_context.getStateIndex()] = _agg.init(processorContext.batchId, _collector);
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:6,代码来源:AggregateProcessor.java


示例15: execute

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
    _collector.setContext(processorContext);
    _agg.aggregate(processorContext.state[_context.getStateIndex()], _projection.create(tuple), _collector);
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:6,代码来源:AggregateProcessor.java


示例16: finishBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void finishBatch(ProcessorContext processorContext) {
    _collector.setContext(processorContext);
    _agg.complete(processorContext.state[_context.getStateIndex()], _collector);
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:6,代码来源:AggregateProcessor.java


示例17: startBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void startBatch(ProcessorContext processorContext) {
    processorContext.state[_context.getStateIndex()] =  new BatchState();
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:5,代码来源:StateQueryProcessor.java


示例18: execute

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
    BatchState state = (BatchState) processorContext.state[_context.getStateIndex()];
    state.tuples.add(tuple);
    state.args.add(_projection.create(tuple));
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:7,代码来源:StateQueryProcessor.java


示例19: startBatch

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void startBatch(ProcessorContext processorContext) {
    processorContext.state[_context.getStateIndex()] = new ArrayList<TridentTuple>();        
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:5,代码来源:PartitionPersistProcessor.java


示例20: execute

import storm.trident.planner.ProcessorContext; //导入依赖的package包/类
@Override
public void execute(ProcessorContext processorContext, String streamId, TridentTuple tuple) {
    ((List) processorContext.state[_context.getStateIndex()]).add(_projection.create(tuple));
}
 
开发者ID:zhangjunfang,项目名称:jstorm-0.9.6.3-,代码行数:5,代码来源:PartitionPersistProcessor.java



注:本文中的storm.trident.planner.ProcessorContext类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Exchange类代码示例发布时间:2022-05-22
下一篇:
Java TestWith类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap