本文整理汇总了Java中org.apache.beam.sdk.coders.Coder类的典型用法代码示例。如果您正苦于以下问题:Java Coder类的具体用法?Java Coder怎么用?Java Coder使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
Coder类属于org.apache.beam.sdk.coders包,在下文中一共展示了Coder类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: testEncodeDecodeCycle
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
@Test
public void testEncodeDecodeCycle() throws Exception {
// Encode
SdkComponents sdkComponents = SdkComponents.create();
RunnerApi.PCollection protoCollection =
PCollectionTranslation.toProto(testCollection, sdkComponents);
RehydratedComponents protoComponents =
RehydratedComponents.forComponents(sdkComponents.toComponents());
// Decode
Pipeline pipeline = Pipeline.create();
PCollection<?> decodedCollection =
PCollectionTranslation.fromProto(protoCollection, pipeline, protoComponents);
// Verify
assertThat(decodedCollection.getCoder(), Matchers.<Coder<?>>equalTo(testCollection.getCoder()));
assertThat(
decodedCollection.getWindowingStrategy(),
Matchers.<WindowingStrategy<?, ?>>equalTo(
testCollection.getWindowingStrategy().fixDefaults()));
assertThat(decodedCollection.isBounded(), equalTo(testCollection.isBounded()));
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:PCollectionTranslationTest.java
示例2: fireTimer
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
@Override
public void fireTimer(Object key, Collection<TimerData> timerDataSet) {
pushbackDoFnRunner.startBundle();
@SuppressWarnings("unchecked")
Coder<Object> keyCoder = (Coder) currentKeyStateInternals.getKeyCoder();
((StateInternalsProxy) currentKeyStateInternals).setKey(key);
currentKeyTimerInternals.setContext(key, keyCoder, new Instant(this.currentInputWatermark),
new Instant(this.currentOutputWatermark));
for (TimerData timerData : timerDataSet) {
StateNamespace namespace = timerData.getNamespace();
checkArgument(namespace instanceof WindowNamespace);
BoundedWindow window = ((WindowNamespace<?>) namespace).getWindow();
pushbackDoFnRunner.onTimer(timerData.getTimerId(), window,
timerData.getTimestamp(), timerData.getDomain());
}
pushbackDoFnRunner.finishBundle();
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:ApexParDoOperator.java
示例3: fromByteFunctionIterable
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
/**
* A function wrapper for converting a byte array pair to a key-value pair, where
* values are {@link Iterable}.
*
* @param keyCoder Coder to deserialize keys.
* @param valueCoder Coder to deserialize values.
* @param <K> The type of the key being deserialized.
* @param <V> The type of the value being deserialized.
* @return A function that accepts a pair of byte arrays and returns a key-value pair.
*/
public static <K, V> PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>
fromByteFunctionIterable(final Coder<K> keyCoder, final Coder<V> valueCoder) {
return new PairFunction<Tuple2<ByteArray, Iterable<byte[]>>, K, Iterable<V>>() {
@Override
public Tuple2<K, Iterable<V>> call(Tuple2<ByteArray, Iterable<byte[]>> tuple) {
return new Tuple2<>(fromByteArray(tuple._1().getValue(), keyCoder),
Iterables.transform(tuple._2(), new com.google.common.base.Function<byte[], V>() {
@Override
public V apply(byte[] bytes) {
return fromByteArray(bytes, valueCoder);
}
}));
}
};
}
开发者ID:apache,项目名称:beam,代码行数:26,代码来源:CoderHelpers.java
示例4: translate
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
@Override
public void translate(GroupByKey<K, V> transform, TranslationContext context) {
PCollection<KV<K, V>> input = (PCollection<KV<K, V>>) context.getInput();
Coder<K> inputKeyCoder = ((KvCoder<K, V>) input.getCoder()).getKeyCoder();
JavaStream<WindowedValue<KV<K, V>>> inputStream =
context.getInputStream(input);
int parallelism = context.getPipelineOptions().getParallelism();
TimestampCombiner timestampCombiner = input.getWindowingStrategy().getTimestampCombiner();
WindowFn<KV<K, V>, BoundedWindow> windowFn = (WindowFn<KV<K, V>, BoundedWindow>)
input.getWindowingStrategy().getWindowFn();
JavaStream<WindowedValue<KV<K, List<V>>>> outputStream = inputStream
.window(Windows.apply(
new GearpumpWindowFn(windowFn.isNonMerging()),
EventTimeTrigger$.MODULE$, Discarding$.MODULE$, windowFn.toString()))
.groupBy(new GroupByFn<K, V>(inputKeyCoder), parallelism, "group_by_Key_and_Window")
.map(new KeyedByTimestamp<K, V>(windowFn, timestampCombiner), "keyed_by_timestamp")
.fold(new Merge<>(windowFn, timestampCombiner), "merge")
.map(new Values<K, V>(), "values");
context.setOutputStream(context.getOutput(), outputStream);
}
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:GroupByKeyTranslator.java
示例5: create
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
public static BeamRecordSqlType create(List<String> fieldNames,
List<Integer> fieldTypes) {
if (fieldNames.size() != fieldTypes.size()) {
throw new IllegalStateException("the sizes of 'dataType' and 'fieldTypes' must match.");
}
List<Coder> fieldCoders = new ArrayList<>(fieldTypes.size());
for (int idx = 0; idx < fieldTypes.size(); ++idx) {
Integer fieldType = fieldTypes.get(idx);
if (!CODERS.containsKey(fieldType)) {
throw new UnsupportedOperationException(
"Data type: " + fieldType + " not supported yet!");
}
fieldCoders.add(CODERS.get(fieldType));
}
return new BeamRecordSqlType(fieldNames, fieldTypes, fieldCoders);
}
开发者ID:apache,项目名称:beam,代码行数:22,代码来源:BeamRecordSqlType.java
示例6: BagUserState
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
public BagUserState(
BeamFnStateClient beamFnStateClient,
String stateId,
Coder<T> coder,
Supplier<Builder> partialRequestSupplier) {
this.beamFnStateClient = beamFnStateClient;
this.stateId = stateId;
this.coder = coder;
this.partialRequestSupplier = partialRequestSupplier;
this.oldValues = new LazyCachingIteratorToIterable<>(
new DataStreams.DataStreamDecoder(coder,
DataStreams.inbound(
StateFetchingIterators.usingPartialRequestWithStateKey(
beamFnStateClient,
partialRequestSupplier))));
this.newValues = new ArrayList<>();
this.unmodifiableNewValues = Collections.unmodifiableList(newValues);
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:BagUserState.java
示例7: putPView
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
void putPView(
PCollectionView<?> view,
Iterable<WindowedValue<?>> value,
Coder<Iterable<WindowedValue<?>>> coder) {
pviews.put(view, new Tuple2<>(CoderHelpers.toByteArray(value, coder), coder));
// Currently unsynchronized unpersist, if needed can be changed to blocking
if (broadcastHelperMap != null) {
synchronized (SparkPCollectionView.class) {
SideInputBroadcast helper = broadcastHelperMap.get(view);
if (helper != null) {
helper.unpersist();
broadcastHelperMap.remove(view);
}
}
}
}
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:SparkPCollectionView.java
示例8: toAndFromProto
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
@Test
public void toAndFromProto() throws Exception {
SdkComponents componentsBuilder = SdkComponents.create();
RunnerApi.Coder coderProto = CoderTranslation.toProto(coder, componentsBuilder);
Components encodedComponents = componentsBuilder.toComponents();
Coder<?> decodedCoder =
CoderTranslation.fromProto(
coderProto, RehydratedComponents.forComponents(encodedComponents));
assertThat(decodedCoder, Matchers.<Coder<?>>equalTo(coder));
if (KNOWN_CODERS.contains(coder)) {
for (RunnerApi.Coder encodedCoder : encodedComponents.getCodersMap().values()) {
assertThat(
encodedCoder.getSpec().getSpec().getUrn(),
not(equalTo(CoderTranslation.JAVA_SERIALIZED_CODER_URN)));
}
}
}
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:CoderTranslationTest.java
示例9: getValues
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
Iterable<WindowedValue<T>> getValues(PCollection<T> pcollection) {
if (windowedValues == null) {
WindowFn<?, ?> windowFn =
pcollection.getWindowingStrategy().getWindowFn();
Coder<? extends BoundedWindow> windowCoder = windowFn.windowCoder();
final WindowedValue.WindowedValueCoder<T> windowedValueCoder;
if (windowFn instanceof GlobalWindows) {
windowedValueCoder =
WindowedValue.ValueOnlyWindowedValueCoder.of(pcollection.getCoder());
} else {
windowedValueCoder =
WindowedValue.FullWindowedValueCoder.of(pcollection.getCoder(), windowCoder);
}
JavaRDDLike<byte[], ?> bytesRDD =
rdd.map(CoderHelpers.toByteFunction(windowedValueCoder));
List<byte[]> clientBytes = bytesRDD.collect();
windowedValues = Iterables.transform(clientBytes,
new Function<byte[], WindowedValue<T>>() {
@Override
public WindowedValue<T> apply(byte[] bytes) {
return CoderHelpers.fromByteArray(bytes, windowedValueCoder);
}
});
}
return windowedValues;
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:BoundedDataset.java
示例10: expand
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
@Override
public PCollection<KV<Integer, Iterable<KV<KV<K, W>, WindowedValue<V>>>>>
expand(PCollection<KV<K, V>> input) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
input.getWindowingStrategy().getWindowFn().windowCoder();
@SuppressWarnings("unchecked")
KvCoder<K, V> inputCoder = (KvCoder<K, V>) input.getCoder();
PCollection<KV<Integer, KV<KV<K, W>, WindowedValue<V>>>> keyedByHash;
keyedByHash = input.apply(
ParDo.of(new GroupByKeyHashAndSortByKeyAndWindowDoFn<K, V, W>(coder)));
keyedByHash.setCoder(
KvCoder.of(
VarIntCoder.of(),
KvCoder.of(KvCoder.of(inputCoder.getKeyCoder(), windowCoder),
FullWindowedValueCoder.of(inputCoder.getValueCoder(), windowCoder))));
return keyedByHash.apply(
new GroupByKeyAndSortValuesOnly<Integer, KV<K, W>, WindowedValue<V>>());
}
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:BatchViewOverrides.java
示例11: expand
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
@Override
public PCollection<ValueInSingleWindow<T>> expand(PCollection<T> input) {
return input
.apply(
ParDo.of(
new DoFn<T, ValueInSingleWindow<T>>() {
@ProcessElement
public void processElement(ProcessContext c, BoundedWindow window) {
c.outputWithTimestamp(
ValueInSingleWindow.of(c.element(), c.timestamp(), window, c.pane()),
c.timestamp());
}
}))
.setCoder(
ValueInSingleWindow.Coder.of(
input.getCoder(), input.getWindowingStrategy().getWindowFn().windowCoder()));
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:Reify.java
示例12: getDestinationCoderWithDefault
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
final Coder<DestinationT> getDestinationCoderWithDefault(CoderRegistry registry)
throws CannotProvideCoderException {
Coder<DestinationT> destinationCoder = getDestinationCoder();
if (destinationCoder != null) {
return destinationCoder;
}
// If dynamicDestinations doesn't provide a coder, try to find it in the coder registry.
@Nullable
TypeDescriptor<DestinationT> descriptor =
extractFromTypeParameters(
this,
DynamicDestinations.class,
new TypeVariableExtractor<
DynamicDestinations<UserT, DestinationT, OutputT>, DestinationT>() {});
try {
return registry.getCoder(descriptor);
} catch (CannotProvideCoderException e) {
throw new CannotProvideCoderException(
"Failed to infer coder for DestinationT from type "
+ descriptor + ", please provide it explicitly by overriding getDestinationCoder()",
e);
}
}
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:FileBasedSink.java
示例13: create
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
private static <T> TransformEvaluator<Create.Values<T>> create() {
return new TransformEvaluator<Create.Values<T>>() {
@Override
public void evaluate(Create.Values<T> transform, EvaluationContext context) {
Iterable<T> elems = transform.getElements();
// Use a coder to convert the objects in the PCollection to byte arrays, so they
// can be transferred over the network.
Coder<T> coder = context.getOutput(transform).getCoder();
context.putBoundedDatasetFromValues(transform, elems, coder);
}
@Override
public String toNativeString() {
return "sparkContext.parallelize(Arrays.asList(...))";
}
};
}
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:TransformTranslator.java
示例14: applyForSingleton
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
static <T, FinalT, ViewT, W extends BoundedWindow> PCollection<?>
applyForSingleton(
DataflowRunner runner,
PCollection<T> input,
DoFn<KV<Integer, Iterable<KV<W, WindowedValue<T>>>>,
IsmRecord<WindowedValue<FinalT>>> doFn,
Coder<FinalT> defaultValueCoder,
PCollectionView<ViewT> view) {
@SuppressWarnings("unchecked")
Coder<W> windowCoder = (Coder<W>)
input.getWindowingStrategy().getWindowFn().windowCoder();
IsmRecordCoder<WindowedValue<FinalT>> ismCoder =
coderForSingleton(windowCoder, defaultValueCoder);
PCollection<IsmRecord<WindowedValue<FinalT>>> reifiedPerWindowAndSorted = input
.apply(new GroupByWindowHashAsKeyAndWindowAsSortKey<T, W>(ismCoder))
.apply(ParDo.of(doFn));
reifiedPerWindowAndSorted.setCoder(ismCoder);
runner.addPCollectionRequiringIndexedFormat(reifiedPerWindowAndSorted);
reifiedPerWindowAndSorted.apply(
CreateDataflowView.<IsmRecord<WindowedValue<FinalT>>, ViewT>forBatch(view));
return reifiedPerWindowAndSorted;
}
开发者ID:apache,项目名称:beam,代码行数:27,代码来源:BatchViewOverrides.java
示例15: fullWindowedValue
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
static CoderTranslator<FullWindowedValueCoder<?>> fullWindowedValue() {
return new CoderTranslator<FullWindowedValueCoder<?>>() {
@Override
public List<? extends Coder<?>> getComponents(FullWindowedValueCoder<?> from) {
return ImmutableList.of(from.getValueCoder(), from.getWindowCoder());
}
@Override
public FullWindowedValueCoder<?> fromComponents(List<Coder<?>> components) {
return WindowedValue.getFullCoder(
components.get(0), (Coder<BoundedWindow>) components.get(1));
}
};
}
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:CoderTranslators.java
示例16: testStreamFromProtoPayload
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
private static TestStream<?> testStreamFromProtoPayload(
RunnerApi.TestStreamPayload testStreamPayload, RehydratedComponents components)
throws IOException {
Coder<Object> coder = (Coder<Object>) components.getCoder(testStreamPayload.getCoderId());
List<TestStream.Event<Object>> events = new ArrayList<>();
for (RunnerApi.TestStreamPayload.Event event : testStreamPayload.getEventsList()) {
events.add(eventFromProto(event, coder));
}
return TestStream.fromRawEvents(coder, events);
}
开发者ID:apache,项目名称:beam,代码行数:14,代码来源:TestStreamTranslation.java
示例17: coderForMapLike
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
static <V> IsmRecordCoder<WindowedValue<V>> coderForMapLike(
Coder<? extends BoundedWindow> windowCoder, Coder<?> keyCoder, Coder<V> valueCoder) {
// TODO: swap to use a variable length long coder which has values which compare
// the same as their byte representation compare lexicographically within the key coder
return IsmRecordCoder.of(
1, // We use only the key for hashing when producing value records
2, // Since the key is not present, we add the window to the hash when
// producing metadata records
ImmutableList.of(
MetadataKeyCoder.of(keyCoder),
windowCoder,
BigEndianLongCoder.of()),
FullWindowedValueCoder.of(valueCoder, windowCoder));
}
开发者ID:apache,项目名称:beam,代码行数:15,代码来源:BatchViewOverrides.java
示例18: addOutput
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
@Override
public long addOutput(PCollection<?> value) {
translator.producers.put(value, translator.currentTransform);
// Wrap the PCollection element Coder inside a WindowedValueCoder.
Coder<?> coder =
WindowedValue.getFullCoder(
value.getCoder(), value.getWindowingStrategy().getWindowFn().windowCoder());
return addOutput(value, coder);
}
开发者ID:apache,项目名称:beam,代码行数:10,代码来源:DataflowPipelineTranslator.java
示例19: TestUnboundedSource
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
private TestUnboundedSource(Coder<T> coder, boolean throwOnClose, List<T> elems) {
readerAdvancedCount = 0;
readerClosedCount = 0;
this.coder = coder;
this.elems = elems;
this.throwOnClose = throwOnClose;
}
开发者ID:apache,项目名称:beam,代码行数:8,代码来源:UnboundedReadEvaluatorFactoryTest.java
示例20: encode
import org.apache.beam.sdk.coders.Coder; //导入依赖的package包/类
@Override
public void encode(InvoiceGroupingKey value, OutputStream outStream) throws IOException {
Coder<String> stringCoder = StringUtf8Coder.of();
stringCoder.encode(value.startDate(), outStream);
stringCoder.encode(value.endDate(), outStream);
stringCoder.encode(value.productAccountKey(), outStream);
stringCoder.encode(value.usageGroupingKey(), outStream);
stringCoder.encode(value.description(), outStream);
stringCoder.encode(String.valueOf(value.unitPrice()), outStream);
stringCoder.encode(value.unitPriceCurrency(), outStream);
stringCoder.encode(value.poNumber(), outStream);
}
开发者ID:google,项目名称:nomulus,代码行数:13,代码来源:BillingEvent.java
注:本文中的org.apache.beam.sdk.coders.Coder类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论