| 主程序包括执行环境的定义,Source/Sink的注册以及统计查SQL的执行,具体如下: def main(args: Array[String]): Unit = { // Streaming 环境 val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env)  // 设置EventTime env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)  //方便我们查出输出数据 env.setParallelism(1)  val sourceTableName = "mySource" // 创建自定义source数据结构 val tableSource = new MyTableSource  val sinkTableName = "csvSink" // 创建CSV sink 数据结构 val tableSink = getCsvTableSink  // 注册source tEnv.registerTableSource(sourceTableName, tableSource) // 注册sink tEnv.registerTableSink(sinkTableName, tableSink)  val sql = "SELECT " + " region, " + " TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," + " TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " + " FROM mySource " + " GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"  tEnv.sqlQuery(sql).insertInto(sinkTableName); env.execute() } 
 4. 执行并查看运行结果 (编辑:威海站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |