• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java Serde类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.samza.serializers.Serde的典型用法代码示例。如果您正苦于以下问题:Java Serde类的具体用法?Java Serde怎么用?Java Serde使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



Serde类属于org.apache.samza.serializers包,在下文中一共展示了Serde类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: join

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Override
public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
    JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
    Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde,
    Duration ttl, String userDefinedId) {
  if (otherStream.equals(this)) throw new SamzaException("Cannot join a MessageStream with itself.");
  OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
  String opId = this.graph.getNextOpId(OpCode.JOIN, userDefinedId);
  JoinOperatorSpec<K, M, OM, JM> joinOpSpec =
      OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn,
          keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), opId);

  this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
  otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec);

  return new MessageStreamImpl<>(this.graph, joinOpSpec);
}
 
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:MessageStreamImpl.java


示例2: getInputStream

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Override
public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) {
  StreamSpec streamSpec = runner.getStreamSpec(streamId);
  Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId);
  Preconditions.checkNotNull(serde, "serde must not be null for an input stream.");
  Preconditions.checkState(!inputOperators.containsKey(streamSpec),
      "getInputStream must not be called multiple times with the same streamId: " + streamId);

  KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
  if (outputStreams.containsKey(streamSpec)) {
    OutputStreamImpl outputStream = outputStreams.get(streamSpec);
    Serde keySerde = outputStream.getKeySerde();
    Serde valueSerde = outputStream.getValueSerde();
    Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
        String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
            + "stream level, so the same key and message Serde must be used for both.", streamId));
  }

  boolean isKeyed = serde instanceof KVSerde;
  InputOperatorSpec inputOperatorSpec =
      OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(),
          isKeyed, this.getNextOpId(OpCode.INPUT, null));
  inputOperators.put(streamSpec, inputOperatorSpec);
  return new MessageStreamImpl<>(this, inputOperators.get(streamSpec));
}
 
开发者ID:apache,项目名称:samza,代码行数:26,代码来源:StreamGraphImpl.java


示例3: getOutputStream

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Override
public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) {
  StreamSpec streamSpec = runner.getStreamSpec(streamId);
  Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId);
  Preconditions.checkNotNull(serde, "serde must not be null for an output stream.");
  Preconditions.checkState(!outputStreams.containsKey(streamSpec),
      "getOutputStream must not be called multiple times with the same streamId: " + streamId);

  KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
  if (inputOperators.containsKey(streamSpec)) {
    InputOperatorSpec inputOperatorSpec = inputOperators.get(streamSpec);
    Serde keySerde = inputOperatorSpec.getKeySerde();
    Serde valueSerde = inputOperatorSpec.getValueSerde();
    Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde),
        String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at "
            + "stream level, so the same key and message Serde must be used for both.", streamId));
  }

  boolean isKeyed = serde instanceof KVSerde;
  outputStreams.put(streamSpec, new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
  return outputStreams.get(streamSpec);
}
 
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:StreamGraphImpl.java


示例4: getIntermediateStream

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
/**
 * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph.
 * An intermediate {@link MessageStream} is both an output and an input stream.
 *
 * @param streamId the id of the stream to be created.
 * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde
 *              is used.
 * @param <M> the type of messages in the intermediate {@link MessageStream}
 * @return  the intermediate {@link MessageStreamImpl}
 */
<M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde) {
  StreamSpec streamSpec = runner.getStreamSpec(streamId);

  Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec),
      "getIntermediateStream must not be called multiple times with the same streamId: " + streamId);

  if (serde == null) {
    LOGGER.info("Using default serde for intermediate stream: " + streamId);
    serde = (Serde<M>) defaultSerde;
  }

  boolean isKeyed = serde instanceof KVSerde;
  KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde);
  InputOperatorSpec inputOperatorSpec =
      OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(),
          isKeyed, this.getNextOpId(OpCode.INPUT, null));
  inputOperators.put(streamSpec, inputOperatorSpec);
  outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed));
  return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec));
}
 
