本地jvm执行flink带web ui
使用
|
1 |
StreamExecutionEnvironment executionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment(); |
可以获取flink执行环境。但是本地jvm执行的时候是不带web ui的。有时候出于监控的考虑,需要带着监控页面查看。任务运行状况,可以使用下面方式获取flink本地执行环境,并带有web ui。
|
1 2 3 |
Configuration config = new Configuration(); config.setInteger(RestOptions.PORT, 9998 ); StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(config); |
Flink 本地执行入门
一、maven依赖
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 |
< properties > < project.build.sourceEncoding >UTF-8</ project.build.sourceEncoding > < flink.version >1.6.3</ flink.version > < java.version >1.8</ java.version > < scala.version >2.11.8</ scala.version > < hbase.version >1.2.4</ hbase.version > < scala.binary.version >2.11</ scala.binary.version > < maven测试数据piler.source >${java.version}</ maven测试数据piler.source > < maven测试数据piler.target >${java.version}</ maven测试数据piler.target > </ properties > < dependency > < groupId >org.apache.flink</ groupId > < artifactId >flink-clients_${scala.binary.version}</ artifactId > < version >${flink.version}</ version > </ dependency > |
二、本地执行
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
import org.apache.flink.api测试数据mon.functions.FilterFunction; import org.apache.flink.api.java.DataSet; import org.apache.flink.api测试数据mon.JobExecutionResult; import org.apache.flink.api.java.ExecutionEnvironment; public class FlinkReadTextFile { public static void main(String[] args) throws Exception { ExecutionEnvironment env = ExecutionEnvironment.createLocalEnvironment(); DataSet<String> data = env.readTextFile( "file:///Users/***/Documents/test.txt" ); data.filter( new FilterFunction<String>() { @Override public boolean filter(String value) throws Exception { return value.startsWith( "五芳斋美" ); } }) .writeAsText( "file:///Users/***/Documents/test01.txt" ); JobExecutionResult res = env.execute(); } } |
三、实例
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 |
import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.scala._
object SocketWindowWordCount { /** Main program method */ def main(args: Array[String]): Unit ={ // the port to connect to // val port: Int = try { // ParameterTool.fromArgs(args).getInt("port") // } catch { // case e: Exception => { // System.err.println("No port specified. Please run 'SocketWindowWordCount --port <port>'") // return // } // } // get the execution environment val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment // get input data by connecting to the socket val text = env.socketTextStream( "localhost" , 9000 , '\n' ) // parse the data, group it, window it, and aggregate the counts val windowCounts = text .flatMap { w => w.split( "\\s" ) } .map { w => WordWithCount(w, 1 ) } .keyBy( "word" ) .timeWindow(Time.seconds( 5 ), Time.seconds( 1 )) .sum( "count" ) // print the results with a single thread, rather than in parallel windowCounts.print().setParallelism( 1 ) env.execute( "Socket Window WordCount" ) } // Data type for words with count case class WordWithCount(word: String, count: Long) } |
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。
原文链接:https://blog.csdn.net/Vector97/article/details/118182173
查看更多关于本地jvm执行flink程序带web ui的操作的详细内容...