本文整理汇总了Java中org.apache.samza.config.MapConfig类的典型用法代码示例。如果您正苦于以下问题:Java MapConfig类的具体用法?Java MapConfig怎么用?Java MapConfig使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
MapConfig类属于org.apache.samza.config包,在下文中一共展示了MapConfig类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testGetIntermediateStreamProperties
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testGetIntermediateStreamProperties() {
Map<String, String> config = new HashMap<>();
KafkaSystemFactory factory = new KafkaSystemFactory();
Map<String, Properties> properties = JavaConversions.mapAsJavaMap(
factory.getIntermediateStreamProperties(new MapConfig(config)));
assertTrue(properties.isEmpty());
// no properties for stream
config.put("streams.test.samza.intermediate", "true");
config.put("streams.test.compression.type", "lz4"); //some random config
properties = JavaConversions.mapAsJavaMap(
factory.getIntermediateStreamProperties(new MapConfig(config)));
assertTrue(properties.isEmpty());
config.put(ApplicationConfig.APP_MODE, ApplicationConfig.ApplicationMode.BATCH.name());
properties = JavaConversions.mapAsJavaMap(
factory.getIntermediateStreamProperties(new MapConfig(config)));
assertTrue(!properties.isEmpty());
Properties prop = properties.get("test");
assertEquals(prop.getProperty("retention.ms"), String.valueOf(KafkaConfig.DEFAULT_RETENTION_MS_FOR_BATCH()));
assertEquals(prop.getProperty("compression.type"), "lz4");
}
开发者ID:apache,项目名称:samza,代码行数:24,代码来源:TestKafkaSystemFactoryJava.java
示例2: getEventHubConfig
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
public static Config getEventHubConfig(EventHubSystemProducer.PartitioningMethod partitioningMethod) {
HashMap<String, String> mapConfig = new HashMap<>();
mapConfig.put(String.format(EventHubConfig.CONFIG_PRODUCER_PARTITION_METHOD, SYSTEM_NAME), partitioningMethod.toString());
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_LIST, SYSTEM_NAME), STREAM_NAME1 + "," + STREAM_NAME2);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_NAMESPACE);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_ENTITY1);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY_NAME);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, SYSTEM_NAME, STREAM_NAME1), EVENTHUB_KEY);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_NAMESPACE, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_NAMESPACE);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_ENTITYPATH, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_ENTITY2);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_KEY_NAME, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY_NAME);
mapConfig.put(String.format(EventHubConfig.CONFIG_STREAM_SAS_TOKEN, SYSTEM_NAME, STREAM_NAME2), EVENTHUB_KEY);
return new MapConfig(mapConfig);
}
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:MockEventHubConfigFactory.java
示例3: testResourceMapWithFileStatusFailure
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testResourceMapWithFileStatusFailure() {
thrown.expect(LocalizerResourceException.class);
thrown.expectMessage("IO Exception when accessing the resource file status from the filesystem");
Map<String, String> configMap = new HashMap<>();
configMap.put("yarn.resources.myResource1.path", "unknown://host1.com/readme");
configMap.put("yarn.resources.myResource1.local.name", "readme");
configMap.put("yarn.resources.myResource1.local.type", "file");
configMap.put("yarn.resources.myResource1.local.visibility", "public");
Config conf = new MapConfig(configMap);
YarnConfiguration yarnConfiguration = new YarnConfiguration();
yarnConfiguration.set("fs.http.impl", HttpFileSystem.class.getName());
yarnConfiguration.set("fs.https.impl", HttpFileSystem.class.getName());
LocalizerResourceMapper mapper = new LocalizerResourceMapper(new LocalizerResourceConfig(conf), yarnConfiguration);
}
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:TestLocalizerResourceMapper.java
示例4: getExecutionPlan
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
ExecutionPlan getExecutionPlan(StreamApplication app, String runId) throws Exception {
// build stream graph
StreamGraphImpl streamGraph = new StreamGraphImpl(this, config);
app.init(streamGraph, config);
// create the physical execution plan
Map<String, String> cfg = new HashMap<>(config);
if (StringUtils.isNoneEmpty(runId)) {
cfg.put(ApplicationConfig.APP_RUN_ID, runId);
}
Set<StreamSpec> inputStreams = new HashSet<>(streamGraph.getInputOperators().keySet());
inputStreams.removeAll(streamGraph.getOutputStreams().keySet());
ApplicationMode mode = inputStreams.stream().allMatch(StreamSpec::isBounded)
? ApplicationMode.BATCH : ApplicationMode.STREAM;
cfg.put(ApplicationConfig.APP_MODE, mode.name());
ExecutionPlanner planner = new ExecutionPlanner(new MapConfig(cfg), streamManager);
return planner.plan(streamGraph);
}
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:AbstractApplicationRunner.java
示例5: testResourceConfigIncluded
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testResourceConfigIncluded() {
Map<String, String> configMap = new HashMap<>();
configMap.put("yarn.resources.myResource1.path", "http://host1.com/readme");
configMap.put("yarn.resources.myResource1.local.name", "readme");
configMap.put("yarn.resources.myResource1.local.type", "file");
configMap.put("yarn.resources.myResource1.local.visibility", "public");
Config conf = new MapConfig(configMap);
LocalizerResourceConfig manager = new LocalizerResourceConfig(conf);
assertEquals(1, manager.getResourceNames().size());
assertEquals("myResource1", manager.getResourceNames().get(0));
assertEquals("readme", manager.getResourceLocalName("myResource1"));
assertEquals(LocalResourceType.FILE, manager.getResourceLocalType("myResource1"));
assertEquals(LocalResourceVisibility.PUBLIC, manager.getResourceLocalVisibility("myResource1"));
}
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestLocalizerResourceConfig.java
示例6: testSchemeWithSubkeys
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testSchemeWithSubkeys() {
Map<String, String> configMap = new HashMap<>();
configMap.put("fs.http.impl", "org.apache.samza.HttpFileSystem");
configMap.put("fs.myscheme.impl", "org.apache.samza.MySchemeFileSystem");
configMap.put("fs.http.impl.key1", "val1");
configMap.put("fs.http.impl.key2", "val2");
Config conf = new MapConfig(configMap);
FileSystemImplConfig manager = new FileSystemImplConfig(conf);
Map<String, String> expectedFsHttpImplConfs = ImmutableMap.of( //Scheme with additional subkeys
"fs.http.impl", "org.apache.samza.HttpFileSystem",
"fs.http.impl.key1", "val1",
"fs.http.impl.key2", "val2"
);
Map<String, String> expectedFsMyschemeImplConfs = ImmutableMap.of( // Scheme without subkeys
"fs.myscheme.impl", "org.apache.samza.MySchemeFileSystem"
);
assertEquals(Arrays.asList("http", "myscheme"), manager.getSchemes());
assertEquals(expectedFsHttpImplConfs, manager.getSchemeConfig("http"));
assertEquals(expectedFsMyschemeImplConfs, manager.getSchemeConfig("myscheme"));
}
开发者ID:apache,项目名称:samza,代码行数:26,代码来源:TestFileSystemImplConfig.java
示例7: testDefaultPartitions
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testDefaultPartitions() {
Map<String, String> map = new HashMap<>(config);
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
StreamGraphImpl streamGraph = createSimpleGraph();
JobGraph jobGraph = planner.createJobGraph(streamGraph);
planner.calculatePartitions(streamGraph, jobGraph);
// the partitions should be the same as input1
jobGraph.getIntermediateStreams().forEach(edge -> {
assertTrue(edge.getPartitionCount() == DEFAULT_PARTITIONS);
});
}
开发者ID:apache,项目名称:samza,代码行数:17,代码来源:TestExecutionPlanner.java
示例8: testTriggerIntervalWithInvalidWindowMs
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testTriggerIntervalWithInvalidWindowMs() throws Exception {
Map<String, String> map = new HashMap<>(config);
map.put(TaskConfig.WINDOW_MS(), "-1");
map.put(JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(), String.valueOf(DEFAULT_PARTITIONS));
Config cfg = new MapConfig(map);
ExecutionPlanner planner = new ExecutionPlanner(cfg, streamManager);
StreamGraphImpl streamGraph = createStreamGraphWithJoinAndWindow();
ExecutionPlan plan = planner.plan(streamGraph);
List<JobConfig> jobConfigs = plan.getJobConfigs();
assertEquals(1, jobConfigs.size());
// GCD of 8, 16, 1600 and 252 is 4
assertEquals("4", jobConfigs.get(0).get(TaskConfig.WINDOW_MS()));
}
开发者ID:apache,项目名称:samza,代码行数:17,代码来源:TestExecutionPlanner.java
示例9: testStreamProcessor
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
/**
* Testing a basic identity stream task - reads data from a topic and writes it to another topic
* (without any modifications)
*
* <p>
* The standalone version in this test uses KafkaSystemFactory and it uses a SingleContainerGrouperFactory. Hence,
* no matter how many tasks are present, it will always be run in a single processor instance. This simplifies testing
*/
// TODO Fix in SAMZA-1538
// @Test
public void testStreamProcessor() {
final String testSystem = "test-system";
final String inputTopic = "numbers";
final String outputTopic = "output";
final int messageCount = 20;
final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount));
// Note: createTopics needs to be called before creating a StreamProcessor. Otherwise it fails with a
// TopicExistsException since StreamProcessor auto-creates them.
createTopics(inputTopic, outputTopic);
final TestStubs stubs = new TestStubs(configs, IdentityStreamTask::new, bootstrapServers());
produceMessages(stubs.producer, inputTopic, messageCount);
run(stubs.processor, stubs.shutdownLatch);
verifyNumMessages(stubs.consumer, outputTopic, messageCount);
}
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TestStreamProcessor.java
示例10: testOnShutdown
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testOnShutdown() throws Exception {
Config conf = getConfig();
state = new SamzaApplicationState(getJobModelManagerWithoutHostAffinity(1));
ContainerProcessManager taskManager = new ContainerProcessManager(
new MapConfig(conf),
state,
new MetricsRegistryMap(),
clusterResourceManager
);
taskManager.start();
Thread allocatorThread = (Thread) getPrivateFieldFromTaskManager("allocatorThread", taskManager).get(taskManager);
assertTrue(allocatorThread.isAlive());
taskManager.stop();
assertFalse(allocatorThread.isAlive());
}
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:TestContainerProcessManager.java
示例11: getConfig
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
private static Config getConfig() {
Config config = new MapConfig(new HashMap<String, String>() {
{
put("cluster-manager.container.count", "1");
put("cluster-manager.container.retry.count", "1");
put("cluster-manager.container.retry.window.ms", "1999999999");
put("cluster-manager.container.request.timeout.ms", "3");
put("cluster-manager.allocator.sleep.ms", "1");
put("cluster-manager.container.memory.mb", "512");
put("yarn.package.path", "/foo");
put("task.inputs", "test-system.test-stream");
put("systems.test-system.samza.factory", "org.apache.samza.system.MockSystemFactory");
put("systems.test-system.samza.key.serde", "org.apache.samza.serializers.JsonSerde");
put("systems.test-system.samza.msg.serde", "org.apache.samza.serializers.JsonSerde");
put("job.host-affinity.enabled", "true");
}
});
Map<String, String> map = new HashMap<>();
map.putAll(config);
return new MapConfig(map);
}
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:TestHostAwareContainerAllocator.java
示例12: testPartitionCountMonitorWithDurableStates
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testPartitionCountMonitorWithDurableStates()
throws IllegalAccessException, InvocationTargetException, NoSuchMethodException {
configMap.put("stores.mystore.changelog", "mychangelog");
Config config = new MapConfig(configMap);
// mimic job runner code to write the config to coordinator stream
CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory();
CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
producer.writeConfig("test-job", config);
ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config);
// change the input system stream metadata
MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());
StreamPartitionCountMonitor monitor = clusterCoordinator.getPartitionMonitor();
monitor.updatePartitionCountMetric();
assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.FAILED);
}
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:TestClusterBasedJobCoordinator.java
示例13: testPartitionCountMonitorWithoutDurableStates
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testPartitionCountMonitorWithoutDurableStates() throws IllegalAccessException, InvocationTargetException {
Config config = new MapConfig(configMap);
// mimic job runner code to write the config to coordinator stream
CoordinatorStreamSystemFactory coordinatorFactory = new CoordinatorStreamSystemFactory();
CoordinatorStreamSystemProducer producer = coordinatorFactory.getCoordinatorStreamSystemProducer(config, mock(MetricsRegistry.class));
producer.writeConfig("test-job", config);
ClusterBasedJobCoordinator clusterCoordinator = new ClusterBasedJobCoordinator(config);
// change the input system stream metadata
MockSystemFactory.MSG_QUEUES.put(new SystemStreamPartition("kafka", "topic1", new Partition(1)), new ArrayList<>());
StreamPartitionCountMonitor monitor = clusterCoordinator.getPartitionMonitor();
monitor.updatePartitionCountMetric();
assertEquals(clusterCoordinator.getAppStatus(), SamzaApplicationState.SamzaAppStatus.UNDEFINED);
}
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestClusterBasedJobCoordinator.java
示例14: testStreamProcessorWithStreamTaskFactory
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
/**
* Should be able to create task instances from the provided task factory.
*/
// TODO Fix in SAMZA-1538
// @Test
public void testStreamProcessorWithStreamTaskFactory() {
final String testSystem = "test-system";
final String inputTopic = "numbers2";
final String outputTopic = "output2";
final int messageCount = 20;
final Config configs = new MapConfig(createConfigs("1", testSystem, inputTopic, outputTopic, messageCount));
createTopics(inputTopic, outputTopic);
final TestStubs stubs = new TestStubs(configs, IdentityStreamTask::new, bootstrapServers());
produceMessages(stubs.producer, inputTopic, messageCount);
run(stubs.processor, stubs.shutdownLatch);
verifyNumMessages(stubs.consumer, outputTopic, messageCount);
}
开发者ID:apache,项目名称:samza,代码行数:20,代码来源:TestStreamProcessor.java
示例15: testLocalStreamGroupedCorrectlyForYarn
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testLocalStreamGroupedCorrectlyForYarn() {
HashSet<SystemStreamPartition> allSSPs = new HashSet<>();
HashMap<String, String> configMap = new HashMap<>();
configMap.put("job.container.count", "2");
configMap.put("processor.list", "0,1");
Config config = new MapConfig(configMap);
SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config);
Collections.addAll(allSSPs, aa0, aa1, aa2, ab0);
Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<>();
HashSet<SystemStreamPartition> partitions = new HashSet<>();
partitions.add(aa0);
partitions.add(aa1);
partitions.add(aa2);
partitions.add(ab0);
expectedResult.put(new TaskName("Task-0"), partitions);
expectedResult.put(new TaskName("Task-1"), partitions);
assertEquals(expectedResult, result);
}
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TestAllSspToSingleTaskGrouper.java
示例16: testLocalStreamGroupedCorrectlyForPassthru
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testLocalStreamGroupedCorrectlyForPassthru() {
HashSet<SystemStreamPartition> allSSPs = new HashSet<>();
HashMap<String, String> configMap = new HashMap<>();
configMap.put("job.coordinator.factory", "org.apache.samza.standalone.PassthroughJobCoordinatorFactory");
configMap.put("processor.id", "1");
configMap.put("processor.list", configMap.get("processor.id"));
Config config = new MapConfig(configMap);
SystemStreamPartitionGrouper grouper = grouperFactory.getSystemStreamPartitionGrouper(config);
Collections.addAll(allSSPs, aa0, aa1, aa2, ab0);
Map<TaskName, Set<SystemStreamPartition>> result = grouper.group(allSSPs);
Map<TaskName, Set<SystemStreamPartition>> expectedResult = new HashMap<>();
HashSet<SystemStreamPartition> partitions = new HashSet<>();
partitions.add(aa0);
partitions.add(aa1);
partitions.add(aa2);
partitions.add(ab0);
expectedResult.put(new TaskName("Task-1"), partitions);
assertEquals(expectedResult, result);
}
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TestAllSspToSingleTaskGrouper.java
示例17: testShouldGroupRelevantMonitorConfigTogether
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void testShouldGroupRelevantMonitorConfigTogether() {
// Test that Monitor Loader groups relevant config together.
Map<String, String> firstMonitorConfig = ImmutableMap.of("monitor.monitor1.factory.class",
"org.apache.samza.monitor.DummyMonitor",
"monitor.monitor1.scheduling.interval.ms",
"100");
Map<String, String> secondMonitorConfig = ImmutableMap.of("monitor.monitor2.factory.class",
"org.apache.samza.monitor.DummyMonitor",
"monitor.monitor2.scheduling.interval.ms",
"200");
MapConfig mapConfig = new MapConfig(ImmutableList.of(firstMonitorConfig, secondMonitorConfig));
MonitorConfig expectedFirstConfig = new MonitorConfig(new MapConfig(firstMonitorConfig).subset("monitor.monitor1."));
MonitorConfig expectedSecondConfig = new MonitorConfig(new MapConfig(secondMonitorConfig).subset("monitor.monitor2."));
Map<String, MonitorConfig> expected = ImmutableMap.of("monitor1", expectedFirstConfig, "monitor2", expectedSecondConfig);
assertEquals(expected, MonitorConfig.getMonitorConfigs(mapConfig));
}
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:TestMonitorService.java
示例18: generateCommonStoreConfig
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
protected Map<String, String> generateCommonStoreConfig(Map<String, String> config) {
Map<String, String> storeConfig = new HashMap<>();
// We assume the configuration for serde are already generated for this table,
// so we simply carry them over to store configuration.
//
JavaTableConfig tableConfig = new JavaTableConfig(new MapConfig(config));
String keySerde = tableConfig.getKeySerde(tableSpec.getId());
storeConfig.put(String.format(StorageConfig.KEY_SERDE(), tableSpec.getId()), keySerde);
String valueSerde = tableConfig.getValueSerde(tableSpec.getId());
storeConfig.put(String.format(StorageConfig.MSG_SERDE(), tableSpec.getId()), valueSerde);
return storeConfig;
}
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:BaseLocalStoreBackedTableProvider.java
示例19: fullAction
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
@Test
public void fullAction() throws Exception {
Map<String, String> configMap = new HashMap<String, String>();
configMap.put("out", "fortunes");
Config testConfig = new MapConfig(configMap);
TaskUnitTestHarness<String, GenericRecord> testProcessTask =
new TaskUnitTestHarness<String, GenericRecord>(Class_Task_Name, false, true);
testProcessTask.start(testConfig);
assertTrue(testProcessTask.isStarted());
assertEquals(0, testProcessTask.getResult().size());
testProcessTask.inject(Dummy_Key, sampleInput());
assertEquals(1, testProcessTask.getResult().size());
testProcessTask.inject(Dummy_Key, sampleInput());
assertEquals(2, testProcessTask.getResult().size());
Fortune outMsg = (Fortune)testProcessTask.getResult().get(1).getMessage();
assertEquals(outMsg.getAnswer().toString(),
"The sons of Ragnar Lothbrok will be spoken of as long as men have tongues to speak.");
}
开发者ID:theduderog,项目名称:hello-samza-confluent,代码行数:24,代码来源:SeerTaskTest.java
示例20: SqlSystemStreamConfig
import org.apache.samza.config.MapConfig; //导入依赖的package包/类
public SqlSystemStreamConfig(String systemName, String streamName, List<String> sourceParts,
Config systemConfig) {
HashMap<String, String> streamConfigs = new HashMap<>(systemConfig);
this.systemName = systemName;
this.streamName = streamName;
this.source = getSourceFromSourceParts(sourceParts);
this.sourceParts = sourceParts;
this.systemStream = new SystemStream(systemName, streamName);
samzaRelConverterName = streamConfigs.get(CFG_SAMZA_REL_CONVERTER);
Validate.notEmpty(samzaRelConverterName,
String.format("%s is not set or empty for system %s", CFG_SAMZA_REL_CONVERTER, systemName));
relSchemaProviderName = streamConfigs.get(CFG_REL_SCHEMA_PROVIDER);
// Removing the Samza SQL specific configs to get the remaining Samza configs.
streamConfigs.remove(CFG_SAMZA_REL_CONVERTER);
streamConfigs.remove(CFG_REL_SCHEMA_PROVIDER);
config = new MapConfig(streamConfigs);
}
开发者ID:apache,项目名称:samza,代码行数:24,代码来源:SqlSystemStreamConfig.java
注:本文中的org.apache.samza.config.MapConfig类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论