开发者ID:apache,项目名称:samza,代码行数:31,代码来源:StreamGraphImpl.java


示例5: getKVSerdes

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) {
  Serde keySerde, valueSerde;

  if (serde instanceof KVSerde) {
    keySerde = ((KVSerde) serde).getKeySerde();
    valueSerde = ((KVSerde) serde).getValueSerde();
  } else {
    keySerde = new NoOpSerde();
    valueSerde = serde;
  }

  if (keySerde instanceof NoOpSerde) {
    LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId +
        ". Keys will not be (de)serialized");
  }
  if (valueSerde instanceof NoOpSerde) {
    LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId +
        ". Values will not be (de)serialized");
  }

  return KV.of(keySerde, valueSerde);
}
 
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:StreamGraphImpl.java


示例6: TableManager

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
/**
 * Construct a table manager instance
 * @param config the job configuration
 * @param serdes Serde instances for tables
 */
public TableManager(Config config, Map<String, Serde<Object>> serdes) {

  new JavaTableConfig(config).getTableIds().forEach(tableId -> {

      // Construct the table provider
      String tableProviderFactory = config.get(String.format(JavaTableConfig.TABLE_PROVIDER_FACTORY, tableId));

      // Construct the KVSerde
      JavaTableConfig tableConfig = new JavaTableConfig(config);
      KVSerde serde = KVSerde.of(
          serdes.get(tableConfig.getKeySerde(tableId)),
          serdes.get(tableConfig.getValueSerde(tableId)));

      TableSpec tableSpec = new TableSpec(tableId, serde, tableProviderFactory,
          config.subset(String.format(JavaTableConfig.TABLE_ID_PREFIX, tableId) + "."));

      addTable(tableSpec);

      logger.info("Added table " + tableSpec.getId());
    });
}
 
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TableManager.java


示例7: testWindowWithRelaxedTypes

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testWindowWithRelaxedTypes() throws Exception {
  StreamGraphImpl mockGraph = mock(StreamGraphImpl.class);
  OperatorSpec mockOpSpec = mock(OperatorSpec.class);
  MessageStream<TestInputMessageEnvelope> inputStream = new MessageStreamImpl<>(mockGraph, mockOpSpec);

  Function<TestMessageEnvelope, String> keyExtractor = m -> m.getKey();
  FoldLeftFunction<TestMessageEnvelope, Integer> aggregator = (m, c) -> c + 1;
  Supplier<Integer> initialValue = () -> 0;

  // should compile since TestMessageEnvelope (input for functions) is base class of TestInputMessageEnvelope (M)
  Window<TestInputMessageEnvelope, String, Integer> window =
      Windows.keyedTumblingWindow(keyExtractor, Duration.ofHours(1), initialValue, aggregator,
          null, mock(Serde.class));
  MessageStream<WindowPane<String, Integer>> windowedStream = inputStream.window(window, "w1");

  ArgumentCaptor<OperatorSpec> registeredOpCaptor = ArgumentCaptor.forClass(OperatorSpec.class);
  verify(mockOpSpec).registerNextOperatorSpec(registeredOpCaptor.capture());
  OperatorSpec<?, TestMessageEnvelope> registeredOpSpec = registeredOpCaptor.getValue();

  assertTrue(registeredOpSpec instanceof WindowOperatorSpec);
  assertEquals(OpCode.WINDOW, registeredOpSpec.getOpCode());
  assertEquals(window, ((WindowOperatorSpec) registeredOpSpec).getWindow());
}
 
开发者ID:apache,项目名称:samza,代码行数:25,代码来源:TestMessageStreamImpl.java


