FlinkSql之TableAPI详解

乎语百科 267 0

一、FlinkSql的概念

核心概念

Flink 的 Table APISQL 是流批统一的 API。 这意味着 Table API & SQL 在无论有限的批式输入还是无限的流式输入下,都具有相同的语义。 因为传统的关系代数以及 SQL 最开始都是为了批式处理而设计的, 关系型查询在流式场景下不如在批式场景下容易理解.

动态表和连续查询

动态表(Dynamic Tables) 是 Flink 的支持流数据的 Table API 和 SQL 的核心概念。

与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个连续查询(Continuous Query)。一个连续查询永远不会终止,结果会生成一个动态表。查询不断更新其(动态)结果表,以反映其(动态)输入表上的更改。

TableAPI

首先需要导入依赖

 <dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-table-planner-blink_${scala.binary.version}</artifactId>     <version>${flink.version}</version>     <scope>provided</scope> </dependency> <dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>     <version>${flink.version}</version>     <scope>provided</scope> </dependency> <dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-csv</artifactId>     <version>${flink.version}</version> </dependency> <dependency>     <groupId>org.apache.flink</groupId>     <artifactId>flink-json</artifactId>     <version>${flink.version}</version> </dependency> ​ <!-- https://mvnrepository.com/artifact/org.apache.commons/commons-compress --> <dependency>     <groupId>org.apache.commons</groupId>     <artifactId>commons-compress</artifactId>     <version>1.21</version> </dependency>
 /**  * 使用TableAPI的基本套路:  * 1.创建表的执行环境  * 2.创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取  * 3.对动态表进行查询  * 4.把动态表转换为流  */

这里需要注意的问题:

1.TableAPI 中将动态表转换为流时有两种方法

 DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class);

toAppendStream方法只能在查询时使用,不能使用包含聚合函数等更新语句

 DataStream<Tuple2<Boolean, Row>> tuple2DataStream = tableEnvironment.toRetractStream(select, Row.class);

toRetractStream则可以使用

2.上述两种方法内传入的参数Row.class,表示将表中查询出的数据封装为行类型,也就是对每行进行封装,解决查询出的数据列少于或者多于原表。如何能够确保所查询的数据与之前封装的Bean有完全一致的结构则也可以封装为原Bean.class

