
复制import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; import org.apache.flink.types.Row; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); DataStream<String> dataStream = env.fromElements("Alice",知其转转转 "Bob", "John"); // 1. 使用 StreamTableEnvironment::fromDataStream API 将 DataStream 转为 Table Table inputTable = tableEnv.fromDataStream(dataStream); // 将 Table 注册为一个临时表 tableEnv.createTemporaryView("InputTable", inputTable); // 然后就可以在这个临时表上做一些自定义的查询了 Table resultTable = tableEnv.sqlQuery("SELECT UPPER(f0) FROM InputTable"); // 2. 也可以使用 StreamTableEnvironment::toDataStream 将 Table 转为 DataStream // 注意:这里只能转为 DataStream<Row>,
云南idc服务商其中的
亿华云知其转转转
数据类型只能为 Row DataStream<Row> resultStream = tableEnv.toDataStream(resultTable); // 将 DataStream 结果打印到控制台 resultStream.print(); env.execute(); // prints: // +I[Alice] // +I[Bob] // +I[John] 1.2.3.4.5.6.7.8.9.10.11.12.13.14.15.16.17.18.19.20.21.22.23.24.25.26.27.28.29.30.31.32.
源码库(责任编辑:IT科技类资讯)