本文整理汇总了Java中org.apache.flink.core.memory.DataOutputViewStreamWrapper类的典型用法代码示例。如果您正苦于以下问题:Java DataOutputViewStreamWrapper类的具体用法?Java DataOutputViewStreamWrapper怎么用?Java DataOutputViewStreamWrapper使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
DataOutputViewStreamWrapper类属于org.apache.flink.core.memory包,在下文中一共展示了DataOutputViewStreamWrapper类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: update
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
@Override
public void update(V value) throws IOException {
if (value == null) {
clear();
return;
}
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();
keySerializationStream.reset();
valueSerializer.serialize(value, out);
backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:RocksDBValueState.java
示例2: add
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
@Override
public void add(V value) throws IOException {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, key);
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
if (valueBytes == null) {
keySerializationStream.reset();
valueSerializer.serialize(value, out);
backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} else {
V oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStream(valueBytes)));
V newValue = reduceFunction.reduce(oldValue, value);
keySerializationStream.reset();
valueSerializer.serialize(newValue, out);
backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
}
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:RocksDBReducingState.java
示例3: add
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
@Override
public void add(T value) throws IOException {
try {
writeCurrentKeyWithGroupAndNamespace();
byte[] key = keySerializationStream.toByteArray();
byte[] valueBytes = backend.db.get(columnFamily, key);
DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(keySerializationStream);
if (valueBytes == null) {
keySerializationStream.reset();
valueSerializer.serialize(foldFunction.fold(stateDesc.getDefaultValue(), value), out);
backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
} else {
ACC oldValue = valueSerializer.deserialize(new DataInputViewStreamWrapper(new ByteArrayInputStreamWithPos(valueBytes)));
ACC newValue = foldFunction.fold(oldValue, value);
keySerializationStream.reset();
valueSerializer.serialize(newValue, out);
backend.db.put(columnFamily, writeOptions, key, keySerializationStream.toByteArray());
}
} catch (Exception e) {
throw new RuntimeException("Error while adding data to RocksDB", e);
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:RocksDBFoldingState.java
示例4: AbstractRocksDBState
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
/**
* Creates a new RocksDB backed state.
* @param namespaceSerializer The serializer for the namespace.
*/
protected AbstractRocksDBState(
ColumnFamilyHandle columnFamily,
TypeSerializer<N> namespaceSerializer,
SD stateDesc,
RocksDBKeyedStateBackend<K> backend) {
this.namespaceSerializer = namespaceSerializer;
this.backend = backend;
this.columnFamily = columnFamily;
writeOptions = new WriteOptions();
writeOptions.setDisableWAL(true);
this.stateDesc = Preconditions.checkNotNull(stateDesc, "State Descriptor");
this.keySerializationStream = new ByteArrayOutputStreamWithPos(128);
this.keySerializationDataOutputView = new DataOutputViewStreamWrapper(keySerializationStream);
this.ambiguousKeyPossible = (backend.getKeySerializer().getLength() < 0)
&& (namespaceSerializer.getLength() < 0);
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:AbstractRocksDBState.java
示例5: getSerializedValue
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
@Override
@SuppressWarnings("unchecked")
public byte[] getSerializedValue(byte[] serializedKeyAndNamespace) throws Exception {
Preconditions.checkNotNull(serializedKeyAndNamespace, "Serialized key and namespace");
//TODO make KvStateSerializer key-group aware to save this round trip and key-group computation
Tuple2<K, N> des = KvStateSerializer.<K, N>deserializeKeyAndNamespace(
serializedKeyAndNamespace,
backend.getKeySerializer(),
namespaceSerializer);
int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, backend.getNumberOfKeyGroups());
// we cannot reuse the keySerializationStream member since this method
// is called concurrently to the other ones and it may thus contain garbage
ByteArrayOutputStreamWithPos tmpKeySerializationStream = new ByteArrayOutputStreamWithPos(128);
DataOutputViewStreamWrapper tmpKeySerializationDateDataOutputView = new DataOutputViewStreamWrapper(tmpKeySerializationStream);
writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
tmpKeySerializationStream, tmpKeySerializationDateDataOutputView);
return backend.db.get(columnFamily, tmpKeySerializationStream.toByteArray());
}
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:AbstractRocksDBState.java
示例6: writeSerializersAndConfigsWithResilience
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
/**
* Write a list of serializers and their corresponding config snapshots to the provided
* data output view. This method writes in a fault tolerant way, so that when read again
* using {@link #readSerializersAndConfigsWithResilience(DataInputView, ClassLoader)}, if
* deserialization of the serializer fails, its configuration snapshot will remain intact.
*
* <p>Specifically, all written serializers and their config snapshots are indexed by their
* offset positions within the serialized bytes. The serialization format is as follows:
* <ul>
* <li>1. number of serializer and configuration snapshot pairs.</li>
* <li>2. offsets of each serializer and configuration snapshot, in order.</li>
* <li>3. total number of bytes for the serialized serializers and the config snapshots.</li>
* <li>4. serialized serializers and the config snapshots.</li>
* </ul>
*
* @param out the data output view.
* @param serializersAndConfigs serializer and configuration snapshot pairs
*
* @throws IOException
*/
public static void writeSerializersAndConfigsWithResilience(
DataOutputView out,
List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> serializersAndConfigs) throws IOException {
try (
ByteArrayOutputStreamWithPos bufferWithPos = new ByteArrayOutputStreamWithPos();
DataOutputViewStreamWrapper bufferWrapper = new DataOutputViewStreamWrapper(bufferWithPos)) {
out.writeInt(serializersAndConfigs.size());
for (Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> serAndConfSnapshot : serializersAndConfigs) {
out.writeInt(bufferWithPos.getPosition());
writeSerializer(bufferWrapper, serAndConfSnapshot.f0);
out.writeInt(bufferWithPos.getPosition());
writeSerializerConfigSnapshot(bufferWrapper, serAndConfSnapshot.f1);
}
out.writeInt(bufferWithPos.getPosition());
out.write(bufferWithPos.getBuf(), 0, bufferWithPos.getPosition());
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:42,代码来源:TypeSerializerSerializationUtil.java
示例7: createCopyWritable
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
/**
* Clones the given writable using the {@link IOReadableWritable serialization}.
*
* @param original Object to clone
* @param <T> Type of the object to clone
* @return Cloned object
* @throws IOException Thrown is the serialization fails.
*/
public static <T extends IOReadableWritable> T createCopyWritable(T original) throws IOException {
if (original == null) {
return null;
}
final ByteArrayOutputStream baos = new ByteArrayOutputStream();
try (DataOutputViewStreamWrapper out = new DataOutputViewStreamWrapper(baos)) {
original.write(out);
}
final ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
try (DataInputViewStreamWrapper in = new DataInputViewStreamWrapper(bais)) {
@SuppressWarnings("unchecked")
T copy = (T) instantiate(original.getClass());
copy.read(in);
return copy;
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:InstantiationUtil.java
示例8: testReconfigureWithDifferentPojoType
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
/**
* Verifies that reconfiguring with a config snapshot of a preceding POJO serializer
* with different POJO type will result in INCOMPATIBLE.
*/
@Test
public void testReconfigureWithDifferentPojoType() throws Exception {
PojoSerializer<SubTestUserClassB> pojoSerializer1 = (PojoSerializer<SubTestUserClassB>)
TypeExtractor.getForClass(SubTestUserClassB.class).createSerializer(new ExecutionConfig());
// snapshot configuration and serialize to bytes
TypeSerializerConfigSnapshot pojoSerializerConfigSnapshot = pojoSerializer1.snapshotConfiguration();
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), pojoSerializerConfigSnapshot);
serializedConfig = out.toByteArray();
}
PojoSerializer<SubTestUserClassA> pojoSerializer2 = (PojoSerializer<SubTestUserClassA>)
TypeExtractor.getForClass(SubTestUserClassA.class).createSerializer(new ExecutionConfig());
// read configuration again from bytes
try(ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
pojoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
CompatibilityResult<SubTestUserClassA> compatResult = pojoSerializer2.ensureCompatibility(pojoSerializerConfigSnapshot);
assertTrue(compatResult.isRequiresMigration());
}
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:PojoSerializerTest.java
示例9: testMigrationStrategyWithDifferentKryoType
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
/**
* Verifies that reconfiguration result is INCOMPATIBLE if data type has changed.
*/
@Test
public void testMigrationStrategyWithDifferentKryoType() throws Exception {
KryoSerializer<TestClassA> kryoSerializerForA = new KryoSerializer<>(TestClassA.class, new ExecutionConfig());
// snapshot configuration and serialize to bytes
TypeSerializerConfigSnapshot kryoSerializerConfigSnapshot = kryoSerializerForA.snapshotConfiguration();
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(new DataOutputViewStreamWrapper(out), kryoSerializerConfigSnapshot);
serializedConfig = out.toByteArray();
}
KryoSerializer<TestClassB> kryoSerializerForB = new KryoSerializer<>(TestClassB.class, new ExecutionConfig());
// read configuration again from bytes
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
kryoSerializerConfigSnapshot = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
CompatibilityResult<TestClassB> compatResult = kryoSerializerForB.ensureCompatibility(kryoSerializerConfigSnapshot);
assertTrue(compatResult.isRequiresMigration());
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:KryoSerializerCompatibilityTest.java
示例10: testSnapshotConfigurationAndReconfigure
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
@Test
public void testSnapshotConfigurationAndReconfigure() throws Exception {
final TypeSerializerConfigSnapshot configSnapshot = getSerializer().snapshotConfiguration();
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
new DataOutputViewStreamWrapper(out), configSnapshot);
serializedConfig = out.toByteArray();
}
TypeSerializerConfigSnapshot restoredConfig;
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
restoredConfig = TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
CompatibilityResult strategy = getSerializer().ensureCompatibility(restoredConfig);
assertFalse(strategy.isRequiresMigration());
// also verify that the serializer's reconfigure implementation detects incompatibility
strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot());
assertTrue(strategy.isRequiresMigration());
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:SerializerTestBase.java
示例11: testSerializeConfigurationSnapshots
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
/**
* Verifies that reading and writing configuration snapshots work correctly.
*/
@Test
public void testSerializeConfigurationSnapshots() throws Exception {
TestConfigSnapshot configSnapshot1 = new TestConfigSnapshot(1, "foo");
TestConfigSnapshot configSnapshot2 = new TestConfigSnapshot(2, "bar");
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerUtil.writeSerializerConfigSnapshots(
new DataOutputViewStreamWrapper(out),
configSnapshot1,
configSnapshot2);
serializedConfig = out.toByteArray();
}
TypeSerializerConfigSnapshot[] restoredConfigs;
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
restoredConfigs = TypeSerializerUtil.readSerializerConfigSnapshots(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
assertEquals(2, restoredConfigs.length);
assertEquals(configSnapshot1, restoredConfigs[0]);
assertEquals(configSnapshot2, restoredConfigs[1]);
}
开发者ID:axbaretto,项目名称:flink,代码行数:29,代码来源:TypeSerializerConfigSnapshotTest.java
示例12: testFailsWhenConfigurationSnapshotClassNotFound
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
/**
* Verifies that deserializing config snapshots fail if the config class could not be found.
*/
@Test
public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception {
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerUtil.writeSerializerConfigSnapshot(
new DataOutputViewStreamWrapper(out), new TestConfigSnapshot(123, "foobar"));
serializedConfig = out.toByteArray();
}
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
// read using a dummy classloader
TypeSerializerUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null));
fail("Expected a ClassNotFoundException wrapped in IOException");
} catch (IOException expected) {
// test passes
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:TypeSerializerConfigSnapshotTest.java
示例13: testStateSerializerSerializationProxy
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
@Test
public void testStateSerializerSerializationProxy() throws Exception {
TypeSerializer<?> serializer = IntSerializer.INSTANCE;
TypeSerializerSerializationProxy<?> proxy = new TypeSerializerSerializationProxy<>(serializer);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
proxy.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
proxy = new TypeSerializerSerializationProxy<>(Thread.currentThread().getContextClassLoader());
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
proxy.read(new DataInputViewStreamWrapper(in));
}
Assert.assertEquals(serializer, proxy.getTypeSerializer());
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:TypeSerializerSerializationProxyTest.java
示例14: testSerializerSerialization
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
/**
* Verifies that reading and writing serializers work correctly.
*/
@Test
public void testSerializerSerialization() throws Exception {
TypeSerializer<?> serializer = IntSerializer.INSTANCE;
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
serialized = out.toByteArray();
}
TypeSerializer<?> deserializedSerializer;
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
Assert.assertEquals(serializer, deserializedSerializer);
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:TypeSerializerSerializationUtilTest.java
示例15: testSerializerSerializationWithInvalidClass
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
/**
* Verifies deserialization failure cases when reading a serializer from bytes, in the
* case of a {@link InvalidClassException}.
*/
@Test
public void testSerializerSerializationWithInvalidClass() throws Exception {
TypeSerializer<?> serializer = IntSerializer.INSTANCE;
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
TypeSerializerSerializationUtil.writeSerializer(new DataOutputViewStreamWrapper(out), serializer);
serialized = out.toByteArray();
}
TypeSerializer<?> deserializedSerializer;
// mock failure when deserializing serializers
TypeSerializerSerializationUtil.TypeSerializerSerializationProxy<?> mockProxy =
mock(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class);
doThrow(new IOException()).when(mockProxy).read(any(DataInputViewStreamWrapper.class));
PowerMockito.whenNew(TypeSerializerSerializationUtil.TypeSerializerSerializationProxy.class).withAnyArguments().thenReturn(mockProxy);
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
deserializedSerializer = TypeSerializerSerializationUtil.tryReadSerializer(
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader());
}
Assert.assertEquals(null, deserializedSerializer);
}
开发者ID:axbaretto,项目名称:flink,代码行数:30,代码来源:TypeSerializerSerializationUtilTest.java
示例16: testFailsWhenConfigurationSnapshotClassNotFound
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
/**
* Verifies that deserializing config snapshots fail if the config class could not be found.
*/
@Test
public void testFailsWhenConfigurationSnapshotClassNotFound() throws Exception {
byte[] serializedConfig;
try (ByteArrayOutputStream out = new ByteArrayOutputStream()) {
TypeSerializerSerializationUtil.writeSerializerConfigSnapshot(
new DataOutputViewStreamWrapper(out), new TypeSerializerSerializationUtilTest.TestConfigSnapshot(123, "foobar"));
serializedConfig = out.toByteArray();
}
try (ByteArrayInputStream in = new ByteArrayInputStream(serializedConfig)) {
// read using a dummy classloader
TypeSerializerSerializationUtil.readSerializerConfigSnapshot(
new DataInputViewStreamWrapper(in), new URLClassLoader(new URL[0], null));
fail("Expected a ClassNotFoundException wrapped in IOException");
} catch (IOException expected) {
// test passes
}
}
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:TypeSerializerSerializationUtilTest.java
示例17: testSerialization
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
public static void testSerialization(String[] values) throws IOException {
ByteArrayOutputStream baos = new ByteArrayOutputStream(4096);
DataOutputViewStreamWrapper serializer = new DataOutputViewStreamWrapper(baos);
for (String value : values) {
StringValue sv = new StringValue(value);
sv.write(serializer);
}
serializer.close();
baos.close();
ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray());
DataInputViewStreamWrapper deserializer = new DataInputViewStreamWrapper(bais);
int num = 0;
while (bais.available() > 0) {
StringValue deser = new StringValue();
deser.read(deserializer);
assertEquals("DeserializedString differs from original string.", values[num], deser.getValue());
num++;
}
assertEquals("Wrong number of deserialized values", values.length, num);
}
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:StringValueSerializationTest.java
示例18: testReadSameVersion
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
@Test
public void testReadSameVersion() throws Exception {
String payload = "test";
TestWriteable testWriteable = new TestWriteable(1, payload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
testWriteable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
testWriteable = new TestWriteable(1);
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
testWriteable.read(new DataInputViewStreamWrapper(in));
}
Assert.assertEquals(payload, testWriteable.getData());
}
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:VersionedIOWriteableTest.java
示例19: testReadCompatibleVersion
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
@Test
public void testReadCompatibleVersion() throws Exception {
String payload = "test";
TestWriteable testWriteable = new TestWriteable(1, payload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
testWriteable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
testWriteable = new TestWriteable(2) {
@Override
public int[] getCompatibleVersions() {
return new int[] {1, 2};
}
};
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
testWriteable.read(new DataInputViewStreamWrapper(in));
}
Assert.assertEquals(payload, testWriteable.getData());
}
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:VersionedIOWriteableTest.java
示例20: testReadMismatchVersion
import org.apache.flink.core.memory.DataOutputViewStreamWrapper; //导入依赖的package包/类
@Test
public void testReadMismatchVersion() throws Exception {
String payload = "test";
TestWriteable testWriteable = new TestWriteable(1, payload);
byte[] serialized;
try (ByteArrayOutputStreamWithPos out = new ByteArrayOutputStreamWithPos()) {
testWriteable.write(new DataOutputViewStreamWrapper(out));
serialized = out.toByteArray();
}
testWriteable = new TestWriteable(2);
try (ByteArrayInputStreamWithPos in = new ByteArrayInputStreamWithPos(serialized)) {
testWriteable.read(new DataInputViewStreamWrapper(in));
Assert.fail("Version mismatch expected.");
} catch (VersionMismatchException ignored) {
}
Assert.assertEquals(null, testWriteable.getData());
}
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:VersionedIOWriteableTest.java
注:本文中的org.apache.flink.core.memory.DataOutputViewStreamWrapper类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论