主程序包括执行环境的定义,Source/Sink的注册以及统计查SQL的执行,具体如下:
- def main(args: Array[String]): Unit = {
- // Streaming 环境
- val env = StreamExecutionEnvironment.getExecutionEnvironment
- val tEnv = TableEnvironment.getTableEnvironment(env)
-
- // 设置EventTime
- env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
-
- //方便我们查出输出数据
- env.setParallelism(1)
-
- val sourceTableName = "mySource"
- // 创建自定义source数据结构
- val tableSource = new MyTableSource
-
- val sinkTableName = "csvSink"
- // 创建CSV sink 数据结构
- val tableSink = getCsvTableSink
-
- // 注册source
- tEnv.registerTableSource(sourceTableName, tableSource)
- // 注册sink
- tEnv.registerTableSink(sinkTableName, tableSink)
-
- val sql =
- "SELECT " +
- " region, " +
- " TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +
- " TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +
- " FROM mySource " +
- " GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"
-
- tEnv.sqlQuery(sql).insertInto(sinkTableName);
- env.execute()
- }
4. 执行并查看运行结果 (编辑:常州站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|