本文整理汇总了Java中org.apache.samza.serializers.Serde类的典型用法代码示例。如果您正苦于以下问题:Java Serde类的具体用法?Java Serde怎么用?Java Serde使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Serde类属于org.apache.samza.serializers包,在下文中一共展示了Serde类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: join
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Override
public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
Duration ttl, String userDefinedId) {
if (otherStream.equals(this)) throw new SamzaException("Cannot join a MessageStream with itself.");
OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
String opId = this.graph.getNextOpId(OpCode.JOIN, userDefinedId);
JoinOperatorSpec<K, M, OM, JM> joinOpSpec =
OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn,
keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), opId);
this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec);
return new MessageStreamImpl<>(this.graph, joinOpSpec);
}
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:MessageStreamImpl.java
示例2: getInputStream
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Override
public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
StreamSpec streamSpec = runner.getStreamSpec(streamId);
Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId);
Preconditions.checkNotNull(serde, "serde must not be null for an input stream.");
Preconditions.checkState(!inputOperators.containsKey(streamSpec),
"getInputStream must not be called multiple times with the same streamId: " + streamId);
KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
if (outputStreams.containsKey(streamSpec)) {
OutputStreamImpl outputStream = outputStreams.get(streamSpec);
Serde keySerde = outputStream.getKeySerde();
Serde valueSerde = outputStream.getValueSerde();
Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+ "stream level, so the same key and message Serde must be used for both.", streamId));
}
boolean isKeyed = serde instanceof KVSerde;
InputOperatorSpec inputOperatorSpec =
OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(),
isKeyed, this.getNextOpId(OpCode.INPUT, null));
inputOperators.put(streamSpec, inputOperatorSpec);
return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
}
开发者ID:apache,项目名称:samza,代码行数:26,代码来源:StreamGraphImpl.java
示例3: getOutputStream
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Override
public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
StreamSpec streamSpec = runner.getStreamSpec(streamId);
Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId);
Preconditions.checkNotNull(serde, "serde must not be null for an output stream.");
Preconditions.checkState(!outputStreams.containsKey(streamSpec),
"getOutputStream must not be called multiple times with the same streamId: " + streamId);
KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
if (inputOperators.containsKey(streamSpec)) {
InputOperatorSpec inputOperatorSpec = inputOperators.get(streamSpec);
Serde keySerde = inputOperatorSpec.getKeySerde();
Serde valueSerde = inputOperatorSpec.getValueSerde();
Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
+ "stream level, so the same key and message Serde must be used for both.", streamId));
}
boolean isKeyed = serde instanceof KVSerde;
outputStreams.put(streamSpec, new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
return outputStreams.get(streamSpec);
}
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:StreamGraphImpl.java
示例4: getIntermediateStream
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
/**
* Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
* An intermediate {@link MessageStream} is both an output and an input stream.
*
* @param streamId the id of the stream to be created.
* @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
* is used.
* @param <M> the type of messages in the intermediate {@link MessageStream}
* @return the intermediate {@link MessageStreamImpl}
*/
<M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde) {
StreamSpec streamSpec = runner.getStreamSpec(streamId);
Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec),
"getIntermediateStream must not be called multiple times with the same streamId: " + streamId);
if (serde == null) {
LOGGER.info("Using default serde for intermediate stream: " + streamId);
serde = (Serde<M>) defaultSerde;
}
boolean isKeyed = serde instanceof KVSerde;
KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
InputOperatorSpec inputOperatorSpec =
OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(),
isKeyed, this.getNextOpId(OpCode.INPUT, null));
inputOperators.put(streamSpec, inputOperatorSpec);
outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec));
}
开发者ID:apache,项目名称:samza,代码行数:31,代码来源:StreamGraphImpl.java
示例5: getKVSerdes
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
Serde keySerde, valueSerde;
if (serde instanceof KVSerde) {
keySerde = ((KVSerde) serde).getKeySerde();
valueSerde = ((KVSerde) serde).getValueSerde();
} else {
keySerde = new NoOpSerde();
valueSerde = serde;
}
if (keySerde instanceof NoOpSerde) {
LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
". Keys will not be (de)serialized");
}
if (valueSerde instanceof NoOpSerde) {
LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
". Values will not be (de)serialized");
}
return KV.of(keySerde, valueSerde);
}
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:StreamGraphImpl.java
示例6: TableManager
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
/**
* Construct a table manager instance
* @param config the job configuration
* @param serdes Serde instances for tables
*/
public TableManager(Config config, Map<String, Serde<Object>> serdes) {
new JavaTableConfig(config).getTableIds().forEach(tableId -> {
// Construct the table provider
String tableProviderFactory = config.get(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableId));
// Construct the KVSerde
JavaTableConfig tableConfig = new JavaTableConfig(config);
KVSerde serde = KVSerde.of(
serdes.get(tableConfig.getKeySerde(tableId)),
serdes.get(tableConfig.getValueSerde(tableId)));
TableSpec tableSpec = new TableSpec(tableId, serde, tableProviderFactory,
config.subset(String.format(JavaTableConfig.TABLE_ID_PREFIX, tableId) + "."));
addTable(tableSpec);
logger.info("Added table " + tableSpec.getId());
});
}
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TableManager.java
示例7: testWindowWithRelaxedTypes
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testWindowWithRelaxedTypes() throws Exception {
StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
OperatorSpec mockOpSpec = mock(OperatorSpec.class);
MessageStream<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);
Function<TestMessageEnvelope, String> keyExtractor = m -> m.getKey();
FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
Supplier<Integer> initialValue = () -> 0;
// should compile since TestMessageEnvelope (input for functions) is base class of TestInputMessageEnvelope (M)
Window<TestInputMessageEnvelope, String, Integer> window =
Windows.keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator,
null, mock(Serde.class));
MessageStream<WindowPane<String, Integer>> windowedStream = inputStream.window(window, "w1");
ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();
assertTrue(registeredOpSpec instanceof WindowOperatorSpec);
assertEquals(OpCode.WINDOW, registeredOpSpec.getOpCode());
assertEquals(window, ((WindowOperatorSpec) registeredOpSpec).getWindow());
}
开发者ID:apache,项目名称:samza,代码行数:25,代码来源:TestMessageStreamImpl.java
示例8: testTriggerIntervalWithNestedTimeTriggers
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testTriggerIntervalWithNestedTimeTriggers() {
Trigger defaultTrigger = Triggers.timeSinceFirstMessage(Duration.ofMillis(150));
Trigger lateTrigger = Triggers.any(Triggers.count(6), Triggers.timeSinceFirstMessage(Duration.ofMillis(15)));
Trigger earlyTrigger = Triggers.repeat(
Triggers.any(Triggers.count(23),
Triggers.timeSinceFirstMessage(Duration.ofMillis(15)),
Triggers.any(Triggers.any(Triggers.count(6),
Triggers.timeSinceFirstMessage(Duration.ofMillis(15)),
Triggers.timeSinceFirstMessage(Duration.ofMillis(25)),
Triggers.timeSinceLastMessage(Duration.ofMillis(15))))));
WindowInternal window = new WindowInternal(defaultTrigger, null, null, null,
null, WindowType.SESSION, null, null, mock(Serde.class));
window.setEarlyTrigger(earlyTrigger);
window.setLateTrigger(lateTrigger);
WindowOperatorSpec spec = new WindowOperatorSpec(window, "0");
Assert.assertEquals(spec.getDefaultTriggerMs(), 5);
}
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:TestWindowOperatorSpec.java
示例9: testGetInputStreamWithValueSerde
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetInputStreamWithValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
Serde mockValueSerde = mock(Serde.class);
MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockValueSerde);
InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
(InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
}
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestStreamGraphImpl.java
示例10: testGetInputStreamWithKeyValueSerde
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetInputStreamWithKeyValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
KVSerde mockKVSerde = mock(KVSerde.class);
Serde mockKeySerde = mock(Serde.class);
Serde mockValueSerde = mock(Serde.class);
doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockKVSerde);
InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
(InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
}
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:TestStreamGraphImpl.java
示例11: testGetInputStreamWithDefaultValueSerde
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetInputStreamWithDefaultValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
Serde mockValueSerde = mock(Serde.class);
graph.setDefaultSerde(mockValueSerde);
MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
(InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
}
开发者ID:apache,项目名称:samza,代码行数:20,代码来源:TestStreamGraphImpl.java
示例12: testGetInputStreamWithDefaultKeyValueSerde
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetInputStreamWithDefaultKeyValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
KVSerde mockKVSerde = mock(KVSerde.class);
Serde mockKeySerde = mock(Serde.class);
Serde mockValueSerde = mock(Serde.class);
doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
graph.setDefaultSerde(mockKVSerde);
MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");
InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
(InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
}
开发者ID:apache,项目名称:samza,代码行数:24,代码来源:TestStreamGraphImpl.java
示例13: testGetOutputStreamWithValueSerde
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetOutputStreamWithValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
Serde mockValueSerde = mock(Serde.class);
OutputStream<TestMessageEnvelope> outputStream =
graph.getOutputStream("test-stream-1", mockValueSerde);
OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
}
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestStreamGraphImpl.java
示例14: testGetOutputStreamWithKeyValueSerde
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetOutputStreamWithKeyValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
KVSerde mockKVSerde = mock(KVSerde.class);
Serde mockKeySerde = mock(Serde.class);
Serde mockValueSerde = mock(Serde.class);
doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
graph.setDefaultSerde(mockKVSerde);
OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1", mockKVSerde);
OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
}
开发者ID:apache,项目名称:samza,代码行数:22,代码来源:TestStreamGraphImpl.java
示例15: testGetOutputStreamWithDefaultValueSerde
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetOutputStreamWithDefaultValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
Serde mockValueSerde = mock(Serde.class);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
graph.setDefaultSerde(mockValueSerde);
OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
}
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:TestStreamGraphImpl.java
示例16: testGetOutputStreamWithDefaultKeyValueSerde
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetOutputStreamWithDefaultKeyValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
KVSerde mockKVSerde = mock(KVSerde.class);
Serde mockKeySerde = mock(Serde.class);
Serde mockValueSerde = mock(Serde.class);
doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
graph.setDefaultSerde(mockKVSerde);
OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");
OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
}
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:TestStreamGraphImpl.java
示例17: testGetIntermediateStreamWithValueSerde
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetIntermediateStreamWithValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
String mockStreamName = "mockStreamName";
when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
Serde mockValueSerde = mock(Serde.class);
IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
graph.getIntermediateStream(mockStreamName, mockValueSerde);
assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
}
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:TestStreamGraphImpl.java
示例18: testGetIntermediateStreamWithKeyValueSerde
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetIntermediateStreamWithKeyValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
String mockStreamName = "mockStreamName";
when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
KVSerde mockKVSerde = mock(KVSerde.class);
Serde mockKeySerde = mock(Serde.class);
Serde mockValueSerde = mock(Serde.class);
doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
graph.getIntermediateStream(mockStreamName, mockKVSerde);
assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
}
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TestStreamGraphImpl.java
示例19: testGetIntermediateStreamWithDefaultValueSerde
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetIntermediateStreamWithDefaultValueSerde() {
ApplicationRunner mockRunner = mock(ApplicationRunner.class);
Config mockConfig = mock(Config.class);
StreamSpec mockStreamSpec = mock(StreamSpec.class);
String mockStreamName = "mockStreamName";
when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);
StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);
Serde mockValueSerde = mock(Serde.class);
graph.setDefaultSerde(mockValueSerde);
IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
graph.getIntermediateStream(mockStreamName, null);
assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
}
开发者ID:apache,项目名称:samza,代码行数:24,代码来源:TestStreamGraphImpl.java
示例20: setup
import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Before
public void setup() throws Exception {
config = mock(Config.class);
when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
taskContext = mock(TaskContextImpl.class);
runner = mock(ApplicationRunner.class);
Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
Serde storeValSerde = new IntegerEnvelopeSerde();
when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
.of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());
when(taskContext.getStore("jobName-jobId-window-w1"))
.thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka"));
}
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestWindowOperator.java
注:本文中的org.apache.samza.serializers.Serde类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论