• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java Locality类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中com.datatorrent.api.DAG.Locality的典型用法代码示例。如果您正苦于以下问题:Java Locality类的具体用法?Java Locality怎么用?Java Locality使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Locality类属于com.datatorrent.api.DAG包,在下文中一共展示了Locality类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testParDoChaining

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void testParDoChaining() throws Exception {
  Pipeline p = Pipeline.create();
  long numElements = 1000;
  PCollection<Long> input = p.apply(GenerateSequence.from(0).to(numElements));
  PAssert.thatSingleton(input.apply("Count", Count.<Long>globally())).isEqualTo(numElements);

  ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
  DAG dag = TestApexRunner.translate(p, options);

  String[] expectedThreadLocal = { "/CreateActual/FilterActuals/Window.Assign" };
  Set<String> actualThreadLocal = Sets.newHashSet();
  for (DAG.StreamMeta sm : dag.getAllStreamsMeta()) {
    DAG.OutputPortMeta opm = sm.getSource();
    if (sm.getLocality() == Locality.THREAD_LOCAL) {
       String name = opm.getOperatorMeta().getName();
       String prefix = "PAssert$";
       if (name.startsWith(prefix)) {
         // remove indeterministic prefix
         name = name.substring(prefix.length() + 1);
       }
       actualThreadLocal.add(name);
    }
  }
  Assert.assertThat(actualThreadLocal, Matchers.hasItems(expectedThreadLocal));
}
 
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:ApexRunnerTest.java


示例2: validatePositiveOiO

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void validatePositiveOiO()
{
  logger.info("Checking the logic for sanity checking of OiO");

  LogicalPlan plan = new LogicalPlan();
  RecoverableInputOperator inputOperator = plan.addOperator("IntegerGenerator", new RecoverableInputOperator());
  CollectorOperator outputOperator = plan.addOperator("IntegerCollector", new CollectorOperator());
  plan.addStream("PossibleOiO", inputOperator.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);

  try {
    plan.validate();
    Assert.assertTrue("OiO validation", true);
  } catch (ConstraintViolationException ex) {
    Assert.fail("OIO Single InputPort");
  }
}
 
开发者ID:apache,项目名称:apex-core,代码行数:18,代码来源:OiOStreamTest.java


示例3: validatePositiveOiOiO

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void validatePositiveOiOiO()
{
  logger.info("Checking the logic for sanity checking of OiO");

  LogicalPlan plan = new LogicalPlan();
  ThreadIdValidatingInputOperator inputOperator = plan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
  ThreadIdValidatingGenericIntermediateOperator intermediateOperator = plan.addOperator("intermediateOperator", new ThreadIdValidatingGenericIntermediateOperator());
  ThreadIdValidatingOutputOperator outputOperator = plan.addOperator("outputOperator", new ThreadIdValidatingOutputOperator());

  plan.addStream("OiO1", inputOperator.output, intermediateOperator.input).setLocality(Locality.THREAD_LOCAL);
  plan.addStream("OiO2", intermediateOperator.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);

  try {
    plan.validate();
    Assert.assertTrue("OiOiO validation", true);
  } catch (ConstraintViolationException ex) {
    Assert.fail("OiOiO validation");
  }
}
 
开发者ID:apache,项目名称:apex-core,代码行数:21,代码来源:OiOStreamTest.java


示例4: validatePositiveOiOiOdiamond

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void validatePositiveOiOiOdiamond()
{
  logger.info("Checking the logic for sanity checking of OiO");

  LogicalPlan plan = new LogicalPlan();
  ThreadIdValidatingInputOperator inputOperator = plan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
  ThreadIdValidatingGenericIntermediateOperator intermediateOperator1 = plan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
  ThreadIdValidatingGenericIntermediateOperator intermediateOperator2 = plan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
  ThreadIdValidatingGenericOperatorWithTwoInputPorts outputOperator = plan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());

  plan.addStream("OiOin", inputOperator.output, intermediateOperator1.input, intermediateOperator2.input).setLocality(Locality.THREAD_LOCAL);
  plan.addStream("OiOout1", intermediateOperator1.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
  plan.addStream("OiOout2", intermediateOperator2.output, outputOperator.input2).setLocality(Locality.THREAD_LOCAL);

  try {
    plan.validate();
    Assert.assertTrue("OiOiO diamond validation", true);
  } catch (ConstraintViolationException ex) {
    Assert.fail("OIOIO diamond validation");
  }
}
 
