• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java SchemaAndValue类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.kafka.connect.data.SchemaAndValue的典型用法代码示例。如果您正苦于以下问题:Java SchemaAndValue类的具体用法?Java SchemaAndValue怎么用?Java SchemaAndValue使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



SchemaAndValue类属于org.apache.kafka.connect.data包,在下文中一共展示了SchemaAndValue类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: toSourceRecord

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
/**
 * Convert a message into a Kafka Connect SourceRecord.
 * 
 * @param context            the JMS context to use for building messages
 * @param topic              the Kafka topic
 * @param messageBodyJms     whether to interpret MQ messages as JMS messages
 * @param message            the message
 * 
 * @return the Kafka Connect SourceRecord
 * 
 * @throws JMSException      Message could not be converted
 */
@Override public SourceRecord toSourceRecord(JMSContext context, String topic, boolean messageBodyJms, Message message) throws JMSException {
    byte[] payload;
    if (message instanceof BytesMessage) {
        payload = message.getBody(byte[].class);
    }
    else if (message instanceof TextMessage) {
        String s = message.getBody(String.class);
        payload = s.getBytes(UTF_8);
    }
    else {
        log.error("Unsupported JMS message type {}", message.getClass());
        throw new ConnectException("Unsupported JMS message type");
    }

    SchemaAndValue sv = converter.toConnectData(topic, payload);
    return new SourceRecord(null, null, topic, sv.schema(), sv.value());
}
 
开发者ID:ibm-messaging,项目名称:kafka-connect-mq-source,代码行数:30,代码来源:JsonRecordBuilder.java


示例2: toConnectData

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
    JsonNode jsonValue;
    try {
        jsonValue = deserializer.deserialize(topic, value);
    } catch (SerializationException e) {
        throw new DataException("Converting byte[] to Kafka Connect data failed due to serialization error: ", e);
    }

    if (enableSchemas && (jsonValue == null || !jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has("schema") || !jsonValue.has("payload")))
        throw new DataException("JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields." +
                " If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.");

    // The deserialized data should either be an envelope object containing the schema and the payload or the schema
    // was stripped during serialization and we need to fill in an all-encompassing schema.
    if (!enableSchemas) {
        ObjectNode envelope = JsonNodeFactory.instance.objectNode();
        envelope.set("schema", null);
        envelope.set("payload", jsonValue);
        jsonValue = envelope;
    }

    return jsonToConnect(jsonValue);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:25,代码来源:JsonConverter.java


