目录
1.创建源表和结果表。
创建及注册表名分别为 source 和 sink 的表
使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表
2. 创建一个作业
3. 提交作业Submitting PyFlink Jobs
1.创建源表和结果表。
创建及注册表名分别为 source 和 sink 的表
其中,源表 source 有一列: word,该表代表了从 input_path 所指定的输入文件中读取的单词;
结果表 sink 有两列: word 和 count,该表的结果会输出到 output_path 所指定的输出文件中。
source表
t_env.create_temporary_table(
'source',
TableDescriptor.for_connector('filesystem')
.schema(Schema.new_builder()
.column('word', DataTypes.STRING())
.build())
.option('path', input_path)
.format('csv')
.build())
tab = t_env.from_path('source')
sink表
t_env.create_temporary_table(
'sink',
TableDescriptor.for_connector('filesystem')
.schema(Schema.new_builder()
.column('word', DataTypes.STRING())
.column('count', DataTypes.BIGINT())
.build())
.option('path', output_path)
.format(FormatDescriptor.for_format('canal-json')
.build())
.build())
使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表
my_source_ddl = """
create table source (
word STRING
) with (
'connector' = 'filesystem',
'format' = 'csv',
'path' = '{}'
)
""".format(input_path)
my_sink_ddl = """
create table sink (
word STRING,
`count` BIGINT
) with (
'connector' = 'filesystem',
'format' = 'canal-json',
'path' = '{}'
)
""".format(output_path)
t_env.execute_sql(my_source_ddl)
t_env.execute_sql(my_sink_ddl)
2. 创建一个作业
该作业读取表 source 中的数据,进行一些变换,然后将结果写入表 sink。
最后,需要做的就是启动 Flink Python Table API 作业。
上面所有的操作,比如创建源表,进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name) 被调用的时候, 作业才会被真正提交到集群或者本地进行执行。
@udtf(result_types=[DataTypes.STRING()])
def split(line: Row):
for s in line[0].split():
yield Row(s)
# 计算 word count
tab.flat_map(split).alias('word') \
.group_by(col('word')) \
.select(col('word'), lit(1).count) \
.execute_insert('sink') \
.wait()
3. 提交作业Submitting PyFlink Jobs
构建 Python Table API 程序,并在本地 mini cluster 中运行。
python word_count.py
将作业提交到远端集群执行
flink run --python examples/python/table/word_count.py
当在 mini cluster 环境执行作业(比如,在IDE中执行作业)且在作业中使用了如下API(比如 Python Table API 的 TableEnvironment.execute_sql, Statementset.execute 和 Python DataStream API 的 StreamExecutionEnvironment.execute_async) 的时候,因为这些API是异步的,请记得显式地等待作业执行结束。
否则程序会在已提交的作业执行结束之前退出,以致无法观测到已提交作业的执行结果。
注意: 当往远程集群提交作业时,无需显式地等待作业执行结束,所以当往远程集群提交作业之前,请记得移除这些等待作业执行结束的代码逻辑。
异步执行 SQL / Table API 作业文章来源:https://www.toymoban.com/news/detail-547393.html
t_result = table_env.execute_sql(...)
t_result.wait()
异步执行 DataStream 作业文章来源地址https://www.toymoban.com/news/detail-547393.html
job_client = stream_execution_env.execute_async('My DataStream Job')
job_client.get_job_execution_result().result()
到了这里,关于Flink流批一体计算(12):PyFlink Tabel API之构建作业的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!