示例8: testTriggerIntervalWithNestedTimeTriggers

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testTriggerIntervalWithNestedTimeTriggers() {
  Trigger defaultTrigger = Triggers.timeSinceFirstMessage(Duration.ofMillis(150));
  Trigger lateTrigger = Triggers.any(Triggers.count(6), Triggers.timeSinceFirstMessage(Duration.ofMillis(15)));
  Trigger earlyTrigger = Triggers.repeat(
      Triggers.any(Triggers.count(23),
          Triggers.timeSinceFirstMessage(Duration.ofMillis(15)),
          Triggers.any(Triggers.any(Triggers.count(6),
              Triggers.timeSinceFirstMessage(Duration.ofMillis(15)),
              Triggers.timeSinceFirstMessage(Duration.ofMillis(25)),
              Triggers.timeSinceLastMessage(Duration.ofMillis(15))))));

  WindowInternal window = new WindowInternal(defaultTrigger, null, null, null,
          null, WindowType.SESSION, null, null, mock(Serde.class));
  window.setEarlyTrigger(earlyTrigger);
  window.setLateTrigger(lateTrigger);

  WindowOperatorSpec spec = new WindowOperatorSpec(window, "0");
  Assert.assertEquals(spec.getDefaultTriggerMs(), 5);
}
 
开发者ID:apache,项目名称:samza,代码行数:21,代码来源:TestWindowOperatorSpec.java


示例9: testGetInputStreamWithValueSerde

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetInputStreamWithValueSerde() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  StreamSpec mockStreamSpec = mock(StreamSpec.class);
  when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));

  Serde mockValueSerde = mock(Serde.class);
  MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockValueSerde);

  InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
      (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
  assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
  assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
  assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
  assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
  assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
}
 
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestStreamGraphImpl.java


示例10: testGetInputStreamWithKeyValueSerde

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetInputStreamWithKeyValueSerde() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  StreamSpec mockStreamSpec = mock(StreamSpec.class);
  when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));

  KVSerde mockKVSerde = mock(KVSerde.class);
  Serde mockKeySerde = mock(Serde.class);
  Serde mockValueSerde = mock(Serde.class);
  doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
  doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
  MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1", mockKVSerde);

  InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
      (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
  assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
  assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
  assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
  assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
  assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
}
 
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:TestStreamGraphImpl.java


示例11: testGetInputStreamWithDefaultValueSerde

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetInputStreamWithDefaultValueSerde() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  StreamSpec mockStreamSpec = mock(StreamSpec.class);
  when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));

  Serde mockValueSerde = mock(Serde.class);
  graph.setDefaultSerde(mockValueSerde);
  MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");

  InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
      (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
  assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
  assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
  assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
  assertTrue(inputOpSpec.getKeySerde() instanceof NoOpSerde);
  assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
}
 
开发者ID:apache,项目名称:samza,代码行数:20,代码来源:TestStreamGraphImpl.java


示例12: testGetInputStreamWithDefaultKeyValueSerde

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetInputStreamWithDefaultKeyValueSerde() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  StreamSpec mockStreamSpec = mock(StreamSpec.class);
  when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);
  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));

  KVSerde mockKVSerde = mock(KVSerde.class);
  Serde mockKeySerde = mock(Serde.class);
  Serde mockValueSerde = mock(Serde.class);
  doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
  doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
  graph.setDefaultSerde(mockKVSerde);
  MessageStream<TestMessageEnvelope> inputStream = graph.getInputStream("test-stream-1");

  InputOperatorSpec<String, TestMessageEnvelope> inputOpSpec =
      (InputOperatorSpec) ((MessageStreamImpl<TestMessageEnvelope>) inputStream).getOperatorSpec();
  assertEquals(OpCode.INPUT, inputOpSpec.getOpCode());
  assertEquals(graph.getInputOperators().get(mockStreamSpec), inputOpSpec);
  assertEquals(mockStreamSpec, inputOpSpec.getStreamSpec());
  assertEquals(mockKeySerde, inputOpSpec.getKeySerde());
  assertEquals(mockValueSerde, inputOpSpec.getValueSerde());
}
 
