上面的代码中存在的一些细节:
- 心跳的超时时间,在_topCoord.prepareHeartbeatRequestV1方法中就已经设定好了
- 具体的算法就是:
- **hbTimeout=_rsConfig.getHeartbeatTimeoutPeriodMillis() - alreadyElapsed**
其中heartbeatTimeoutPeriodMillis是可配置的参数,默认是10s, 那么alreadyElapsed是指此前连续心跳失败(最多2次)累计的消耗时间,在心跳成功响应或者超过10s后alreadyElapsed会置为0。因此可以判断,随着心跳失败次数的增加,超时时间会越来越短(心跳更加密集)
心跳执行的回调,指向自身的_handleHeartbeatResponse方法,该函数实现了心跳响应成功、失败(或是超时)之后的流程处理。
ReplicationCoordinatorImpl::_handleHeartbeatResponse方法的代码片段:
- void ReplicationCoordinatorImpl::_handleHeartbeatResponse(
- const ReplicationExecutor::RemoteCommandCallbackArgs& cbData, int targetIndex) {
- LockGuard topoLock(_topoMutex);
- // remove handle from queued heartbeats
- _untrackHeartbeatHandle(cbData.myHandle);
- ...
- //响应成功后
- if (responseStatus.isOK()) {
- networkTime = cbData.response.elapsedMillis.value_or(Milliseconds{0});
- const auto& hbResponse = hbStatusResponse.getValue();
- // 只要primary 心跳响应成功,就会重新调度 electionTimeout定时器
- if (hbResponse.hasState() && hbResponse.getState().primary() &&
- hbResponse.getTerm() == _topCoord->getTerm()) {
- //取消并重新调度 electionTimeout定时器
- cancelAndRescheduleElectionTimeout();
- }
- }
- ...
- //调用topCoord的processHeartbeatResponse方法处理心跳响应状态,并返回下一步执行的Action
- HeartbeatResponseAction action = _topCoord->processHeartbeatResponse(
- now, networkTime, target, hbStatusResponse, lastApplied);
- ...
- //调度下一次心跳,时间间隔采用action提供的信息
- _scheduleHeartbeatToTarget(
- target, targetIndex, std::max(now, action.getNextHeartbeatStartDate()));
- //根据Action 执行处理
- _handleHeartbeatResponseAction(action, hbStatusResponse, false);
- }
这里省略了许多细节,但仍然可以看到,在响应心跳时会包含这些事情的处理:
对于主节点的成功响应,会重新调度 electionTimeout定时器(取消之前的调度并重新发起)
通过_topCoord对象的processHeartbeatResponse方法解析处理心跳响应,并返回下一步的Action指示
根据Action 指示中的下一次心跳时间设置下一次心跳定时任务
处理Action指示的动作
那么,心跳响应之后会等待多久继续下一次心跳呢? 在 TopologyCoordinatorImpl::processHeartbeatResponse方法中,实现逻辑为:
如果心跳响应成功,会等待heartbeatInterval,该值是一个可配参数,默认为2s;
如果心跳响应失败,则会直接发送心跳(不等待)。 (编辑:威海站长网)
【声明】本站内容均来自网络,其相关言论仅代表作者个人观点,不代表本站立场。若无意侵犯到您的权利,请及时与联系站长删除相关内容!
|