本文整理汇总了Java中com.datatorrent.api.StreamingApplication类的典型用法代码示例。如果您正苦于以下问题:Java StreamingApplication类的具体用法?Java StreamingApplication怎么用?Java StreamingApplication使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
StreamingApplication类属于com.datatorrent.api包,在下文中一共展示了StreamingApplication类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: getApexLauncher
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
@Override
protected Launcher<?> getApexLauncher() {
return new Launcher<AppHandle>() {
@Override
public AppHandle launchApp(StreamingApplication application,
Configuration configuration, AttributeMap launchParameters)
throws org.apache.apex.api.Launcher.LauncherException {
EmbeddedAppLauncher<?> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED);
DAG dag = embeddedLauncher.getDAG();
application.populateDAG(dag, new Configuration(false));
String appName = dag.getValue(DAGContext.APPLICATION_NAME);
Assert.assertEquals("DummyApp", appName);
return new AppHandle() {
@Override
public boolean isFinished() {
return true;
}
@Override
public void shutdown(org.apache.apex.api.Launcher.ShutdownMode arg0) {
}
};
}
};
}
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:ApexYarnLauncherTest.java
示例2: runLocal
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
/**
* Run application in-process. Returns only once application completes.
*
* @param appConfig
* @throws Exception
*/
public void runLocal(AppFactory appConfig) throws Exception
{
propertiesBuilder.conf.setEnum(StreamingApplication.ENVIRONMENT, StreamingApplication.Environment.LOCAL);
LogicalPlan lp = appConfig.createApp(propertiesBuilder);
String libJarsCsv = lp.getAttributes().get(Context.DAGContext.LIBRARY_JARS);
if (libJarsCsv != null && libJarsCsv.length() != 0) {
String[] split = libJarsCsv.split(StramClient.LIB_JARS_SEP);
for (String jarPath : split) {
File file = new File(jarPath);
URL url = file.toURI().toURL();
launchDependencies.add(url);
}
}
// local mode requires custom classes to be resolved through the context class loader
loadDependencies();
StramLocalCluster lc = new StramLocalCluster(lp);
lc.run();
}
开发者ID:apache,项目名称:apex-core,代码行数:27,代码来源:StramAppLauncher.java
示例3: addFromProperties
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
/**
* Read operator configurations from properties. The properties can be in any
* random order, as long as they represent a consistent configuration in their
* entirety.
*
* @param props
* @param conf configuration for variable substitution and evaluation
* @return Logical plan configuration.
*/
public LogicalPlanConfiguration addFromProperties(Properties props, Configuration conf)
{
if (conf != null) {
StramClientUtils.evalProperties(props, conf);
}
for (final String propertyName : props.stringPropertyNames()) {
String propertyValue = props.getProperty(propertyName);
this.properties.setProperty(propertyName, propertyValue);
if (propertyName.startsWith(StreamingApplication.DT_PREFIX) ||
propertyName.startsWith(StreamingApplication.APEX_PREFIX)) {
String[] keyComps = propertyName.split(KEY_SEPARATOR_SPLIT_REGEX);
parseStramPropertyTokens(keyComps, 1, propertyName, propertyValue, stramConf);
}
}
return this;
}
开发者ID:apache,项目名称:apex-core,代码行数:26,代码来源:LogicalPlanConfiguration.java
示例4: testInjectionOfOperatorName
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
@Test
public void testInjectionOfOperatorName() throws Exception
{
StreamingApplication application = new StreamingApplication()
{
@Override
public void populateDAG(DAG dag, Configuration conf)
{
dag.addOperator("input", new MockInputOperator());
}
};
LocalMode lma = LocalMode.newInstance();
lma.prepareDAG(application, new Configuration());
LocalMode.Controller lc = lma.getController();
lc.runAsync();
latch.await();
Assert.assertEquals("operator name", "input", operatorName);
lc.shutdown();
}
开发者ID:apache,项目名称:apex-core,代码行数:20,代码来源:OperatorContextTest.java
示例5: testApp
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
public void testApp(StreamingApplication app) throws Exception
{
try {
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
lma.prepareDAG(app, conf);
LocalMode.Controller lc = lma.getController();
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return endApp;
}
});
lc.run(200000); // runs for 20 seconds and quits if terminating condition not reached
LOG.info("Control Tuples received {} expected {}", numControlTuples, controlIndex);
Assert.assertTrue("Incorrect Control Tuples", numControlTuples == controlIndex);
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
开发者ID:apache,项目名称:apex-core,代码行数:25,代码来源:CustomControlTupleTest.java
示例6: testAppLevelProperties
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
@Test
@SuppressWarnings("UnnecessaryBoxing")
public void testAppLevelProperties()
{
String appName = "app1";
Properties props = new Properties();
props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".testprop1", "10");
props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".prop.testprop2", "100");
props.put(StreamingApplication.APEX_PREFIX + "application.*.prop.testprop3", "1000");
props.put(StreamingApplication.APEX_PREFIX + "application." + appName + ".inncls.a", "10000");
LogicalPlanConfiguration dagBuilder = new LogicalPlanConfiguration(new Configuration(false));
dagBuilder.addFromProperties(props, null);
LogicalPlan dag = new LogicalPlan();
TestApplication app1Test = new TestApplication();
dagBuilder.setApplicationConfiguration(dag, appName, app1Test);
Assert.assertEquals("", Integer.valueOf(10), app1Test.getTestprop1());
Assert.assertEquals("", Integer.valueOf(100), app1Test.getTestprop2());
Assert.assertEquals("", Integer.valueOf(1000), app1Test.getTestprop3());
Assert.assertEquals("", Integer.valueOf(10000), app1Test.getInncls().getA());
}
开发者ID:apache,项目名称:apex-core,代码行数:23,代码来源:LogicalPlanConfigurationTest.java
示例7: testSetOperatorProperties
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
@Test
public void testSetOperatorProperties()
{
Configuration conf = new Configuration(false);
conf.set(StreamingApplication.APEX_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue");
conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c");
conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val");
conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal");
conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal");
LogicalPlan dag = new LogicalPlan();
GenericTestOperator o1 = dag.addOperator("o1", new GenericTestOperator());
ValidationTestOperator o2 = dag.addOperator("o2", new ValidationTestOperator());
LogicalPlanConfiguration pb = new LogicalPlanConfiguration(conf);
pb.setOperatorProperties(dag, "testSetOperatorProperties");
Assert.assertEquals("o1.myStringProperty", "myStringPropertyValue", o1.getMyStringProperty());
Assert.assertArrayEquals("o2.stringArrayField", new String[] {"a", "b", "c"}, o2.getStringArrayField());
Assert.assertEquals("o2.mapProperty.key1", "key1Val", o2.getMapProperty().get("key1"));
Assert.assertEquals("o2.mapProperty(key1.dot)", "key1dotVal", o2.getMapProperty().get("key1.dot"));
Assert.assertEquals("o2.mapProperty(key2.dot)", "key2dotVal", o2.getMapProperty().get("key2.dot"));
}
开发者ID:apache,项目名称:apex-core,代码行数:27,代码来源:LogicalPlanConfigurationTest.java
示例8: testAppNameAttribute
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
@Test
public void testAppNameAttribute()
{
StreamingApplication app = new AnnotatedApplication();
Configuration conf = new Configuration(false);
conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
Properties properties = new Properties();
properties.put(StreamingApplication.APEX_PREFIX + "application.TestAliasApp.class", app.getClass().getName());
builder.addFromProperties(properties, null);
LogicalPlan dag = new LogicalPlan();
String appPath = app.getClass().getName().replace(".", "/") + ".class";
dag.setAttribute(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME, "testApp");
builder.prepareDAG(dag, app, appPath);
Assert.assertEquals("Application name", "testApp", dag.getAttributes().get(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME));
}
开发者ID:apache,项目名称:apex-core,代码行数:22,代码来源:LogicalPlanConfigurationTest.java
示例9: testAppAlias
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
@Test
public void testAppAlias()
{
StreamingApplication app = new AnnotatedApplication();
Configuration conf = new Configuration(false);
conf.addResource(StramClientUtils.DT_SITE_XML_FILE);
LogicalPlanConfiguration builder = new LogicalPlanConfiguration(conf);
Properties properties = new Properties();
properties.put(StreamingApplication.APEX_PREFIX + "application.TestAliasApp.class", app.getClass().getName());
builder.addFromProperties(properties, null);
LogicalPlan dag = new LogicalPlan();
String appPath = app.getClass().getName().replace(".", "/") + ".class";
builder.prepareDAG(dag, app, appPath);
Assert.assertEquals("Application name", "TestAliasApp", dag.getAttributes().get(com.datatorrent.api.Context.DAGContext.APPLICATION_NAME));
}
开发者ID:apache,项目名称:apex-core,代码行数:21,代码来源:LogicalPlanConfigurationTest.java
示例10: testJavaApplication
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
@Test
public void testJavaApplication()
{
Configuration conf = getConfiguration();
StreamingAppFactory factory = new StreamingAppFactory(Application.class.getName(), Application.class)
{
@Override
public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
{
Class<? extends StreamingApplication> c = StramUtils.classForName(Application.class.getName(), StreamingApplication.class);
StreamingApplication app = StramUtils.newInstance(c);
return super.createApp(app, planConfig);
}
};
LogicalPlan dag = factory.createApp(new LogicalPlanConfiguration(conf));
validateProperties(dag);
}
开发者ID:apache,项目名称:apex-core,代码行数:18,代码来源:DAGSetupPluginTests.java
示例11: testModuleProperties
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
@Test
public void testModuleProperties()
{
Configuration conf = new Configuration(false);
conf.set(StreamingApplication.APEX_PREFIX + "operator.o1.prop.myStringProperty", "myStringPropertyValue");
conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.stringArrayField", "a,b,c");
conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty.key1", "key1Val");
conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key1.dot)", "key1dotVal");
conf.set(StreamingApplication.APEX_PREFIX + "operator.o2.prop.mapProperty(key2.dot)", "key2dotVal");
LogicalPlan dag = new LogicalPlan();
TestModules.GenericModule o1 = dag.addModule("o1", new TestModules.GenericModule());
TestModules.ValidationTestModule o2 = dag.addModule("o2", new TestModules.ValidationTestModule());
LogicalPlanConfiguration pb = new LogicalPlanConfiguration(conf);
pb.setModuleProperties(dag, "testSetOperatorProperties");
Assert.assertEquals("o1.myStringProperty", "myStringPropertyValue", o1.getMyStringProperty());
Assert.assertArrayEquals("o2.stringArrayField", new String[]{"a", "b", "c"}, o2.getStringArrayField());
Assert.assertEquals("o2.mapProperty.key1", "key1Val", o2.getMapProperty().get("key1"));
Assert.assertEquals("o2.mapProperty(key1.dot)", "key1dotVal", o2.getMapProperty().get("key1.dot"));
Assert.assertEquals("o2.mapProperty(key2.dot)", "key2dotVal", o2.getMapProperty().get("key2.dot"));
}
开发者ID:apache,项目名称:apex-core,代码行数:26,代码来源:TestModuleProperties.java
示例12: testLoadFromJson
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
@Test
public void testLoadFromJson() throws Exception
{
String resourcePath = "/testModuleTopology.json";
InputStream is = this.getClass().getResourceAsStream(resourcePath);
if (is == null) {
throw new RuntimeException("Could not load " + resourcePath);
}
StringWriter writer = new StringWriter();
IOUtils.copy(is, writer);
JSONObject json = new JSONObject(writer.toString());
Configuration conf = new Configuration(false);
conf.set(StreamingApplication.APEX_PREFIX + "operator.operator3.prop.myStringProperty", "o3StringFromConf");
LogicalPlanConfiguration planConf = new LogicalPlanConfiguration(conf);
LogicalPlan dag = planConf.createFromJson(json, "testLoadFromJson");
planConf.prepareDAG(dag, null, "testApplication");
dag.validate();
validateTopLevelOperators(dag);
validateTopLevelStreams(dag);
validatePublicMethods(dag);
}
开发者ID:apache,项目名称:apex-core,代码行数:25,代码来源:TestModuleExpansion.java
示例13: testApplication
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
public void testApplication(StreamingApplication streamingApplication) throws Exception
{
try {
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(false);
conf.addResource(this.getClass().getResourceAsStream("/JdbcProperties.xml"));
lma.prepareDAG(streamingApplication, conf);
LocalMode.Controller lc = lma.getController();
lc.setHeartbeatMonitoringEnabled(false);
((StramLocalCluster)lc).setExitCondition(new Callable<Boolean>()
{
@Override
public Boolean call() throws Exception
{
return TupleCount == 10;
}
});
lc.run(10000);// runs for 10 seconds and quits
Assert.assertEquals("rows in db", TupleCount, getNumOfRowsinTable(TABLE_POJO_NAME));
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
开发者ID:apache,项目名称:apex-malhar,代码行数:25,代码来源:JdbcInputOperatorApplicationTest.java
示例14: testApplication
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
@Test
public void testApplication() throws Exception {
EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
//launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, false);
//launchAttributes.put(EmbeddedAppLauncher.HEARTBEAT_MONITORING, false);
Configuration conf = new Configuration(false);
conf.addResource(new Path(TEMPLATE_PROPERTIES_PATH));
conf.addResource(new Path(PROPERTIES_PATH));
//conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
conf.setBooleanIfUnset(TwitterStatsApp.IS_TWITTER_SAMPLE_INPUT, defaultTwitterInput);
conf.setBooleanIfUnset(TwitterStatsApp.IS_WEBSOCKET_OUTPUT, defaultWebsocketOutput);
StreamingApplication app = /*new TestApplication();*/ new TwitterStatsApp();
AppHandle appHandle = launcher.launchApp(app, conf, launchAttributes);
long timeoutMillis = System.currentTimeMillis() + Duration.standardMinutes(15).getMillis();
while (!appHandle.isFinished() && System.currentTimeMillis() < timeoutMillis) {
Thread.sleep(500);
}
appHandle.shutdown(ShutdownMode.KILL);
}
开发者ID:tweise,项目名称:apex-samples,代码行数:22,代码来源:TwitterStatsAppTest.java
示例15: launch
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
public static void launch(StreamingApplication app, String name, String libjars) throws Exception {
Configuration conf = new Configuration(true);
// conf.set("dt.loggers.level", "org.apache.*:DEBUG, com.datatorrent.*:DEBUG");
conf.set("dt.dfsRootDirectory", System.getProperty("dt.dfsRootDirectory"));
conf.set("fs.defaultFS", System.getProperty("fs.defaultFS"));
conf.set("yarn.resourcemanager.address", System.getProperty("yarn.resourcemanager.address"));
conf.addResource(new File(System.getProperty("dt.site.path")).toURI().toURL());
if (libjars != null) {
conf.set(StramAppLauncher.LIBJARS_CONF_KEY_NAME, libjars);
}
StramAppLauncher appLauncher = new StramAppLauncher(name, conf);
appLauncher.loadDependencies();
StreamingAppFactory appFactory = new StreamingAppFactory(app, name);
appLauncher.launchApp(appFactory);
}
开发者ID:apache,项目名称:incubator-samoa,代码行数:17,代码来源:ApexDoTask.java
示例16: main
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
StreamingApplication app = new LogLevelApplication();
Configuration conf = new Configuration(false);
conf.addResource(app.getClass().getResourceAsStream(
"/META-INF/properties-LogLevelCount.xml"));
LocalMode.runApp(app, conf, 140000);
}
开发者ID:bbende,项目名称:nifi-streaming-examples,代码行数:10,代码来源:LogLevelApplicationRunner.java
示例17: launchApp
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
public AppHandle launchApp(StreamingApplication app, Properties configProperties)
throws IOException {
List<File> jarsToShip = getYarnDeployDependencies();
StringBuilder classpath = new StringBuilder();
for (File path : jarsToShip) {
if (path.isDirectory()) {
File tmpJar = File.createTempFile("beam-runners-apex-", ".jar");
createJar(path, tmpJar);
tmpJar.deleteOnExit();
path = tmpJar;
}
if (classpath.length() != 0) {
classpath.append(':');
}
classpath.append(path.getAbsolutePath());
}
EmbeddedAppLauncher<?> embeddedLauncher = Launcher.getLauncher(LaunchMode.EMBEDDED);
DAG dag = embeddedLauncher.getDAG();
app.populateDAG(dag, new Configuration(false));
Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
launchAttributes.put(YarnAppLauncher.LIB_JARS, classpath.toString().replace(':', ','));
LaunchParams lp = new LaunchParams(dag, launchAttributes, configProperties);
lp.cmd = "hadoop " + ApexYarnLauncher.class.getName();
HashMap<String, String> env = new HashMap<>();
env.put("HADOOP_USER_CLASSPATH_FIRST", "1");
env.put("HADOOP_CLASSPATH", classpath.toString());
lp.env = env;
return launchApp(lp);
}
开发者ID:apache,项目名称:beam,代码行数:33,代码来源:ApexYarnLauncher.java
示例18: runApp
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
/**
* Shortcut to run an application with the modified configuration.
*
* @param app - Application to be run
* @param configuration - Configuration
* @param runMillis - The time after which the application will be shutdown; pass 0 to run indefinitely.
*/
public static void runApp(StreamingApplication app, Configuration configuration, int runMillis)
{
EmbeddedAppLauncher launcher = newInstance();
Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
launchAttributes.put(RUN_MILLIS, (long)runMillis);
launcher.launchApp(app, configuration, launchAttributes);
}
开发者ID:apache,项目名称:apex-core,代码行数:15,代码来源:EmbeddedAppLauncher.java
示例19: prepareDAG
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
@Override
public DAG prepareDAG(StreamingApplication app, Configuration conf) throws Exception
{
if (app == null && conf == null) {
throw new IllegalArgumentException("Require app or configuration to populate logical plan.");
}
if (conf == null) {
conf = new Configuration(false);
}
LogicalPlanConfiguration lpc = new LogicalPlanConfiguration(conf);
String appName = app != null ? app.getClass().getName() : "unknown";
lpc.prepareDAG(lp, app, appName);
return lp;
}
开发者ID:apache,项目名称:apex-core,代码行数:15,代码来源:EmbeddedAppLauncherImpl.java
示例20: launchApp
import com.datatorrent.api.StreamingApplication; //导入依赖的package包/类
@Override
public YarnAppHandleImpl launchApp(final StreamingApplication app, Configuration conf, Attribute.AttributeMap launchParameters) throws LauncherException
{
if (launchParameters != null) {
for (Map.Entry<Attribute<?>, Object> entry : launchParameters.entrySet()) {
String property = propMapping.get(entry.getKey());
if (property != null) {
setConfiguration(conf, property, entry.getValue());
}
}
}
try {
String name = app.getClass().getName();
StramAppLauncher appLauncher = new StramAppLauncher(name, conf);
appLauncher.loadDependencies();
StreamingAppFactory appFactory = new StreamingAppFactory(name, app.getClass())
{
@Override
public LogicalPlan createApp(LogicalPlanConfiguration planConfig)
{
return super.createApp(app, planConfig);
}
};
ApplicationId appId = appLauncher.launchApp(appFactory);
appLauncher.resetContextClassLoader();
return new YarnAppHandleImpl(appId, conf);
} catch (Exception ex) {
throw new LauncherException(ex);
}
}
开发者ID:apache,项目名称:apex-core,代码行数:31,代码来源:YarnAppLauncherImpl.java
注:本文中的com.datatorrent.api.StreamingApplication类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论