好得很程序员自学网

<tfoot draggable='sEl'></tfoot>

本地jvm执行flink程序带web ui的操作

本地jvm执行flink带web ui

使用

?

1

StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment();

可以获取flink执行环境。但是本地jvm执行的时候是不带web ui的。有时候出于监控的考虑,需要带着监控页面查看。任务运行状况,可以使用下面方式获取flink本地执行环境,并带有web ui。

?

1

2

3

Configuration config = new Configuration();

config.setInteger(RestOptions.PORT, 9998 );

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config);

Flink 本地执行入门

一、maven依赖

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

< properties >

     < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding >

     < flink.version >1.6.3</ flink.version >

     < java.version >1.8</ java.version >

     < scala.version >2.11.8</ scala.version >

     < hbase.version >1.2.4</ hbase.version >

     < scala.binary.version >2.11</ scala.binary.version >

     < maven测试数据piler.source >${java.version}</ maven测试数据piler.source >

     < maven测试数据piler.target >${java.version}</ maven测试数据piler.target >

</ properties >

< dependency >

     < groupId >org.apache.flink</ groupId >

     < artifactId >flink-clients_${scala.binary.version}</ artifactId >

     < version >${flink.version}</ version >

</ dependency >

二、本地执行

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

import org.apache.flink.api测试数据mon.functions.FilterFunction;

import org.apache.flink.api.java.DataSet;

import org.apache.flink.api测试数据mon.JobExecutionResult;

import org.apache.flink.api.java.ExecutionEnvironment;

public class FlinkReadTextFile {

     public static void main(String[] args) throws Exception {

         ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment();

         DataSet<String> data = env.readTextFile( "file:///Users/***/Documents/test.txt" );

         data.filter( new FilterFunction<String>() {

             @Override

             public boolean filter(String value) throws Exception {

                 return value.startsWith( "五芳斋美" );

             }

         })

                 .writeAsText( "file:///Users/***/Documents/test01.txt" );

         JobExecutionResult res = env.execute();

     }

}

三、实例

?

1

2

3

4

5

6

7

8

9

10

11

12

13

14

15

16

17

18

19

20

21

22

23

24

25

26

27

28

29

30

31

32

import org.apache.flink.streaming.api.windowing.time.Time

import org.apache.flink.streaming.api.scala._

 

object SocketWindowWordCount {

   /** Main program method */

   def main(args: Array[String]): Unit ={ // the port to connect to

//  val port: Int = try {

//    ParameterTool.fromArgs(args).getInt("port")

//  } catch {

//    case e: Exception => {

//      System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'")

//      return

//    }

//  }

   // get the execution environment

   val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

   // get input data by connecting to the socket

   val text = env.socketTextStream( "localhost" , 9000 , '\n' )

   // parse the data, group it, window it, and aggregate the counts

   val windowCounts = text

     .flatMap { w => w.split( "\\s" ) }

     .map { w => WordWithCount(w, 1 ) }

     .keyBy( "word" )

     .timeWindow(Time.seconds( 5 ), Time.seconds( 1 ))

     .sum( "count" )

   // print the results with a single thread, rather than in parallel

   windowCounts.print().setParallelism( 1 )

   env.execute( "Socket Window WordCount" )

}

// Data type for words with count

case class WordWithCount(word: String, count: Long)

}

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。

原文链接:https://blog.csdn.net/Vector97/article/details/118182173

查看更多关于本地jvm执行flink程序带web ui的操作的详细内容...

  阅读:38次