代码实现:

 package net.cyan.FlinkSql; ​ import net.cyan.POJO.WaterSensor; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; ​ import static org.apache.flink.table.api.Expressions.$; ​ /**  * 使用TableAPI的基本套路:  * 1.创建表的执行环境  * 2.创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取  * 3.对动态表进行查询  * 4.把动态表转换为流  */ public class Demo1 {     public static void main(String[] args) {         Configuration configuration=new Configuration();         configuration.setInteger("rest.port",3333);         //创建执行环境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(configuration);         env.setParallelism(1);         //模拟数据         DataStreamSource<WaterSensor> WaterSensorSource = env.fromElements(                 new WaterSensor("S1", 1000L, 10),                 new WaterSensor("S1", 1000L, 10),                 new WaterSensor("S2", 2000L, 20),                 new WaterSensor("S3", 3000L, 30),                 new WaterSensor("S4", 4000L, 40),                 new WaterSensor("S5", 5000L, 50)         );         //创建表的执行环境         StreamTableEnvironment tableEnvironment = StreamTableEnvironment.create(env);         //创建表,将流转换为动态表,表的字段名从bean的属性名自动抽取         Table table = tableEnvironment.fromDataStream(WaterSensorSource);         //对表进行查询         //1、过时的查询书写         Table result = table                 .where("id='S1'")                 .select("*");         //2、不过时的书写         Table result1 = table //                .where($("id").isEqual("S1"))                 .select($("id"), $("ts"), $("vc"));         //3.聚合函数         Table select = table                 .groupBy($("id"))                 .aggregate($("vc").sum().as("sum_vc"))                 .select($("id"), $("sum_vc"));         //把动态表转换为流,使用到了之前创建的表运行环境 ​         SingleOutputStreamOperator<Row> tuple2DataStream = tableEnvironment                 .toRetractStream(select, Row.class)                 .filter(t -> t.f0)                 .map(t -> t.f1); //        DataStream<Row> rowDataStream = tableEnvironment.toAppendStream(result, Row.class); //        DataStream<Row> rowDataStream1 = tableEnvironment.toAppendStream(result1, Row.class); //        rowDataStream.print(); //        rowDataStream1.print();         tuple2DataStream.print(); ​ ​         try {             //启动执行环境             env.execute();         } catch (Exception e) {             e.printStackTrace();         } ​ ​ ​     } }

二、TableAPI读取文件

使用TableAPI读取文件时,我们首先需要知道去哪里读取也就是文件路径、读取文件的格式、读取出来的数据的结构也就是结果表的表结构及表名

 package net.cyan.FlinkSql; ​ import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; ​ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Csv; import org.apache.flink.table.descriptors.FileSystem; import org.apache.flink.table.descriptors.Schema; import org.apache.flink.table.types.DataType; ​ import static org.apache.flink.table.api.Expressions.$; ​ public class Demo2_readWriteText {     public static void main(String[] args) {         //创建执行环境 //        Configuration configuration = new Configuration(); //        configuration.setInteger("rest.port", 3333);         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         env.setParallelism(1);         StreamTableEnvironment talEnv = StreamTableEnvironment.create(env);         //创建查询的数据结果封装类型         Schema schema = new Schema()                 .field("id", DataTypes.STRING())                 .field("ts", DataTypes.BIGINT())                 .field("vc", DataTypes.INT());         talEnv                 .connect(new FileSystem().path("input/sensor.txt"))  //读取文件路径                 .withFormat(new Csv()) //读取文件的数据格式                 .withSchema(schema) //读取出来的数据格式                 .createTemporaryTable("sensor");//定义结果表名 ​         //进行查询         Table select = talEnv.from("sensor")                 .where($("id").isEqual("sensor_1"))                 .select($("id"), $("ts"), $("vc")); ​ ​         //将查询结果写入到新文件中         //同样建立一个动态表连接         talEnv                 .connect(new FileSystem().path("input/b.txt"))  //写入路径                 .withFormat(new Csv()) //写入文件的数据格式                 .withSchema(schema) //写入的数据格式                 .createTemporaryTable("abc");//定义写入表名         //进行写入操作 ​         select.executeInsert("abc"); ​ //        try { //            //启动执行环境 //            env.execute(); //        } catch (Exception e) { //            e.printStackTrace(); //        } ​     } }

三、TableAPI 读取、写入Kakfa

基本流程

1>需要创建表的运行环境

 //创建表的运行环境 StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);

2>创建查询出的数据写出结构

 //创建表结构 Schema schema=new Schema()         .field("id",DataTypes.STRING())         .field("ts",DataTypes.BIGINT())         .field("vc",DataTypes.INT());

3> 创建kafka连接

 //创建kafka连接 tabEnv.connect(         new Kafka()         .version("universal")// 版本号         .property("bootstrap.servers","hadoop102:9092")//地址         .property("group.id","cy")//消费者组         .topic("first")//消费主题 ​  )         .withFormat(new Json())//写入的格式         .withSchema(schema)         .createTemporaryTable("a");//临时表

4> 进行查询

 //创建表 Table select = tabEnv.from("a").select("*");

5> 创建写入kafka连接

 //创建写入主题 tabEnv.connect(         new Kafka()                 .version("universal")// 版本号                 .property("bootstrap.servers","hadoop102:9092")//地址                 .topic("first1")//消费主题                 .sinkPartitionerRoundRobin()//随机分区 ​ )         .withFormat(new Json())//写入的格式         .withSchema(schema)         .createTemporaryTable("c");

6> 写入

 //写入 select.executeInsert("c");

完整代码如下

 package net.cyan.FlinkSql; ​ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.table.descriptors.Json; import org.apache.flink.table.descriptors.Kafka; import org.apache.flink.table.descriptors.Schema; ​ public class Demo5_readWriteKafka {     public static void main(String[] args) {        //创建执行环境        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         //创建表的运行环境         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);         //创建表结构         Schema schema=new Schema()                 .field("id",DataTypes.STRING())                 .field("ts",DataTypes.BIGINT())                 .field("vc",DataTypes.INT());         //创建kafka连接         tabEnv.connect(                 new Kafka()                 .version("universal")// 版本号                 .property("bootstrap.servers","hadoop102:9092")//地址                 .property("group.id","cy")//消费者组                 .topic("first")//消费主题 ​          )                 .withFormat(new Json())//写入的格式                 .withSchema(schema)                 .createTemporaryTable("a");         //创建表         Table select = tabEnv.from("a").select("*");         //创建写入主题         tabEnv.connect(                 new Kafka()                         .version("universal")// 版本号                         .property("bootstrap.servers","hadoop102:9092")//地址                         .topic("first1")//消费主题                         .sinkPartitionerRoundRobin()//随即分区 ​         )                 .withFormat(new Json())//写入的格式                 .withSchema(schema)                 .createTemporaryTable("c"); ​         //写入         select.executeInsert("c"); ​ ​     } }
 

标签:

留言评论

  • 这篇文章还没有收到评论,赶紧来抢沙发吧~