• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java View类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.beam.sdk.transforms.View的典型用法代码示例。如果您正苦于以下问题:Java View类的具体用法?Java View怎么用?Java View使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



View类属于org.apache.beam.sdk.transforms包,在下文中一共展示了View类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: filterAlreadyProcessedUrls

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
/**
 * @param options
 * @param pipeline
 * @param readContent
 * @return
 */
private static PCollection<InputContent> filterAlreadyProcessedUrls(
		PCollection<InputContent> readContent, Pipeline pipeline, 
		IndexerPipelineOptions options) {
	PCollection<InputContent> contentToProcess;
	String query = IndexerPipelineUtils.buildBigQueryProcessedUrlsQuery(options);
	PCollection<KV<String,Long>> alreadyProcessedUrls = pipeline
		.apply("Get processed URLs",BigQueryIO.read().fromQuery(query))
		.apply(ParDo.of(new GetUrlFn()));

	final PCollectionView<Map<String,Long>> alreadyProcessedUrlsSideInput =
		alreadyProcessedUrls.apply(View.<String,Long>asMap());
	  
	contentToProcess = readContent
		.apply(ParDo.of(new FilterProcessedUrls(alreadyProcessedUrlsSideInput))
			.withSideInputs(alreadyProcessedUrlsSideInput));
	return contentToProcess;
}
 
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:24,代码来源:IndexerPipeline.java


示例2: expand

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override
public PCollection<List<ResultT>> expand(PCollection<ResultT> input) {
  if (getWindowedWrites()) {
    // Reshuffle the results to make them stable against retries.
    // Use a single void key to maximize size of bundles for finalization.
    return input
        .apply("Add void key", WithKeys.<Void, ResultT>of((Void) null))
        .apply("Reshuffle", Reshuffle.<Void, ResultT>of())
        .apply("Drop key", Values.<ResultT>create())
        .apply("Gather bundles", ParDo.of(new GatherBundlesPerWindowFn<ResultT>()))
        .setCoder(ListCoder.of(resultCoder))
        // Reshuffle one more time to stabilize the contents of the bundle lists to finalize.
        .apply(Reshuffle.<List<ResultT>>viaRandomKey());
  } else {
    // Pass results via a side input rather than reshuffle, because we need to get an empty
    // iterable to finalize if there are no results.
    return input
        .getPipeline()
        .apply(
            Reify.viewInGlobalWindow(
                input.apply(View.<ResultT>asList()), ListCoder.of(resultCoder)));
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:WriteFiles.java


示例3: expand

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override
public PCollection<Iterable<T>> expand(PCollection<T> input) {
  final PCollectionView<Iterable<T>> view = input.apply(View.<T>asIterable());
  return input
      .getPipeline()
      .apply(Create.of((Void) null).withCoder(VoidCoder.of()))
      .apply(
          ParDo.of(
                  new DoFn<Void, Iterable<T>>() {
                    @ProcessElement
                    public void processElement(ProcessContext c) {
                      c.output(c.sideInput(view));
                    }
                  })
              .withSideInputs(view));
}
 
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:ReifyAsIterable.java


示例4: testPassThroughThenCleanup

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Test
public void testPassThroughThenCleanup() throws Exception {

  PCollection<Integer> output =
      p.apply(Create.of(1, 2, 3))
          .apply(
              new PassThroughThenCleanup<Integer>(
                  new PassThroughThenCleanup.CleanupOperation() {
                    @Override
                    void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
                      // no-op
                    }
                  },
                  p.apply("Create1", Create.of("")).apply(View.<String>asSingleton())));

  PAssert.that(output).containsInAnyOrder(1, 2, 3);

  p.run();
}
 
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:BigQueryIOReadTest.java


示例5: testPassThroughThenCleanupExecuted

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Test
public void testPassThroughThenCleanupExecuted() throws Exception {

  p.apply(Create.empty(VarIntCoder.of()))
      .apply(
          new PassThroughThenCleanup<Integer>(
              new PassThroughThenCleanup.CleanupOperation() {
                @Override
                void cleanup(PassThroughThenCleanup.ContextContainer c) throws Exception {
                  throw new RuntimeException("cleanup executed");
                }
              },
              p.apply("Create1", Create.of("")).apply(View.<String>asSingleton())));

  thrown.expect(RuntimeException.class);
  thrown.expectMessage("cleanup executed");

  p.run();
}
 
开发者ID:apache,项目名称:beam,代码行数:20,代码来源:BigQueryIOReadTest.java


示例6: expand

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override public PCollection<KV<String, String>> expand(PCollection<KV<String, String>> input) {
  // reparallelize mimics the same behavior as in JdbcIO
  // breaking fusion
  PCollectionView<Iterable<KV<String, String>>> empty = input
      .apply("Consume",
          Filter.by(SerializableFunctions.<KV<String, String>, Boolean>constant(false)))
      .apply(View.<KV<String, String>>asIterable());
  PCollection<KV<String, String>> materialized = input
      .apply("Identity", ParDo.of(new DoFn<KV<String, String>, KV<String, String>>() {
        @ProcessElement
        public void processElement(ProcessContext context) {
          context.output(context.element());
        }
  }).withSideInputs(empty));
  return materialized.apply(Reshuffle.<KV<String, String>>viaRandomKey());
}
 
开发者ID:apache,项目名称:beam,代码行数:17,代码来源:RedisIO.java


示例7: multiMultiParDo

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
private static AppliedPTransform<?, ?, ?> multiMultiParDo(Pipeline pipeline) {
  PCollectionView<String> view =
      pipeline.apply(Create.of("foo")).apply(View.<String>asSingleton());
  PCollection<Long> input = pipeline.apply(GenerateSequence.from(0));
  ParDo.MultiOutput<Long, KV<Long, String>> parDo =
      ParDo.of(new TestDoFn())
          .withSideInputs(view)
          .withOutputTags(
              new TupleTag<KV<Long, String>>() {},
              TupleTagList.of(new TupleTag<KV<String, Long>>() {}));
  PCollectionTuple output = input.apply(parDo);

  Map<TupleTag<?>, PValue> inputs = new HashMap<>();
  inputs.putAll(parDo.getAdditionalInputs());
  inputs.putAll(input.expand());

  return AppliedPTransform
      .<PCollection<Long>, PCollectionTuple, ParDo.MultiOutput<Long, KV<Long, String>>>of(
          "MultiParDoInAndOut", inputs, output.expand(), parDo, pipeline);
}
 
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:PTransformTranslationTest.java


示例8: getDefaultOverrides

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
public static List<PTransformOverride> getDefaultOverrides(boolean streaming) {
  if (streaming) {
    return ImmutableList.<PTransformOverride>builder()
        .add(
            PTransformOverride.of(
                PTransformMatchers.splittableParDoMulti(),
                new FlinkStreamingPipelineTranslator.SplittableParDoOverrideFactory()))
        .add(
            PTransformOverride.of(
                PTransformMatchers.classEqualTo(SplittableParDo.ProcessKeyedElements.class),
                new SplittableParDoViaKeyedWorkItems.OverrideFactory()))
        .add(
            PTransformOverride.of(
                PTransformMatchers.classEqualTo(View.CreatePCollectionView.class),
                new CreateStreamingFlinkView.Factory()))
        .build();
  } else {
    return ImmutableList.of();
  }
}
 
开发者ID:apache,项目名称:beam,代码行数:21,代码来源:FlinkTransformOverrides.java


示例9: visitValue

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override
public void visitValue(PValue value, TransformHierarchy.Node producer) {
  LOG.debug("Checking translation of {}", value);
  // Primitive transforms are the only ones assigned step names.
  if (producer.getTransform() instanceof CreateDataflowView
      && !hasExperiment(options, "beam_fn_api")) {
    // CreateDataflowView produces a dummy output (as it must be a primitive transform)
    // but in the Dataflow Job graph produces only the view and not the output PCollection.
    asOutputReference(
        ((CreateDataflowView) producer.getTransform()).getView(),
        producer.toAppliedPTransform(getPipeline()));
    return;
  } else if (producer.getTransform() instanceof View.CreatePCollectionView
      && hasExperiment(options, "beam_fn_api")) {
    // View.CreatePCollectionView produces a dummy output (as it must be a primitive transform)
    // but in the Dataflow Job graph produces only the view and not the output PCollection.
    asOutputReference(
        ((View.CreatePCollectionView) producer.getTransform()).getView(),
        producer.toAppliedPTransform(getPipeline()));
    return;
  }
  asOutputReference(value, producer.toAppliedPTransform(getPipeline()));
}
 
开发者ID:apache,项目名称:beam,代码行数:24,代码来源:DataflowPipelineTranslator.java


示例10: getViewsReturnsViews

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Test
public void getViewsReturnsViews() {
  PCollectionView<List<String>> listView =
      p.apply("listCreate", Create.of("foo", "bar"))
          .apply(
              ParDo.of(
                  new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(DoFn<String, String>.ProcessContext c)
                        throws Exception {
                      c.output(Integer.toString(c.element().length()));
                    }
                  }))
          .apply(View.<String>asList());
  PCollectionView<Object> singletonView =
      p.apply("singletonCreate", Create.<Object>of(1, 2, 3)).apply(View.<Object>asSingleton());
  p.replaceAll(
      DirectRunner.fromOptions(TestPipeline.testingPipelineOptions())
          .defaultTransformOverrides());
  p.traverseTopologically(visitor);
  assertThat(
      visitor.getGraph().getViews(),
      Matchers.<PCollectionView<?>>containsInAnyOrder(listView, singletonView));
}
 
开发者ID:apache,项目名称:beam,代码行数:25,代码来源:DirectGraphVisitorTest.java


示例11: setup

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Before
public void setup() {
  MockitoAnnotations.initMocks(this);

  PCollection<Integer> create =
      pipeline.apply("forBaseCollection", Create.<Integer>of(1, 2, 3, 4));

  mapView =
      create.apply("forKeyTypes", WithKeys.<String, Integer>of("foo"))
          .apply("asMapView", View.<String, Integer>asMap());

  singletonView = create.apply("forCombinedTypes", Mean.<Integer>globally().asSingletonView());
  iterableView = create.apply("asIterableView", View.<Integer>asIterable());

  container = SideInputContainer.create(
      context, ImmutableList.of(iterableView, mapView, singletonView));
}
 
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:SideInputContainerTest.java


示例12: writeForMultipleIdenticalElementsInSameWindowSucceeds

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Test
public void writeForMultipleIdenticalElementsInSameWindowSucceeds() throws Exception {
  ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
  for (Object materializedValue : materializeValuesFor(View.asIterable(), 44, 44)) {
    valuesBuilder.add(WindowedValue.of(
        materializedValue,
        FIRST_WINDOW.maxTimestamp().minus(200L),
        FIRST_WINDOW,
        PaneInfo.ON_TIME_AND_ONLY_FIRING));
  }
  container.write(iterableView, valuesBuilder.build());

  assertThat(
      container
          .createReaderForViews(ImmutableList.<PCollectionView<?>>of(iterableView))
          .get(iterableView, FIRST_WINDOW),
      contains(44, 44));
}
 
开发者ID:apache,项目名称:beam,代码行数:19,代码来源:SideInputContainerTest.java


示例13: writeForElementInMultipleWindowsSucceeds

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Test
public void writeForElementInMultipleWindowsSucceeds() throws Exception {
  ImmutableList.Builder<WindowedValue<?>> valuesBuilder = ImmutableList.builder();
  for (Object materializedValue : materializeValuesFor(View.asSingleton(), 2.875)) {
    valuesBuilder.add(WindowedValue.of(
        materializedValue,
        FIRST_WINDOW.maxTimestamp().minus(200L),
        ImmutableList.of(FIRST_WINDOW, SECOND_WINDOW),
        PaneInfo.ON_TIME_AND_ONLY_FIRING));
  }
  container.write(singletonView, valuesBuilder.build());
  assertThat(
      container
          .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
          .get(singletonView, FIRST_WINDOW),
      equalTo(2.875));
  assertThat(
      container
          .createReaderForViews(ImmutableList.<PCollectionView<?>>of(singletonView))
          .get(singletonView, SECOND_WINDOW),
      equalTo(2.875));
}
 
开发者ID:apache,项目名称:beam,代码行数:23,代码来源:SideInputContainerTest.java


