• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java MapConfig类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.samza.config.MapConfig的典型用法代码示例。如果您正苦于以下问题:Java MapConfig类的具体用法?Java MapConfig怎么用?Java MapConfig使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



MapConfig类属于org.apache.samza.config包,在下文中一共展示了MapConfig类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: testGetIntermediateStreamProperties

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testGetIntermediateStreamProperties() {
  Map<String, String> config = new HashMap<>();
  KafkaSystemFactory factory = new KafkaSystemFactory();
  Map<String, Properties> properties = JavaConversions.mapAsJavaMap(
      factory.getIntermediateStreamProperties(new MapConfig(config)));
  assertTrue(properties.isEmpty());

  // no properties for stream
  config.put("streams.test.samza.intermediate", "true");
  config.put("streams.test.compression.type", "lz4"); //some random config
  properties = JavaConversions.mapAsJavaMap(
      factory.getIntermediateStreamProperties(new MapConfig(config)));
  assertTrue(properties.isEmpty());

  config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
  properties = JavaConversions.mapAsJavaMap(
      factory.getIntermediateStreamProperties(new MapConfig(config)));
  assertTrue(!properties.isEmpty());
  Properties prop = properties.get("test");
  assertEquals(prop.getProperty("retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH()));
  assertEquals(prop.getProperty("compression.type"), "lz4");
}
 
开发者ID:apache,项目名称:samza,代码行数:24,代码来源:TestKafkaSystemFactoryJava.java


示例2: getEventHubConfig

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
public static Config getEventHubConfig(EventHubSystemProducer.PartitioningMethod partitioningMethod) {
  HashMap<String, String> mapConfig = new HashMap<>();
  mapConfig.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, SYSTEM_NAME), partitioningMethod.toString());
  mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, SYSTEM_NAME), STREAM_NAME1 + "," + STREAM_NAME2);

  mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_NAMESPACE);
  mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_ENTITY1);
  mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY_NAME);
  mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY);

  mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_NAMESPACE);
  mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_ENTITY2);
  mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY_NAME);
  mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY);

  return new MapConfig(mapConfig);
}
 
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:MockEventHubConfigFactory.java


示例3: testResourceMapWithFileStatusFailure

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testResourceMapWithFileStatusFailure() {
  thrown.expect(LocalizerResourceException.class);
  thrown.expectMessage("IO Exception when accessing the resource file status from the filesystem");

  Map<String, String> configMap = new HashMap<>();
  configMap.put("yarn.resources.myResource1.path", "unknown://host1.com/readme");
  configMap.put("yarn.resources.myResource1.local.name", "readme");
  configMap.put("yarn.resources.myResource1.local.type", "file");
  configMap.put("yarn.resources.myResource1.local.visibility", "public");
  Config conf = new MapConfig(configMap);

  YarnConfiguration yarnConfiguration = new YarnConfiguration();
  yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
  yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName());
  LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
}
 
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:TestLocalizerResourceMapper.java


示例4: getExecutionPlan

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
ExecutionPlan getExecutionPlan(StreamApplication app, String runId) throws Exception {
  // build stream graph
  StreamGraphImpl streamGraph = new StreamGraphImpl(this, config);
  app.init(streamGraph, config);

  // create the physical execution plan
  Map<String, String> cfg = new HashMap<>(config);
  if (StringUtils.isNoneEmpty(runId)) {
    cfg.put(ApplicationConfig.APP_RUN_ID, runId);
  }

  Set<StreamSpec> inputStreams = new HashSet<>(streamGraph.getInputOperators().keySet());
  inputStreams.removeAll(streamGraph.getOutputStreams().keySet());
  ApplicationMode mode = inputStreams.stream().allMatch(StreamSpec::isBounded)
      ? ApplicationMode.BATCH : ApplicationMode.STREAM;
  cfg.put(ApplicationConfig.APP_MODE, mode.name());

  ExecutionPlanner planner = new ExecutionPlanner(new MapConfig(cfg), streamManager);
  return planner.plan(streamGraph);
}
 
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:AbstractApplicationRunner.java


示例5: testResourceConfigIncluded

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testResourceConfigIncluded() {
  Map<String, String> configMap = new HashMap<>();

  configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
  configMap.put("yarn.resources.myResource1.local.name", "readme");
  configMap.put("yarn.resources.myResource1.local.type", "file");
  configMap.put("yarn.resources.myResource1.local.visibility", "public");

  Config conf = new MapConfig(configMap);

  LocalizerResourceConfig manager = new LocalizerResourceConfig(conf);
  assertEquals(1, manager.getResourceNames().size());
  assertEquals("myResource1", manager.getResourceNames().get(0));
  assertEquals("readme", manager.getResourceLocalName("myResource1"));
  assertEquals(LocalResourceType.FILE, manager.getResourceLocalType("myResource1"));
  assertEquals(LocalResourceVisibility.PUBLIC, manager.getResourceLocalVisibility("myResource1"));
}
 
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestLocalizerResourceConfig.java