开发者ID:apache,项目名称:samza,代码行数:24,代码来源:TestStreamGraphImpl.java


示例13: testGetOutputStreamWithValueSerde

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetOutputStreamWithValueSerde() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  StreamSpec mockStreamSpec = mock(StreamSpec.class);
  when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);

  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));

  Serde mockValueSerde = mock(Serde.class);
  OutputStream<TestMessageEnvelope> outputStream =
      graph.getOutputStream("test-stream-1", mockValueSerde);

  OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
  assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
  assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
  assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
  assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
}
 
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestStreamGraphImpl.java


示例14: testGetOutputStreamWithKeyValueSerde

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetOutputStreamWithKeyValueSerde() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  StreamSpec mockStreamSpec = mock(StreamSpec.class);
  when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);

  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
  KVSerde mockKVSerde = mock(KVSerde.class);
  Serde mockKeySerde = mock(Serde.class);
  Serde mockValueSerde = mock(Serde.class);
  doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
  doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
  graph.setDefaultSerde(mockKVSerde);
  OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1", mockKVSerde);

  OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
  assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
  assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
  assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
  assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
}
 
开发者ID:apache,项目名称:samza,代码行数:22,代码来源:TestStreamGraphImpl.java


示例15: testGetOutputStreamWithDefaultValueSerde

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetOutputStreamWithDefaultValueSerde() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  StreamSpec mockStreamSpec = mock(StreamSpec.class);
  when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);

  Serde mockValueSerde = mock(Serde.class);
  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
  graph.setDefaultSerde(mockValueSerde);
  OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");

  OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
  assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
  assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
  assertTrue(outputStreamImpl.getKeySerde() instanceof NoOpSerde);
  assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
}
 
开发者ID:apache,项目名称:samza,代码行数:18,代码来源:TestStreamGraphImpl.java


示例16: testGetOutputStreamWithDefaultKeyValueSerde

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetOutputStreamWithDefaultKeyValueSerde() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  StreamSpec mockStreamSpec = mock(StreamSpec.class);
  when(mockRunner.getStreamSpec("test-stream-1")).thenReturn(mockStreamSpec);

  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mock(Config.class));
  KVSerde mockKVSerde = mock(KVSerde.class);
  Serde mockKeySerde = mock(Serde.class);
  Serde mockValueSerde = mock(Serde.class);
  doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
  doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
  graph.setDefaultSerde(mockKVSerde);

  OutputStream<TestMessageEnvelope> outputStream = graph.getOutputStream("test-stream-1");

  OutputStreamImpl<TestMessageEnvelope> outputStreamImpl = (OutputStreamImpl) outputStream;
  assertEquals(graph.getOutputStreams().get(mockStreamSpec), outputStreamImpl);
  assertEquals(mockStreamSpec, outputStreamImpl.getStreamSpec());
  assertEquals(mockKeySerde, outputStreamImpl.getKeySerde());
  assertEquals(mockValueSerde, outputStreamImpl.getValueSerde());
}
 
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:TestStreamGraphImpl.java


示例17: testGetIntermediateStreamWithValueSerde

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetIntermediateStreamWithValueSerde() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  Config mockConfig = mock(Config.class);
  StreamSpec mockStreamSpec = mock(StreamSpec.class);
  String mockStreamName = "mockStreamName";
  when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);

  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);

  Serde mockValueSerde = mock(Serde.class);
  IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
      graph.getIntermediateStream(mockStreamName, mockValueSerde);

  assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
  assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
  assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
  assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
  assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
  assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
  assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
}
 
开发者ID:apache,项目名称:samza,代码行数:23,代码来源:TestStreamGraphImpl.java


示例18: testGetIntermediateStreamWithKeyValueSerde

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetIntermediateStreamWithKeyValueSerde() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  Config mockConfig = mock(Config.class);
  StreamSpec mockStreamSpec = mock(StreamSpec.class);
  String mockStreamName = "mockStreamName";
  when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);

  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);

  KVSerde mockKVSerde = mock(KVSerde.class);
  Serde mockKeySerde = mock(Serde.class);
  Serde mockValueSerde = mock(Serde.class);
  doReturn(mockKeySerde).when(mockKVSerde).getKeySerde();
  doReturn(mockValueSerde).when(mockKVSerde).getValueSerde();
  IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
      graph.getIntermediateStream(mockStreamName, mockKVSerde);

  assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
  assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
  assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
  assertEquals(mockKeySerde, intermediateStreamImpl.getOutputStream().getKeySerde());
  assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
  assertEquals(mockKeySerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde());
  assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
}
 
开发者ID:apache,项目名称:samza,代码行数:27,代码来源:TestStreamGraphImpl.java


示例19: testGetIntermediateStreamWithDefaultValueSerde

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Test
public void testGetIntermediateStreamWithDefaultValueSerde() {
  ApplicationRunner mockRunner = mock(ApplicationRunner.class);
  Config mockConfig = mock(Config.class);
  StreamSpec mockStreamSpec = mock(StreamSpec.class);
  String mockStreamName = "mockStreamName";
  when(mockRunner.getStreamSpec(mockStreamName)).thenReturn(mockStreamSpec);

  StreamGraphImpl graph = new StreamGraphImpl(mockRunner, mockConfig);

  Serde mockValueSerde = mock(Serde.class);
  graph.setDefaultSerde(mockValueSerde);
  IntermediateMessageStreamImpl<TestMessageEnvelope> intermediateStreamImpl =
      graph.getIntermediateStream(mockStreamName, null);

  assertEquals(graph.getInputOperators().get(mockStreamSpec), intermediateStreamImpl.getOperatorSpec());
  assertEquals(graph.getOutputStreams().get(mockStreamSpec), intermediateStreamImpl.getOutputStream());
  assertEquals(mockStreamSpec, intermediateStreamImpl.getStreamSpec());
  assertTrue(intermediateStreamImpl.getOutputStream().getKeySerde() instanceof NoOpSerde);
  assertEquals(mockValueSerde, intermediateStreamImpl.getOutputStream().getValueSerde());
  assertTrue(((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getKeySerde() instanceof NoOpSerde);
  assertEquals(mockValueSerde, ((InputOperatorSpec) intermediateStreamImpl.getOperatorSpec()).getValueSerde());
}
 
开发者ID:apache,项目名称:samza,代码行数:24,代码来源:TestStreamGraphImpl.java


示例20: setup

import org.apache.samza.serializers.Serde; //导入依赖的package包/类
@Before
public void setup() throws Exception {
  config = mock(Config.class);
  when(config.get(JobConfig.JOB_NAME())).thenReturn("jobName");
  when(config.get(eq(JobConfig.JOB_ID()), anyString())).thenReturn("jobId");
  taskContext = mock(TaskContextImpl.class);
  runner = mock(ApplicationRunner.class);
  Serde storeKeySerde = new TimeSeriesKeySerde(new IntegerSerde());
  Serde storeValSerde = new IntegerEnvelopeSerde();

  when(taskContext.getSystemStreamPartitions()).thenReturn(ImmutableSet
      .of(new SystemStreamPartition("kafka", "integers", new Partition(0))));
  when(taskContext.getMetricsRegistry()).thenReturn(new MetricsRegistryMap());

  when(taskContext.getStore("jobName-jobId-window-w1"))
      .thenReturn(new TestInMemoryStore<>(storeKeySerde, storeValSerde));
  when(runner.getStreamSpec("integers")).thenReturn(new StreamSpec("integers", "integers", "kafka"));
}
 
开发者ID:apache,项目名称:samza,代码行数:19,代码来源:TestWindowOperator.java



注:本文中的org.apache.samza.serializers.Serde类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java Jade4J类代码示例发布时间:2022-05-22
下一篇:
Java CDOTransaction类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap