FlinkSQL之Windowing TVF

乎语百科 239 0

Windowing TVF

在Flink1.13版本之后出现的替代之前的Group window的产物,官网描述其 is more powerful and effective

FlinkSQL之Windowing TVF

 //TVF 中的tumble滚动窗口 //tumble(table sensor,descriptor(et),interval '5' second ):作为一张表存在 //特别注意!!!! //如果在sql中使用了tumble窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段

sql实现TVF的tumble窗口实现

 package net.cyan.FlinkSql.TVF; ​ import net.cyan.POJO.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; ​ import java.time.Duration; ​ import static org.apache.flink.table.api.Expressions.$; ​ public class Demo1_Window_TableAPI_Tumble {     public static void main(String[] args) {         //创建执行环境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         //创建表的运行环境         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);         env.setParallelism(1);         DataStream<WaterSensor> waterSensorStream =                 env.fromElements(                         new WaterSensor("sensor_1", 1000L, 10),                         new WaterSensor("sensor_1", 2000L, 20),                         new WaterSensor("sensor_2", 3000L, 30),                         new WaterSensor("sensor_1", 4000L, 40),                         new WaterSensor("sensor_1", 5000L, 50),                         new WaterSensor("sensor_2", 6000L, 60))                         .assignTimestampsAndWatermarks(                                 WatermarkStrategy                                         .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))                                         .withTimestampAssigner((ws, ts) -> ws.getTs()) ​                         );         //创建table         Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());         //创建表         tabEnv.createTemporaryView("sensor",table);         //执行sql         //TVF 中的tumble滚动窗口         //tumble(table sensor,descriptor(et),interval '5' second ):作为一张表存在         //特别注意!!!!         //如果在sql中使用了tumble窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段         tabEnv.sqlQuery("select" +                 "  window_start,window_end,id," +                 "sum(vc) sum_vc" +                 " from table (tumble(table sensor,descriptor(et),interval '5' second ))" +                 " group by window_start,window_end,id ")                 .execute()                 .print(); ​     } }

sql实现TVF的滑动窗口

 //TVF 中的hop滚动窗口 //hop(table sensor,descriptor(et),interval '2' second,interval '5' second ):作为一张表存在 //first interval :滑动步长, second interval :窗口长度 //特别注意!!!! // 1.TVF 中滑动窗口的滑动步长与窗口长度必须是整数倍的关系,不然会报错 // 例如:滑动步长为2,窗口长度就不能为5,可以为6 // 2.如果在sql中使用了hop窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段
 package net.cyan.FlinkSql.TVF; ​ import net.cyan.POJO.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; ​ import java.time.Duration; ​ import static org.apache.flink.table.api.Expressions.$; ​ public class Demo2_Window_TVF_Hop {     public static void main(String[] args) {         //创建执行环境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         //创建表的运行环境         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);         env.setParallelism(1);         DataStream<WaterSensor> waterSensorStream =                 env.fromElements(                         new WaterSensor("sensor_1", 1000L, 10),                         new WaterSensor("sensor_1", 2000L, 20),                         new WaterSensor("sensor_2", 3000L, 30),                         new WaterSensor("sensor_1", 4000L, 40),                         new WaterSensor("sensor_1", 5000L, 50),                         new WaterSensor("sensor_2", 6000L, 60))                         .assignTimestampsAndWatermarks(                                 WatermarkStrategy                                         .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))                                         .withTimestampAssigner((ws, ts) -> ws.getTs()) ​                         );         //创建table         Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());         //创建表         tabEnv.createTemporaryView("sensor",table);         //执行sql         //TVF 中的hop滚动窗口         //hop(table sensor,descriptor(et),interval '2' second,interval '5' second ):作为一张表存在         //first interval :滑动步长, second interval :窗口长度         //特别注意!!!!         // 1.TVF 中滑动窗口的滑动步长与窗口长度必须是整数倍的关系,不然会报错         // 例如:滑动步长为2,窗口长度就不能为5,可以为6         // 2.如果在sql中使用了hop窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段         tabEnv.sqlQuery("select" +                 "  window_start,window_end,id," +                 "sum(vc) sum_vc" +                 " from table (hop(table sensor,descriptor(et),interval '2' second,interval '6' second ))" +                 " group by window_start,window_end,id ")                 .execute()                 .print(); ​ ​ ​     } }

sql实现TVF的累计窗口

累计窗口的应用:

需求:每天每隔一个小时统计一次当天的pv(浏览量)

流的方式如何解决:

1、用滚动窗口, 窗口长度设为1h

2、每天的第一个窗口清除状态,后面的不清,进行状态的累加

或者

用滚动窗口,长度设置为2day

自定义触发器,每隔1小时对窗内的元素计算一次,不关闭窗口

sql的方式如何解决?

直接使用累计窗口cumulate

 //TVF 中的cumulate累计窗口 //cumulate(table tableName,descriptor(timecol),step,size):作为一张表存在 //tableName:表名 //timecol:时间属性字段 //step:累计步长,跟滑动步长类似 //size:窗口长度 //特别注意!!!! //1.累计窗口的步长与窗口长度同样是需要整数倍关系 // 2.如果在sql中使用了cumulate窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段
 package net.cyan.FlinkSql.TVF; ​ import net.cyan.POJO.WaterSensor; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; ​ import java.time.Duration; ​ import static org.apache.flink.table.api.Expressions.$; ​ public class Demo3_Window_TVF_cumulate {     public static void main(String[] args) {         //创建执行环境         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();         //创建表的运行环境         StreamTableEnvironment tabEnv = StreamTableEnvironment.create(env);         env.setParallelism(1);         DataStream<WaterSensor> waterSensorStream =                 env.fromElements(                         new WaterSensor("sensor_1", 1000L, 10),                         new WaterSensor("sensor_1", 2000L, 20),                         new WaterSensor("sensor_2", 3000L, 30),                         new WaterSensor("sensor_1", 4000L, 40),                         new WaterSensor("sensor_1", 5000L, 50),                         new WaterSensor("sensor_2", 6000L, 60))                         .assignTimestampsAndWatermarks(                                 WatermarkStrategy                                         .<WaterSensor>forBoundedOutOfOrderness(Duration.ofSeconds(3))                                         .withTimestampAssigner((ws, ts) -> ws.getTs()) ​                         );         //创建table         Table table = tabEnv.fromDataStream(waterSensorStream,$("id"),$("ts"),$("vc"),$("et").rowtime());         //创建表         tabEnv.createTemporaryView("sensor",table);         //执行sql         //TVF 中的cumulate累计窗口         //cumulate(table tableName,descriptor(timecol),step,size):作为一张表存在         //tableName:表名         //timecol:时间属性字段         //step:累计步长,跟滑动步长类似         //size:窗口长度         //特别注意!!!!         //1.累计窗口的步长与窗口长度同样是需要整数倍关系         // 2.如果在sql中使用了cumulate窗口,则一定需要group by,而且group by后一定有window_start,window_end两个字段         tabEnv.sqlQuery("select" +                 "  window_start,window_end,id," +                 " sum(vc) sum_vc" +                 " from table (cumulate(table sensor,descriptor(et),interval '2' second,interval '6' second)) " +                 "group by window_start,window_end,id")                 .execute()                 .print();     } }
 

标签:

留言评论

  • 这篇文章还没有收到评论,赶紧来抢沙发吧~