示例6: testSchemeWithSubkeys

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testSchemeWithSubkeys() {
  Map<String, String> configMap = new HashMap<>();
  configMap.put("fs.http.impl", "org.apache.samza.HttpFileSystem");
  configMap.put("fs.myscheme.impl", "org.apache.samza.MySchemeFileSystem");
  configMap.put("fs.http.impl.key1", "val1");
  configMap.put("fs.http.impl.key2", "val2");
  Config conf = new MapConfig(configMap);

  FileSystemImplConfig manager = new FileSystemImplConfig(conf);

  Map<String, String> expectedFsHttpImplConfs = ImmutableMap.of( //Scheme with additional subkeys
      "fs.http.impl", "org.apache.samza.HttpFileSystem",
      "fs.http.impl.key1", "val1",
      "fs.http.impl.key2", "val2"
  );

  Map<String, String> expectedFsMyschemeImplConfs = ImmutableMap.of( // Scheme without subkeys
      "fs.myscheme.impl", "org.apache.samza.MySchemeFileSystem"
  );

  assertEquals(Arrays.asList("http", "myscheme"), manager.getSchemes());
  assertEquals(expectedFsHttpImplConfs, manager.getSchemeConfig("http"));
  assertEquals(expectedFsMyschemeImplConfs, manager.getSchemeConfig("myscheme"));
}
 
开发者ID:apache,项目名称:samza,代码行数:26,代码来源:TestFileSystemImplConfig.java


示例7: testDefaultPartitions

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testDefaultPartitions() {
  Map<String, String> map = new HashMap<>(config);
  map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
  Config cfg = new MapConfig(map);

  ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
  StreamGraphImpl streamGraph = createSimpleGraph();
  JobGraph jobGraph = planner.createJobGraph(streamGraph);
  planner.calculatePartitions(streamGraph, jobGraph);

  // the partitions should be the same as input1
  jobGraph.getIntermediateStreams().forEach(edge -> {
      assertTrue(edge.getPartitionCount() == DEFAULT_PARTITIONS);
    });
}
 
开发者ID:apache,项目名称:samza,代码行数:17,代码来源:TestExecutionPlanner.java


示例8: testTriggerIntervalWithInvalidWindowMs

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testTriggerIntervalWithInvalidWindowMs() throws Exception {
  Map<String, String> map = new HashMap<>(config);
  map.put(TaskConfig.WINDOW_MS(), "-1");
  map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
  Config cfg = new MapConfig(map);

  ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
  StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
  ExecutionPlan plan = planner.plan(streamGraph);
  List<JobConfig> jobConfigs = plan.getJobConfigs();
  assertEquals(1, jobConfigs.size());

  // GCD of 8, 16, 1600 and 252 is 4
  assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
}
 
开发者ID:apache,项目名称:samza,代码行数:17,代码来源:TestExecutionPlanner.java


示例9: testStreamProcessor

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
/**
   * Testing a basic identity stream task - reads data from a topic and writes it to another topic
   * (without any modifications)
   *
   * <p>
   * The standalone version in this test uses KafkaSystemFactory and it uses a SingleContainerGrouperFactory. Hence,
   * no matter how many tasks are present, it will always be run in a single processor instance. This simplifies testing
   */
// TODO Fix in SAMZA-1538
//  @Test
  public void testStreamProcessor() {
    final String testSystem = "test-system";
    final String inputTopic = "numbers";
    final String outputTopic = "output";
    final int messageCount = 20;

    final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount));
    // Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a
    // TopicExistsException since StreamProcessor auto-creates them.
    createTopics(inputTopic, outputTopic);
    final TestStubs stubs = new TestStubs(configs, IdentityStreamTask::new, bootstrapServers());

    produceMessages(stubs.producer, inputTopic, messageCount);
    run(stubs.processor, stubs.shutdownLatch);
    verifyNumMessages(stubs.consumer, outputTopic, messageCount);
  }
 
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TestStreamProcessor.java


示例10: testOnShutdown

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testOnShutdown() throws Exception {
  Config conf = getConfig();
  state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));

  ContainerProcessManager taskManager =  new ContainerProcessManager(
      new MapConfig(conf),
      state,
      new MetricsRegistryMap(),
      clusterResourceManager
  );
  taskManager.start();

  Thread allocatorThread = (Thread) getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager);
  assertTrue(allocatorThread.isAlive());

  taskManager.stop();

  assertFalse(allocatorThread.isAlive());
}
 
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:TestContainerProcessManager.java


