本文整理汇总了Java中com.datatorrent.api.DAG.Locality类的典型用法代码示例。如果您正苦于以下问题:Java Locality类的具体用法?Java Locality怎么用?Java Locality使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Locality类属于com.datatorrent.api.DAG包,在下文中一共展示了Locality类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testParDoChaining
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void testParDoChaining() throws Exception {
Pipeline p = Pipeline.create();
long numElements = 1000;
PCollection<Long> input = p.apply(GenerateSequence.from(0).to(numElements));
PAssert.thatSingleton(input.apply("Count", Count.<Long>globally())).isEqualTo(numElements);
ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
DAG dag = TestApexRunner.translate(p, options);
String[] expectedThreadLocal = { "/CreateActual/FilterActuals/Window.Assign" };
Set<String> actualThreadLocal = Sets.newHashSet();
for (DAG.StreamMeta sm : dag.getAllStreamsMeta()) {
DAG.OutputPortMeta opm = sm.getSource();
if (sm.getLocality() == Locality.THREAD_LOCAL) {
String name = opm.getOperatorMeta().getName();
String prefix = "PAssert$";
if (name.startsWith(prefix)) {
// remove indeterministic prefix
name = name.substring(prefix.length() + 1);
}
actualThreadLocal.add(name);
}
}
Assert.assertThat(actualThreadLocal, Matchers.hasItems(expectedThreadLocal));
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:ApexRunnerTest.java
示例2: validatePositiveOiO
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void validatePositiveOiO()
{
logger.info("Checking the logic for sanity checking of OiO");
LogicalPlan plan = new LogicalPlan();
RecoverableInputOperator inputOperator = plan.addOperator("IntegerGenerator", new RecoverableInputOperator());
CollectorOperator outputOperator = plan.addOperator("IntegerCollector", new CollectorOperator());
plan.addStream("PossibleOiO", inputOperator.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
try {
plan.validate();
Assert.assertTrue("OiO validation", true);
} catch (ConstraintViolationException ex) {
Assert.fail("OIO Single InputPort");
}
}
开发者ID:apache,项目名称:apex-core,代码行数:18,代码来源:OiOStreamTest.java
示例3: validatePositiveOiOiO
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void validatePositiveOiOiO()
{
logger.info("Checking the logic for sanity checking of OiO");
LogicalPlan plan = new LogicalPlan();
ThreadIdValidatingInputOperator inputOperator = plan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator = plan.addOperator("intermediateOperator", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingOutputOperator outputOperator = plan.addOperator("outputOperator", new ThreadIdValidatingOutputOperator());
plan.addStream("OiO1", inputOperator.output, intermediateOperator.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiO2", intermediateOperator.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
try {
plan.validate();
Assert.assertTrue("OiOiO validation", true);
} catch (ConstraintViolationException ex) {
Assert.fail("OiOiO validation");
}
}
开发者ID:apache,项目名称:apex-core,代码行数:21,代码来源:OiOStreamTest.java
示例4: validatePositiveOiOiOdiamond
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void validatePositiveOiOiOdiamond()
{
logger.info("Checking the logic for sanity checking of OiO");
LogicalPlan plan = new LogicalPlan();
ThreadIdValidatingInputOperator inputOperator = plan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator1 = plan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator2 = plan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericOperatorWithTwoInputPorts outputOperator = plan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
plan.addStream("OiOin", inputOperator.output, intermediateOperator1.input, intermediateOperator2.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOout1", intermediateOperator1.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOout2", intermediateOperator2.output, outputOperator.input2).setLocality(Locality.THREAD_LOCAL);
try {
plan.validate();
Assert.assertTrue("OiOiO diamond validation", true);
} catch (ConstraintViolationException ex) {
Assert.fail("OIOIO diamond validation");
}
}
开发者ID:apache,项目名称:apex-core,代码行数:23,代码来源:OiOStreamTest.java
示例5: validatePositiveOiOiOExtendeddiamond
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void validatePositiveOiOiOExtendeddiamond()
{
logger.info("Checking the logic for sanity checking of OiO");
LogicalPlan plan = new LogicalPlan();
ThreadIdValidatingInputOperator inputOperator = plan.addOperator("inputOperator", new ThreadIdValidatingInputOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator1 = plan.addOperator("intermediateOperator1", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator2 = plan.addOperator("intermediateOperator2", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator3 = plan.addOperator("intermediateOperator3", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericIntermediateOperator intermediateOperator4 = plan.addOperator("intermediateOperator4", new ThreadIdValidatingGenericIntermediateOperator());
ThreadIdValidatingGenericOperatorWithTwoInputPorts outputOperator = plan.addOperator("outputOperator", new ThreadIdValidatingGenericOperatorWithTwoInputPorts());
plan.addStream("OiOin", inputOperator.output, intermediateOperator1.input, intermediateOperator3.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOIntermediate1", intermediateOperator1.output, intermediateOperator2.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOIntermediate2", intermediateOperator3.output, intermediateOperator4.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOout1", intermediateOperator2.output, outputOperator.input).setLocality(Locality.THREAD_LOCAL);
plan.addStream("OiOout2", intermediateOperator4.output, outputOperator.input2).setLocality(Locality.THREAD_LOCAL);
try {
plan.validate();
Assert.assertTrue("OiOiO extended diamond validation", true);
} catch (ConstraintViolationException ex) {
Assert.fail("OIOIO extended diamond validation");
}
}
开发者ID:apache,项目名称:apex-core,代码行数:27,代码来源:OiOStreamTest.java
示例6: validateOiOImplementation
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void validateOiOImplementation() throws Exception
{
LogicalPlan lp = new LogicalPlan();
String workingDir = new File("target/validateOiOImplementation").getAbsolutePath();
lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
TestInputOperator io = lp.addOperator("Input Operator", new TestInputOperator());
FirstGenericOperator go = lp.addOperator("First Generic Operator", new FirstGenericOperator());
SecondGenericOperator out = lp.addOperator("Second Generic Operator", new SecondGenericOperator());
/*
* This tests make sure that even if the application_window_count is different the endWindow() is called for
* end_stream
*/
lp.getOperatorMeta("Second Generic Operator").getAttributes().put(Context.OperatorContext.APPLICATION_WINDOW_COUNT, 2);
StreamMeta stream = lp.addStream("Stream", io.output, go.input);
StreamMeta stream1 = lp.addStream("Stream1", go.output, out.input);
stream1.setLocality(Locality.THREAD_LOCAL);
lp.validate();
StramLocalCluster slc = new StramLocalCluster(lp);
slc.run();
Assert.assertEquals("End Window Count", FirstGenericOperator.endwindowCount, SecondGenericOperator.endwindowCount);
}
开发者ID:apache,项目名称:apex-core,代码行数:25,代码来源:OiOEndWindowTest.java
示例7: testLinearInlineOperatorsRecovery
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
public void testLinearInlineOperatorsRecovery() throws Exception
{
RecoverableInputOperator.initGenTuples();
CollectorOperator.collection.clear();
CollectorOperator.duplicates.clear();
dag.getAttributes().put(LogicalPlan.CHECKPOINT_WINDOW_COUNT, 2);
dag.getAttributes().put(LogicalPlan.STREAMING_WINDOW_SIZE_MILLIS, 300);
dag.getAttributes().put(LogicalPlan.CONTAINERS_MAX_COUNT, 1);
RecoverableInputOperator rip = dag.addOperator("LongGenerator", RecoverableInputOperator.class);
rip.setMaximumTuples(maxTuples);
rip.setSimulateFailure(true);
CollectorOperator cm = dag.addOperator("LongCollector", CollectorOperator.class);
cm.setSimulateFailure(true);
dag.getMeta(cm).getAttributes().put(OperatorContext.PROCESSING_MODE, processingMode);
dag.addStream("connection", rip.output, cm.input).setLocality(Locality.CONTAINER_LOCAL);
StramLocalCluster lc = new StramLocalCluster(dag);
lc.run();
}
开发者ID:apache,项目名称:apex-core,代码行数:23,代码来源:ProcessingModeTests.java
示例8: testOiOCommitted
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void testOiOCommitted() throws IOException, ClassNotFoundException
{
LogicalPlan lp = new LogicalPlan();
String workingDir = new File("target/testCommitted").getAbsolutePath();
lp.setAttribute(Context.OperatorContext.STORAGE_AGENT, new AsyncFSStorageAgent(workingDir, null));
lp.setAttribute(DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
String op1Name = "CommitAwareOperatorTestOioCommit1";
String op2Name = "CommitAwareOperatorTestOioCommit2";
CommitAwareOperator operator1 = lp.addOperator(op1Name, new CommitAwareOperator());
CommitAwareOperator operator2 = lp.addOperator(op2Name, new CommitAwareOperator());
lp.addStream("local", operator1.output, operator2.input).setLocality(Locality.THREAD_LOCAL);
StramLocalCluster lc = new StramLocalCluster(lp);
lc.run(5000);
/* this is not foolproof but some insurance is better than nothing */
Assert.assertTrue("No Committed Windows", committedWindowIds.contains(op1Name));
Assert.assertTrue("No Committed Windows", committedWindowIds.contains(op2Name));
}
开发者ID:apache,项目名称:apex-core,代码行数:21,代码来源:StreamingContainerTest.java
示例9: testDownStreamPartition
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
private void testDownStreamPartition(Locality locality) throws Exception
{
TestGeneratorInputOperator o1 = dag.addOperator("o1", TestGeneratorInputOperator.class);
GenericTestOperator o2 = dag.addOperator("o2", GenericTestOperator.class);
dag.setOperatorAttribute(o2, OperatorContext.PARTITIONER, new StatelessPartitioner<GenericTestOperator>(2));
dag.addStream("o1Output1", o1.outport, o2.inport1).setLocality(locality);
int maxContainers = 5;
dag.setAttribute(LogicalPlan.CONTAINERS_MAX_COUNT, maxContainers);
dag.setAttribute(OperatorContext.STORAGE_AGENT, new StramTestSupport.MemoryStorageAgent());
dag.validate();
PhysicalPlan plan = new PhysicalPlan(dag, new TestPlanContext());
Assert.assertEquals("number of containers", 1, plan.getContainers().size());
PTContainer container1 = plan.getContainers().get(0);
Assert.assertEquals("number operators " + container1, 3, container1.getOperators().size());
StramLocalCluster slc = new StramLocalCluster(dag);
slc.run(5000);
}
开发者ID:apache,项目名称:apex-core,代码行数:20,代码来源:StreamingContainerManagerTest.java
示例10: testLocalityValidation
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void testLocalityValidation()
{
TestGeneratorInputOperator input1 = dag.addOperator("input1", TestGeneratorInputOperator.class);
GenericTestOperator o1 = dag.addOperator("o1", GenericTestOperator.class);
StreamMeta s1 = dag.addStream("input1.outport", input1.outport, o1.inport1).setLocality(Locality.THREAD_LOCAL);
dag.validate();
TestGeneratorInputOperator input2 = dag.addOperator("input2", TestGeneratorInputOperator.class);
dag.addStream("input2.outport", input2.outport, o1.inport2);
try {
dag.validate();
Assert.fail("Exception expected for " + o1);
} catch (ValidationException ve) {
Assert.assertThat("", ve.getMessage(), RegexMatcher.matches("Locality THREAD_LOCAL invalid for operator .* with multiple input streams .*"));
}
s1.setLocality(null);
dag.validate();
}
开发者ID:apache,项目名称:apex-core,代码行数:22,代码来源:LogicalPlanTest.java
示例11: populateDAG
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
RandomEventGenerator rand = dag.addOperator("rand", new RandomEventGenerator());
rand.setMaxvalue(3000);
rand.setTuplesBlast(120);
RandomMapOutput randMap = dag.addOperator("randMap", new RandomMapOutput());
randMap.setKey("val");
RubyOperator ruby = dag.addOperator("ruby", new RubyOperator());
String setupScript = "def square(val)\n";
setupScript += " return val*val\nend\n";
ruby.addSetupScript(setupScript);
ruby.setInvoke("square");
ruby.setPassThru(true);
ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
dag.getMeta(console).getMeta(console.input).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
dag.getMeta(ruby).getMeta(ruby.result).getAttributes().put(PortContext.QUEUE_CAPACITY, QUEUE_CAPACITY);
dag.addStream("rand_randMap", rand.integer_data, randMap.input).setLocality(Locality.THREAD_LOCAL);
dag.addStream("randMap_ruby", randMap.map_data, ruby.inBindings).setLocality(locality);
dag.addStream("ruby_console", ruby.result, console.input).setLocality(locality);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:26,代码来源:RubyOperatorBenchmarkApplication.java
示例12: populateDAG
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
TestStatsListener sl = new TestStatsListener();
sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false);
TestGenerator gen = dag.addOperator("Generator", new TestGenerator());
gen.setRange(timeRange);
dag.setAttribute(gen, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
storeOperator = new StoreOperator();
storeOperator.setStore(createStore(conf));
storeOperator.setTimeRange(timeRange);
storeOperator = dag.addOperator("Store", storeOperator);
dag.setAttribute(storeOperator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
dag.addStream("Events", gen.data, storeOperator.input).setLocality(Locality.CONTAINER_LOCAL);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:19,代码来源:ManagedStateBenchmarkApp.java
示例13: populateDAG
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
TestStatsListener sl = new TestStatsListener();
sl.adjustRate = conf.getBoolean("dt.ManagedStateBenchmark.adjustRate", false);
G generator = createGenerator();
dag.addOperator("Generator", generator);
//generator.setRange(timeRange);
dag.setAttribute(generator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
O windowedOperator = createWindowedOperator(conf);
dag.addOperator("windowedOperator", windowedOperator);
dag.setAttribute(windowedOperator, OperatorContext.STATS_LISTENERS, Lists.newArrayList((StatsListener)sl));
//dag.addStream("Data", generator.data, windowedOperator.input).setLocality(Locality.CONTAINER_LOCAL);
connectGeneratorToWindowedOperator(dag, generator, windowedOperator);
// WatermarkGenerator watermarkGenerator = new WatermarkGenerator();
// dag.addOperator("WatermarkGenerator", watermarkGenerator);
// dag.addStream("Control", watermarkGenerator.control, windowedOperator.controlInput)
// .setLocality(Locality.CONTAINER_LOCAL);
DevNull output = dag.addOperator("output", new DevNull());
dag.addStream("output", windowedOperator.output, output.data).setLocality(Locality.CONTAINER_LOCAL);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:26,代码来源:AbstractWindowedOperatorBenchmarkApp.java
示例14: testApplication
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Test
public void testApplication() throws IOException, Exception
{
for (final Locality l : Locality.values()) {
logger.debug("Running the with {} locality", l);
LocalMode.runApp(new Benchmark.AbstractApplication()
{
@Override
public Locality getLocality()
{
return l;
}
}, 60000);
}
}
开发者ID:apache,项目名称:apex-malhar,代码行数:17,代码来源:BenchmarkTest.java
示例15: populateDAG
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration entries)
{
/* Generate random key-value pairs */
RandomDataGenerator randGen = dag.addOperator("randomgen", new RandomDataGenerator());
/* Initialize with three partition to start with */
UniqueCounter<KeyValPair<String, Object>> uniqCount =
dag.addOperator("uniqevalue", new UniqueCounter<KeyValPair<String, Object>>());
MapToKeyHashValuePairConverter<KeyValPair<String, Object>, Integer> converter = dag.addOperator("converter", new MapToKeyHashValuePairConverter());
uniqCount.setCumulative(false);
dag.setAttribute(randGen, Context.OperatorContext.PARTITIONER, new StatelessPartitioner<UniqueCounter<KeyValPair<String, Object>>>(3));
ConsoleOutputOperator output = dag.addOperator("output", new ConsoleOutputOperator());
dag.addStream("datain", randGen.outPort, uniqCount.data);
dag.addStream("convert", uniqCount.count, converter.input).setLocality(Locality.THREAD_LOCAL);
dag.addStream("consoutput", converter.output, output.input);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:20,代码来源:UniqueKeyValCountExample.java
示例16: populateDAG
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
TwitterSampleInput twitterFeed = new TwitterSampleInput();
twitterFeed = dag.addOperator("TweetSampler", twitterFeed);
TwitterStatusWordExtractor wordExtractor = dag.addOperator("WordExtractor", TwitterStatusWordExtractor.class);
UniqueCounter<String> uniqueCounter = dag.addOperator("UniqueWordCounter", new UniqueCounter<String>());
WindowedTopCounter<String> topCounts = dag.addOperator("TopCounter", new WindowedTopCounter<String>());
topCounts.setSlidingWindowWidth(120);
topCounts.setDagWindowWidth(1);
dag.addStream("TweetStream", twitterFeed.text, wordExtractor.input);
dag.addStream("TwittedWords", wordExtractor.output, uniqueCounter.data);
dag.addStream("UniqueWordCounts", uniqueCounter.count, topCounts.input).setLocality(Locality.CONTAINER_LOCAL);
TwitterTopCounterApplication.consoleOutput(dag, "topWords", topCounts.output, SNAPSHOT_SCHEMA, "word");
}
开发者ID:apache,项目名称:apex-malhar,代码行数:20,代码来源:TwitterTopWordsApplication.java
示例17: populateDAG
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
//dag.setAttribute(DAGContext.APPLICATION_NAME, "TweetsDump");
TwitterSampleInput twitterStream = dag.addOperator("TweetSampler", new TwitterSampleInput());
//ConsoleOutputOperator dbWriter = dag.addOperator("DatabaseWriter", new ConsoleOutputOperator());
Status2Database dbWriter = dag.addOperator("DatabaseWriter", new Status2Database());
dbWriter.getStore().setDatabaseDriver("com.mysql.jdbc.Driver");
dbWriter.getStore().setDatabaseUrl("jdbc:mysql://node6.morado.com:3306/twitter");
dbWriter.getStore().setConnectionProperties("user:twitter");
dag.addStream("Statuses", twitterStream.status, dbWriter.input).setLocality(Locality.CONTAINER_LOCAL);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:17,代码来源:TwitterDumpApplication.java
示例18: populateDAG
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
JdbcPOJOInputOperator jdbcInputOperator = dag.addOperator("JdbcInput", new JdbcPOJOInputOperator());
/**
* The class given below can be updated to the user defined class based on
* input table schema The addField infos method needs to be updated
* accordingly This line can be commented and class can be set from the
* properties file
*/
// dag.setOutputPortAttribute(jdbcInputOperator.outputPort, Context.PortContext.TUPLE_CLASS, PojoEvent.class);
jdbcInputOperator.setFieldInfos(addFieldInfos());
JdbcStore store = new JdbcStore();
jdbcInputOperator.setStore(store);
FileLineOutputOperator fileOutput = dag.addOperator("FileOutputOperator", new FileLineOutputOperator());
dag.addStream("POJO's", jdbcInputOperator.outputPort, fileOutput.input).setLocality(Locality.CONTAINER_LOCAL);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:22,代码来源:JdbcHDFSApp.java
示例19: populateDAG
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
// Sample DAG with 2 operators
// Replace this code with the DAG you want to build
SeedEventGenerator seedGen = dag.addOperator("seedGen", SeedEventGenerator.class);
seedGen.setSeedStart(1);
seedGen.setSeedEnd(10);
seedGen.addKeyData("x", 0, 10);
seedGen.addKeyData("y", 0, 100);
ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
cons.setStringFormat("hello: %s");
dag.addStream("seeddata", seedGen.val_list, cons.input).setLocality(Locality.CONTAINER_LOCAL);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:18,代码来源:Application.java
示例20: populateDAG
import com.datatorrent.api.DAG.Locality; //导入依赖的package包/类
@Override
public void populateDAG(DAG dag, Configuration conf)
{
TailFsInputOperator log = dag.addOperator("log", new TailFsInputOperator());
log.setDelimiter('\n');
log.setFilePath("/var/log/apache2/access.log");
ApacheLogParseMapOutputOperator parse = dag.addOperator("parse", new ApacheLogParseMapOutputOperator());
GeoIPExtractor geoIPExtractor = new GeoIPExtractor();
// Can't put this file in resources until licensing issue is straightened out
geoIPExtractor.setDatabasePath("/home/david/GeoLiteCity.dat");
parse.registerInformationExtractor("ip", geoIPExtractor);
parse.registerInformationExtractor("agent", new UserAgentExtractor());
TimestampExtractor timestampExtractor = new TimestampExtractor();
timestampExtractor.setDateFormatString("dd/MMM/yyyy:HH:mm:ss Z");
parse.registerInformationExtractor("time", timestampExtractor);
ConsoleOutputOperator console = dag.addOperator("console", new ConsoleOutputOperator());
dag.addStream("log-parse", log.output, parse.data);
dag.addStream("parse-console", parse.output, console.input).setLocality(Locality.CONTAINER_LOCAL);
}
开发者ID:apache,项目名称:apex-malhar,代码行数:25,代码来源:ApplicationLocalLog.java
注:本文中的com.datatorrent.api.DAG.Locality类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论