示例3: convertMessages

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
private void convertMessages(ConsumerRecords<byte[], byte[]> msgs) {
    for (ConsumerRecord<byte[], byte[]> msg : msgs) {
        log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
        SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
        SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
        SinkRecord record = new SinkRecord(msg.topic(), msg.partition(),
                keyAndSchema.schema(), keyAndSchema.value(),
                valueAndSchema.schema(), valueAndSchema.value(),
                msg.offset(),
                ConnectUtils.checkAndConvertTimestamp(msg.timestamp()),
                msg.timestampType());
        record = transformationChain.apply(record);
        if (record != null) {
            messageBatch.add(record);
        }
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:WorkerSinkTask.java


示例4: parseConnectorStatus

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
private ConnectorStatus parseConnectorStatus(String connector, byte[] data) {
    try {
        SchemaAndValue schemaAndValue = converter.toConnectData(topic, data);
        if (!(schemaAndValue.value() instanceof Map)) {
            log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
            return null;
        }

        @SuppressWarnings("unchecked")
        Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
        TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
        String trace = (String) statusMap.get(TRACE_KEY_NAME);
        String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
        int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
        return new ConnectorStatus(connector, state, trace, workerUrl, generation);
    } catch (Exception e) {
        log.error("Failed to deserialize connector status", e);
        return null;
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:21,代码来源:KafkaStatusBackingStore.java


示例5: parseTaskStatus

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
private TaskStatus parseTaskStatus(ConnectorTaskId taskId, byte[] data) {
    try {
        SchemaAndValue schemaAndValue = converter.toConnectData(topic, data);
        if (!(schemaAndValue.value() instanceof Map)) {
            log.error("Invalid connector status type {}", schemaAndValue.value().getClass());
            return null;
        }
        @SuppressWarnings("unchecked")
        Map<String, Object> statusMap = (Map<String, Object>) schemaAndValue.value();
        TaskStatus.State state = TaskStatus.State.valueOf((String) statusMap.get(STATE_KEY_NAME));
        String trace = (String) statusMap.get(TRACE_KEY_NAME);
        String workerUrl = (String) statusMap.get(WORKER_ID_KEY_NAME);
        int generation = ((Long) statusMap.get(GENERATION_KEY_NAME)).intValue();
        return new TaskStatus(taskId, state, workerUrl, generation, trace);
    } catch (Exception e) {
        log.error("Failed to deserialize task status", e);
        return null;
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KafkaStatusBackingStore.java


示例6: expectOnePoll

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@SuppressWarnings("unchecked")
private IExpectationSetters<Object> expectOnePoll() {
    // Currently the SinkTask's put() method will not be invoked unless we provide some data, so instead of
    // returning empty data, we return one record. The expectation is that the data will be ignored by the
    // response behavior specified using the return value of this method.
    EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer(
            new IAnswer<ConsumerRecords<byte[], byte[]>>() {
                @Override
                public ConsumerRecords<byte[], byte[]> answer() throws Throwable {
                    // "Sleep" so time will progress
                    time.sleep(1L);
                    ConsumerRecords<byte[], byte[]> records = new ConsumerRecords<>(
                            Collections.singletonMap(
                                    new TopicPartition(TOPIC, PARTITION),
                                    Arrays.asList(
                                            new ConsumerRecord<>(TOPIC, PARTITION, FIRST_OFFSET + recordsReturned, TIMESTAMP, TIMESTAMP_TYPE, 0L, 0, 0, RAW_KEY, RAW_VALUE)
                                    )));
                    recordsReturned++;
                    return records;
                }
            });
    EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY));
    EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE));
    sinkTask.put(EasyMock.anyObject(Collection.class));
    return EasyMock.expectLastCall();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:27,代码来源:WorkerSinkTaskThreadedTest.java


示例7: readConnectorState

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void readConnectorState() {
    byte[] value = new byte[0];

    KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
    Converter converter = mock(Converter.class);
    KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);

    Map<String, Object> statusMap = new HashMap<>();
    statusMap.put("worker_id", WORKER_ID);
    statusMap.put("state", "RUNNING");
    statusMap.put("generation", 0L);

    expect(converter.toConnectData(STATUS_TOPIC, value))
            .andReturn(new SchemaAndValue(null, statusMap));

    replayAll();

    store.read(consumerRecord(0, "status-connector-conn", value));

    ConnectorStatus status = new ConnectorStatus(CONNECTOR, ConnectorStatus.State.RUNNING, WORKER_ID, 0);
    assertEquals(status, store.get(CONNECTOR));

    verifyAll();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:KafkaStatusBackingStoreTest.java


示例8: readTaskState

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void readTaskState() {
    byte[] value = new byte[0];

    KafkaBasedLog<String, byte[]> kafkaBasedLog = mock(KafkaBasedLog.class);
    Converter converter = mock(Converter.class);
    KafkaStatusBackingStore store = new KafkaStatusBackingStore(new MockTime(), converter, STATUS_TOPIC, kafkaBasedLog);

    Map<String, Object> statusMap = new HashMap<>();
    statusMap.put("worker_id", WORKER_ID);
    statusMap.put("state", "RUNNING");
    statusMap.put("generation", 0L);

    expect(converter.toConnectData(STATUS_TOPIC, value))
            .andReturn(new SchemaAndValue(null, statusMap));

    replayAll();

    store.read(consumerRecord(0, "status-task-conn-0", value));

    TaskStatus status = new TaskStatus(TASK, TaskStatus.State.RUNNING, WORKER_ID, 0);
    assertEquals(status, store.get(TASK));

    verifyAll();
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:26,代码来源:KafkaStatusBackingStoreTest.java


示例9: expectStart

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
                         final Map<byte[], Struct> deserializations) throws Exception {
    storeLog.start();
    PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
        @Override
        public Object answer() throws Throwable {
            for (ConsumerRecord<String, byte[]> rec : preexistingRecords)
                capturedConsumedCallback.getValue().onCompletion(null, rec);
            return null;
        }
    });
    for (Map.Entry<byte[], Struct> deserializationEntry : deserializations.entrySet()) {
        // Note null schema because default settings for internal serialization are schema-less
        EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(deserializationEntry.getKey())))
                .andReturn(new SchemaAndValue(null, structToMap(deserializationEntry.getValue())));
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:18,代码来源:KafkaConfigBackingStoreTest.java


示例10: expectConvertWriteRead

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
                                    final String dataFieldName, final Object dataFieldValue) {
    final Capture<Struct> capturedRecord = EasyMock.newCapture();
    if (serialized != null)
        EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
                .andReturn(serialized);
    storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
    PowerMock.expectLastCall();
    EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
            .andAnswer(new IAnswer<SchemaAndValue>() {
                @Override
                public SchemaAndValue answer() throws Throwable {
                    if (dataFieldName != null)
                        assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName));
                    // Note null schema because default settings for internal serialization are schema-less
                    return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue()));
                }
            });
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:20,代码来源:KafkaConfigBackingStoreTest.java


