• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java DefaultOutputPort类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.datatorrent.api.DefaultOutputPort的典型用法代码示例。如果您正苦于以下问题:Java DefaultOutputPort类的具体用法?Java DefaultOutputPort怎么用?Java DefaultOutputPort使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



DefaultOutputPort类属于com.datatorrent.api包,在下文中一共展示了DefaultOutputPort类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: populateDAG

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void populateDAG(DAG dag, Configuration conf)
{
  RandomKeyValGenerator randGen = dag.addOperator("RandomGenerator", new RandomKeyValGenerator());
  UniqueValueCount<Integer> valCount = dag.addOperator("UniqueCounter", new UniqueValueCount<Integer>());
  ConsoleOutputOperator consOut = dag.addOperator("Console", new ConsoleOutputOperator());
  IntegerUniqueValueCountAppender uniqueUnifier = dag.addOperator("StatefulUniqueCounter", new IntegerUniqueValueCountAppender());
  dag.getOperatorMeta("StatefulUniqueCounter").getMeta(uniqueUnifier.input).getAttributes().put(Context.PortContext.STREAM_CODEC, new KeyBasedStreamCodec());

  @SuppressWarnings("rawtypes")
  DefaultOutputPort valOut = valCount.output;
  @SuppressWarnings("rawtypes")
  DefaultOutputPort uniqueOut = uniqueUnifier.output;

  dag.addStream("Events", randGen.outport, valCount.input);
  dag.addStream("Unified", valOut, uniqueUnifier.input);
  dag.addStream("Result", uniqueOut, consOut.input);
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:20,代码来源:StatefulApplication.java


示例2: populateDAG

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void populateDAG(DAG dag, Configuration conf)
{
  KeyGen keyGen = dag.addOperator("KeyGenerator", new KeyGen());
  UniqueValueCount<Integer> valCount = dag.addOperator("ValueCounter", new UniqueValueCount<Integer>());
  IntegerUniqueValueCountAppender uniqueUnifier = dag.addOperator("Unique", new IntegerUniqueValueCountAppender());
  VerifyTable verifyTable = dag.addOperator("VerifyTable", new VerifyTable());

  @SuppressWarnings("rawtypes")
  DefaultOutputPort valOut = valCount.output;
  @SuppressWarnings("rawtypes")
  DefaultOutputPort uniqueOut = uniqueUnifier.output;
  dag.addStream("DataIn", keyGen.output, valCount.input);
  dag.addStream("UnifyWindows", valOut, uniqueUnifier.input);
  dag.addStream("ResultsOut", uniqueOut, verifyTable.input);
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:18,代码来源:StatefulUniqueCountTest.java


示例3: attachOutputPortToInputPort

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
/**
 * This is a utility method which is used to attach the output port of an {@link EmbeddableQueryInfoProvider} to the input port
 * of the encapsulating {@link AppData.Store}.
 * @param <T> The type of data emitted by the {@link EmbeddableQueryInfoProvider}'s output port and received by the
 * {@link AppData.Store}'s input port.
 * @param outputPort The output port of the {@link EmbeddableQueryInfoProvider} which is being used by an {@link AppData.Store}.
 * @param inputPort The input port of the {@link AppData.Store} which is using an {@link EmbeddableQueryInfoProvider}.
 */
public static <T> void attachOutputPortToInputPort(DefaultOutputPort<T> outputPort, final DefaultInputPort<T> inputPort)
{
  outputPort.setSink(new Sink<Object>()
  {
    @Override
    @SuppressWarnings("unchecked")
    public void put(Object tuple)
    {
      LOG.debug("processing tuple");
      inputPort.process((T)tuple);
    }

    @Override
    public int getCount(boolean reset)
    {
      return 0;
    }

  });
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:29,代码来源:StoreUtils.java


示例4: test

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@SuppressWarnings({ "rawtypes", "unchecked" })
@Test
public void test()
{
  DimensionQueryResultMergeUnifier unifier = new DimensionQueryResultMergeUnifier();
  DefaultOutputPort<String> output = unifier.output;
  SimpleSink<Object> simpleSink = new SimpleSink<Object>();
  output.setSink(simpleSink);
  
  unifier.beginWindow(1);
  for (List<String> tuples : idToTuplesMap.values()) {
    for (String tuple : tuples) {
      unifier.process(tuple);
    }
  }
  unifier.endWindow();
  
  //verify
  assertCollectionSame(simpleSink.data, (Collection)expectedIdToTuple.values());
}
 
开发者ID:DataTorrent,项目名称:Megh,代码行数:21,代码来源:DimensionQueryResultMergeUnifierTester.java


示例5: output

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@Override
public <T> void output(TupleTag<T> tag, WindowedValue<T> tuple) {
  DefaultOutputPort<ApexStreamTuple<?>> additionalOutputPort =
      additionalOutputPortMapping.get(tag);
  if (additionalOutputPort != null) {
    additionalOutputPort.emit(ApexStreamTuple.DataTuple.of(tuple));
  } else {
    output.emit(ApexStreamTuple.DataTuple.of(tuple));
  }
  if (traceTuples) {
    LOG.debug("\nemitting {}\n", tuple);
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:14,代码来源:ApexParDoOperator.java


示例6: outputWatermark

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
private void outputWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
  if (traceTuples) {
    LOG.debug("\nemitting {}\n", mark);
  }
  output.emit(mark);
  if (!additionalOutputPortMapping.isEmpty()) {
    for (DefaultOutputPort<ApexStreamTuple<?>> additionalOutput :
        additionalOutputPortMapping.values()) {
      additionalOutput.emit(mark);
    }
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:13,代码来源:ApexParDoOperator.java


示例7: emitCreditCardKeyTuple

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
private void emitCreditCardKeyTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, CreditCardData>> outputPort)
{
  MerchantKey key = getMerchantKey(tuple);

  CreditCardData data = new CreditCardData();
  data.fullCcNum = tuple.fullCcNum;
  data.amount = tuple.amount;

  KeyValPair<MerchantKey, CreditCardData> keyValPair = new KeyValPair<MerchantKey, CreditCardData>(key, data);
  outputPort.emit(keyValPair);
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:12,代码来源:MerchantTransactionBucketOperator.java


示例8: QueryManagerAsynchronous

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
public QueryManagerAsynchronous(DefaultOutputPort<String> resultPort,
    QueueManager<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT> queueManager,
    QueryExecutor<QUERY_TYPE, META_QUERY, QUEUE_CONTEXT, RESULT> queryExecutor,
    MessageSerializerFactory messageSerializerFactory,
    Thread mainThread)
{
  setResultPort(resultPort);
  setQueueManager(queueManager);
  setQueryExecutor(queryExecutor);
  setMessageSerializerFactory(messageSerializerFactory);
  setMainThread(mainThread);
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:13,代码来源:QueryManagerAsynchronous.java


示例9: populateDAG

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration configuration)
{
  redisServer = configuration.get("dt.application.AppWithDCWithoutDe.redisServer");
  
  DefaultOutputPort<DimensionTuple> upstreamOutput = populateUpstreamDAG(dag, configuration);

  //populateHardCodedDimensionsDAG(dag, configuration, generateOperator.outputPort);
  populateDimensionsDAG(dag, configuration, upstreamOutput);
}
 
开发者ID:yahoo,项目名称:streaming-benchmarks,代码行数:11,代码来源:ApplicationWithDCWithoutDeserializer.java


示例10: populateUpstreamDAG

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
public DefaultOutputPort<DimensionTuple> populateUpstreamDAG(DAG dag, Configuration configuration)
{
  JsonGenerator eventGenerator = dag.addOperator("eventGenerator", new JsonGenerator());
  FilterTuples filterTuples = dag.addOperator("filterTuples", new FilterTuples());
  FilterFields filterFields = dag.addOperator("filterFields", new FilterFields());
  
  // Connect the Ports in the Operators
  dag.addStream("filterTuples", eventGenerator.out, filterTuples.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
  dag.addStream("filterFields", filterTuples.output, filterFields.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
  
  TupleToDimensionTupleConverter converter = dag.addOperator("converter", new TupleToDimensionTupleConverter());
  
  if(includeRedisJoin) {
    RedisJoin redisJoin = dag.addOperator("redisJoin", new RedisJoin());
    dag.addStream("redisJoin", filterFields.output, redisJoin.input).setLocality(DAG.Locality.CONTAINER_LOCAL);
    dag.addStream("converter", redisJoin.output, converter.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);

    dag.setInputPortAttribute(redisJoin.input, Context.PortContext.PARTITION_PARALLEL, true);

    setupRedis(eventGenerator.getCampaigns());
  } else {
    dag.addStream("convert", filterFields.output, converter.inputPort).setLocality(DAG.Locality.CONTAINER_LOCAL);
  }
  

  dag.setInputPortAttribute(filterTuples.input, Context.PortContext.PARTITION_PARALLEL, true);
  dag.setInputPortAttribute(filterFields.input, Context.PortContext.PARTITION_PARALLEL, true);
  dag.setInputPortAttribute(converter.inputPort, Context.PortContext.PARTITION_PARALLEL, true);

  dag.setAttribute(eventGenerator, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<EventGenerator>(PARTITION_NUM));

  return converter.outputPort;
}
 
开发者ID:yahoo,项目名称:streaming-benchmarks,代码行数:34,代码来源:ApplicationWithDCWithoutDeserializer.java


示例11: emitMerchantKeyTuple

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
private void emitMerchantKeyTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<MerchantKey, Long>> outputPort)
{
  MerchantKey key = getMerchantKey(tuple);
  KeyValPair<MerchantKey, Long> keyValPair = new KeyValPair<MerchantKey, Long>(key, tuple.amount);
  outputPort.emit(keyValPair);
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:7,代码来源:MerchantTransactionBucketOperator.java


示例12: emitBankIdNumTuple

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
private void emitBankIdNumTuple(MerchantTransaction tuple, DefaultOutputPort<KeyValPair<KeyValPair<MerchantKey, String>, Integer>> outputPort)
{
  MerchantKey key = getMerchantKey(tuple);
  KeyValPair<MerchantKey, String> keyValPair = new KeyValPair<MerchantKey, String>(key, tuple.bankIdNum);
  outputPort.emit(new KeyValPair<KeyValPair<MerchantKey, String>, Integer>(keyValPair, 1));
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:7,代码来源:MerchantTransactionBucketOperator.java


示例13: getOutputPortOfGenerator

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@Override
protected DefaultOutputPort getOutputPortOfGenerator( StringGeneratorInputOperator generator )
{
  return generator.outputPort;
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:6,代码来源:KinesisStringOutputOperatorTest.java


示例14: getOutputPortOfGenerator

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@Override
protected DefaultOutputPort getOutputPortOfGenerator(POJOTupleGenerateOperator generator)
{
  return generator.outputPort;
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:6,代码来源:KinesisByteArrayOutputOperatorTest.java


示例15: setResultPort

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
private void setResultPort(DefaultOutputPort<String> resultPort)
{
  this.resultPort = Preconditions.checkNotNull(resultPort);
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:5,代码来源:QueryManagerAsynchronous.java


示例16: getOutputPort

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@Override
public DefaultOutputPort<String> getOutputPort()
{
  return outputPort;
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:6,代码来源:PubSubWebSocketAppDataQuery.java


示例17: stressTest

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
@Test
public void stressTest() throws Exception
{
  final int totalTuples = 100000;
  final int batchSize = 100;
  final double waitMillisProb = .01;

  AppDataWindowEndQueueManager<MockQuery, Void> queueManager = new AppDataWindowEndQueueManager<MockQuery, Void>();

  DefaultOutputPort<String> outputPort = new DefaultOutputPort<String>();
  CollectorTestSink<MockResult> sink = new CollectorTestSink<MockResult>();
  TestUtils.setSink(outputPort, sink);

  MessageSerializerFactory msf = new MessageSerializerFactory(new ResultFormatter());

  QueryManagerAsynchronous<MockQuery, Void, MutableLong, MockResult> queryManagerAsynch =
      new QueryManagerAsynchronous<>(outputPort, queueManager, new NOPQueryExecutor(waitMillisProb), msf,
      Thread.currentThread());

  Thread producerThread = new Thread(new ProducerThread(queueManager, totalTuples, batchSize, waitMillisProb));
  producerThread.start();
  producerThread.setName("Producer Thread");

  long startTime = System.currentTimeMillis();

  queryManagerAsynch.setup(null);

  int numWindows = 0;

  for (; sink.collectedTuples.size() < totalTuples && ((System.currentTimeMillis() - startTime) < 60000);
      numWindows++) {
    queryManagerAsynch.beginWindow(numWindows);
    Thread.sleep(100);
    queryManagerAsynch.endWindow();
  }

  producerThread.stop();
  queryManagerAsynch.teardown();

  try {
    Thread.sleep(1000);
  } catch (InterruptedException e) {
    //Do Nothing
  }

  Assert.assertEquals(totalTuples, sink.collectedTuples.size());
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:48,代码来源:QueryManagerAsynchronousTest.java


示例18: populateDimensionsDAG

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
public void populateDimensionsDAG(DAG dag, Configuration conf, DefaultOutputPort<DimensionTuple> upstreamPort) 
{
  final String eventSchema = SchemaUtils.jarResourceFileToString(eventSchemaLocation);
  
  // dimension
  DimensionsComputationFlexibleSingleSchemaPOJO dimensions = dag.addOperator("DimensionsComputation",
      DimensionsComputationFlexibleSingleSchemaPOJO.class);

  // Set operator properties
  // key expression
  {
    Map<String, String> keyToExpression = Maps.newHashMap();
    keyToExpression.put("campaignId", DimensionTuple.CAMPAIGNID);
    keyToExpression.put("time", DimensionTuple.EVENTTIME);
    dimensions.setKeyToExpression(keyToExpression);
  }

  // aggregate expression
  {
    Map<String, String> valueToExpression = Maps.newHashMap();
    valueToExpression.put("clicks", DimensionTuple.CLICKS);
    valueToExpression.put("latency", DimensionTuple.LATENCY);
    
    dimensions.setAggregateToExpression(valueToExpression);
  }

  // event schema
  dimensions.setConfigurationSchemaJSON(eventSchema);
  dimensions.setUnifier(new DimensionsComputationUnifierImpl<InputEvent, Aggregate>());

  dag.setUnifierAttribute(dimensions.output, OperatorContext.MEMORY_MB, 10240);
  
  dag.setInputPortAttribute(dimensions.input, Context.PortContext.PARTITION_PARALLEL, true);
  
  // store
  AppDataSingleSchemaDimensionStoreHDHT store = createStore(dag, conf, eventSchema); 
  store.setCacheWindowDuration(10000 * 5 / STREAMING_WINDOW_SIZE_MILLIS);   //cache for 5 windows
  dag.addStream("GenerateStream", upstreamPort, dimensions.input).setLocality(Locality.CONTAINER_LOCAL);
  
  StoreStreamCodec codec = new StoreStreamCodec();
  dag.setInputPortAttribute(store.input, PortContext.STREAM_CODEC, codec);
  dag.addStream("DimensionalStream", dimensions.output, store.input);

  
  if (includeQuery) {
    createQuery(dag, conf, store);

    // wsOut
    PubSubWebSocketAppDataResult wsOut = createQueryResult(dag, conf, store);

    dag.addStream("QueryResult", store.queryResult, wsOut.input);
  } else {
    DevNull devNull = new DevNull();
    dag.addOperator("devNull", devNull);
    dag.addStream("QueryResult", store.queryResult, devNull.data);
  }
  
  dag.setAttribute(DAGContext.STREAMING_WINDOW_SIZE_MILLIS, STREAMING_WINDOW_SIZE_MILLIS);
}
 
开发者ID:yahoo,项目名称:streaming-benchmarks,代码行数:60,代码来源:ApplicationDimensionComputation.java


示例19: getOutputPort

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
/**
 * Gets the output port for queries.
 * @return The output port for queries.
 */
DefaultOutputPort<QUERY_TYPE> getOutputPort();
 
开发者ID:apache,项目名称:apex-core,代码行数:6,代码来源:AppData.java


示例20: getOutputPortOfGenerator

import com.datatorrent.api.DefaultOutputPort; //导入依赖的package包/类
protected abstract DefaultOutputPort getOutputPortOfGenerator( G generator ); 
开发者ID:apache,项目名称:apex-malhar,代码行数:2,代码来源:KinesisOutputOperatorTest.java



注:本文中的com.datatorrent.api.DefaultOutputPort类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Currency类代码示例发布时间:2022-05-23
下一篇:
Java SelectPrevBuildPolicy类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap