• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java InstantiationUtil类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.util.InstantiationUtil的典型用法代码示例。如果您正苦于以下问题:Java InstantiationUtil类的具体用法?Java InstantiationUtil怎么用?Java InstantiationUtil使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



InstantiationUtil类属于org.apache.flink.util包,在下文中一共展示了InstantiationUtil类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: snapshotKeyGroupState

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
/**
 * Snapshots the state {@code (stateName -> (valueCoder && (namespace -> value)))} for a given
 * {@code keyGroupIdx}.
 *
 * @param keyGroupIdx the id of the key-group to be put in the snapshot.
 * @param out the stream to write to.
 */
public void snapshotKeyGroupState(int keyGroupIdx, DataOutputStream out) throws Exception {
  int localIdx = getIndexForKeyGroup(keyGroupIdx);
  Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = stateTables[localIdx];
  Preconditions.checkState(stateTable.size() <= Short.MAX_VALUE,
      "Too many States: " + stateTable.size() + ". Currently at most "
          + Short.MAX_VALUE + " states are supported");
  out.writeShort(stateTable.size());
  for (Map.Entry<String, Tuple2<Coder<?>, Map<String, ?>>> entry : stateTable.entrySet()) {
    out.writeUTF(entry.getKey());
    Coder coder = entry.getValue().f0;
    InstantiationUtil.serializeObject(out, coder);
    Map<String, ?> map = entry.getValue().f1;
    out.writeInt(map.size());
    for (Map.Entry<String, ?> entry1 : map.entrySet()) {
      StringUtf8Coder.of().encode(entry1.getKey(), out);
      coder.encode(entry1.getValue(), out);
    }
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:FlinkKeyGroupStateInternals.java


示例2: restoreKeyGroupState

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
/**
 * Restore the state {@code (stateName -> (valueCoder && (namespace -> value)))}
 * for a given {@code keyGroupIdx}.
 *
 * @param keyGroupIdx the id of the key-group to be put in the snapshot.
 * @param in the stream to read from.
 * @param userCodeClassLoader the class loader that will be used to deserialize
 *                            the valueCoder.
 */
public void restoreKeyGroupState(int keyGroupIdx, DataInputStream in,
                                 ClassLoader userCodeClassLoader) throws Exception {
  int localIdx = getIndexForKeyGroup(keyGroupIdx);
  Map<String, Tuple2<Coder<?>, Map<String, ?>>> stateTable = stateTables[localIdx];
  int numStates = in.readShort();
  for (int i = 0; i < numStates; ++i) {
    String stateName = in.readUTF();
    Coder coder = InstantiationUtil.deserializeObject(in, userCodeClassLoader);
    Tuple2<Coder<?>, Map<String, ?>> tuple2 = stateTable.get(stateName);
    if (tuple2 == null) {
      tuple2 = new Tuple2<>();
      tuple2.f0 = coder;
      tuple2.f1 = new HashMap<>();
      stateTable.put(stateName, tuple2);
    }
    Map<String, Object> map = (Map<String, Object>) tuple2.f1;
    int mapSize = in.readInt();
    for (int j = 0; j < mapSize; j++) {
      String namespace = StringUtf8Coder.of().decode(in);
      Object value = coder.decode(in);
      map.put(namespace, value);
    }
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:34,代码来源:FlinkKeyGroupStateInternals.java


示例3: createAndStartSimpleConsumerThread

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
private SimpleConsumerThread<T> createAndStartSimpleConsumerThread(
		List<KafkaTopicPartitionState<TopicAndPartition>> seedPartitions,
		Node leader,
		ExceptionProxy errorHandler) throws IOException, ClassNotFoundException {
	// each thread needs its own copy of the deserializer, because the deserializer is
	// not necessarily thread safe
	final KeyedDeserializationSchema<T> clonedDeserializer =
			InstantiationUtil.clone(deserializer, runtimeContext.getUserCodeClassLoader());

	// seed thread with list of fetch partitions (otherwise it would shut down immediately again
	SimpleConsumerThread<T> brokerThread = new SimpleConsumerThread<>(
			this, errorHandler, kafkaConfig, leader, seedPartitions, unassignedPartitionsQueue,
			clonedDeserializer, invalidOffsetBehavior);

	brokerThread.setName(String.format("SimpleConsumer - %s - broker-%s (%s:%d)",
			runtimeContext.getTaskName(), leader.id(), leader.host(), leader.port()));
	brokerThread.setDaemon(true);
	brokerThread.start();

	LOG.info("Starting thread {}", brokerThread.getName());
	return brokerThread;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:Kafka08Fetcher.java


示例4: RecordPairComparator

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
public RecordPairComparator(int[] keyFieldsReference, int[] keyFieldsCandidate, Class<? extends Value>[] keyTypes) {
	if (keyFieldsReference.length != keyFieldsCandidate.length || keyFieldsCandidate.length != keyTypes.length) {
		throw new IllegalArgumentException(
			"The arrays describing the key positions and types must be of the same length.");
	}
	this.keyFields1 = keyFieldsReference;
	this.keyFields2 = keyFieldsCandidate;
	
	// instantiate fields to extract keys into
	this.keyHolders1 = new Value[keyTypes.length];
	this.keyHolders2 = new Value[keyTypes.length];
	
	for (int i = 0; i < keyTypes.length; i++) {
		if (keyTypes[i] == null) {
			throw new NullPointerException("Key type " + i + " is null.");
		}
		this.keyHolders1[i] = InstantiationUtil.instantiate(keyTypes[i], Value.class);
		this.keyHolders2[i] = InstantiationUtil.instantiate(keyTypes[i], Value.class);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:RecordPairComparator.java


示例5: nextRecord

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
@Override
public E nextRecord(E reuseValue) throws IOException {
	if (reachedEnd()) {
		return null;
	}

	// if we start a new block, then register the event, and
	// restart the counter.
	if (dataFileReader.previousSync() != lastSync) {
		lastSync = dataFileReader.previousSync();
		recordsReadSinceLastSync = 0;
	}
	recordsReadSinceLastSync++;

	if (reuseAvroValue) {
		return dataFileReader.next(reuseValue);
	} else {
		if (GenericRecord.class == avroValueType) {
			return dataFileReader.next();
		} else {
			return dataFileReader.next(InstantiationUtil.instantiate(avroValueType, Object.class));
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:AvroInputFormat.java


示例6: testSerializability

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
@Test
public void testSerializability() throws IOException, ClassNotFoundException {
	final Tuple3<Class<? extends SpecificRecord>, SpecificRecord, Row> testData = AvroTestUtils.getComplexTestData();

	final AvroRowSerializationSchema serOrig = new AvroRowSerializationSchema(testData.f0);
	final AvroRowDeserializationSchema deserOrig = new AvroRowDeserializationSchema(testData.f0);

	byte[] serBytes = InstantiationUtil.serializeObject(serOrig);
	byte[] deserBytes = InstantiationUtil.serializeObject(deserOrig);

	AvroRowSerializationSchema serCopy =
		InstantiationUtil.deserializeObject(serBytes, Thread.currentThread().getContextClassLoader());
	AvroRowDeserializationSchema deserCopy =
		InstantiationUtil.deserializeObject(deserBytes, Thread.currentThread().getContextClassLoader());

	final byte[] bytes = serCopy.serialize(testData.f2);
	deserCopy.deserialize(bytes);
	deserCopy.deserialize(bytes);
	final Row actual = deserCopy.deserialize(bytes);

	assertEquals(testData.f2, actual);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:AvroRowDeSerializationSchemaTest.java


示例7: getPairComparatorFactory

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
public <T1, T2> TypePairComparatorFactory<T1, T2> getPairComparatorFactory(ClassLoader cl) {
	final String className = this.config.getString(DRIVER_PAIR_COMPARATOR_FACTORY, null);
	if (className == null) {
		return null;
	}
	
	@SuppressWarnings("unchecked")
	final Class<TypePairComparatorFactory<T1, T2>> superClass = (Class<TypePairComparatorFactory<T1, T2>>) (Class<?>) TypePairComparatorFactory.class;
	try {
		final Class<? extends TypePairComparatorFactory<T1, T2>> clazz = Class.forName(className, true, cl).asSubclass(superClass);
		return InstantiationUtil.instantiate(clazz, superClass);
	}
	catch (ClassNotFoundException cnfex) {
		throw new RuntimeException("The class '" + className + "', noted in the configuration as " +
			"pair comparator factory, could not be found. It is not part of the user code's class loader resources.");
	}
	catch (ClassCastException ccex) {
		throw new CorruptConfigurationException("The class noted in the configuration as the pair comparator factory " +
			"is no subclass of TypePairComparatorFactory.");
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:TaskConfig.java


示例8: getTypeInfoFactory

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
/**
 * Returns the type information factory for a type using the factory registry or annotations.
 */
@Internal
public static <OUT> TypeInfoFactory<OUT> getTypeInfoFactory(Type t) {
	final Class<?> factoryClass;
	if (registeredTypeInfoFactories.containsKey(t)) {
		factoryClass = registeredTypeInfoFactories.get(t);
	}
	else {
		if (!isClassType(t) || !typeToClass(t).isAnnotationPresent(TypeInfo.class)) {
			return null;
		}
		final TypeInfo typeInfoAnnotation = typeToClass(t).getAnnotation(TypeInfo.class);
		factoryClass = typeInfoAnnotation.value();
		// check for valid factory class
		if (!TypeInfoFactory.class.isAssignableFrom(factoryClass)) {
			throw new InvalidTypesException("TypeInfo annotation does not specify a valid TypeInfoFactory.");
		}
	}

	// instantiate
	return (TypeInfoFactory<OUT>) InstantiationUtil.instantiate(factoryClass);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:TypeExtractor.java


示例9: restoreKeysForKeyGroup

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
public void restoreKeysForKeyGroup(DataInputViewStreamWrapper stream, int keyGroupIdx,
								ClassLoader userCodeClassLoader) throws IOException, ClassNotFoundException {

	checkArgument(localKeyGroupRange.contains(keyGroupIdx),
		"Key Group " + keyGroupIdx + " does not belong to the local range.");

	int numKeys = stream.readInt();
	if (numKeys > 0) {

		TypeSerializer<K> tmpKeyDeserializer = InstantiationUtil.deserializeObject(stream, userCodeClassLoader);

		if (keySerializer != null && !keySerializer.equals(tmpKeyDeserializer)) {
			throw new IllegalArgumentException("Tried to restore keys " +
				"for the watermark callback service with mismatching serializers.");
		}

		this.keySerializer = tmpKeyDeserializer;

		Set<K> keys = getRegisteredKeysForKeyGroup(keyGroupIdx);
		for (int i = 0; i < numKeys; i++) {
			keys.add(keySerializer.deserialize(stream));
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:InternalWatermarkCallbackService.java


示例10: PojoComparator

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
@SuppressWarnings("unchecked")
private PojoComparator(PojoComparator<T> toClone) {
	this.keyFields = toClone.keyFields;
	this.comparators = new TypeComparator[toClone.comparators.length];

	for (int i = 0; i < toClone.comparators.length; i++) {
		this.comparators[i] = toClone.comparators[i].duplicate();
	}

	this.normalizedKeyLengths = toClone.normalizedKeyLengths;
	this.numLeadingNormalizableKeys = toClone.numLeadingNormalizableKeys;
	this.normalizableKeyPrefixLen = toClone.normalizableKeyPrefixLen;
	this.invertNormKey = toClone.invertNormKey;

	this.type = toClone.type;

	try {
		this.serializer = (TypeSerializer<T>) InstantiationUtil.deserializeObject(
				InstantiationUtil.serializeObject(toClone.serializer), Thread.currentThread().getContextClassLoader());
	} catch (IOException | ClassNotFoundException e) {
		throw new RuntimeException("Cannot copy serializer", e);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:24,代码来源:PojoComparator.java


示例11: testIntegerTaskEvent

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
/**
 * This test checks the serialization/deserialization of {@link IntegerTaskEvent} objects.
 */
@Test
public void testIntegerTaskEvent() {

	try {
		final IntegerTaskEvent orig = new IntegerTaskEvent(11);
		final IntegerTaskEvent copy = InstantiationUtil.createCopyWritable(orig);

		assertEquals(orig.getInteger(), copy.getInteger());
		assertEquals(orig.hashCode(), copy.hashCode());
		assertTrue(orig.equals(copy));

	} catch (IOException ioe) {
		fail(ioe.getMessage());
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:TaskEventTest.java


示例12: copy

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
/**
 * Tries to copy the given record from using the provided Kryo instance. If this fails, then
 * the record from is copied by serializing it into a byte buffer and deserializing it from
 * there.
 *
 * @param from Element to copy
 * @param reuse Reuse element for the deserialization
 * @param kryo Kryo instance to use
 * @param serializer TypeSerializer which is used in case of a Kryo failure
 * @param <T> Type of the element to be copied
 * @return Copied element
 */
public static <T> T copy(T from, T reuse, Kryo kryo, TypeSerializer<T> serializer) {
	try {
		return kryo.copy(from);
	} catch (KryoException ke) {
		// Kryo could not copy the object --> try to serialize/deserialize the object
		try {
			byte[] byteArray = InstantiationUtil.serializeToByteArray(serializer, from);

			return InstantiationUtil.deserializeFromByteArray(serializer, reuse, byteArray);
		} catch (IOException ioe) {
			throw new RuntimeException("Could not copy object by serializing/deserializing" +
				" it.", ioe);
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:28,代码来源:KryoUtils.java


示例13: read

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
@SuppressWarnings("unchecked")
@Override
public void read(DataInputView in) throws IOException {
	super.read(in);

	String serializerConfigClassname = in.readUTF();
	Class<? extends TypeSerializerConfigSnapshot> serializerConfigSnapshotClass;
	try {
		serializerConfigSnapshotClass = (Class<? extends TypeSerializerConfigSnapshot>)
			Class.forName(serializerConfigClassname, true, userCodeClassLoader);
	} catch (ClassNotFoundException e) {
		throw new IOException(
			"Could not find requested TypeSerializerConfigSnapshot class "
				+ serializerConfigClassname +  " in classpath.", e);
	}

	serializerConfigSnapshot = InstantiationUtil.instantiate(serializerConfigSnapshotClass);
	serializerConfigSnapshot.setUserCodeClassLoader(userCodeClassLoader);
	serializerConfigSnapshot.read(in);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:TypeSerializerUtil.java


示例14: write

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
@Override
public void write(DataOutputView out) throws IOException {
	super.write(out);

	if (typeSerializer instanceof UnloadableDummyTypeSerializer) {
		UnloadableDummyTypeSerializer<T> dummyTypeSerializer =
			(UnloadableDummyTypeSerializer<T>) this.typeSerializer;

		byte[] serializerBytes = dummyTypeSerializer.getActualBytes();
		out.write(serializerBytes.length);
		out.write(serializerBytes);
	} else {
		// write in a way that allows the stream to recover from exceptions
		try (ByteArrayOutputStreamWithPos streamWithPos = new ByteArrayOutputStreamWithPos()) {
			InstantiationUtil.serializeObject(streamWithPos, typeSerializer);
			out.writeInt(streamWithPos.getPosition());
			out.write(streamWithPos.getBuf(), 0, streamWithPos.getPosition());
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:TypeSerializerSerializationUtil.java


示例15: deserializeError

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
public Throwable deserializeError(ClassLoader classloader) {
	if (serializedException == null) {
		// failed to serialize the original exception
		// return this SerializedThrowable as a stand in
		return this;
	}

	Throwable cached = cachedException == null ? null : cachedException.get();
	if (cached == null) {
		try {
			cached = InstantiationUtil.deserializeObject(serializedException, classloader);
			cachedException = new WeakReference<Throwable>(cached);
		}
		catch (Throwable t) {
			// something went wrong
			// return this SerializedThrowable as a stand in
			return this;
		}
	}
	return cached;
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:SerializedThrowable.java


示例16: snapshotKeysForKeyGroup

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
public void snapshotKeysForKeyGroup(DataOutputViewStreamWrapper stream, int keyGroupIdx) throws Exception {

		// we cleanup also here to avoid checkpointing the deletion set
		cleanupRegisteredKeys();

		Set<K> keySet = getRegisteredKeysForKeyGroup(keyGroupIdx);
		if (keySet != null) {
			stream.writeInt(keySet.size());

			InstantiationUtil.serializeObject(stream, keySerializer);
			for (K key : keySet) {
				keySerializer.serialize(key, stream);
			}
		} else {
			stream.writeInt(0);
		}
	}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:InternalWatermarkCallbackService.java


示例17: iterator

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
@Override
public Iterator<T> iterator() {
	return new Iterator<T>() {
		private int index;

		@Override
		public boolean hasNext() {
			return index < classes.size();
		}

		@Override
		public T next() {
			return InstantiationUtil.instantiate(classes.get(index++));
		}

		@Override
		public void remove() {
			throw new UnsupportedOperationException();
		}
	};
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:22,代码来源:Runner.java


示例18: reinstantiateDriver

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
private void reinstantiateDriver() throws Exception {
	if (this.driver instanceof ResettableDriver) {
		final ResettableDriver<?, ?> resDriver = (ResettableDriver<?, ?>) this.driver;
		resDriver.reset();
	} else {
		Class<? extends Driver<S, OT>> driverClass = this.config.getDriver();
		this.driver = InstantiationUtil.instantiate(driverClass, Driver.class);

		try {
			this.driver.setup(this);
		}
		catch (Throwable t) {
			throw new Exception("The pact driver setup for '" + this.getEnvironment().getTaskInfo().getTaskName() +
					"' , caused an error: " + t.getMessage(), t);
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:AbstractIterativeTask.java


示例19: getNextInputSplit

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
@Override
public InputSplit getNextInputSplit(ClassLoader userCodeClassLoader) throws InputSplitProviderException {
	Preconditions.checkNotNull(userCodeClassLoader);

	CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit(
		jobVertexID,
		executionAttemptID);

	try {
		SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit());

		if (serializedInputSplit.isEmpty()) {
			return null;
		} else {
			return InstantiationUtil.deserializeObject(serializedInputSplit.getInputSplitData(), userCodeClassLoader);
		}
	} catch (Exception e) {
		throw new InputSplitProviderException("Requesting the next input split failed.", e);
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:21,代码来源:RpcInputSplitProvider.java


示例20: instantiateFileSystem

import org.apache.flink.util.InstantiationUtil; //导入依赖的package包/类
private org.apache.hadoop.fs.FileSystem instantiateFileSystem(Class<? extends org.apache.hadoop.fs.FileSystem> fsClass)
	throws IOException {
	try {
		return fsClass.newInstance();
	}
	catch (ExceptionInInitializerError e) {
		throw new IOException("The filesystem class '" + fsClass.getName() + "' throw an exception upon initialization.", e.getException());
	}
	catch (Throwable t) {
		String errorMessage = InstantiationUtil.checkForInstantiationError(fsClass);
		if (errorMessage != null) {
			throw new IOException("The filesystem class '" + fsClass.getName() + "' cannot be instantiated: " + errorMessage);
		} else {
			throw new IOException("An error occurred while instantiating the filesystem class '" +
					fsClass.getName() + "'.", t);
		}
	}
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:19,代码来源:HadoopFileSystem.java



注:本文中的org.apache.flink.util.InstantiationUtil类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java GitHubRequest类代码示例发布时间:2022-05-23
下一篇:
Java SubscriptionGroupConfig类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap