代码开发
Quick Start
Installation
最简单的引入 Flink 依赖项的方式就是利用 Maven 或者 Gradle:
<!-- Use this dependency if you are using the DataStream API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.10</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
<!-- Use this dependency if you are using the DataSet API -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.10</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
不过需要注意的是,由于 Scala 2.11 编译版本与 2.10 版本无法兼容,因此在 Flink 的依赖项后面也加了个后缀来表示使用的 Scala 版本,你可以选择需要的 Scala 版本进行操作。
WordCount
Flink 有个方便的地方就是能够直接在本地运行而不需要提交到集群上,下面的测试程序直接右键点击 Run 即可。
Stream
这里展示的是经典的 WordCount 程序:
public class WordCount {
public static void main(String[] args) throws Exception {
// set up the execution environment
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
DataSet<String> text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
);
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new LineSplitter())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
// execute and print result
counts.print();
}
public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
Window Word Count
public class WindowWordCount {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String, Integer>> dataStream = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter()) //将Sentence转化为Collector流
.keyBy(0) //将Collector中的Tuple2按照word排序
.timeWindow(Time.seconds(5))
.sum(1); //进行求和操作
dataStream.print();
env.execute("Window WordCount");
}
public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
for (String word : sentence.split(" ")) {
//每遇到1个词,将它设置加1
out.collect(new Tuple2<String, Integer>(word, 1));
}
}
}
}
Submit Jobs
Command-Line Interface
笔者建议可以将 Flink 的命令添加到全局:
export FLINK_HOME=/Users/apple/Desktop/Tools/SDK/Flink/flink-1.0.3
export PATH=$PATH:$FLINK_HOME/bin
完整的参数列表列举如下:
-
Run example program with no arguments.
./bin/flink run ./examples/batch/WordCount.jar
-
Run example program with arguments for input and result files
./bin/flink run ./examples/batch/WordCount.jar \ file:///home/user/hamlet.txt file:///home/user/wordcount_out
-
Run example program with parallelism 16 and arguments for input and result files
./bin/flink run -p 16 ./examples/batch/WordCount.jar \ file:///home/user/hamlet.txt file:///home/user/wordcount_out
-
Run example program with flink log output disabled
./bin/flink run -q ./examples/batch/WordCount.jar
-
Run example program in detached mode
./bin/flink run -d ./examples/batch/WordCount.jar
-
Run example program on a specific JobManager:
./bin/flink run -m myJMHost:6123 \ ./examples/batch/WordCount.jar \ file:///home/user/hamlet.txt file:///home/user/wordcount_out
-
Run example program with a specific class as an entry point:
./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \ ./examples/batch/WordCount.jar \ file:///home/user/hamlet.txt file:///home/user/wordcount_out
-
Run example program using a per-job YARN cluster with 2 TaskManagers:
./bin/flink run -m yarn-cluster -yn 2 \ ./examples/batch/WordCount.jar \ hdfs:///user/hamlet.txt hdfs:///user/wordcount_out
-
Display the optimized execution plan for the WordCount example program as JSON:
./bin/flink info ./examples/batch/WordCount.jar \ file:///home/user/hamlet.txt file:///home/user/wordcount_out
-
List scheduled and running jobs (including their JobIDs):
./bin/flink list
-
List scheduled jobs (including their JobIDs):
./bin/flink list -s
-
List running jobs (including their JobIDs):
./bin/flink list -r
-
Cancel a job:
./bin/flink cancel <jobID>
-
Stop a job (streaming jobs only):
./bin/flink stop <jobID>
Program:貌似不可用,API 变化了
try {
PackagedProgram program = new PackagedProgram(file, args);
InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
Configuration config = new Configuration();
Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());
// set the parallelism to 10 here
client.run(program, 10, true);
} catch (ProgramInvocationException e) {
e.printStackTrace();
}