智能心跳设计与简要实现

智能心跳设计与简要实现

Author: Youni

摘要

心跳多用于即时通讯系统中,用于持续的通知后台服务器,客户端依旧存活。心跳包就是双方所定义的一种用来通知对方存活的数据通讯结构。

周期性的发心跳包给服务器主要有以下几个原因:

  1. 通知服务器,客户端存活状态,一旦服务器监测到在某段时间没有收到客户端发来的心跳包,服务器就会释放曾经为此客户端分配的所有资源,例如Socket连接

  2. 定时的刷新NAT内外网IP映射表,以便防止NAT路由器移除映射表,导致客户端和服务器端的连接中断

一个简单的实现就是定期的给服务器发送心跳包,但这种实现引出一个问题,就是周期时间间隔是多少为合适?

由于不同的网络拓扑结构的实现,NAT节点会分配到不同的网路路由当中,不同的NAT有不同的策略来处理映射表,一旦NAT发现某个映射表对应IP在某段时间内没有上行或下行数据,NAT就会移除此IP映射表,导致连接被中断,对于即时通讯的客户端而言就是不能及时的收到推送的消息。

还有一个问题就是较短周期性的发送心跳包,会导致客户端尤其是移动设备的电量和流量的消耗。

基于以上的考虑我们会设计一种智能心跳,他可以根据不同的网络情况,自动调整发送心跳的周期,已解决上述所提出的问题。但是在具体深入智能心跳的设计和简要实现之前,让我们来简单了解下和心跳相关的一些具体的概念,以便帮助读者更好的理解心跳机制。

连接

通常IM系统会建立TCP连接用来保证消息能能够准确无误的传送给对方,原因是因为TCP是面向连接的,并且是按序传送,并且保证了消息的正确送达,一旦双方一方没收到对方的数据报文,TCP就会重传直至得到对方的回执。

所以连接是双方通讯的基本通道,一旦中断,并且没有心跳机制保证,双方式监测不到对方还是连通的,还会继续的保留连接资源,对客户端和服务器端来说都是一种资源的损耗。

一般情况下,一旦IM连接建立,就不会人为的断掉,除非:

  1. 网络自动切换 <-2G->3G<->4G<->WIFI->
  2. 服务器主动关掉连接。有于某种原因服务器负载过大,为了保持服务器继续运行,不得不关掉一些连接,以便释放资源达到减轻负载的目的
  3. 网络路由器强制断开连接,以清除长时间没有数据传输的连接
  4. 客户端app异常中断/或正常退出,这种情况服务器会立即收到RST数据包

Keep-alive

实际上TCP/IP在协议提供了类似一种心跳包的机制,我们称之为Keep-alive机制,简单的描述为下:

  1. 一旦在某段时间内-默认是2个小时,没有数据传输
  2. TCP就会启动探针,实际上是一个keep-alive Packet发送给对方,如果收到对方的ACK回执,TCP会重置计时器,并且重复步骤1
  3. 一旦没有收到对方的ACK,TCP会在某段时间后再次重发探针,然后重复步骤3一定的次数,如果收到,认为对方还存活,否者对方已掉线

Keep-alive time: 就是没有传输数据的空闲时间,例如我们刚才说的2个小时

Keep-alive interval:每隔都少毫秒发送一次探测包

Keep-alive probes:一共发送多少次探测包

以上几个基本的参数不同的OS会有不同的设置方式,在这里不做具体的详细解释

但是这种基于TCP Keep-alive一般都适用于服务器端,但对于客户端来说,要不移动平台不提供类似的API设置,再一个就是其不灵活性,也就是说不能容易动态调整心跳周期,所以客户端不太适用这种TCP keep-alive 的机制,一般客户端都是自定义心跳包,发送给服务器端。

可以参考此文 Linux TCP Keepalive HOWTO获取更多的详细介绍

NAT(Network Address Translation)

处于公网间的节点之间如果需要进行数据传输,就必须要知道对方的公网IP方可进行通讯,但对于那些处于内网的节点,是如何和另外一个外网的节点或者服务器进行通讯的呢?这就引入了NAT,我们称之为Network Address Translation,它的作用就是把内网的IP转换成一个外网的IP。

  1. 内网A发送数据给外网B
  2. NAT路由器转化此内网IP为外网IP,并存储此映射到映射表中,并伪装此数据包的source IP为外网IP,发送给B
  3. 当收到对端发送给内网的节点时,查询映射表找到内网IP,把数据包的destination IP改为内网IP,然后传给内网A节点