开发者ID:apache,项目名称:apex-core,代码行数:23,代码来源:OiOStreamTest.java


示例5: validatePositiveOiOiOExtendeddiamond

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void validatePositiveOiOiOExtendeddiamond()
{
  logger.info("Checking the logic for sanity checking of OiO");

  LogicalPlan plan = new LogicalPlan();
  ThreadIdValidatingInputOperator inputOperator = plan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
  ThreadIdValidatingGenericIntermediateOperator intermediateOperator1 = plan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
  ThreadIdValidatingGenericIntermediateOperator intermediateOperator2 = plan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
  ThreadIdValidatingGenericIntermediateOperator intermediateOperator3 = plan.addOperator("intermediateOperator3", new ThreadIdValidatingGenericIntermediateOperator());
  ThreadIdValidatingGenericIntermediateOperator intermediateOperator4 = plan.addOperator("intermediateOperator4", new ThreadIdValidatingGenericIntermediateOperator());
  ThreadIdValidatingGenericOperatorWithTwoInputPorts outputOperator = plan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());

  plan.addStream("OiOin", inputOperator.output, intermediateOperator1.input, intermediateOperator3.input).setLocality(Locality.THREAD_LOCAL);
  plan.addStream("OiOIntermediate1", intermediateOperator1.output, intermediateOperator2.input).setLocality(Locality.THREAD_LOCAL);
  plan.addStream("OiOIntermediate2", intermediateOperator3.output, intermediateOperator4.input).setLocality(Locality.THREAD_LOCAL);
  plan.addStream("OiOout1", intermediateOperator2.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
  plan.addStream("OiOout2", intermediateOperator4.output, outputOperator.input2).setLocality(Locality.THREAD_LOCAL);

  try {
    plan.validate();
    Assert.assertTrue("OiOiO extended diamond validation", true);
  } catch (ConstraintViolationException ex) {
    Assert.fail("OIOIO extended diamond validation");
  }
}
 
开发者ID:apache,项目名称:apex-core,代码行数:27,代码来源:OiOStreamTest.java


示例6: validateOiOImplementation

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void validateOiOImplementation() throws Exception
{
  LogicalPlan lp = new LogicalPlan();
  String workingDir = new File("target/validateOiOImplementation").getAbsolutePath();
  lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
  TestInputOperator io = lp.addOperator("Input Operator", new TestInputOperator());
  FirstGenericOperator go = lp.addOperator("First Generic Operator", new FirstGenericOperator());
  SecondGenericOperator out = lp.addOperator("Second Generic Operator", new SecondGenericOperator());

  /*
   * This tests make sure that even if the application_window_count is different the endWindow() is called for
   * end_stream
   */
  lp.getOperatorMeta("Second Generic Operator").getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2);
  StreamMeta stream = lp.addStream("Stream", io.output, go.input);
  StreamMeta stream1 = lp.addStream("Stream1", go.output, out.input);

  stream1.setLocality(Locality.THREAD_LOCAL);
  lp.validate();
  StramLocalCluster slc = new StramLocalCluster(lp);
  slc.run();
  Assert.assertEquals("End Window Count", FirstGenericOperator.endwindowCount, SecondGenericOperator.endwindowCount);
}
 
开发者ID:apache,项目名称:apex-core,代码行数:25,代码来源:OiOEndWindowTest.java


