• 设为首页
  • 点击收藏
  • 手机版
    手机扫一扫访问
    迪恩网络手机版
  • 关注官方公众号
    微信扫一扫关注
    迪恩网络公众号

Java DataStreamSource类代码示例

原作者: [db:作者] 来自: [db:来源] 收藏 邀请

本文整理汇总了Java中org.apache.flink.streaming.api.datastream.DataStreamSource的典型用法代码示例。如果您正苦于以下问题:Java DataStreamSource类的具体用法?Java DataStreamSource怎么用?Java DataStreamSource使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。



DataStreamSource类属于org.apache.flink.streaming.api.datastream包,在下文中一共展示了DataStreamSource类的20个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。

示例1: main

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

    Properties properties = new Properties();
    properties.load(new FileInputStream("src/main/resources/application.properties"));

    Properties mqttProperties = new Properties();

    // client id = a:<Organization_ID>:<App_Id>
    mqttProperties.setProperty(MQTTSource.CLIENT_ID,
            String.format("a:%s:%s",
                    properties.getProperty("Org_ID"),
                    properties.getProperty("App_Id")));

    // mqtt server url = tcp://<Org_ID>.messaging.internetofthings.ibmcloud.com:1883
    mqttProperties.setProperty(MQTTSource.URL,
            String.format("tcp://%s.messaging.internetofthings.ibmcloud.com:1883",
                    properties.getProperty("Org_ID")));

    // topic = iot-2/type/<Device_Type>/id/<Device_ID>/evt/<Event_Id>/fmt/json
    mqttProperties.setProperty(MQTTSource.TOPIC,
            String.format("iot-2/type/%s/id/%s/evt/%s/fmt/json",
                    properties.getProperty("Device_Type"),
                    properties.getProperty("Device_ID"),
                    properties.getProperty("EVENT_ID")));

    mqttProperties.setProperty(MQTTSource.USERNAME, properties.getProperty("API_Key"));
    mqttProperties.setProperty(MQTTSource.PASSWORD, properties.getProperty("APP_Authentication_Token"));


    MQTTSource mqttSource = new MQTTSource(mqttProperties);
    DataStreamSource<String> tempratureDataSource = env.addSource(mqttSource);
    DataStream<String> stream = tempratureDataSource.map((MapFunction<String, String>) s -> s);
    stream.print();

    env.execute("Temperature Analysis");
}
 
开发者ID:pkhanal,项目名称:flink-watson-iot-connector,代码行数:38,代码来源:DeviceDataAnalysis.java


示例2: testEventTimeOrderedWriter

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
@Test
public void testEventTimeOrderedWriter() throws Exception {
    StreamExecutionEnvironment execEnv = StreamExecutionEnvironment.createLocalEnvironment();

    String streamName = "testEventTimeOrderedWriter";
    SETUP_UTILS.createTestStream(streamName, 1);

    DataStreamSource<Integer> dataStream = execEnv
            .addSource(new IntegerGeneratingSource(false, EVENT_COUNT_PER_SOURCE));

    FlinkPravegaWriter<Integer> pravegaSink = new FlinkPravegaWriter<>(
            SETUP_UTILS.getControllerUri(),
            SETUP_UTILS.getScope(),
            streamName,
            new IntSerializer(),
            event -> "fixedkey");

    FlinkPravegaUtils.writeToPravegaInEventTimeOrder(dataStream, pravegaSink, 1);
    Assert.assertNotNull(execEnv.getExecutionPlan());
}
 
开发者ID:pravega,项目名称:flink-connectors,代码行数:21,代码来源:FlinkPravegaWriterITCase.java


示例3: redisSinkTest

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
@Test
public void redisSinkTest() throws Exception {
    sinkThread.start();
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
        .setHost(REDIS_HOST)
        .setPort(REDIS_PORT).build();
    DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());

    RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, new RedisTestMapper());

    source.addSink(redisSink);

    env.execute("Redis Sink Test");

    assertEquals(NUM_ELEMENTS, sourceList.size());
}
 
开发者ID:apache,项目名称:bahir-flink,代码行数:18,代码来源:RedisSinkPublishITCase.java