以上只是一个最基本的描述,其实NAT最主要的原因就是为了解决IP地址数量紧缺的问题,当大量内外节点需要和外网通讯的时候,NAT可以把这些节点通过某些策略转换成少量的公网IP地址,达到了IP共享的目的。

根据上述的描述,我们对NAT有了一个最基本的了解,其实NAT还有更复杂的种类和不同的实现,具体可以参考

但是NAT和心跳有啥联系呢?通过上面描述,我们得知公网IP的资源是多么的宝贵,一旦某一连接长时间的占用并且不发数据,这怎能对得起网络给此连接分配公网IP,这简直是对网络资源最大的浪费,所以基本上所有的NAT路由器都会定时的清除那些长时间没有数据传输的映射表项。一是回收IP资源,二是释放NAT路由器本身内存的资源,这样问题就来了,连接被从中间断开了,双发还都不晓得对方应经连通不了了,还会继续发数据,这样会有两个结果

  1. 发方会收到NAT路由器的RST包,导致发方知道连接已中断
  2. 发方没有收到任何NAT的回执,NAT只是简单的drop相应的数据包

通常我们测试得出的是第二种情况会多些,就是客户端是不知道自己应经连接断开了,所以这时候心跳就可以和NAT建立关联了,只要我们在NAT认为合理连接的时间内发送心跳数据包,这样NAT会继续keep连接的IP映射表项不被移除,达到了连接不会被中断的目的。

简单的网络结构:

  1. 互联网

    A->wifi->主干网->wifi->B

  2. 移动网

    A->SGSN->移动网->GGSN->主干网–>GGSN->移动网->SGSN->B

通过某些数据我们得知一般移动网络NAT路由器 -一般是GGSN充当,的NAT IP映射表的刷新频率是大于180秒也就是三分钟,所以通常客户端判断如果是是移动网我们就设置固定心跳周期为3分钟,但是3分钟也不能保证一直是不变的,在信令风暴时期,NAT刷新周期应该是几十秒,所以固定的心跳周期是有风险的,这就为我们引入了只能心跳的设计和实现。

智能心跳的设计

智能心跳的设计对服务器有有一个必要的要求就是,服务器必须支持某种特殊心跳包得回执,这里的特殊心跳包和正常的心跳有一点区别就是,正常心跳包发送给服务器,服务器可以不用回执。

设计原理

分以下几种情况进行讨论

  1. 服务器集群足够强悍,有能力随时对心跳包进行回执
  2. 服务器集群能力稍弱,但可以进行必要的回执

情况1:服务器集群足够强悍,有能力对心跳包进行回执

这样我们可以动态的加减心跳包的周期直至达到理想的心跳周期。

基本算法如下:

  1. 连接成功启动ping-pong心跳线程,心跳周期初始值为上一次调整的心跳周期,如果是首次,心跳周期设置为默认的周期
  2. 发送ping-pong数据包给服务器,意味着客户端发心跳ping->服务器->等待回执->收到pong回执 算是一次成功的ping-pong
  3. 如果ping-pong在规定的时间内没有回来,再连续的发几次->
    • 3.1 如果收到重复步骤2,重复几次->
      • 成功->增加心跳包周期->go to 步骤2
      • 失败->go to 3.2
    • 3.2 如果失败则减小心跳包周期->go to 步骤2
  4. 停止ping-pong,直至找到合适的心跳包=上一次成功的心跳周期
  5. 启动ping心跳包线程,意味着服务器可以不用给回执,减小服务器压力

另外还有一个优化就是,一旦我们在心跳周期内收到任何非心跳包的消息,即可打断ping-pong线程和ping线程使之重新开始心跳周期,这样会减少电量流量的损失

实际上从以上的算法可以看出,我们的目的就是找到最大可用的心跳周期,用来达到既可以保持连接不被断开,也可以减少电量,和流量的消耗。但这样也会给服务器带来些压力,如果ping-pong周期过长,意味着后台就会有资源的消耗。

情况2:服务器集群能力稍弱,但可以进行必要的心跳回执

对于这种情况的考虑是因为,考虑服务器集群的负载能力有限。

其实基本算法和上述基本差不多,只不过简单些,就是不支持递增心跳周期,只允许递减。

  1. ping-pong次数不会很多,减少对服务器的压力

但是这个算法还是可以在继续优化的,例如我们可以允许递增策略,只不过递增的步骤可以加大。

简要实现

现在给出一个简单基本实现,只是用来给读者作为参考,实现语言是C++11

Header文件:

#ifndef SmartHeatbeat_smartheartbeat_h
#define SmartHeatbeat_smartheartbeat_h

#include <map>
#include <future>

// we define heartbeat step to be 10 seconds.
const int HEARTBEAT_CHANGE_STEP    = 10;

