加入收藏 | 设为首页 | 会员中心 | 我要投稿 常州站长网 (https://www.0519zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

Apache Flink 漫谈系列 - SQL概览

发布时间:2018-11-20 12:29:35 所属栏目:教程 来源:孙金城
导读:副标题#e# 一、SQL简述 SQL是Structured Query Language的缩写,最初是由美国计算机科学家Donald D. Chamberlin和Raymond F. Boyce在20世纪70年代早期从 Early History of SQL 中了解关系模型后在IBM开发的。该版本最初称为[SEQUEL: A Structured English Q

主程序包括执行环境的定义,Source/Sink的注册以及统计查SQL的执行,具体如下:

  1. def main(args: Array[String]): Unit = { 
  2. // Streaming 环境 
  3. val env = StreamExecutionEnvironment.getExecutionEnvironment 
  4. val tEnv = TableEnvironment.getTableEnvironment(env) 
  5.  
  6. // 设置EventTime 
  7. env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 
  8.  
  9. //方便我们查出输出数据 
  10. env.setParallelism(1) 
  11.  
  12. val sourceTableName = "mySource" 
  13. // 创建自定义source数据结构 
  14. val tableSource = new MyTableSource 
  15.  
  16. val sinkTableName = "csvSink" 
  17. // 创建CSV sink 数据结构 
  18. val tableSink = getCsvTableSink 
  19.  
  20. // 注册source 
  21. tEnv.registerTableSource(sourceTableName, tableSource) 
  22. // 注册sink 
  23. tEnv.registerTableSink(sinkTableName, tableSink) 
  24.  
  25. val sql = 
  26. "SELECT " + 
  27. " region, " + 
  28. " TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," + 
  29. " TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " + 
  30. " FROM mySource " + 
  31. " GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region" 
  32.  
  33. tEnv.sqlQuery(sql).insertInto(sinkTableName); 
  34. env.execute() 

4. 执行并查看运行结果

(编辑:常州站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读