示例11: deserializeImpl

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
protected Struct deserializeImpl(String topic, byte[] payload) {
	/*
	 * if (isKey){ return deserializeKey(topic, op); }else{ return
	 * deserializeVal(topic, op); }
	 */
	if (!isKey && payload == null){
		// A delete mutation... 
		return null;
	}
	try {
		SchemaAndValue res = converter.toConnectData(topic, payload);
		Schema schema = res.schema();
		Object val = res.value();
		Struct struct = (Struct) val;
		// Schema will be null for delete mutation vals
		if (schema != struct.schema()) {
			throw new SerializationException(
					"Object schema doesn't match given schema");
		}
		return struct;
	} catch (RuntimeException e) {
		throw new SerializationException(
				"Error deserializing Avro message  ", e);
	}

}
 
开发者ID:rogers,项目名称:change-data-capture,代码行数:27,代码来源:SpecificAvroMutationDeserializer.java


示例12: toConnectData

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
/** {@inheritDoc} */
@Override public SchemaAndValue toConnectData(String topic, byte[] bytes) {
    CacheEvent evt;

    try {
        evt = deserializer.deserialize(topic, bytes);
    }
    catch (SerializationException e) {
        throw new DataException("Failed to convert to Kafka Connect data due to a serialization error", e);
    }

    if (evt == null) {
        return SchemaAndValue.NULL;
    }
    return new SchemaAndValue(null, evt);
}
 
开发者ID:apache,项目名称:ignite,代码行数:17,代码来源:CacheEventConverter.java


示例13: apply

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Override
public R apply(R r) {
  final SchemaAndValue headerValue = extractHeader(r);

  return r.newRecord(
      r.topic(),
      r.kafkaPartition(),
      headerValue.schema(),
      headerValue.value(),
      r.valueSchema(),
      r.value(),
      r.timestamp()
  );
}
 
开发者ID:jcustenborder,项目名称:kafka-connect-rabbitmq,代码行数:15,代码来源:ExtractHeader.java