示例4: testRedisSortedSetDataType

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
@Test
public void testRedisSortedSetDataType() throws Exception {
    DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunctionSortedSet());
    RedisSink<Tuple2<String, String>> redisZaddSink = new RedisSink<>(jedisPoolConfig,
        new RedisAdditionalDataMapper(RedisCommand.ZADD));

    source.addSink(redisZaddSink);
    env.execute("Test ZADD");

    assertEquals(NUM_ELEMENTS, jedis.zcard(REDIS_ADDITIONAL_KEY));

    RedisSink<Tuple2<String, String>> redisZremSink = new RedisSink<>(jedisPoolConfig,
            new RedisAdditionalDataMapper(RedisCommand.ZREM));

    source.addSink(redisZremSink);
    env.execute("Test ZREM");

    assertEquals(ZERO, jedis.zcard(REDIS_ADDITIONAL_KEY));

    jedis.del(REDIS_ADDITIONAL_KEY);
}
 
开发者ID:apache,项目名称:bahir-flink,代码行数:22,代码来源:RedisSinkITCase.java


示例5: createProducerTopology

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
private void createProducerTopology(StreamExecutionEnvironment env, AMQSinkConfig<String> config) {
    DataStreamSource<String> stream = env.addSource(new SourceFunction<String>() {
        @Override
        public void run(SourceContext<String> ctx) throws Exception {
            for (int i = 0; i < MESSAGES_NUM; i++) {
                ctx.collect("amq-" + i);
            }
        }

        @Override
        public void cancel() {}
    });


    AMQSink<String> sink = new AMQSink<>(config);
    stream.addSink(sink);
}
 
开发者ID:apache,项目名称:bahir-flink,代码行数:18,代码来源:ActiveMQConnectorITCase.java


示例6: main

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	DataStreamSource<Tuple2<String, Integer>> source = env.fromCollection(collection);

	CassandraSink.addSink(source)
		.setQuery(INSERT)
		.setClusterBuilder(new ClusterBuilder() {
			@Override
			protected Cluster buildCluster(Builder builder) {
				return builder.addContactPoint("127.0.0.1").build();
			}
		})
		.build();

	env.execute("WriteTupleIntoCassandra");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:CassandraTupleSinkExample.java


示例7: main

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	DataStreamSource<Message> source = env.fromCollection(messages);

	CassandraSink.addSink(source)
		.setClusterBuilder(new ClusterBuilder() {
			@Override
			protected Cluster buildCluster(Builder builder) {
				return builder.addContactPoint("127.0.0.1").build();
			}
		})
		.setMapperOptions(() -> new Mapper.Option[]{Mapper.Option.saveNullFields(true)})
		.build();

	env.execute("Cassandra Sink example");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:CassandraPojoSinkExample.java


示例8: redisSinkTest

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
@Test
public void redisSinkTest() throws Exception {
	sinkThread.start();
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
	FlinkJedisPoolConfig jedisPoolConfig = new FlinkJedisPoolConfig.Builder()
		.setHost(REDIS_HOST)
		.setPort(REDIS_PORT).build();
	DataStreamSource<Tuple2<String, String>> source = env.addSource(new TestSourceFunction());

	RedisSink<Tuple2<String, String>> redisSink = new RedisSink<>(jedisPoolConfig, new RedisTestMapper());

	source.addSink(redisSink);

	env.execute("Redis Sink Test");

	assertEquals(NUM_ELEMENTS, sourceList.size());
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:18,代码来源:RedisSinkPublishITCase.java


示例9: runTransportClientTest

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
/**
 * Tests that the Elasticsearch sink works properly using a {@link TransportClient}.
 */
public void runTransportClientTest() throws Exception {
	final String index = "transport-client-test-index";

	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());

	Map<String, String> userConfig = new HashMap<>();
	// This instructs the sink to emit after every element, otherwise they would be buffered
	userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
	userConfig.put("cluster.name", CLUSTER_NAME);

	source.addSink(createElasticsearchSinkForEmbeddedNode(
		userConfig, new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));

	env.execute("Elasticsearch TransportClient Test");

	// verify the results
	Client client = embeddedNodeEnv.getClient();
	SourceSinkDataTestKit.verifyProducedSinkData(client, index);

	client.close();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:27,代码来源:ElasticsearchSinkTestBase.java


示例10: runTransportClientFailsTest

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
/**
 * Tests whether the Elasticsearch sink fails when there is no cluster to connect to.
 */
public void runTransportClientFailsTest() throws Exception {
	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());

	Map<String, String> userConfig = new HashMap<>();
	userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
	userConfig.put("cluster.name", "my-transport-client-cluster");

	source.addSink(createElasticsearchSinkForEmbeddedNode(
		userConfig, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")));

	try {
		env.execute("Elasticsearch Transport Client Test");
	} catch (JobExecutionException expectedException) {
		assertTrue(expectedException.getCause().getMessage().contains("not connected to any Elasticsearch nodes"));
		return;
	}

	fail();
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:25,代码来源:ElasticsearchSinkTestBase.java


示例11: testKafkaConsumer

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
@Test
@SuppressWarnings("unchecked")
public void testKafkaConsumer() {
	KafkaTableSource.Builder b = getBuilder();
	configureBuilder(b);

	// assert that correct
	KafkaTableSource observed = spy(b.build());
	StreamExecutionEnvironment env = mock(StreamExecutionEnvironment.class);
	when(env.addSource(any(SourceFunction.class))).thenReturn(mock(DataStreamSource.class));
	observed.getDataStream(env);

	verify(env).addSource(any(getFlinkKafkaConsumer()));

	verify(observed).getKafkaConsumer(
		eq(TOPIC),
		eq(PROPS),
		any(getDeserializationSchema()));
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:20,代码来源:KafkaTableSourceTestBase.java


示例12: fromElements

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
/**
 * Creates a new data stream that contains the given elements. The elements must all be of the
 * same type, for example, all of the {@link String} or {@link Integer}.
 *
 * <p>The framework will try and determine the exact type from the elements. In case of generic
 * elements, it may be necessary to manually supply the type information via
 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
 *
 * <p>Note that this operation will result in a non-parallel data stream source, i.e. a data
 * stream source with a degree of parallelism one.
 *
 * @param data
 * 		The array of elements to create the data stream from.
 * @param <OUT>
 * 		The type of the returned data stream
 * @return The data stream representing the given array of elements
 */
@SafeVarargs
public final <OUT> DataStreamSource<OUT> fromElements(OUT... data) {
	if (data.length == 0) {
		throw new IllegalArgumentException("fromElements needs at least one element as argument");
	}

	TypeInformation<OUT> typeInfo;
	try {
		typeInfo = TypeExtractor.getForObject(data[0]);
	}
	catch (Exception e) {
		throw new RuntimeException("Could not create TypeInformation for type " + data[0].getClass().getName()
				+ "; please specify the TypeInformation manually via "
				+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
	}
	return fromCollection(Arrays.asList(data), typeInfo);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:35,代码来源:StreamExecutionEnvironment.java


示例13: fromCollection

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
/**
 * Creates a data stream from the given non-empty collection. The type of the data stream is that of the
 * elements in the collection.
 *
 * <p>The framework will try and determine the exact type from the collection elements. In case of generic
 * elements, it may be necessary to manually supply the type information via
 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.
 *
 * <p>Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with
 * parallelism one.
 *
 * @param data
 * 		The collection of elements to create the data stream from.
 * @param <OUT>
 *     The generic type of the returned data stream.
 * @return
 *     The data stream representing the given collection
 */
public <OUT> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
	Preconditions.checkNotNull(data, "Collection must not be null");
	if (data.isEmpty()) {
		throw new IllegalArgumentException("Collection must not be empty");
	}

	OUT first = data.iterator().next();
	if (first == null) {
		throw new IllegalArgumentException("Collection must not contain null elements");
	}

	TypeInformation<OUT> typeInfo;
	try {
		typeInfo = TypeExtractor.getForObject(first);
	}
	catch (Exception e) {
		throw new RuntimeException("Could not create TypeInformation for type " + first.getClass()
				+ "; please specify the TypeInformation manually via "
				+ "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
	}
	return fromCollection(data, typeInfo);
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:41,代码来源:StreamExecutionEnvironment.java


示例14: testTransportClientFails

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
@Test(expected = JobExecutionException.class)
public void testTransportClientFails() throws Exception{
	// this checks whether the TransportClient fails early when there is no cluster to
	// connect to. We don't hava such as test for the Node Client version since that
	// one will block and wait for a cluster to come online

	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());

	Map<String, String> config = Maps.newHashMap();
	// This instructs the sink to emit after every element, otherwise they would be buffered
	config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
	config.put("cluster.name", "my-node-client-cluster");

	// connect to our local node
	config.put("node.local", "true");

	List<TransportAddress> transports = Lists.newArrayList();
	transports.add(new LocalTransportAddress("1"));

	source.addSink(new ElasticsearchSink<>(config, transports, new TestIndexRequestBuilder()));

	env.execute("Elasticsearch Node Client Test");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:26,代码来源:ElasticsearchSinkITCase.java


示例15: testTransportClientFails

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
@Test(expected = JobExecutionException.class)
public void testTransportClientFails() throws Exception{
	// this checks whether the TransportClient fails early when there is no cluster to
	// connect to. There isn't a similar test for the Node Client version since that
	// one will block and wait for a cluster to come online

	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	DataStreamSource<Tuple2<Integer, String>> source = env.addSource(new TestSourceFunction());

	Map<String, String> config = new HashMap<>();
	// This instructs the sink to emit after every element, otherwise they would be buffered
	config.put(ElasticsearchSink.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
	config.put("cluster.name", "my-node-client-cluster");

	List<InetSocketAddress> transports = new ArrayList<>();
	transports.add(new InetSocketAddress(InetAddress.getByName("127.0.0.1"), 9300));

	source.addSink(new ElasticsearchSink<>(config, transports, new TestElasticsearchSinkFunction()));

	env.execute("Elasticsearch Node Client Test");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:23,代码来源:ElasticsearchSinkITCase.java


示例16: main

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
	StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

	DataStreamSource<Message> source = env.fromCollection(messages);

	CassandraSink.addSink(source)
		.setClusterBuilder(new ClusterBuilder() {
			@Override
			protected Cluster buildCluster(Builder builder) {
				return builder.addContactPoint("127.0.0.1").build();
			}
		})
		.build();

	env.execute("Cassandra Sink example");
}
 
开发者ID:axbaretto,项目名称:flink,代码行数:17,代码来源:CassandraPojoSinkExample.java


示例17: fromCollectionWithTimestamp

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
/**
 * Creates a data stream from the given non-empty collection. The type of the data stream is that of the
 * elements in the collection.
 * <p>
 * <p>The framework will try and determine the exact type from the collection elements. In case of generic
 * elements, it may be necessary to manually supply the type information via
 * {@link #fromCollection(java.util.Collection, org.apache.flink.api.common.typeinfo.TypeInformation)}.</p>
 * <p>
 * <p>Note that this operation will result in a non-parallel data stream source, i.e. a data stream source with a
 * parallelism one.</p>
 *
 * @param <OUT>        The generic type of the returned data stream.
 * @param data         The collection of elements to startWith the data stream from.
 * @param flushWindows Specifies whether open windows should be flushed on termination.
 * @return The data stream representing the given collection
 */
public <OUT> DataStreamSource<OUT> fromCollectionWithTimestamp(Collection<StreamRecord<OUT>> data, Boolean flushWindows) {
    Preconditions.checkNotNull(data, "Collection must not be null");
    if (data.isEmpty()) {
        throw new IllegalArgumentException("Collection must not be empty");
    }

    StreamRecord<OUT> first = data.iterator().next();
    if (first == null) {
        throw new IllegalArgumentException("Collection must not contain null elements");
    }

    TypeInformation<OUT> typeInfo;
    try {
        typeInfo = TypeExtractor.getForObject(first.getValue());
    } catch (Exception e) {
        throw new RuntimeException("Could not startWith TypeInformation for type " + first.getClass()
                + "; please specify the TypeInformation manually via "
                + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)");
    }
    return fromCollectionWithTimestamp(data, typeInfo, flushWindows);
}
 
开发者ID:ottogroup,项目名称:flink-spector,代码行数:38,代码来源:DataStreamTestEnvironment.java


示例18: fromElements

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
/**
 * Creates a new DataStream that contains the given elements. The elements
 * must all be of the same type, for example, all of the String or Integer.
 * The sequence of elements must not be empty. Furthermore, the elements
 * must be serializable (as defined in java.io.Serializable), because the
 * execution environment may ship the elements into the cluster.
 * 
 * @param data
 *            The collection of elements to create the DataStream from.
 * @param <OUT>
 *            type of the returned stream
 * @return The DataStream representing the elements.
 */
public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
	if (data.length == 0) {
		throw new IllegalArgumentException(
				"fromElements needs at least one element as argument");
	}

	TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data[0]);
	DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
			outTypeWrapper);

	try {
		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
		jobGraphBuilder.addStreamVertex(returnStream.getId(),
				new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
				SerializationUtils.serialize(function), 1);
	} catch (SerializationException e) {
		throw new RuntimeException("Cannot serialize elements");
	}
	return returnStream;
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:34,代码来源:StreamExecutionEnvironment.java


示例19: fromCollection

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
/**
 * Creates a DataStream from the given non-empty collection. The type of the
 * DataStream is that of the elements in the collection. The elements need
 * to be serializable (as defined by java.io.Serializable), because the
 * framework may move the elements into the cluster if needed.
 * 
 * @param data
 *            The collection of elements to create the DataStream from.
 * @param <OUT>
 *            type of the returned stream
 * @return The DataStream representing the elements.
 */
public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
	if (data == null) {
		throw new NullPointerException("Collection must not be null");
	}

	if (data.isEmpty()) {
		throw new IllegalArgumentException("Collection must not be empty");
	}

	TypeWrapper<OUT> outTypeWrapper = new ObjectTypeWrapper<OUT>(data.iterator().next());
	DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements",
			outTypeWrapper);

	try {
		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);

		jobGraphBuilder.addStreamVertex(returnStream.getId(), new SourceInvokable<OUT>(
				new FromElementsFunction<OUT>(data)), null, new ObjectTypeWrapper<OUT>(data
				.iterator().next()), "source", SerializationUtils.serialize(function), 1);
	} catch (SerializationException e) {
		throw new RuntimeException("Cannot serialize collection");
	}

	return returnStream;
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:38,代码来源:StreamExecutionEnvironment.java


示例20: addSource

import org.apache.flink.streaming.api.datastream.DataStreamSource; //导入依赖的package包/类
/**
 * Ads a data source thus opening a {@link DataStream}.
 * 
 * @param function
 *            the user defined function
 * @param parallelism
 *            number of parallel instances of the function
 * @param <OUT>
 *            type of the returned stream
 * @return the data stream constructed
 */
public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, int parallelism) {
	TypeWrapper<OUT> outTypeWrapper = new FunctionTypeWrapper<OUT>(function,
			SourceFunction.class, 0);
	DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source",
			outTypeWrapper);

	try {
		jobGraphBuilder.addStreamVertex(returnStream.getId(),
				new SourceInvokable<OUT>(function), null, outTypeWrapper, "source",
				SerializationUtils.serialize(function), parallelism);
	} catch (SerializationException e) {
		throw new RuntimeException("Cannot serialize SourceFunction");
	}

	return returnStream;
}
 
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:28,代码来源:StreamExecutionEnvironment.java



注:本文中的org.apache.flink.streaming.api.datastream.DataStreamSource类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。


鲜花

握手

雷人

路过

鸡蛋
该文章已有0人参与评论

请发表评论

全部评论

专题导读
上一篇:
Java EntityLiving类代码示例发布时间:2022-05-22
下一篇:
Java VimPortType类代码示例发布时间:2022-05-22
热门推荐
阅读排行榜

扫描微信二维码

查看手机版网站

随时了解更新最新资讯

139-2527-9053

在线客服(服务时间 9:00~18:00)

在线QQ客服
地址:深圳市南山区西丽大学城创智工业园
电邮:jeky_zhao#qq.com
移动电话:139-2527-9053

Powered by 互联科技 X3.4© 2001-2213 极客世界.|Sitemap