示例7: testLinearInlineOperatorsRecovery

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
public void testLinearInlineOperatorsRecovery() throws Exception
{
  RecoverableInputOperator.initGenTuples();
  CollectorOperator.collection.clear();
  CollectorOperator.duplicates.clear();

  dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
  dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
  dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
  RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class);
  rip.setMaximumTuples(maxTuples);
  rip.setSimulateFailure(true);

  CollectorOperator cm = dag.addOperator("LongCollector", CollectorOperator.class);
  cm.setSimulateFailure(true);
  dag.getMeta(cm).getAttributes().put(OperatorContext.PROCESSING_MODE, processingMode);

  dag.addStream("connection", rip.output, cm.input).setLocality(Locality.CONTAINER_LOCAL);

  StramLocalCluster lc = new StramLocalCluster(dag);
  lc.run();
}
 
开发者ID:apache,项目名称:apex-core,代码行数:23,代码来源:ProcessingModeTests.java


示例8: testOiOCommitted

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void testOiOCommitted() throws IOException, ClassNotFoundException
{
  LogicalPlan lp = new LogicalPlan();
  String workingDir = new File("target/testCommitted").getAbsolutePath();
  lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
  lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
  String op1Name = "CommitAwareOperatorTestOioCommit1";
  String op2Name = "CommitAwareOperatorTestOioCommit2";
  CommitAwareOperator operator1 = lp.addOperator(op1Name, new CommitAwareOperator());
  CommitAwareOperator operator2 = lp.addOperator(op2Name, new CommitAwareOperator());
  lp.addStream("local", operator1.output, operator2.input).setLocality(Locality.THREAD_LOCAL);

  StramLocalCluster lc = new StramLocalCluster(lp);
  lc.run(5000);

  /* this is not foolproof but some insurance is better than nothing */
  Assert.assertTrue("No Committed Windows", committedWindowIds.contains(op1Name));
  Assert.assertTrue("No Committed Windows", committedWindowIds.contains(op2Name));
}
 
开发者ID:apache,项目名称:apex-core,代码行数:21,代码来源:StreamingContainerTest.java


示例9: testDownStreamPartition

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
private void testDownStreamPartition(Locality locality) throws Exception
{
  TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
  GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
  dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
  dag.addStream("o1Output1", o1.outport, o2.inport1).setLocality(locality);

  int maxContainers = 5;
  dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
  dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
  dag.validate();
  PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
  Assert.assertEquals("number of containers", 1, plan.getContainers().size());

  PTContainer container1 = plan.getContainers().get(0);
  Assert.assertEquals("number operators " + container1, 3, container1.getOperators().size());
  StramLocalCluster slc = new StramLocalCluster(dag);
  slc.run(5000);
}
 
开发者ID:apache,项目名称:apex-core,代码行数:20,代码来源:StreamingContainerManagerTest.java


示例10: testLocalityValidation

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void testLocalityValidation()
{
  TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
  GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
  StreamMeta s1 = dag.addStream("input1.outport", input1.outport, o1.inport1).setLocality(Locality.THREAD_LOCAL);
  dag.validate();

  TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
  dag.addStream("input2.outport", input2.outport, o1.inport2);

  try {
    dag.validate();
    Assert.fail("Exception expected for " + o1);
  } catch (ValidationException ve) {
    Assert.assertThat("", ve.getMessage(), RegexMatcher.matches("Locality THREAD_LOCAL invalid for operator .* with multiple input streams .*"));
  }

  s1.setLocality(null);
  dag.validate();
}
 
开发者ID:apache,项目名称:apex-core,代码行数:22,代码来源:LogicalPlanTest.java


