加入收藏 | 设为首页 | 会员中心 | 我要投稿 威海站长网 (https://www.0631zz.cn/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 运营中心 > 网站设计 > 教程 > 正文

Apache Flink 漫谈系列 - SQL概览

发布时间:2018-11-16 07:35:33 所属栏目:教程 来源:孙金城
导读:一、SQL简述 SQL是Structured Query Language的缩写,最初是由美国计算机科学家Donald D. Chamberlin和Raymond F. Boyce在20世纪70年代早期从 Early History of SQL 中了解关系模型后在IBM开发的。该版本最初称为[SEQUEL: A Structured English Query Lang

ScalarFunction#eval()`

  1. class MySplit extends TableFunction[String] { 
  2. def eval(str: String): Unit = { 
  3. if (str.contains("#")){ 
  4. str.split("#").foreach(collect) 
  5.  
  6. def eval(str: String, prefix: String): Unit = { 
  7. if (str.contains("#")) { 
  8. str.split("#").foreach(s => collect(prefix + s)) 
  9. }} 

b. 使用

  1. ... 
  2. val fun = new MySplit() 
  3. tEnv.registerFunction("mySplit", fun) 
  4. val sql = "SELECT c, s FROM MyTable, LATERAL TABLE(mySplit(c)) AS T(s)" 
  5. ... 

3. UDAF

a. 定义

UDAF 要实现的接口比较多,我们以一个简单的CountAGG为例,做简单实现如下:

  1. /** The initial accumulator for count aggregate function */ 
  2. class CountAccumulator extends JTuple1[Long] { 
  3. f0 = 0L //count 
  4.  
  5. /** 
  6. * User-defined count aggregate function 
  7. */ 
  8. class MyCount 
  9. extends AggregateFunction[JLong, CountAccumulator] { 
  10.  
  11. // process argument is optimized by Calcite. 
  12. // For instance count(42) or count(*) will be optimized to count(). 
  13. def accumulate(acc: CountAccumulator): Unit = { 
  14. acc.f0 += 1L 
  15.  
  16. // process argument is optimized by Calcite. 
  17. // For instance count(42) or count(*) will be optimized to count(). 
  18. def retract(acc: CountAccumulator): Unit = { 
  19. acc.f0 -= 1L 
  20.  
  21. def accumulate(acc: CountAccumulator, value: Any): Unit = { 
  22. if (value != null) { 
  23. acc.f0 += 1L 
  24.  
  25. def retract(acc: CountAccumulator, value: Any): Unit = { 
  26. if (value != null) { 
  27. acc.f0 -= 1L 
  28.  
  29. override def getValue(acc: CountAccumulator): JLong = { 
  30. acc.f0 
  31.  
  32. def merge(acc: CountAccumulator, its: JIterable[CountAccumulator]): Unit = { 
  33. val iter = its.iterator() 
  34. while (iter.hasNext) { 
  35. acc.f0 += iter.next().f0 
  36.  
  37. override def createAccumulator(): CountAccumulator = { 
  38. new CountAccumulator 
  39.  
  40. def resetAccumulator(acc: CountAccumulator): Unit = { 
  41. acc.f0 = 0L 
  42.  
  43. override def getAccumulatorType: TypeInformation[CountAccumulator] = { 
  44. new TupleTypeInfo(classOf[CountAccumulator], BasicTypeInfo.LONG_TYPE_INFO) 
  45.  
  46. override def getResultType: TypeInformation[JLong] = 
  47. BasicTypeInfo.LONG_TYPE_INFO} 

(编辑:威海站长网)

【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!

热点阅读