本文整理汇总了Java中org.apache.flink.streaming.connectors.twitter.TwitterSource类的典型用法代码示例。如果您正苦于以下问题:Java TwitterSource类的具体用法?Java TwitterSource怎么用?Java TwitterSource使用的例子?那么恭喜您, 这里精选的类代码示例或许可以为您提供帮助。
TwitterSource类属于org.apache.flink.streaming.connectors.twitter包,在下文中一共展示了TwitterSource类的5个代码示例,这些例子默认根据受欢迎程度排序。您可以为喜欢或者感觉有用的代码点赞,您的评价将有助于我们的系统推荐出更棒的Java代码示例。
示例1: main
import org.apache.flink.streaming.connectors.twitter.TwitterSource; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// set up the execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.setProperty(TwitterSource.CONSUMER_KEY, "...");
props.setProperty(TwitterSource.CONSUMER_SECRET, "...");
props.setProperty(TwitterSource.TOKEN, "...");
props.setProperty(TwitterSource.TOKEN_SECRET, "...");
env.addSource(new TwitterSource(props))
.flatMap(new ExtractHashTags())
.keyBy(0)
.timeWindow(Time.seconds(30))
.sum(1)
.filter(new FilterHashTags())
.timeWindowAll(Time.seconds(30))
.apply(new GetTopHashTag())
.print();
env.execute();
}
开发者ID:mushketyk,项目名称:flink-examples,代码行数:23,代码来源:TopTweet.java
示例2: main
import org.apache.flink.streaming.connectors.twitter.TwitterSource; //导入依赖的package包/类
public static void main(String args[]) throws Exception {
ParameterTool tool = ParameterTool.fromArgs(args);
Properties p = new Properties();
p.setProperty(TwitterSource.CONSUMER_KEY, tool.getRequired("consumer.key"));
p.setProperty(TwitterSource.CONSUMER_SECRET, tool.getRequired("consumer.secret"));
p.setProperty(TwitterSource.TOKEN, tool.getRequired("token"));
p.setProperty(TwitterSource.TOKEN_SECRET, tool.getRequired("token.secret"));
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
DataStream<String> streamSource = env.addSource(new TwitterSource(p));
// Every 10 minutes, most trending of last 1 hour of tweets.
SlidingEventTimeWindows windowSpec = SlidingEventTimeWindows.of(Time.hours(1), Time.minutes(10));
streamSource.flatMap(hashTagExtraction())
.keyBy(0)
.sum(1)
.windowAll(windowSpec)
.maxBy(1)
.writeAsText("file:///Users/abij/projects/tryouts/flink-streaming-xke/hot-hashtags.log", OVERWRITE);
env.execute("Twitter Stream");
}
开发者ID:godatadriven,项目名称:flink-streaming-xke,代码行数:27,代码来源:TwitterStreamProcessing.java
示例3: main
import org.apache.flink.streaming.connectors.twitter.TwitterSource; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// set up the streaming execution environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool params = ParameterTool.fromPropertiesFile(args[0]);
DataStream<String> twitterStreamString = env.addSource(new TwitterSource(params.getProperties()));
DataStream<String> filteredStream = twitterStreamString.flatMap(new ParseJson());
filteredStream.flatMap(new ThroughputLogger(5000L)).setParallelism(1);
filteredStream.addSink(new FlinkKafkaProducer09<>("twitter", new SimpleStringSchema(), params.getProperties()));
// execute program
env.execute("Ingest data from Twitter to Kafka");
}
开发者ID:rmetzger,项目名称:flink-streaming-etl,代码行数:15,代码来源:TwitterIntoKafka.java
示例4: getTextDataStream
import org.apache.flink.streaming.connectors.twitter.TwitterSource; //导入依赖的package包/类
private static DataStream<String> getTextDataStream(
StreamExecutionEnvironment env) {
if (fromFile) {
// read the text file from given input path
return env.addSource(new TwitterSource(path));
} else {
// get default test text data
return env.fromElements(TwitterStreamData.TEXTS);
}
}
开发者ID:citlab,项目名称:vs.msc.ws14,代码行数:11,代码来源:TwitterStream.java
示例5: main
import org.apache.flink.streaming.connectors.twitter.TwitterSource; //导入依赖的package包/类
public static void main(String[] args) throws Exception {
// Checking input parameters
final ParameterTool params = ParameterTool.fromArgs(args);
System.out.println("Usage: TwitterExample [--output <path>] " +
"[--twitter-source.consumerKey <key> --twitter-source.consumerSecret <secret> --twitter-source.token <token> --twitter-source.tokenSecret <tokenSecret>]");
// set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// make parameters available in the web interface
env.getConfig().setGlobalJobParameters(params);
env.setParallelism(params.getInt("parallelism", 1));
// get input data
DataStream<String> streamSource;
if (params.has(TwitterSource.CONSUMER_KEY) &&
params.has(TwitterSource.CONSUMER_SECRET) &&
params.has(TwitterSource.TOKEN) &&
params.has(TwitterSource.TOKEN_SECRET)
) {
streamSource = env.addSource(new TwitterSource(params.getProperties()));
} else {
System.out.println("Executing TwitterStream example with default props.");
System.out.println("Use --twitter-source.consumerKey <key> --twitter-source.consumerSecret <secret> " +
"--twitter-source.token <token> --twitter-source.tokenSecret <tokenSecret> specify the authentication info.");
// get default test text data
streamSource = env.fromElements(TwitterExampleData.TEXTS);
}
DataStream<Tuple2<String, Integer>> tweets = streamSource
// selecting English tweets and splitting to (word, 1)
.flatMap(new SelectEnglishAndTokenizeFlatMap())
// group by words and sum their occurrences
.keyBy(0).sum(1);
// emit result
if (params.has("output")) {
tweets.writeAsText(params.get("output"));
} else {
System.out.println("Printing result to stdout. Use --output to specify output path.");
tweets.print();
}
// execute program
env.execute("Twitter Streaming Example");
}
开发者ID:axbaretto,项目名称:flink,代码行数:49,代码来源:TwitterExample.java
注:本文中的org.apache.flink.streaming.connectors.twitter.TwitterSource类示例整理自Github/MSDocs等源码及文档管理平台,相关代码片段筛选自各路编程大神贡献的开源项目,源码版权归原作者所有,传播和使用请参考对应项目的License;未经允许,请勿转载。 |
请发表评论