// wait ping_pong ack time from seirver
const int PINGPONG_WAIT_TIME       = 10;

// default heartbeat interval
const int HEARTBEAT_DEFAULT_TIME   = 3*60;

// never go below min heartbeat
const int MIN_HEARTBEAT = 20;

using namespace std;

typedef struct{
    // unique packet id
    int id;

    // the data structure for ping packet
    char* data;
} ping_packet;

class SmartHeartBeat{
public:
    enum Policy{
        // only support dynamic heartbeat increase
        increase,

        // only support dynamic heartbeat decrease
        decrease,

        // supoort both above
        both,

        // if we find one suitable heartbeat time interval, heatbeat will not be changed anymore
        // until network has been changed
        fix
    };

public:
    ~SmartHeartBeat();
private:
    SmartHeartBeat();
    SmartHeartBeat(Policy policy);

public:
    /**
     * @brief build a smart hear
     *
     * \param change_step
     * \param init_heartbeat
     * \param policy see @ref Policy
     */
    static SmartHeartBeat* build(int change_step = HEARTBEAT_CHANGE_STEP, int init_heartbeat = HEARTBEAT_DEFAULT_TIME, Policy policy = SmartHeartBeat::Policy::both, int pong_wait_time = PINGPONG_WAIT_TIME);

/**
 * @brief start the smart heartbeat
 *
 *
 */
void start();

/**
 * @brief stop the smart heartbeat
 *
 *
 */
void cancel();

/**
 * @brief tell the smart heartbeat the pong ack received from server
 *
 *
 */
void on_pong_received(int pong_id);

private:
    bool wait(int secPerLoop);
    void wait_for(int sec);
    void wait_pong(int sec, ping_packet& packet);
    void send_ping_pong_packet(ping_packet& packet);
    void send_only_ping();
    void increase_heatbeat_by(int step);
    void decrease_heatbeat_by(int step);

private:
    int heart_beat;
    int last_failed_heart_beat;
    ping_packet _packet;
    condition_variable cv;
    mutex ping_mutex;
    mutex cancel_mutex;
    bool cancelled;
    Policy policy;
    int change_step;
    int pong_wait_time;
    thread* ping_pong_thread;
    thread* ping_thread;
};

inline void SmartHeartBeat::increase_heatbeat_by(int step){
    heart_beat += step;
}

inline void SmartHeartBeat::decrease_heatbeat_by(int step){
    if(heart_beat <= MIN_HEARTBEAT){
        return;
    }

    heart_beat -= step;
}

#endif

CPP文件:

include <future>
include <thread>

include "smartheartbeat.h"

SmartHeartBeat::SmartHeartBeat(){
heart_beat = HEARTBEAT_DEFAULT_TIME;
last_failed_heart_beat = -1;
change_step = HEARTBEAT_CHANGE_STEP;

policy = Policy::both;
cancelled = false;
}

SmartHeartBeat::~SmartHeartBeat(){
    cancel();

    if(ping_pong_thread != nullptr){
        if(ping_pong_thread->joinable()){
            ping_pong_thread->join();
        }

        delete ping_pong_thread;
        ping_pong_thread = nullptr;
    }

    if (ping_thread != nullptr) {
        if(ping_thread->joinable()){
            ping_thread->join();
        }

        delete ping_thread;
        ping_thread = nullptr;
    }
}

SmartHeartBeat* SmartHeartBeat::build(int change_step, int init_heartbeat, Policy policy, int pong_wait_time){
    SmartHeartBeat* heartbeat = new SmartHeartBeat();

    heartbeat->policy = policy;
    heartbeat->change_step = change_step;
    heartbeat->heart_beat = init_heartbeat;
    heartbeat->pong_wait_time = pong_wait_time;

    return heartbeat;
}

bool SmartHeartBeat::wait(int secPerLoop){
    int loop = heart_beat / secPerLoop;
    int remaining = heart_beat % secPerLoop;

    while (loop > 0) {
        if (cancelled) {
            return false;
        }

        this_thread::sleep_for(std::chrono::seconds(secPerLoop));
        loop--;
    }

    if (remaining != 0){
        this_thread::sleep_for(std::chrono::seconds(remaining));
    }

    return true;
}

void SmartHeartBeat::wait_for(int sec){
    mutex lock;

    std::unique_lock<std::mutex> lck(lock);
    condition_variable wait;

    wait.wait_for(lck, std::chrono::seconds(sec));
}

void SmartHeartBeat::wait_pong(int sec, ping_packet& packet){
    _packet = packet;
    std::unique_lock<std::mutex> lck(ping_mutex);

    cv_status status = cv.wait_for(lck, std::chrono::seconds(sec));

    if(status == cv_status::no_timeout && !cancelled){
        packet.id = -1000;
    }
}

// this function should be called after we got pong from server
void SmartHeartBeat::on_pong_received(int id){
    if(_packet.id == id){
        cv.notify_all();
    }
}

void SmartHeartBeat::cancel(){
    cancelled = true;
    cv.notify_all();
}

void SmartHeartBeat::send_ping_pong_packet(ping_packet &packet){
    // try to send the ping pong trough the socket
    // we just ignore there
}

void SmartHeartBeat::send_only_ping(){
    if(ping_thread != nullptr){
        if (ping_thread->joinable()) {
            ping_thread->join();
        }

        delete ping_thread;

        ping_thread = nullptr;
    }

    ping_thread = new thread([=]{
        while (1) {
            if (cancelled) {
                return;
            }

            if(wait(3)){
                return;
            }

            ping_packet packet;
            packet.id = 1000;

            send_ping_pong_packet(packet);
        }
    });

}

void SmartHeartBeat::start(){
    // we preventing the pingpong forever
    if(heart_beat + change_step == last_failed_heart_beat){
        return;
    }

    if (ping_pong_thread != nullptr) {

        if (ping_pong_thread->joinable()) {
            cancelled = true;
            ping_pong_thread->join();
        }
        delete ping_pong_thread;
        ping_pong_thread = nullptr;
    }

    cancelled = false;
    ping_pong_thread = new thread([this]{
        while (1) {
            if(cancelled){
                return;
            }

            int ping_pong_times = 2;

            // test if we can receive ping-pong 3 times successively
            // if we can, which mean, the heatbeat is suitable one
            while(ping_pong_times > 0){
                if(cancelled){
                    return;
                }

                if(!wait(2)){
                    return;
                }

                int retries = 3;

                bool received = false;

                // try 3 times to receive the pong ack
                while(retries > 0){
                    if(cancelled){
                        return;
                    }

                    ping_packet packet;

                    packet.id = 1000;

                    send_ping_pong_packet(packet);

                    wait_pong(pong_wait_time,packet);

                    received = (packet.id == -1000);

                    if(received){
                        break;
                    }

                    retries--;
                }

                if(!received){
                    last_failed_heart_beat = heart_beat;

                    int old_one = heart_beat;

                    decrease_heatbeat_by(change_step);

                    if (heart_beat == old_one) {
                        return;
                    }

                    if (!cancelled) {
                        start();
                    }

                    return;

                }

                ping_pong_times--;
            }

            if(policy == Policy::fix){
                send_only_ping();
                return;
            }

            if(policy != Policy::decrease){
                if(heart_beat + change_step < last_failed_heart_beat){
                    // increase one one heartbeat change step

                    increase_heatbeat_by(change_step);

                    start();

                    return;
                }
            }
        }
    });
}

简单调用示例:

// initialize smart heartbeat, change step, init heartbeat interval, and heartbeat strategy
SmartHeartBeat* heart_beat = SmartHeartBeat::build(10, 5*60, SmartHeartBeat::Policy::fix);

// start ping_pong thread

heart_beat->start();

/**
 * wait pong ack and mark the pong ack is received
 *
 **/    
// notify pong ack was received
heart_beat->on_pong_received(1000);

总结

通过上诉描述,读者应该会对心跳有个更深入的了解,为何要发心跳包,它的作用是怎样的,如何智能的调整心跳,为什么我们一定需要ping-pong机制,为什们要有不同的心跳调整策略。

其实上述的心跳调整算法只是最基本的实现,还可以继续优化,例如如何定义心跳增量/减量,需要一个固定的增量,还是根据某种算法可以动态的调整增减量,这些都是需要探索的技术。

我们最终想达到的目的就是

  • 找到最大可用的心跳包,并且已达到对客户端省电,省流量的目的

我们再延伸的考虑一下心跳包,现在类似即时通许的app很多,集成推送的app也很多,其实它们都在不停地发心跳包,对于app说就是想保留长连接不被断开,能够实时的收到消息,但是对于网落本事来说是代价巨大的,网络会不停地转发类似的心跳包,浪费了巨大的资源,微信的信令风暴就说明了这点。

还有些学术性文章一种基于深度报文检测的心跳包问题解决方案提出如何解决移动网心跳包导致的空中接口资源浪费的问题,但个人感觉不是太容易做到的。

总而言之,希望此篇文章能够帮助大家深入了解心跳机制,谢谢。

参考

  1. TCP协议的KeepAlive机制与HeartBeat心跳包
  2. TCP保活
相关文章

相关标签/搜索