com.lezhi.business.dxxbs.transmission.table
import com.lezhi.common.{CommonTransmissonFunciton, SystemParams}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.table.api._
import org.apache.flink.table.api.bridge.scala._
object user_login {
def main(args: Array[String]): Unit = {
val bsEnv = StreamExecutionEnvironment.getExecutionEnvironment
val bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build()
val bnv = StreamTableEnvironment.create(bsEnv, bsSettings)
val table_name ="user_login"
val primaryKey ="USER_ID"
val table_column =
"""
| USER_ID STRING,
| USER_PHONE STRING,
| USER_PWD STRING,
| CREAT_TIME STRING,
| UPLOAD_TIME STRING,
| UNION_ID STRING,
| OPEN_ID STRING
|""".stripMargin
val sql_source_table ="CREATE TABLE source_table_"+table_name+" (" +
table_column +
") WITH (" +
"‘connector‘ = ‘kafka‘," + // 连接类型为kafka
"‘topic‘ = ‘"+SystemParams.TOPIC+"‘," + // kafka topic名称
"‘properties.bootstrap.servers‘ = ‘"+SystemParams.BOOTSTRAP_SERVER+"‘," + // kafka bootstrap.servers配置
"‘scan.startup.mode‘ = ‘earliest-offset‘," + // topic消费位置设置
"‘format‘ = ‘canal-json‘," + // 数据格式配置
"‘canal-json.ignore-parse-errors‘ = ‘true‘," + // 当解析异常时,忽略字段的解析异常,则会将该字段值设置为null。
"‘canal-json.table.include‘ =‘"+table_name+"‘)"
bnv.executeSql(sql_source_table)
// bnv.executeSql("select * from source_table_"+table_name).print()
val sql_result_table ="CREATE TABLE sink_table_"+table_name+" (" +
table_column +
",PRIMARY KEY ("+primaryKey+") NOT ENFORCED" +
") WITH (" +
"‘connector‘ = ‘jdbc‘," + // 连接类型为jdbc
"‘url‘ = ‘"+SystemParams.JDBC_URL_BYMM+"‘," + // he JDBC database url.
"‘table-name‘ = ‘"+table_name+"‘," + // 连接的表名
" ‘username‘ =‘"+SystemParams.JDBC_USERNAME+"‘,"+ // 连接数据库用户名
" ‘password‘ =‘"+SystemParams.JDBC_PASSWORD+"‘)"
println(sql_result_table)
bnv.executeSql(sql_result_table)
bnv.executeSql( "INSERT INTO sink_table_"+table_name+" SELECT * FROM source_table_"+ table_name)
bnv.execute(table_name)
}
}
注意,下沉时下沉的表必须要有主键,否则会在更新数据时,旧数据和新数据会同时存在
flink-sql解析canal-json实现实时同步
标签:cut main source error sso res mod class 格式
查看更多关于flink-sql解析canal-json实现实时同步的详细内容...