示例14: expand

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override
public PCollectionView<Integer> expand(PCollection<T> input) {
    return input
            .getPipeline()
            .apply(Create.of(0))
            .apply(
                    "FixedNumShards",
                    ParDo.of(
                            new DoFn<Integer, Integer>() {
                                @ProcessElement
                                public void outputNumShards(ProcessContext ctxt) {
                                    checkArgument(
                                            numShards.isAccessible(),
                                            "NumShards must be accessible at runtime to use constant sharding");
                                    ctxt.output(numShards.get());
                                }
                            }))
            .apply(View.<Integer>asSingleton());
}
 
开发者ID:Talend,项目名称:components,代码行数:20,代码来源:Write.java


示例15: filterAlreadyProcessedDocuments

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
/**
 * @param contentToIndexNotSkipped
 * @param contentNotToIndexSkipped
 * @param pipeline
 * @param options
 * @return
 */
private static ContentToIndexOrNot filterAlreadyProcessedDocuments(
		PCollection<InputContent> contentToIndexNotSkipped, PCollection<InputContent> contentNotToIndexSkipped,
		Pipeline pipeline, IndexerPipelineOptions options) {
	PCollection<KV<String,Long>> alreadyProcessedDocs = null;
	
	if (!options.getWriteTruncate()) {
		String query = IndexerPipelineUtils.buildBigQueryProcessedDocsQuery(options);
		alreadyProcessedDocs = pipeline
			.apply("Get already processed Documents",BigQueryIO.read().fromQuery(query))
			.apply(ParDo.of(new GetDocumentHashFn()));

	} else {
		Map<String, Long> map = new HashMap<String,Long>();
		alreadyProcessedDocs = pipeline
			.apply("Create empty side input of Docs",
				Create.of(map).withCoder(KvCoder.of(StringUtf8Coder.of(),VarLongCoder.of())));
	}			
	
	final PCollectionView<Map<String,Long>> alreadyProcessedDocsSideInput =  
		alreadyProcessedDocs.apply(View.<String,Long>asMap());
	
	PCollectionTuple indexOrNotBasedOnExactDupes = contentToIndexNotSkipped
		.apply("Extract DocumentHash key", ParDo.of(new GetInputContentDocumentHashFn()))
		.apply("Group by DocumentHash key", GroupByKey.<String, InputContent>create())
		.apply("Eliminate InputContent Dupes", ParDo.of(new EliminateInputContentDupes(alreadyProcessedDocsSideInput))
			.withSideInputs(alreadyProcessedDocsSideInput)
			.withOutputTags(PipelineTags.contentToIndexNotExactDupesTag, // main output collection
				TupleTagList.of(PipelineTags.contentNotToIndexExactDupesTag))); // side output collection	
	
	PCollection<InputContent> contentToIndexNotExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentToIndexNotExactDupesTag);
	PCollection<InputContent> contentNotToIndexExactDupes = indexOrNotBasedOnExactDupes.get(PipelineTags.contentNotToIndexExactDupesTag);
	
	// Merge the sets of items that are dupes or skipped
	PCollectionList<InputContent> contentNotToIndexList = PCollectionList.of(contentNotToIndexExactDupes).and(contentNotToIndexSkipped);
	
	ContentToIndexOrNot content = new ContentToIndexOrNot(contentToIndexNotExactDupes, contentNotToIndexList.apply(Flatten.<InputContent>pCollections()));
	return content;
}
 
开发者ID:GoogleCloudPlatform,项目名称:dataflow-opinion-analysis,代码行数:46,代码来源:IndexerPipeline.java


示例16: expand

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override
public PCollection<TableRow> expand(PCollection<TableRow> rows) {

  // Extract the mean_temp from each row.
  PCollection<Double> meanTemps = rows.apply(
      ParDo.of(new ExtractTempFn()));

  // Find the global mean, of all the mean_temp readings in the weather data,
  // and prepare this singleton PCollectionView for use as a side input.
  final PCollectionView<Double> globalMeanTemp =
      meanTemps.apply(Mean.<Double>globally())
           .apply(View.<Double>asSingleton());

  // Rows filtered to remove all but a single month
  PCollection<TableRow> monthFilteredRows = rows
      .apply(ParDo.of(new FilterSingleMonthDataFn(monthFilter)));

  // Then, use the global mean as a side input, to further filter the weather data.
  // By using a side input to pass in the filtering criteria, we can use a value
  // that is computed earlier in pipeline execution.
  // We'll only output readings with temperatures below this mean.
  PCollection<TableRow> filteredRows = monthFilteredRows
      .apply("ParseAndFilter", ParDo
          .of(new DoFn<TableRow, TableRow>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
              Double meanTemp = Double.parseDouble(c.element().get("mean_temp").toString());
              Double gTemp = c.sideInput(globalMeanTemp);
              if (meanTemp < gTemp) {
                c.output(c.element());
              }
            }
          }).withSideInputs(globalMeanTemp));

  return filteredRows;
}
 
开发者ID:apache,项目名称:beam,代码行数:37,代码来源:FilterExamples.java


示例17: sideInputJoinHelper

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
private PCollection<BeamRecord> sideInputJoinHelper(
    JoinRelType joinType,
    PCollection<KV<BeamRecord, BeamRecord>> leftRows,
    PCollection<KV<BeamRecord, BeamRecord>> rightRows,
    BeamRecord rightNullRow, boolean swapped) {
  final PCollectionView<Map<BeamRecord, Iterable<BeamRecord>>> rowsView = rightRows
      .apply(View.<BeamRecord, BeamRecord>asMultimap());

  PCollection<BeamRecord> ret = leftRows
      .apply(ParDo.of(new BeamJoinTransforms.SideInputJoinDoFn(
          joinType, rightNullRow, rowsView, swapped)).withSideInputs(rowsView))
      .setCoder(CalciteUtils.toBeamRowType(getRowType()).getRecordCoder());

  return ret;
}
 
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:BeamJoinRel.java


示例18: thatMultimap

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
/**
 * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection} with the
 * specified reason.
 *
 * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any
 * {@code Coder<K, V>}.
 */
public static <K, V> SingletonAssert<Map<K, Iterable<V>>> thatMultimap(
    String reason, PCollection<KV<K, V>> actual) {
  @SuppressWarnings("unchecked")
  KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
  return new PCollectionViewAssert<>(
      actual,
      View.<K, V>asMultimap(),
      MapCoder.of(kvCoder.getKeyCoder(), IterableCoder.of(kvCoder.getValueCoder())),
      PAssertionSite.capture(reason));
}
 
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:PAssert.java


示例19: thatMap

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
/**
 * Constructs a {@link SingletonAssert} for the value of the provided {@link PCollection} with
 * the specified reason. The {@link PCollection} must have at most one value per key.
 *
 * <p>Note that the actual value must be coded by a {@link KvCoder}, not just any
 * {@code Coder<K, V>}.
 */
public static <K, V> SingletonAssert<Map<K, V>> thatMap(
    String reason, PCollection<KV<K, V>> actual) {
  @SuppressWarnings("unchecked")
  KvCoder<K, V> kvCoder = (KvCoder<K, V>) actual.getCoder();
  return new PCollectionViewAssert<>(
      actual,
      View.<K, V>asMap(),
      MapCoder.of(kvCoder.getKeyCoder(), kvCoder.getValueCoder()),
      PAssertionSite.capture(reason));
}
 
开发者ID:apache,项目名称:beam,代码行数:18,代码来源:PAssert.java


示例20: expand

import org.apache.beam.sdk.transforms.View; //导入依赖的package包/类
@Override
public PCollectionView<Integer> expand(PCollection<String> input) {
  return input
      .apply(
          ParDo.of(
              new DoFn<String, Integer>() {
                @ProcessElement
                public void toInteger(ProcessContext ctxt) {
                  ctxt.output(Integer.valueOf(ctxt.element()));
                }
              }))
      .apply(Top.<Integer>largest(1))
      .apply(Flatten.<Integer>iterables())
      .apply(View.<Integer>asSingleton());
}
 
开发者ID:apache,项目名称:beam,代码行数:16,代码来源:WriteFilesTest.java



注:本文中的org.apache.beam.sdk.transforms.View类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java FeedbackManager类代码示例发布时间:2022-05-23
下一篇:
Java AnchorLinkNode类代码示例发布时间:2022-05-23
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap