| 自定义Apache Flink Stream Source需要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironment 的addSource方法获取DataStream, 所以我们需要自定义一个 SourceFunction, 并且要支持产生WaterMark,也就是要实现DefinedRowtimeAttributes接口。 (1) Source Function定义 支持接收携带EventTime的数据集合,Either的数据结构,Right表示WaterMark和Left表示数据: class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]]) extends SourceFunction[T] { override def run(ctx: SourceContext[T]): Unit = { dataWithTimestampList.foreach { case Left(t) => ctx.collectWithTimestamp(t._2, t._1) case Right(w) => ctx.emitWatermark(new Watermark(w)) } } override def cancel(): Unit = ???} 
 (2) 定义 StreamTableSource 我们自定义的Source要携带我们测试的数据,以及对应WaterMark数据,具体如下: class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {  val fieldNames = Array("accessTime", "region", "userId") val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING)) val rowType = new RowTypeInfo( Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]], fieldNames)  // 页面访问表数据 rows with timestamps and watermarks val data = Seq( Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")), Right(1510365660000L), Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")), Right(1510365660000L), Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")), Right(1510366200000L), Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")), Right(1510366260000L), Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")), Right(1510373400000L) )  override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = { Collections.singletonList(new RowtimeAttributeDescriptor( "accessTime", new ExistingField("accessTime"), PreserveWatermarks.INSTANCE)) }  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = { execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType) }  override def getReturnType: TypeInformation[Row] = rowType  override def getTableSchema: TableSchema = schema  } 
 2. Sink 定义 (编辑:威海站长网) 【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容! |