示例11: populateDAG

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{

  RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
  rand.setMaxvalue(3000);
  rand.setTuplesBlast(120);

  RandomMapOutput randMap = dag.addOperator("randMap", new RandomMapOutput());
  randMap.setKey("val");

  RubyOperator ruby = dag.addOperator("ruby", new RubyOperator());
  String setupScript = "def square(val)\n";
  setupScript += "  return val*val\nend\n";
  ruby.addSetupScript(setupScript);
  ruby.setInvoke("square");
  ruby.setPassThru(true);

  ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
  dag.getMeta(console).getMeta(console.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
  dag.getMeta(ruby).getMeta(ruby.result).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
  dag.addStream("rand_randMap", rand.integer_data, randMap.input).setLocality(Locality.THREAD_LOCAL);
  dag.addStream("randMap_ruby", randMap.map_data, ruby.inBindings).setLocality(locality);
  dag.addStream("ruby_console", ruby.result, console.input).setLocality(locality);
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:26,代码来源:RubyOperatorBenchmarkApplication.java


示例12: populateDAG

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
  TestStatsListener sl = new TestStatsListener();
  sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false);
  TestGenerator gen = dag.addOperator("Generator", new TestGenerator());
  gen.setRange(timeRange);
  dag.setAttribute(gen, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));

  storeOperator = new StoreOperator();
  storeOperator.setStore(createStore(conf));
  storeOperator.setTimeRange(timeRange);
  storeOperator = dag.addOperator("Store", storeOperator);

  dag.setAttribute(storeOperator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));

  dag.addStream("Events", gen.data, storeOperator.input).setLocality(Locality.CONTAINER_LOCAL);
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:19,代码来源:ManagedStateBenchmarkApp.java


示例13: populateDAG

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
  public void populateDAG(DAG dag, Configuration conf)
  {
    TestStatsListener sl = new TestStatsListener();
    sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false);

    G generator = createGenerator();
    dag.addOperator("Generator", generator);
    //generator.setRange(timeRange);
    dag.setAttribute(generator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));

    O windowedOperator = createWindowedOperator(conf);
    dag.addOperator("windowedOperator", windowedOperator);
    dag.setAttribute(windowedOperator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
    //dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL);
    connectGeneratorToWindowedOperator(dag, generator, windowedOperator);

//    WatermarkGenerator watermarkGenerator = new WatermarkGenerator();
//    dag.addOperator("WatermarkGenerator", watermarkGenerator);
//    dag.addStream("Control", watermarkGenerator.control, windowedOperator.controlInput)
//      .setLocality(Locality.CONTAINER_LOCAL);

    DevNull output = dag.addOperator("output", new DevNull());
    dag.addStream("output", windowedOperator.output, output.data).setLocality(Locality.CONTAINER_LOCAL);
  }
 
开发者ID:apache,项目名称:apex-malhar,代码行数:26,代码来源:AbstractWindowedOperatorBenchmarkApp.java


示例14: testApplication

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void testApplication() throws IOException, Exception
{
  for (final Locality l : Locality.values()) {
    logger.debug("Running the with {} locality", l);
    LocalMode.runApp(new Benchmark.AbstractApplication()
    {
      @Override
      public Locality getLocality()
      {
        return l;
      }

    }, 60000);
  }
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:17,代码来源:BenchmarkTest.java


示例15: populateDAG

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration entries)
{
      /* Generate random key-value pairs */
  RandomDataGenerator randGen = dag.addOperator("randomgen", new RandomDataGenerator());

      /* Initialize with three partition to start with */
  UniqueCounter<KeyValPair<String, Object>> uniqCount =
      dag.addOperator("uniqevalue", new UniqueCounter<KeyValPair<String, Object>>());
  MapToKeyHashValuePairConverter<KeyValPair<String, Object>, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter());
  uniqCount.setCumulative(false);
  dag.setAttribute(randGen, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<KeyValPair<String, Object>>>(3));

  ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator());

  dag.addStream("datain", randGen.outPort, uniqCount.data);
  dag.addStream("convert", uniqCount.count, converter.input).setLocality(Locality.THREAD_LOCAL);
  dag.addStream("consoutput", converter.output, output.input);
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:20,代码来源:UniqueKeyValCountExample.java


示例16: populateDAG

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
  TwitterSampleInput twitterFeed = new TwitterSampleInput();
  twitterFeed = dag.addOperator("TweetSampler", twitterFeed);

  TwitterStatusWordExtractor wordExtractor = dag.addOperator("WordExtractor", TwitterStatusWordExtractor.class);
  UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueWordCounter", new UniqueCounter<String>());
  WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());

  topCounts.setSlidingWindowWidth(120);
  topCounts.setDagWindowWidth(1);

  dag.addStream("TweetStream", twitterFeed.text, wordExtractor.input);
  dag.addStream("TwittedWords", wordExtractor.output, uniqueCounter.data);
  dag.addStream("UniqueWordCounts", uniqueCounter.count, topCounts.input).setLocality(Locality.CONTAINER_LOCAL);

  TwitterTopCounterApplication.consoleOutput(dag, "topWords", topCounts.output, SNAPSHOT_SCHEMA, "word");
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:20,代码来源:TwitterTopWordsApplication.java


示例17: populateDAG

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
  //dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump");

  TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput());

  //ConsoleOutputOperator dbWriter = dag.addOperator("DatabaseWriter", new ConsoleOutputOperator());

  Status2Database dbWriter = dag.addOperator("DatabaseWriter", new Status2Database());
  dbWriter.getStore().setDatabaseDriver("com.mysql.jdbc.Driver");
  dbWriter.getStore().setDatabaseUrl("jdbc:mysql://node6.morado.com:3306/twitter");
  dbWriter.getStore().setConnectionProperties("user:twitter");

  dag.addStream("Statuses", twitterStream.status, dbWriter.input).setLocality(Locality.CONTAINER_LOCAL);
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:17,代码来源:TwitterDumpApplication.java


示例18: populateDAG

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
  JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator());
  /**
   * The class given below can be updated to the user defined class based on
   * input table schema The addField infos method needs to be updated
   * accordingly This line can be commented and class can be set from the
   * properties file
   */
 // dag.setOutputPortAttribute(jdbcInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, PojoEvent.class);

  jdbcInputOperator.setFieldInfos(addFieldInfos());

  JdbcStore store = new JdbcStore();
  jdbcInputOperator.setStore(store);

  FileLineOutputOperator fileOutput = dag.addOperator("FileOutputOperator", new FileLineOutputOperator());

  dag.addStream("POJO's", jdbcInputOperator.outputPort, fileOutput.input).setLocality(Locality.CONTAINER_LOCAL);
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:22,代码来源:JdbcHDFSApp.java


示例19: populateDAG

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
  // Sample DAG with 2 operators
  // Replace this code with the DAG you want to build

  SeedEventGenerator seedGen = dag.addOperator("seedGen", SeedEventGenerator.class);
  seedGen.setSeedStart(1);
  seedGen.setSeedEnd(10);
  seedGen.addKeyData("x", 0, 10);
  seedGen.addKeyData("y", 0, 100);

  ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
  cons.setStringFormat("hello: %s");

  dag.addStream("seeddata", seedGen.val_list, cons.input).setLocality(Locality.CONTAINER_LOCAL);
}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:18,代码来源:Application.java


示例20: populateDAG

import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
  TailFsInputOperator log = dag.addOperator("log", new TailFsInputOperator());
  log.setDelimiter('\n');
  log.setFilePath("/var/log/apache2/access.log");

  ApacheLogParseMapOutputOperator parse = dag.addOperator("parse", new ApacheLogParseMapOutputOperator());
  GeoIPExtractor geoIPExtractor = new GeoIPExtractor();

  // Can't put this file in resources until licensing issue is straightened out
  geoIPExtractor.setDatabasePath("/home/david/GeoLiteCity.dat");
  parse.registerInformationExtractor("ip", geoIPExtractor);
  parse.registerInformationExtractor("agent", new UserAgentExtractor());
  TimestampExtractor timestampExtractor = new TimestampExtractor();
  timestampExtractor.setDateFormatString("dd/MMM/yyyy:HH:mm:ss Z");
  parse.registerInformationExtractor("time", timestampExtractor);

  ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());

  dag.addStream("log-parse", log.output, parse.data);
  dag.addStream("parse-console", parse.output, console.input).setLocality(Locality.CONTAINER_LOCAL);

}
 
开发者ID:apache,项目名称:apex-malhar,代码行数:25,代码来源:ApplicationLocalLog.java



注:本文中的com.datatorrent.api.DAG.Locality类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java DomainFilterInterceptor类代码示例发布时间:2022-05-22
下一篇:
Java InlineRenamer类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap