I have an out-of-order DataStream<Event>
that I want to sort so that the events are ordered by their event time timestamps. I've simplified my use case down to where my Event class has just a single field -- the timestamp
field:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
DataStream<Event> eventStream = env.addSource(new OutOfOrderEventSource())
.assignTimestampsAndWatermarks(new TimestampsAndWatermarks());
Table events = tableEnv.fromDataStream(eventStream, "timestamp.rowtime");
tableEnv.registerTable("events", events);
Table sorted = tableEnv.sqlQuery("SELECT timestamp FROM events ORDER BY eventTime ASC");
DataStream<Row> sortedEventStream = tableEnv.toAppendStream(sorted, Row.class);
sortedEventStream.print();
env.execute();
}
I'm getting this error:
Exception in thread "main"
org.apache.flink.table.api.SqlParserException: SQL parse failed.
Encountered "timestamp FROM" at line 1, column 8.
Seems like I'm not specifying the event time attribute in the correct way, but it's not clear what's wrong.
See Question&Answers more detail:
os 与恶龙缠斗过久,自身亦成为恶龙;凝视深渊过久,深渊将回以凝视…