示例14: toConnectData

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Override
public SchemaAndValue toConnectData(String topic, byte[] value) {
    try {
        return new SchemaAndValue(Schema.OPTIONAL_STRING_SCHEMA, deserializer.deserialize(topic, value));
    } catch (SerializationException e) {
        throw new DataException("Failed to deserialize string: ", e);
    }
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:StringConverter.java


示例15: testBytesToStringNonUtf8Encoding

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void testBytesToStringNonUtf8Encoding() throws UnsupportedEncodingException {
    converter.configure(Collections.singletonMap("converter.encoding", "UTF-16"), true);
    SchemaAndValue data = converter.toConnectData(TOPIC, SAMPLE_STRING.getBytes("UTF-16"));
    assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
    assertEquals(SAMPLE_STRING, data.value());
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:8,代码来源:StringConverterTest.java


示例16: jsonToConnect

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
private SchemaAndValue jsonToConnect(JsonNode jsonValue) {
    if (jsonValue == null)
        return SchemaAndValue.NULL;

    if (!jsonValue.isObject() || jsonValue.size() != 2 || !jsonValue.has(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME) || !jsonValue.has(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME))
        throw new DataException("JSON value converted to Kafka Connect must be in envelope containing schema");

    Schema schema = asConnectSchema(jsonValue.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
    return new SchemaAndValue(schema, convertToConnect(schema, jsonValue.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME)));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:JsonConverter.java


示例17: testConnectSchemaMetadataTranslation

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void testConnectSchemaMetadataTranslation() {
    // this validates the non-type fields are translated and handled properly
    assertEquals(new SchemaAndValue(Schema.BOOLEAN_SCHEMA, true), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\" }, \"payload\": true }".getBytes()));
    assertEquals(new SchemaAndValue(Schema.OPTIONAL_BOOLEAN_SCHEMA, null), converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": true }, \"payload\": null }".getBytes()));
    assertEquals(new SchemaAndValue(SchemaBuilder.bool().defaultValue(true).build(), true),
            converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"default\": true }, \"payload\": null }".getBytes()));
    assertEquals(new SchemaAndValue(SchemaBuilder.bool().required().name("bool").version(2).doc("the documentation").parameter("foo", "bar").build(), true),
            converter.toConnectData(TOPIC, "{ \"schema\": { \"type\": \"boolean\", \"optional\": false, \"name\": \"bool\", \"version\": 2, \"doc\": \"the documentation\", \"parameters\": { \"foo\": \"bar\" }}, \"payload\": true }".getBytes()));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:11,代码来源:JsonConverterTest.java


示例18: bytesToConnect

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void bytesToConnect() throws UnsupportedEncodingException {
    ByteBuffer reference = ByteBuffer.wrap("test-string".getBytes("UTF-8"));
    String msg = "{ \"schema\": { \"type\": \"bytes\" }, \"payload\": \"dGVzdC1zdHJpbmc=\" }";
    SchemaAndValue schemaAndValue = converter.toConnectData(TOPIC, msg.getBytes());
    ByteBuffer converted = ByteBuffer.wrap((byte[]) schemaAndValue.value());
    assertEquals(reference, converted);
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:JsonConverterTest.java


示例19: mapToConnectStringKeys

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void mapToConnectStringKeys() {
    byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"string\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": { \"key1\": 12, \"key2\": 15} }".getBytes();
    Map<String, Integer> expected = new HashMap<>();
    expected.put("key1", 12);
    expected.put("key2", 15);
    assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toConnectData(TOPIC, mapJson));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:JsonConverterTest.java


示例20: mapToConnectNonStringKeys

import org.apache.kafka.connect.data.SchemaAndValue; //导入依赖的package包/类
@Test
public void mapToConnectNonStringKeys() {
    byte[] mapJson = "{ \"schema\": { \"type\": \"map\", \"keys\": { \"type\" : \"int32\" }, \"values\": { \"type\" : \"int32\" } }, \"payload\": [ [1, 12], [2, 15] ] }".getBytes();
    Map<Integer, Integer> expected = new HashMap<>();
    expected.put(1, 12);
    expected.put(2, 15);
    assertEquals(new SchemaAndValue(SchemaBuilder.map(Schema.INT32_SCHEMA, Schema.INT32_SCHEMA).build(), expected), converter.toConnectData(TOPIC, mapJson));
}
 
开发者ID:YMCoding,项目名称:kafka-0.11.0.0-src-with-comment,代码行数:9,代码来源:JsonConverterTest.java



注:本文中的org.apache.kafka.connect.data.SchemaAndValue类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java AutoPopulatingList类代码示例发布时间:2022-05-23
下一篇:
Java MavenId类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap