代码开发

Quick Start

Installation

最简单的引入 Flink 依赖项的方式就是利用 Maven 或者 Gradle:

<!-- Use this dependency if you are using the DataStream API -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-streaming-java_2.10</artifactId>
  <version>1.1-SNAPSHOT</version>
</dependency>
<!-- Use this dependency if you are using the DataSet API -->
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-java</artifactId>
  <version>1.1-SNAPSHOT</version>
</dependency>
<dependency>
  <groupId>org.apache.flink</groupId>
  <artifactId>flink-clients_2.10</artifactId>
  <version>1.1-SNAPSHOT</version>
</dependency>

不过需要注意的是,由于 Scala 2.11 编译版本与 2.10 版本无法兼容,因此在 Flink 的依赖项后面也加了个后缀来表示使用的 Scala 版本,你可以选择需要的 Scala 版本进行操作。

WordCount

Flink 有个方便的地方就是能够直接在本地运行而不需要提交到集群上,下面的测试程序直接右键点击 Run 即可。

Stream

这里展示的是经典的 WordCount 程序:

public class WordCount {

    public static void main(String[] args) throws Exception {

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // get input data
        DataSet<String> text = env.fromElements(
                "To be, or not to be,--that is the question:--",
                "Whether 'tis nobler in the mind to suffer",
                "The slings and arrows of outrageous fortune",
                "Or to take arms against a sea of troubles,"
        );

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new LineSplitter())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);

        // execute and print result
        counts.print();

    }

    public static final class LineSplitter implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Tuple2<String, Integer>(token, 1));
                }
            }
        }
    }
}

Window Word Count

public class WindowWordCount {

    public static void main(String[] args) throws Exception {

        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<Tuple2<String, Integer>> dataStream = env
                .socketTextStream("localhost", 9999)
                .flatMap(new Splitter()) //将Sentence转化为Collector流
                .keyBy(0) //将Collector中的Tuple2按照word排序
                .timeWindow(Time.seconds(5))
                .sum(1); //进行求和操作

        dataStream.print();

        env.execute("Window WordCount");
    }

    public static class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : sentence.split(" ")) {
                //每遇到1个词,将它设置加1
                out.collect(new Tuple2<String, Integer>(word, 1));
            }
        }
    }

}

Submit Jobs

Command-Line Interface

笔者建议可以将 Flink 的命令添加到全局:

export FLINK_HOME=/Users/apple/Desktop/Tools/SDK/Flink/flink-1.0.3
export PATH=$PATH:$FLINK_HOME/bin

完整的参数列表列举如下:

  • Run example program with no arguments.

    ./bin/flink run ./examples/batch/WordCount.jar
    
  • Run example program with arguments for input and result files

    ./bin/flink run ./examples/batch/WordCount.jar \
                           file:///home/user/hamlet.txt file:///home/user/wordcount_out
    
  • Run example program with parallelism 16 and arguments for input and result files

    ./bin/flink run -p 16 ./examples/batch/WordCount.jar \
                            file:///home/user/hamlet.txt file:///home/user/wordcount_out
    
  • Run example program with flink log output disabled

        ./bin/flink run -q ./examples/batch/WordCount.jar
    
  • Run example program in detached mode

        ./bin/flink run -d ./examples/batch/WordCount.jar
    
  • Run example program on a specific JobManager:

    ./bin/flink run -m myJMHost:6123 \
                           ./examples/batch/WordCount.jar \
                           file:///home/user/hamlet.txt file:///home/user/wordcount_out
    
  • Run example program with a specific class as an entry point:

    ./bin/flink run -c org.apache.flink.examples.java.wordcount.WordCount \
                           ./examples/batch/WordCount.jar \
                           file:///home/user/hamlet.txt file:///home/user/wordcount_out
    
  • Run example program using a per-job YARN cluster with 2 TaskManagers:

    ./bin/flink run -m yarn-cluster -yn 2 \
                           ./examples/batch/WordCount.jar \
                           hdfs:///user/hamlet.txt hdfs:///user/wordcount_out
    
  • Display the optimized execution plan for the WordCount example program as JSON:

    ./bin/flink info ./examples/batch/WordCount.jar \
                            file:///home/user/hamlet.txt file:///home/user/wordcount_out
    
  • List scheduled and running jobs (including their JobIDs):

    ./bin/flink list
    
  • List scheduled jobs (including their JobIDs):

    ./bin/flink list -s
    
  • List running jobs (including their JobIDs):

    ./bin/flink list -r
    
  • Cancel a job:

    ./bin/flink cancel <jobID>
    
  • Stop a job (streaming jobs only):

    ./bin/flink stop <jobID>
    

Program:貌似不可用,API 变化了

try {
    PackagedProgram program = new PackagedProgram(file, args);
    InetSocketAddress jobManagerAddress = RemoteExecutor.getInetFromHostport("localhost:6123");
    Configuration config = new Configuration();

    Client client = new Client(jobManagerAddress, config, program.getUserCodeClassLoader());

    // set the parallelism to 10 here
    client.run(program, 10, true);

} catch (ProgramInvocationException e) {
    e.printStackTrace();
}
上一页
下一页