示例11: getConfig

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
private static Config getConfig() {
  Config config = new MapConfig(new HashMap<String, String>() {
    {
      put("cluster-manager.container.count", "1");
      put("cluster-manager.container.retry.count", "1");
      put("cluster-manager.container.retry.window.ms", "1999999999");
      put("cluster-manager.container.request.timeout.ms", "3");
      put("cluster-manager.allocator.sleep.ms", "1");
      put("cluster-manager.container.memory.mb", "512");
      put("yarn.package.path", "/foo");
      put("task.inputs", "test-system.test-stream");
      put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
      put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
      put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
      put("job.host-affinity.enabled", "true");
    }
  });

  Map<String, String> map = new HashMap<>();
  map.putAll(config);
  return new MapConfig(map);
}
 
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:TestHostAwareContainerAllocator.java


示例12: testPartitionCountMonitorWithDurableStates

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testPartitionCountMonitorWithDurableStates()
    throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
  configMap.put("stores.mystore.changelog", "mychangelog");
  Config config = new MapConfig(configMap);

  // mimic job runner code to write the config to coordinator stream
  CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory();
  CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
  producer.writeConfig("test-job", config);

  ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config);

  // change the input system stream metadata
  MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());

  StreamPartitionCountMonitor monitor = clusterCoordinator.getPartitionMonitor();
  monitor.updatePartitionCountMetric();
  assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.FAILED);
}
 
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:TestClusterBasedJobCoordinator.java


示例13: testPartitionCountMonitorWithoutDurableStates

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testPartitionCountMonitorWithoutDurableStates() throws IllegalAccessException, InvocationTargetException {
  Config config = new MapConfig(configMap);

  // mimic job runner code to write the config to coordinator stream
  CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory();
  CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
  producer.writeConfig("test-job", config);

  ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config);

  // change the input system stream metadata
  MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());

  StreamPartitionCountMonitor monitor = clusterCoordinator.getPartitionMonitor();
  monitor.updatePartitionCountMetric();
  assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.UNDEFINED);
}
 
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestClusterBasedJobCoordinator.java


示例14: testStreamProcessorWithStreamTaskFactory

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
/**
   * Should be able to create task instances from the provided task factory.
   */
// TODO Fix in SAMZA-1538
//  @Test
  public void testStreamProcessorWithStreamTaskFactory() {
    final String testSystem = "test-system";
    final String inputTopic = "numbers2";
    final String outputTopic = "output2";
    final int messageCount = 20;

    final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount));
    createTopics(inputTopic, outputTopic);
    final TestStubs stubs = new TestStubs(configs, IdentityStreamTask::new, bootstrapServers());

    produceMessages(stubs.producer, inputTopic, messageCount);
    run(stubs.processor, stubs.shutdownLatch);
    verifyNumMessages(stubs.consumer, outputTopic, messageCount);
  }
 
开发者ID:apache,项目名称:samza,代码行数:20,代码来源:TestStreamProcessor.java


示例15: testLocalStreamGroupedCorrectlyForYarn

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testLocalStreamGroupedCorrectlyForYarn() {
  HashSet<SystemStreamPartition> allSSPs = new HashSet<>();
  HashMap<String, String> configMap = new HashMap<>();

  configMap.put("job.container.count", "2");
  configMap.put("processor.list", "0,1");

  Config config = new MapConfig(configMap);

  SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config);

  Collections.addAll(allSSPs, aa0, aa1, aa2, ab0);
  Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
  Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<>();

  HashSet<SystemStreamPartition> partitions = new HashSet<>();
  partitions.add(aa0);
  partitions.add(aa1);
  partitions.add(aa2);
  partitions.add(ab0);
  expectedResult.put(new TaskName("Task-0"), partitions);
  expectedResult.put(new TaskName("Task-1"), partitions);

  assertEquals(expectedResult, result);
}
 
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TestAllSspToSingleTaskGrouper.java


示例16: testLocalStreamGroupedCorrectlyForPassthru

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testLocalStreamGroupedCorrectlyForPassthru() {
  HashSet<SystemStreamPartition> allSSPs = new HashSet<>();
  HashMap<String, String> configMap = new HashMap<>();

  configMap.put("job.coordinator.factory", "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
  configMap.put("processor.id", "1");
  configMap.put("processor.list", configMap.get("processor.id"));

  Config config = new MapConfig(configMap);

  SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config);

  Collections.addAll(allSSPs, aa0, aa1, aa2, ab0);
  Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
  Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<>();

  HashSet<SystemStreamPartition> partitions = new HashSet<>();
  partitions.add(aa0);
  partitions.add(aa1);
  partitions.add(aa2);
  partitions.add(ab0);
  expectedResult.put(new TaskName("Task-1"), partitions);

  assertEquals(expectedResult, result);
}
 
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TestAllSspToSingleTaskGrouper.java


示例17: testShouldGroupRelevantMonitorConfigTogether

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testShouldGroupRelevantMonitorConfigTogether() {
    // Test that Monitor Loader groups relevant config together.
    Map<String, String> firstMonitorConfig = ImmutableMap.of("monitor.monitor1.factory.class",
                                                             "org.apache.samza.monitor.DummyMonitor",
                                                             "monitor.monitor1.scheduling.interval.ms",
                                                             "100");
    Map<String, String> secondMonitorConfig = ImmutableMap.of("monitor.monitor2.factory.class",
                                                              "org.apache.samza.monitor.DummyMonitor",
                                                              "monitor.monitor2.scheduling.interval.ms",
                                                              "200");
    MapConfig mapConfig = new MapConfig(ImmutableList.of(firstMonitorConfig, secondMonitorConfig));
    MonitorConfig expectedFirstConfig = new MonitorConfig(new MapConfig(firstMonitorConfig).subset("monitor.monitor1."));
    MonitorConfig expectedSecondConfig = new MonitorConfig(new MapConfig(secondMonitorConfig).subset("monitor.monitor2."));
    Map<String, MonitorConfig> expected = ImmutableMap.of("monitor1", expectedFirstConfig, "monitor2", expectedSecondConfig);
    assertEquals(expected, MonitorConfig.getMonitorConfigs(mapConfig));
}
 
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:TestMonitorService.java


示例18: generateCommonStoreConfig

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
protected Map<String, String> generateCommonStoreConfig(Map<String, String> config) {

    Map<String, String> storeConfig = new HashMap<>();

    // We assume the configuration for serde are already generated for this table,
    // so we simply carry them over to store configuration.
    //
    JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(config));

    String keySerde = tableConfig.getKeySerde(tableSpec.getId());
    storeConfig.put(String.format(StorageConfig.KEY_SERDE(), tableSpec.getId()), keySerde);

    String valueSerde = tableConfig.getValueSerde(tableSpec.getId());
    storeConfig.put(String.format(StorageConfig.MSG_SERDE(), tableSpec.getId()), valueSerde);

    return storeConfig;
  }
 
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:BaseLocalStoreBackedTableProvider.java


示例19: fullAction

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void fullAction() throws Exception {
    Map<String, String> configMap = new HashMap<String, String>();
    configMap.put("out", "fortunes");

    Config testConfig = new MapConfig(configMap);

    TaskUnitTestHarness<String, GenericRecord> testProcessTask =
            new TaskUnitTestHarness<String, GenericRecord>(Class_Task_Name, false, true);
    testProcessTask.start(testConfig);
    assertTrue(testProcessTask.isStarted());

    assertEquals(0, testProcessTask.getResult().size());
    testProcessTask.inject(Dummy_Key, sampleInput());
    assertEquals(1, testProcessTask.getResult().size());
    testProcessTask.inject(Dummy_Key, sampleInput());
    assertEquals(2, testProcessTask.getResult().size());

    Fortune outMsg = (Fortune)testProcessTask.getResult().get(1).getMessage();

    assertEquals(outMsg.getAnswer().toString(),
            "The sons of Ragnar Lothbrok will be spoken of as long as men have tongues to speak.");
}
 
开发者ID:theduderog,项目名称:hello-samza-confluent,代码行数:24,代码来源:SeerTaskTest.java


示例20: SqlSystemStreamConfig

import org.apache.samza.config.MapConfig; //导入依赖的package包/类
public SqlSystemStreamConfig(String systemName, String streamName, List<String> sourceParts,
    Config systemConfig) {


  HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
  this.systemName = systemName;
  this.streamName = streamName;
  this.source = getSourceFromSourceParts(sourceParts);
  this.sourceParts = sourceParts;
  this.systemStream = new SystemStream(systemName, streamName);

  samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
  Validate.notEmpty(samzaRelConverterName,
      String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));

  relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);

  // Removing the Samza SQL specific configs to get the remaining Samza configs.
  streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
  streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);

  config = new MapConfig(streamConfigs);
}
 
开发者ID:apache,项目名称:samza,代码行数:24,代码来源:SqlSystemStreamConfig.java



注:本文中的org.apache.samza.config.MapConfig类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java SerialRef类代码示例发布时间:2022-05-22
下一篇:
Java Outlet类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap