| ScalarFunction#eval()` class MySplit extends TableFunction[String] { def eval(str: String): Unit = { if (str.contains("#")){ str.split("#").foreach(collect) } }  def eval(str: String, prefix: String): Unit = { if (str.contains("#")) { str.split("#").foreach(s => collect(prefix + s)) } }} 
 b. 使用 ... val fun = new MySplit() tEnv.registerFunction("mySplit", fun) val sql = "SELECT c, s FROM MyTable, LATERAL TABLE(mySplit(c)) AS T(s)" ... 
 3. UDAF a. 定义 UDAF 要实现的接口比较多,我们以一个简单的CountAGG为例,做简单实现如下: /** The initial accumulator for count aggregate function */ class CountAccumulator extends JTuple1[Long] { f0 = 0L //count }  /** * User-defined count aggregate function */ class MyCount extends AggregateFunction[JLong, CountAccumulator] {  // process argument is optimized by Calcite. // For instance count(42) or count(*) will be optimized to count(). def accumulate(acc: CountAccumulator): Unit = { acc.f0 += 1L }  // process argument is optimized by Calcite. // For instance count(42) or count(*) will be optimized to count(). def retract(acc: CountAccumulator): Unit = { acc.f0 -= 1L }  def accumulate(acc: CountAccumulator, value: Any): Unit = { if (value != null) { acc.f0 += 1L } }  def retract(acc: CountAccumulator, value: Any): Unit = { if (value != null) { acc.f0 -= 1L } }  override def getValue(acc: CountAccumulator): JLong = { acc.f0 }  def merge(acc: CountAccumulator, its: JIterable[CountAccumulator]): Unit = { val iter = its.iterator() while (iter.hasNext) { acc.f0 += iter.next().f0 } }  override def createAccumulator(): CountAccumulator = { new CountAccumulator }  def resetAccumulator(acc: CountAccumulator): Unit = { acc.f0 = 0L }  override def getAccumulatorType: TypeInformation[CountAccumulator] = { new TupleTypeInfo(classOf[CountAccumulator], BasicTypeInfo.LONG_TYPE_INFO) }  override def getResultType: TypeInformation[JLong] = BasicTypeInfo.LONG_TYPE_INFO} 
 (编辑:威海站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |