四种优化 Apache Flink 应用程序的方法

如果下面文章格式或图片不清晰,请参见原文:https://www.iteblog.com/archives/2303.html 或直接点击下面 阅读原文 即可进入。

Flink 是一种非常复杂的框架,它提供了多种调整其执行的方法。本文将介绍四种不同的方法来提升你的 Flink 应用程序的性能。

    使用 Flink Tuples

    当你使用类似于 groupBy, join, 或者 keyBy 算子时,Flink 提供了多种用于在你的数据集上选择 key 的方法。你可以使用 key 选择函数,如下:

    你甚至可以指定 POJO 类型中一个 field 的名字:

    但是如果你现在使用的是 Flink 元组类型(tuple types)的数据,你可以简单地指定将要作为 key 的字段在元组中的位置:

    这种方法在 Flink 中将会获得最佳的性能,但是可读性方面呢?这是不是意味着你的代码看起来像下面那样:

    如果想及时了解Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号:iteblog_hadoop

    在这种情况下,提高可读性的常见方法是创建一个继承自 TupleX 的类,并且实现其中的 getters 和 setters。下面是 Flink Gelly 类库中 Edge 类的实现,其中有三个 fileds,所以它直接继承了 Tuple3 类:

    重用 Flink 对象

    另外一种可以提升 Flink 应用程序性能的方法是在用户自定义函数返回数据时使用可变对象(mutable objects),请看看下面的例子:

    正如你所看到的,在我们每次调用 apply 函数的时候,我们都会创建一个 Tuple2 类型的实例,这将会给垃圾回收造成很大的压力。解决这个问题的一种方法就是反复使用相同的实例:

    上面的代码性能会好些。虽然我们在每次调用的时候只创建了一个 Tuple2 实例,但是我们还间接地创建了 Long 类型的实例。为了解决这个问题, Flink 内部提供了一系列 value classes,比如:IntValue, LongValue, StringValue, FloatValue 等。这些类的重点是为内置类型提供了可变版本,所以我们可以在用户自定义函数中重用这些类型,下面就是如何使用的例子:

    上面这些使用习惯在 Flink 类库中被普遍使用,比如 Flink Gelly。

    使用函数注解

    另一种优化 Flink 应用程序的方法是提供一些关于用户自定义函数如何对输入数据进行处理的信息。由于 Flink 无法解析和理解你的代码,所以你提供一些关键的信息将会帮助 Flink 创建一个更加高效的执行计划。我们可以使用三种注解:

    • @ForwardedFields – 指定输入数据中哪些字段保持不变并且在输出值中使用(specifies what fields in an input value were left unchanged and are used in an output value.)。

    • @NotForwardedFields – 指定在输出中相同位置未保留的字段(specifies fields which were not preserved in the same positions in the output.)。

    • @ReadFields – 指定哪些字段在计算结果的时候用到。你只能指定那些在计算中使用的字段,而不是仅仅将数据拷贝到输出中的字段。(specifies what fields were used to compute a result value. You should only specify fields that were used in computations and not merely copied to the output.)

    我们来看看如何使用 ForwardedFields 注释:

    上面的注释意味着输入元组的第一个元素将不会改变,而且在返回元组中同样处在第一个位置。

    如果你没有改变一个元素,只不过简单地将它移到不同的位置上,你同样可以使用 ForwardedFields 注释来实现。下面例子中,我们简单地将输入元组的位置互相交换,并且直接返回:

    上面例子中提到的注释只能应用到只有一个输入参数的函数中,比如 map 或者 flatMap。如果你有两个输入参数的函数,你可以分别使用 ForwardedFieldsFirst 和 ForwardedFieldsSecond 注释来为第一和第二个参数指定一些信息。

    下面我们使用 ForwardedFieldsFirst 和 ForwardedFieldsSecond 注释来为实现 JoinFunction 接口的类指定相关的信息:

    Flink 同样提供了 NotForwardedFieldsFirst, NotForwardedFieldsSecond, ReadFieldsFirst, 和 ReadFirldsSecond 注释来实现相同的功能。

    选择 Join 类型

    如果你为 Flink 提供了一些信息,可以使你的 Join 操作更快,在讨论这个是如何工作之前,让我们先了解 Fliink 是如何运行 Join 操作的。

    当 Flink 处理批量数据时,集群中的每台机器只存储了部分的数据。为了执行 Join 操作, Apache Flink 需要找到两个数据集所有 key 相同的数据。为了做到这一点,Flink 首先必须将两个数据集拥有相同 key 的数据放在同一台机器上。这里有两种实现策略:

    • Repartition-Repartition strategy:在这种场景下,Join 的两个数据集分别对它们的 key 使用相同的分区函数进行分区,并经过网络发送数据。这就意味着如果数据集非常大,这将花费相当一部分时间将数据分发出去。

    • Broadcast-Forward strategy:这种场景下,大的数据集R不做处理,另一个比较小的数据集S将全部复制到集群中所有拥有R的一部分数据的机器上。

    如果你使用一个比较小的数据集和一个比较大的数据集进行 join 操作,你可以使用 Broadcast-Forward 策略,这个很容易实现:

    ds1.join(ds2, JoinHint.BROADCAST_HASH_FIRST)

    这种写法表示第一个数据集要比第二个数据集小的多。

    Flink 支持的其他 join 提示有以下几种:

    • BROADCAST_HASH_SECOND – 表示第二个数据集比较小

    • REPARTITION_HASH_FIRST – 表示第一个数据集比较小

    • REPARTITION_HASH_SECOND – 表示第二个数据集有点小

    • REPARTITION_SORT_MERGE – 表示重新分区两个数据集并使用排序和合并策略(sorting and merging strategy)

    • OPTIMIZER_CHOOSES – Flink 优化器将决定如何连接数据集


    本文翻译自:《Four ways to optimize your Flink applications》:https://brewing.codes/2017/10/17/flink-optimize/


    猜你喜欢

    欢迎关注本公众号:iteblog_hadoop:

    0、回复 电子书 获取 本站所有可下载的电子书

    1、SparkRDMA:使用RDMA技术提升Spark的Shuffle性能

    2、流计算框架 Flink 与 Storm 的性能对比

    3、Apache Spark 2.2.0新特性详细介绍

    4、干货 | Spark SQL:过去,现在以及未来

    5、Apache Spark 黑名单(Blacklist)机制介绍

    6、美团点评数据平台融合实践

    7、干货 | Apache Spark最佳实践

    8、NodeManager节点自身健康状态检测机制

    9、[干货]大规模数据处理的演变(2003-2017)

    10、Apache Flink 1.3.0正式发布及其新功能介绍

    11、更多大数据文章欢迎访问https://www.iteblog.com及本公众号(iteblog_hadoop)
    12、Flink中文文档: http://flink.iteblog.com
    相关文章
    相关标签/搜索