/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.ChannelUriStringBuilder;
import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.cluster.ClusterControl;
import io.aeron.cluster.ClusterMember;
import io.aeron.cluster.ClusterSession;
import io.aeron.cluster.ClusterSessionProxy;
import io.aeron.cluster.ClusterTermination;
import io.aeron.cluster.ConsensusModule;
import io.aeron.cluster.ConsensusModuleAdapter;
import io.aeron.cluster.ConsensusModuleSnapshotLoader;
import io.aeron.cluster.ConsensusModuleSnapshotTaker;
import io.aeron.cluster.DynamicJoin;
import io.aeron.cluster.EgressPublisher;
import io.aeron.cluster.Election;
import io.aeron.cluster.IngressAdapter;
import io.aeron.cluster.LogAdapter;
import io.aeron.cluster.LogPublisher;
import io.aeron.cluster.LogReplay;
import io.aeron.cluster.MemberStatusAdapter;
import io.aeron.cluster.MemberStatusListener;
import io.aeron.cluster.MemberStatusPublisher;
import io.aeron.cluster.RecordingExtent;
import io.aeron.cluster.RecordingLog;
import io.aeron.cluster.ServiceAck;
import io.aeron.cluster.ServiceProxy;
import io.aeron.cluster.TimerService;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.ChangeType;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.cluster.codecs.ClusterAction;
import io.aeron.cluster.codecs.EventCode;
import io.aeron.cluster.codecs.SnapshotRecordingsDecoder;
import io.aeron.cluster.service.Cluster;
import io.aeron.cluster.service.ClusterMarkFile;
import io.aeron.cluster.service.RecoveryState;
import io.aeron.exceptions.TimeoutException;
import io.aeron.logbuffer.ControlledFragmentHandler;
import io.aeron.logbuffer.Header;
import io.aeron.security.Authenticator;
import io.aeron.status.ReadableCounter;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import org.agrona.BitUtil;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.ArrayListUtil;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.LongHashSet;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.status.CountersReader;

class ConsensusModuleAgent
implements Agent,
MemberStatusListener {
    private final long sessionTimeoutMs;
    private final long leaderHeartbeatIntervalMs;
    private final long leaderHeartbeatTimeoutMs;
    private final long serviceHeartbeatTimeoutMs;
    private long nextSessionId = 1L;
    private long leadershipTermId = -1L;
    private long expectedAckPosition = 0L;
    private long serviceAckId = 0L;
    private long lastAppendedPosition = 0L;
    private long followerCommitPosition = 0L;
    private long terminationPosition = -1L;
    private long timeOfLastLogUpdateMs = 0L;
    private long timeOfLastAppendPosition = 0L;
    private long cachedTimeMs;
    private long clusterTimeMs = -1L;
    private long lastRecordingId = -1L;
    private int logInitialTermId = -1;
    private int logTermBufferLength = -1;
    private int logMtuLength = -1;
    private int memberId;
    private int highMemberId;
    private int pendingMemberRemovals = 0;
    private ReadableCounter appendedPosition;
    private final Counter commitPosition;
    private ConsensusModule.State state = ConsensusModule.State.INIT;
    private Cluster.Role role;
    private ClusterMember[] clusterMembers;
    private ClusterMember[] passiveMembers = ClusterMember.EMPTY_CLUSTER_MEMBER_ARRAY;
    private ClusterMember leaderMember;
    private ClusterMember thisMember;
    private long[] rankedPositions;
    private final ServiceAck[] serviceAcks;
    private final Counter clusterRoleCounter;
    private final ClusterMarkFile markFile;
    private final AgentInvoker aeronClientInvoker;
    private final EpochClock epochClock;
    private final Counter moduleState;
    private final Counter controlToggle;
    private final TimerService timerService;
    private final ConsensusModuleAdapter consensusModuleAdapter;
    private final ServiceProxy serviceProxy;
    private final IngressAdapter ingressAdapter;
    private final EgressPublisher egressPublisher;
    private final LogPublisher logPublisher;
    private LogAdapter logAdapter;
    private final MemberStatusAdapter memberStatusAdapter;
    private final MemberStatusPublisher memberStatusPublisher = new MemberStatusPublisher();
    private final Long2ObjectHashMap<ClusterSession> sessionByIdMap = new Long2ObjectHashMap();
    private final ArrayList<ClusterSession> pendingSessions = new ArrayList();
    private final ArrayList<ClusterSession> rejectedSessions = new ArrayList();
    private final ArrayList<ClusterSession> redirectSessions = new ArrayList();
    private final Int2ObjectHashMap<ClusterMember> clusterMemberByIdMap = new Int2ObjectHashMap();
    private final LongHashSet missedTimersSet = new LongHashSet();
    private final Authenticator authenticator;
    private final ClusterSessionProxy sessionProxy;
    private final Aeron aeron;
    private AeronArchive archive;
    private final ConsensusModule.Context ctx;
    private final MutableDirectBuffer tempBuffer;
    private final Counter[] serviceHeartbeats;
    private final IdleStrategy idleStrategy;
    private final RecordingLog recordingLog;
    private final ArrayList<RecordingLog.Snapshot> dynamicJoinSnapshots = new ArrayList();
    private RecordingLog.RecoveryPlan recoveryPlan;
    private Election election;
    private DynamicJoin dynamicJoin;
    private ClusterTermination clusterTermination;
    private String logRecordingChannel;
    private String liveLogDestination;
    private String replayLogDestination;
    private String clientFacingEndpoints;

    ConsensusModuleAgent(ConsensusModule.Context ctx) {
        this.ctx = ctx;
        this.aeron = ctx.aeron();
        this.epochClock = ctx.epochClock();
        this.sessionTimeoutMs = TimeUnit.NANOSECONDS.toMillis(ctx.sessionTimeoutNs());
        this.leaderHeartbeatIntervalMs = TimeUnit.NANOSECONDS.toMillis(ctx.leaderHeartbeatIntervalNs());
        this.leaderHeartbeatTimeoutMs = TimeUnit.NANOSECONDS.toMillis(ctx.leaderHeartbeatTimeoutNs());
        this.serviceHeartbeatTimeoutMs = TimeUnit.NANOSECONDS.toMillis(ctx.serviceHeartbeatTimeoutNs());
        this.egressPublisher = ctx.egressPublisher();
        this.moduleState = ctx.moduleStateCounter();
        this.commitPosition = ctx.commitPositionCounter();
        this.controlToggle = ctx.controlToggleCounter();
        this.logPublisher = ctx.logPublisher();
        this.idleStrategy = ctx.idleStrategy();
        this.timerService = new TimerService(this);
        this.clusterMembers = ClusterMember.parse(ctx.clusterMembers());
        this.sessionProxy = new ClusterSessionProxy(this.egressPublisher);
        this.memberId = ctx.clusterMemberId();
        this.clusterRoleCounter = ctx.clusterNodeCounter();
        this.markFile = ctx.clusterMarkFile();
        this.recordingLog = ctx.recordingLog();
        this.tempBuffer = ctx.tempBuffer();
        this.serviceHeartbeats = ctx.serviceHeartbeatCounters();
        this.serviceAcks = ServiceAck.newArray(ctx.serviceCount());
        this.highMemberId = ClusterMember.highMemberId(this.clusterMembers);
        this.aeronClientInvoker = this.aeron.conductorAgentInvoker();
        this.aeronClientInvoker.invoke();
        this.rankedPositions = new long[ClusterMember.quorumThreshold(this.clusterMembers.length)];
        this.role(Cluster.Role.FOLLOWER);
        ClusterMember.addClusterMemberIds(this.clusterMembers, this.clusterMemberByIdMap);
        this.leaderMember = this.thisMember = this.determineMemberAndCheckEndpoints(this.clusterMembers);
        ChannelUri memberStatusUri = ChannelUri.parse(ctx.memberStatusChannel());
        memberStatusUri.put("endpoint", this.thisMember.memberFacingEndpoint());
        int statusStreamId = ctx.memberStatusStreamId();
        this.memberStatusAdapter = new MemberStatusAdapter(this.aeron.addSubscription(memberStatusUri.toString(), statusStreamId), this);
        ClusterMember.addMemberStatusPublications(this.clusterMembers, this.thisMember, memberStatusUri, statusStreamId, this.aeron);
        this.ingressAdapter = new IngressAdapter(this, ctx.invalidRequestCounter());
        this.consensusModuleAdapter = new ConsensusModuleAdapter(this.aeron.addSubscription(ctx.serviceControlChannel(), ctx.consensusModuleStreamId()), this);
        this.serviceProxy = new ServiceProxy(this.aeron.addPublication(ctx.serviceControlChannel(), ctx.serviceStreamId()));
        this.authenticator = (Authenticator)ctx.authenticatorSupplier().get();
    }

    @Override
    public void onClose() {
        if (!this.ctx.ownsAeronClient()) {
            for (ClusterSession session : this.sessionByIdMap.values()) {
                session.close();
            }
            CloseHelper.close(this.memberStatusAdapter);
            ClusterMember.closeMemberPublications(this.clusterMembers);
            this.logPublisher.disconnect();
            CloseHelper.close(this.ingressAdapter);
            CloseHelper.close(this.serviceProxy);
            CloseHelper.close(this.consensusModuleAdapter);
        }
        CloseHelper.close(this.archive);
        this.ctx.close();
    }

    @Override
    public void onStart() {
        ChannelUri archiveUri = ChannelUri.parse(this.ctx.archiveContext().controlRequestChannel());
        ClusterMember.checkArchiveEndpoint(this.thisMember, archiveUri);
        archiveUri.put("endpoint", this.thisMember.archiveEndpoint());
        this.ctx.archiveContext().controlRequestChannel(archiveUri.toString());
        this.archive = AeronArchive.connect(this.ctx.archiveContext().clone());
        this.recoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.ctx.serviceCount());
        this.dynamicJoin = this.requiresDynamicJoin();
        if (null == this.dynamicJoin) {
            try (Counter ignore = this.addRecoveryStateCounter(this.recoveryPlan);){
                if (!this.recoveryPlan.snapshots.isEmpty()) {
                    this.recoverFromSnapshot(this.recoveryPlan.snapshots.get(0), this.archive);
                }
                this.awaitServiceAcks(this.expectedAckPosition);
            }
            if (ConsensusModule.State.SUSPENDED != this.state) {
                this.state(ConsensusModule.State.ACTIVE);
            }
            this.timeOfLastLogUpdateMs = this.cachedTimeMs = this.epochClock.time();
            this.timeOfLastAppendPosition = this.cachedTimeMs;
            this.leadershipTermId = this.recoveryPlan.lastLeadershipTermId;
            this.election = new Election(true, this.leadershipTermId, this.recoveryPlan.appendedLogPosition, this.clusterMembers, this.clusterMemberByIdMap, this.thisMember, this.memberStatusAdapter, this.memberStatusPublisher, this.ctx, this);
        }
    }

    @Override
    public int doWork() {
        int workCount = 0;
        long nowMs = this.epochClock.time();
        if (this.cachedTimeMs != nowMs) {
            this.cachedTimeMs = nowMs;
            if (Cluster.Role.LEADER == this.role) {
                this.clusterTimeMs(nowMs);
            }
            workCount += this.slowTickWork(nowMs);
        }
        workCount = null != this.dynamicJoin ? (workCount += this.dynamicJoin.doWork(nowMs)) : (null != this.election ? (workCount += this.election.doWork(nowMs)) : (workCount += this.consensusWork(nowMs)));
        return workCount;
    }

    @Override
    public String roleName() {
        return "consensus-module";
    }

    public void onSessionConnect(long correlationId, int responseStreamId, String responseChannel, byte[] encodedCredentials) {
        if (Cluster.Role.LEADER != this.role) {
            ClusterSession session = new ClusterSession(-1L, responseStreamId, responseChannel);
            session.lastActivity(this.cachedTimeMs, correlationId);
            session.connect(this.aeron);
            this.redirectSessions.add(session);
        } else {
            ClusterSession session = new ClusterSession(this.nextSessionId++, responseStreamId, responseChannel);
            session.lastActivity(this.clusterTimeMs, correlationId);
            session.connect(this.aeron);
            if (this.pendingSessions.size() + this.sessionByIdMap.size() < this.ctx.maxConcurrentSessions()) {
                this.authenticator.onConnectRequest(session.id(), encodedCredentials, this.clusterTimeMs);
                this.pendingSessions.add(session);
            } else {
                this.rejectedSessions.add(session);
            }
        }
    }

    public void onSessionClose(long clusterSessionId) {
        ClusterSession session = this.sessionByIdMap.get(clusterSessionId);
        if (null != session && Cluster.Role.LEADER == this.role) {
            session.close(CloseReason.CLIENT_ACTION);
            if (this.logPublisher.appendSessionClose(session, this.leadershipTermId, this.clusterTimeMs)) {
                this.sessionByIdMap.remove(clusterSessionId);
            }
        }
    }

    public ControlledFragmentHandler.Action onIngressMessage(long leadershipTermId, long clusterSessionId, DirectBuffer buffer, int offset, int length) {
        if (leadershipTermId != this.leadershipTermId || Cluster.Role.LEADER != this.role) {
            return ControlledFragmentHandler.Action.CONTINUE;
        }
        ClusterSession session = this.sessionByIdMap.get(clusterSessionId);
        if (null == session || session.state() == ClusterSession.State.CLOSED) {
            return ControlledFragmentHandler.Action.CONTINUE;
        }
        if (session.state() == ClusterSession.State.OPEN && this.logPublisher.appendMessage(leadershipTermId, clusterSessionId, this.clusterTimeMs, buffer, offset, length)) {
            session.timeOfLastActivityMs(this.clusterTimeMs);
            return ControlledFragmentHandler.Action.CONTINUE;
        }
        return ControlledFragmentHandler.Action.ABORT;
    }

    public void onSessionKeepAlive(long clusterSessionId, long leadershipTermId) {
        ClusterSession session;
        if (Cluster.Role.LEADER == this.role && leadershipTermId == this.leadershipTermId && null != (session = this.sessionByIdMap.get(clusterSessionId)) && session.state() == ClusterSession.State.OPEN) {
            session.timeOfLastActivityMs(this.clusterTimeMs);
        }
    }

    public void onChallengeResponse(long correlationId, long clusterSessionId, byte[] encodedCredentials) {
        if (Cluster.Role.LEADER == this.role) {
            int lastIndex;
            for (int i = lastIndex = this.pendingSessions.size() - 1; i >= 0; --i) {
                ClusterSession session = this.pendingSessions.get(i);
                if (session.id() != clusterSessionId || session.state() != ClusterSession.State.CHALLENGED) continue;
                session.lastActivity(this.clusterTimeMs, correlationId);
                this.authenticator.onChallengeResponse(clusterSessionId, encodedCredentials, this.clusterTimeMs);
                break;
            }
        }
    }

    public boolean onTimerEvent(long correlationId, long nowMs) {
        return Cluster.Role.LEADER != this.role || this.logPublisher.appendTimer(correlationId, this.leadershipTermId, nowMs);
    }

    @Override
    public void onCanvassPosition(long logLeadershipTermId, long logPosition, int followerMemberId) {
        ClusterMember follower;
        if (null != this.election) {
            this.election.onCanvassPosition(logLeadershipTermId, logPosition, followerMemberId);
        } else if (Cluster.Role.LEADER == this.role && null != (follower = this.clusterMemberByIdMap.get(followerMemberId))) {
            long position = logLeadershipTermId == this.leadershipTermId ? this.logPosition() : this.recordingLog.getTermEntry((long)this.leadershipTermId).termBaseLogPosition;
            this.memberStatusPublisher.newLeadershipTerm(follower.publication(), this.leadershipTermId, position, this.leadershipTermId, this.logPosition(), this.thisMember.id(), this.logPublisher.sessionId());
        }
    }

    @Override
    public void onRequestVote(long logLeadershipTermId, long logPosition, long candidateTermId, int candidateId) {
        if (null != this.election) {
            this.election.onRequestVote(logLeadershipTermId, logPosition, candidateTermId, candidateId);
        } else if (candidateTermId > this.leadershipTermId) {
            this.ctx.countedErrorHandler().onError(new ClusterException("unexpected vote request"));
            this.enterElection(this.cachedTimeMs);
            this.election.onRequestVote(logLeadershipTermId, logPosition, candidateTermId, candidateId);
        }
    }

    @Override
    public void onVote(long candidateTermId, long logLeadershipTermId, long logPosition, int candidateMemberId, int followerMemberId, boolean vote) {
        if (null != this.election) {
            this.election.onVote(candidateTermId, logLeadershipTermId, logPosition, candidateMemberId, followerMemberId, vote);
        }
    }

    @Override
    public void onNewLeadershipTerm(long logLeadershipTermId, long logPosition, long leadershipTermId, long maxLogPosition, int leaderMemberId, int logSessionId) {
        if (null != this.election) {
            this.election.onNewLeadershipTerm(logLeadershipTermId, logPosition, leadershipTermId, maxLogPosition, leaderMemberId, logSessionId);
        } else if (leadershipTermId > this.leadershipTermId) {
            this.ctx.countedErrorHandler().onError(new ClusterException("unexpected new leadership term"));
            this.enterElection(this.cachedTimeMs);
        }
    }

    @Override
    public void onAppendedPosition(long leadershipTermId, long logPosition, int followerMemberId) {
        ClusterMember follower;
        if (null != this.election) {
            this.election.onAppendedPosition(leadershipTermId, logPosition, followerMemberId);
        } else if (Cluster.Role.LEADER == this.role && leadershipTermId == this.leadershipTermId && null != (follower = this.clusterMemberByIdMap.get(followerMemberId))) {
            follower.logPosition(logPosition).timeOfLastAppendPositionMs(this.cachedTimeMs);
            this.checkCatchupStop(follower);
        }
    }

    @Override
    public void onCommitPosition(long leadershipTermId, long logPosition, int leaderMemberId) {
        if (null != this.election) {
            this.election.onCommitPosition(leadershipTermId, logPosition, leaderMemberId);
        } else if (Cluster.Role.FOLLOWER == this.role && leadershipTermId == this.leadershipTermId) {
            this.timeOfLastLogUpdateMs = this.cachedTimeMs;
            this.followerCommitPosition = logPosition;
        } else if (leadershipTermId > this.leadershipTermId) {
            this.ctx.countedErrorHandler().onError(new ClusterException("unexpected commit position from new leader"));
            this.enterElection(this.cachedTimeMs);
        }
    }

    @Override
    public void onCatchupPosition(long leadershipTermId, long logPosition, int followerMemberId) {
        ClusterMember follower;
        if (Cluster.Role.LEADER == this.role && leadershipTermId == this.leadershipTermId && null != (follower = this.clusterMemberByIdMap.get(followerMemberId))) {
            String replayChannel = new ChannelUriStringBuilder().media("udp").endpoint(follower.transferEndpoint()).isSessionIdTagged(true).sessionId(2).build();
            if (follower.catchupReplaySessionId() == -1L) {
                follower.catchupReplaySessionId(this.archive.startReplay(this.logRecordingId(), logPosition, Long.MAX_VALUE, replayChannel, this.ctx.logStreamId()));
            }
        }
    }

    @Override
    public void onStopCatchup(long leadershipTermId, long logPosition, int followerMemberId) {
        if (null != this.logAdapter && null != this.replayLogDestination && followerMemberId == this.memberId) {
            this.logAdapter.removeDestination(this.replayLogDestination);
            this.replayLogDestination = null;
        }
    }

    @Override
    public void onAddPassiveMember(long correlationId, String memberEndpoints) {
        if (null == this.election && Cluster.Role.LEADER == this.role) {
            if (ClusterMember.isNotDuplicateEndpoints(this.passiveMembers, memberEndpoints)) {
                ClusterMember newMember = ClusterMember.parseEndpoints(++this.highMemberId, memberEndpoints);
                newMember.correlationId(correlationId);
                this.passiveMembers = ClusterMember.addMember(this.passiveMembers, newMember);
                this.clusterMemberByIdMap.put(newMember.id(), newMember);
                ChannelUri memberStatusUri = ChannelUri.parse(this.ctx.memberStatusChannel());
                ClusterMember.addMemberStatusPublication(newMember, memberStatusUri, this.ctx.memberStatusStreamId(), this.aeron);
                this.logPublisher.addPassiveFollower(newMember.logEndpoint());
            }
        } else if (null == this.election && Cluster.Role.FOLLOWER == this.role) {
            this.memberStatusPublisher.addPassiveMember(this.leaderMember.publication(), correlationId, memberEndpoints);
        }
    }

    @Override
    public void onClusterMembersChange(long correlationId, int leaderMemberId, String activeMembers, String passiveMembers) {
        if (null != this.dynamicJoin) {
            this.dynamicJoin.onClusterMembersChange(correlationId, leaderMemberId, activeMembers, passiveMembers);
        }
    }

    @Override
    public void onSnapshotRecordingQuery(long correlationId, int requestMemberId) {
        ClusterMember requester;
        if (null == this.election && Cluster.Role.LEADER == this.role && null != (requester = this.clusterMemberByIdMap.get(requestMemberId))) {
            RecordingLog.RecoveryPlan currentRecoveryPlan = this.recordingLog.createRecoveryPlan(this.archive, this.ctx.serviceCount());
            this.memberStatusPublisher.snapshotRecording(requester.publication(), correlationId, currentRecoveryPlan, ClusterMember.encodeAsString(this.clusterMembers));
        }
    }

    @Override
    public void onSnapshotRecordings(long correlationId, SnapshotRecordingsDecoder decoder) {
        if (null != this.dynamicJoin) {
            this.dynamicJoin.onSnapshotRecordings(correlationId, decoder);
        }
    }

    @Override
    public void onJoinCluster(long leadershipTermId, int memberId) {
        ClusterMember member = this.clusterMemberByIdMap.get(memberId);
        if (null == this.election && Cluster.Role.LEADER == this.role && null != member && !member.hasRequestedJoin()) {
            if (null == member.publication()) {
                ChannelUri memberStatusUri = ChannelUri.parse(this.ctx.memberStatusChannel());
                ClusterMember.addMemberStatusPublication(member, memberStatusUri, this.ctx.memberStatusStreamId(), this.aeron);
                this.logPublisher.addPassiveFollower(member.logEndpoint());
            }
            member.hasRequestedJoin(true);
        }
    }

    @Override
    public void onTerminationPosition(long logPosition) {
        if (Cluster.Role.FOLLOWER == this.role) {
            this.terminationPosition = logPosition;
        }
    }

    @Override
    public void onTerminationAck(long logPosition, int memberId) {
        ClusterMember member;
        if (Cluster.Role.LEADER == this.role && logPosition == this.terminationPosition && null != (member = this.clusterMemberByIdMap.get(memberId))) {
            member.hasSentTerminationAck(true);
            if (this.clusterTermination.canTerminate(this.clusterMembers, this.terminationPosition, this.cachedTimeMs)) {
                this.recordingLog.commitLogPosition(this.leadershipTermId, logPosition);
                this.state(ConsensusModule.State.CLOSED);
                this.ctx.terminationHook().run();
            }
        }
    }

    void state(ConsensusModule.State state) {
        this.state = state;
        this.moduleState.set(state.code());
    }

    void role(Cluster.Role role) {
        this.role = role;
        this.clusterRoleCounter.setOrdered(role.code());
    }

    Cluster.Role role() {
        return this.role;
    }

    void prepareForNewLeadership(long logPosition) {
        long stopPosition;
        long recordingId = RecordingPos.getRecordingId(this.aeron.countersReader(), this.appendedPosition.counterId());
        if (-1L == recordingId) {
            recordingId = this.recordingLog.getTermEntry((long)this.leadershipTermId).recordingId;
        }
        this.stopLogRecording();
        this.idleStrategy.reset();
        while (-1L == (stopPosition = this.archive.getStopPosition(recordingId))) {
            this.idle();
        }
        if (stopPosition > logPosition) {
            this.archive.truncateRecording(recordingId, logPosition);
        }
        if (-1 == this.logInitialTermId) {
            RecordingExtent recordingExtent = new RecordingExtent();
            if (0 == this.archive.listRecording(recordingId, recordingExtent)) {
                throw new ClusterException("recording not found id=" + recordingId);
            }
            this.logInitialTermId = recordingExtent.initialTermId;
            this.logTermBufferLength = recordingExtent.termBufferLength;
            this.logMtuLength = recordingExtent.mtuLength;
        }
        this.lastAppendedPosition = logPosition;
        this.followerCommitPosition = logPosition;
        this.lastRecordingId = recordingId;
        this.commitPosition.setOrdered(logPosition);
        this.clearSessionsAfter(logPosition);
    }

    void stopLogRecording() {
        if (null != this.logRecordingChannel) {
            this.archive.stopRecording(this.logRecordingChannel, this.ctx.logStreamId());
            this.logRecordingChannel = null;
        }
        if (null != this.logAdapter && null != this.replayLogDestination) {
            this.logAdapter.removeDestination(this.replayLogDestination);
            this.replayLogDestination = null;
        }
        if (null != this.logAdapter && null != this.liveLogDestination) {
            this.logAdapter.removeDestination(this.liveLogDestination);
            this.liveLogDestination = null;
        }
    }

    void appendedPositionCounter(ReadableCounter appendedPositionCounter) {
        this.appendedPosition = appendedPositionCounter;
    }

    void clearSessionsAfter(long logPosition) {
        Long2ObjectHashMap.ValueIterator i = this.sessionByIdMap.values().iterator();
        while (i.hasNext()) {
            ClusterSession session = (ClusterSession)i.next();
            if (session.openedLogPosition() < logPosition) continue;
            i.remove();
            session.close();
        }
        for (ClusterSession session : this.pendingSessions) {
            session.close();
        }
        this.pendingSessions.clear();
    }

    void onServiceCloseSession(long clusterSessionId) {
        ClusterSession session = this.sessionByIdMap.get(clusterSessionId);
        if (null != session) {
            if (session.isResponsePublicationConnected()) {
                this.egressPublisher.sendEvent(session, this.leadershipTermId, this.leaderMember.id(), EventCode.ERROR, "Session terminated");
            }
            session.close(CloseReason.SERVICE_ACTION);
            if (Cluster.Role.LEADER == this.role && this.logPublisher.appendSessionClose(session, this.leadershipTermId, this.clusterTimeMs)) {
                this.sessionByIdMap.remove(clusterSessionId);
            }
        }
    }

    void onScheduleTimer(long correlationId, long deadlineMs) {
        this.timerService.scheduleTimer(correlationId, deadlineMs);
    }

    void onCancelTimer(long correlationId) {
        this.timerService.cancelTimer(correlationId);
    }

    void onServiceAck(long logPosition, long ackId, long relevantId, int serviceId) {
        this.validateServiceAck(logPosition, ackId, serviceId);
        this.serviceAcks[serviceId].logPosition(logPosition).ackId(ackId).relevantId(relevantId);
        if (ServiceAck.hasReachedPosition(logPosition, this.serviceAckId, this.serviceAcks)) {
            switch (this.state) {
                case SNAPSHOT: {
                    ++this.serviceAckId;
                    this.takeSnapshot(this.clusterTimeMs, logPosition);
                    if (-1L == this.terminationPosition) {
                        this.state(ConsensusModule.State.ACTIVE);
                        ClusterControl.ToggleState.reset(this.controlToggle);
                        for (ClusterSession session : this.sessionByIdMap.values()) {
                            session.timeOfLastActivityMs(this.clusterTimeMs);
                        }
                        break;
                    }
                    this.serviceProxy.terminationPosition(this.terminationPosition);
                    if (null != this.clusterTermination) {
                        this.clusterTermination.deadlineMs(this.cachedTimeMs + TimeUnit.NANOSECONDS.toMillis(this.ctx.terminationTimeoutNs()));
                    }
                    this.state(ConsensusModule.State.TERMINATING);
                    break;
                }
                case LEAVING: {
                    this.recordingLog.commitLogPosition(this.leadershipTermId, logPosition);
                    this.state(ConsensusModule.State.CLOSED);
                    this.ctx.terminationHook().run();
                    break;
                }
                case TERMINATING: {
                    boolean canTerminate;
                    if (null == this.clusterTermination) {
                        this.memberStatusPublisher.terminationAck(this.leaderMember.publication(), logPosition, this.memberId);
                        canTerminate = true;
                    } else {
                        this.clusterTermination.hasServiceTerminated(true);
                        canTerminate = this.clusterTermination.canTerminate(this.clusterMembers, this.terminationPosition, this.cachedTimeMs);
                    }
                    if (!canTerminate) break;
                    this.recordingLog.commitLogPosition(this.leadershipTermId, logPosition);
                    this.state(ConsensusModule.State.CLOSED);
                    this.ctx.terminationHook().run();
                }
            }
        }
    }

    public void onClusterMembersQuery(long correlationId) {
        this.serviceProxy.clusterMembersResponse(correlationId, this.leaderMember.id(), ClusterMember.encodeAsString(this.clusterMembers), ClusterMember.encodeAsString(this.passiveMembers));
    }

    public void onRemoveMember(long correlationId, int memberId, boolean isPassive) {
        ClusterMember member = this.clusterMemberByIdMap.get(memberId);
        if (null == this.election && Cluster.Role.LEADER == this.role && null != member) {
            if (isPassive) {
                this.passiveMembers = ClusterMember.removeMember(this.passiveMembers, memberId);
                member.publication().close();
                member.publication(null);
                this.logPublisher.removePassiveFollower(member.logEndpoint());
                this.clusterMemberByIdMap.remove(memberId);
                this.clusterMemberByIdMap.compact();
            } else {
                ClusterMember[] newClusterMembers = ClusterMember.removeMember(this.clusterMembers, memberId);
                String newClusterMembersString = ClusterMember.encodeAsString(newClusterMembers);
                long position = this.logPublisher.calculatePositionForMembershipChangeEvent(newClusterMembersString);
                if (this.logPublisher.appendMembershipChangeEvent(this.leadershipTermId, position, this.clusterTimeMs, this.thisMember.id(), this.clusterMembers.length, ChangeType.QUIT, memberId, newClusterMembersString)) {
                    this.timeOfLastLogUpdateMs = this.cachedTimeMs - this.leaderHeartbeatIntervalMs;
                    member.removalPosition(this.logPublisher.position());
                    ++this.pendingMemberRemovals;
                }
            }
        }
    }

    void onReplaySessionMessage(long clusterSessionId, long timestamp, DirectBuffer buffer, int offset, int length, Header header) {
        this.clusterTimeMs(timestamp);
        this.sessionByIdMap.get(clusterSessionId).timeOfLastActivityMs(timestamp);
    }

    void onReplayTimerEvent(long correlationId, long timestamp) {
        this.clusterTimeMs(timestamp);
        if (!this.timerService.cancelTimer(correlationId)) {
            this.missedTimersSet.add(correlationId);
        }
    }

    void onReplaySessionOpen(long logPosition, long correlationId, long clusterSessionId, long timestamp, int responseStreamId, String responseChannel) {
        this.clusterTimeMs(timestamp);
        ClusterSession session = new ClusterSession(clusterSessionId, responseStreamId, responseChannel);
        session.open(logPosition);
        session.lastActivity(timestamp, correlationId);
        this.sessionByIdMap.put(clusterSessionId, session);
        if (clusterSessionId >= this.nextSessionId) {
            this.nextSessionId = clusterSessionId + 1L;
        }
    }

    void onLoadSession(long clusterSessionId, long correlationId, long openedPosition, long timeOfLastActivity, CloseReason closeReason, int responseStreamId, String responseChannel) {
        this.sessionByIdMap.put(clusterSessionId, new ClusterSession(clusterSessionId, correlationId, openedPosition, timeOfLastActivity, responseStreamId, responseChannel, closeReason));
        if (clusterSessionId >= this.nextSessionId) {
            this.nextSessionId = clusterSessionId + 1L;
        }
    }

    void onReplaySessionClose(long clusterSessionId, long timestamp, CloseReason closeReason) {
        this.clusterTimeMs(timestamp);
        this.sessionByIdMap.remove(clusterSessionId).close(closeReason);
    }

    void onReplayClusterAction(long leadershipTermId, long logPosition, long timestamp, ClusterAction action) {
        this.clusterTimeMs(timestamp);
        switch (action) {
            case SUSPEND: {
                this.state(ConsensusModule.State.SUSPENDED);
                break;
            }
            case RESUME: {
                this.state(ConsensusModule.State.ACTIVE);
                break;
            }
            case SNAPSHOT: {
                this.replayClusterAction(leadershipTermId, logPosition, ConsensusModule.State.SNAPSHOT);
            }
        }
    }

    void onReplayNewLeadershipTermEvent(long leadershipTermId, long logPosition, long timestamp, int leaderMemberId, int logSessionId) {
        this.clusterTimeMs(timestamp);
        this.leadershipTermId = leadershipTermId;
        if (null != this.election && null != this.appendedPosition) {
            long recordingId = RecordingPos.getRecordingId(this.aeron.countersReader(), this.appendedPosition.counterId());
            this.election.onReplayNewLeadershipTermEvent(recordingId, leadershipTermId, logPosition, this.cachedTimeMs);
        }
    }

    void onMembershipClusterChange(long leadershipTermId, long logPosition, long timestamp, int leaderMemberId, int clusterSize, ChangeType changeType, int memberId, String clusterMembers) {
        this.clusterTimeMs(timestamp);
        this.leadershipTermId = leadershipTermId;
        ClusterMember[] newMembers = ClusterMember.parse(clusterMembers);
        if (ChangeType.JOIN == changeType) {
            if (memberId == this.memberId) {
                this.clusterMembers = newMembers;
                this.clusterMemberByIdMap.clear();
                this.clusterMemberByIdMap.compact();
                ClusterMember.addClusterMemberIds(newMembers, this.clusterMemberByIdMap);
                this.thisMember = ClusterMember.findMember(this.clusterMembers, memberId);
                this.leaderMember = ClusterMember.findMember(this.clusterMembers, leaderMemberId);
                ClusterMember.addMemberStatusPublications(newMembers, this.thisMember, ChannelUri.parse(this.ctx.memberStatusChannel()), this.ctx.memberStatusStreamId(), this.aeron);
            } else {
                this.clusterMemberJoined(memberId, newMembers);
            }
        } else if (ChangeType.QUIT == changeType) {
            if (memberId == this.memberId) {
                this.expectedAckPosition = logPosition;
                this.state(ConsensusModule.State.LEAVING);
            } else {
                boolean hasCurrentLeaderSteppedDown = leaderMemberId == memberId;
                this.clusterMemberQuit(memberId);
                if (hasCurrentLeaderSteppedDown) {
                    this.enterElection(this.cachedTimeMs);
                }
            }
        }
    }

    void onReloadState(long nextSessionId) {
        this.nextSessionId = nextSessionId;
    }

    void onReloadClusterMembers(int memberId, int highMemberId, String members) {
        if (this.ctx.clusterMembersIgnoreSnapshot() || null != this.dynamicJoin) {
            return;
        }
        ClusterMember[] snapshotClusterMembers = ClusterMember.parse(members);
        if (-1 == this.memberId) {
            this.memberId = memberId;
            this.ctx.clusterMarkFile().memberId(memberId);
        }
        if (ClusterMember.EMPTY_CLUSTER_MEMBER_ARRAY == this.clusterMembers) {
            this.clusterMembers = snapshotClusterMembers;
            this.highMemberId = Math.max(ClusterMember.highMemberId(this.clusterMembers), highMemberId);
            this.rankedPositions = new long[ClusterMember.quorumThreshold(this.clusterMembers.length)];
            this.thisMember = this.clusterMemberByIdMap.get(this.memberId);
            ChannelUri memberStatusUri = ChannelUri.parse(this.ctx.memberStatusChannel());
            memberStatusUri.put("endpoint", this.thisMember.memberFacingEndpoint());
            ClusterMember.addMemberStatusPublications(this.clusterMembers, this.thisMember, memberStatusUri, this.ctx.memberStatusStreamId(), this.aeron);
        }
    }

    Publication addNewLogPublication() {
        this.closeExistingLog();
        ChannelUri channelUri = ChannelUri.parse(this.ctx.logChannel());
        Publication publication = this.createLogPublication(channelUri, this.recoveryPlan, this.election.logPosition());
        this.logPublisher.connect(publication);
        return publication;
    }

    void becomeLeader(long leadershipTermId, long logPosition, int logSessionId) {
        this.leadershipTermId = leadershipTermId;
        ChannelUri channelUri = ChannelUri.parse(this.ctx.logChannel());
        channelUri.put("session-id", Integer.toString(logSessionId));
        this.startLogRecording(channelUri.toString(), SourceLocation.LOCAL);
        this.createAppendPosition(logSessionId);
        this.awaitServicesReady(channelUri, logSessionId, logPosition);
        for (ClusterSession session : this.sessionByIdMap.values()) {
            if (session.state() == ClusterSession.State.CLOSED) continue;
            session.connect(this.aeron);
        }
        long nowMs = this.epochClock.time();
        for (ClusterSession session : this.sessionByIdMap.values()) {
            if (session.state() == ClusterSession.State.CLOSED) continue;
            session.timeOfLastActivityMs(nowMs);
            session.hasNewLeaderEventPending(true);
        }
    }

    void followerCommitPosition(long position) {
        this.followerCommitPosition = position;
    }

    void updateMemberDetails(Election election) {
        this.leaderMember = election.leader();
        this.sessionProxy.leaderMemberId(this.leaderMember.id()).leadershipTermId(this.leadershipTermId);
        for (ClusterMember clusterMember : this.clusterMembers) {
            clusterMember.isLeader(clusterMember.id() == this.leaderMember.id());
        }
        this.updateClientFacingEndpoints(this.clusterMembers);
    }

    void liveLogDestination(String liveLogDestination) {
        this.liveLogDestination = liveLogDestination;
    }

    void replayLogDestination(String replayLogDestination) {
        this.replayLogDestination = replayLogDestination;
    }

    Subscription createAndRecordLogSubscriptionAsFollower(String logChannel) {
        this.closeExistingLog();
        Subscription subscription = this.aeron.addSubscription(logChannel, this.ctx.logStreamId());
        this.startLogRecording(logChannel, SourceLocation.REMOTE);
        return subscription;
    }

    void appendDynamicJoinTermAndSnapshots() {
        if (!this.dynamicJoinSnapshots.isEmpty()) {
            long logRecordingId = this.logRecordingId();
            RecordingLog.Snapshot lastSnapshot = this.dynamicJoinSnapshots.get(this.dynamicJoinSnapshots.size() - 1);
            this.recordingLog.appendTerm(logRecordingId, lastSnapshot.leadershipTermId, lastSnapshot.termBaseLogPosition, lastSnapshot.timestamp);
            for (int i = this.dynamicJoinSnapshots.size() - 1; i >= 0; --i) {
                RecordingLog.Snapshot snapshot = this.dynamicJoinSnapshots.get(i);
                this.recordingLog.appendSnapshot(snapshot.recordingId, snapshot.leadershipTermId, snapshot.termBaseLogPosition, snapshot.logPosition, snapshot.timestamp, snapshot.serviceId);
            }
            this.dynamicJoinSnapshots.clear();
        }
    }

    boolean pollImageAndLogAdapter(Subscription subscription, int logSessionId) {
        boolean result = false;
        if (null == this.logAdapter) {
            Image image = subscription.imageBySessionId(logSessionId);
            if (null != image) {
                this.logAdapter = new LogAdapter(image, this);
                this.lastAppendedPosition = 0L;
                this.createAppendPosition(logSessionId);
                this.appendDynamicJoinTermAndSnapshots();
                result = true;
            }
        } else {
            result = true;
        }
        return result;
    }

    void awaitImageAndCreateFollowerLogAdapter(Subscription subscription, int logSessionId) {
        this.leadershipTermId = this.election.leadershipTermId();
        this.idleStrategy.reset();
        while (!this.pollImageAndLogAdapter(subscription, logSessionId)) {
            this.idle();
        }
    }

    void awaitServicesReady(ChannelUri logChannelUri, int logSessionId, long logPosition) {
        String channel = Cluster.Role.LEADER == this.role && "udp".equals(logChannelUri.media()) ? logChannelUri.prefix("aeron-spy").toString() : logChannelUri.toString();
        this.serviceProxy.joinLog(this.leadershipTermId, logPosition, Long.MAX_VALUE, this.memberId, logSessionId, this.ctx.logStreamId(), channel);
        this.expectedAckPosition = logPosition;
        this.awaitServiceAcks(logPosition);
    }

    LogReplay newLogReplay(long electionCommitPosition) {
        RecordingLog.RecoveryPlan plan = this.recoveryPlan;
        LogReplay logReplay = null;
        if (!plan.logs.isEmpty()) {
            RecordingLog.Log log = plan.logs.get(0);
            long startPosition = log.startPosition;
            long stopPosition = Math.min(log.stopPosition, electionCommitPosition);
            this.leadershipTermId = log.leadershipTermId;
            if (log.logPosition < 0L) {
                this.recordingLog.commitLogPosition(this.leadershipTermId, stopPosition);
            }
            if (plan.hasReplay()) {
                logReplay = new LogReplay(this.archive, log.recordingId, startPosition, stopPosition, this.leadershipTermId, log.sessionId, this, this.ctx);
            }
        }
        return logReplay;
    }

    void awaitServicesReadyForReplay(String channel, int streamId, int logSessionId, long leadershipTermId, long logPosition, long maxLogPosition) {
        this.serviceProxy.joinLog(leadershipTermId, logPosition, maxLogPosition, this.memberId, logSessionId, streamId, channel);
        this.expectedAckPosition = logPosition;
        this.awaitServiceAcks(logPosition);
    }

    void awaitServicesReplayComplete(long stopPosition) {
        this.expectedAckPosition = stopPosition;
        this.awaitServiceAcks(stopPosition);
        while (0 != this.timerService.poll(this.clusterTimeMs) || this.timerService.currentTickTimeMs() < this.clusterTimeMs && this.timerService.timerCount() > 0L) {
            this.idle();
        }
    }

    void replayLogPoll(LogAdapter logAdapter, long stopPosition) {
        int workCount = logAdapter.poll(stopPosition);
        if (0 == workCount && logAdapter.isImageClosed() && logAdapter.position() != stopPosition) {
            throw new ClusterException("unexpected close of image when replaying log");
        }
        this.commitPosition.setOrdered(logAdapter.position());
        this.consensusModuleAdapter.poll();
        this.cancelMissedTimers();
    }

    long logRecordingId() {
        if (!this.recoveryPlan.logs.isEmpty()) {
            return this.recoveryPlan.logs.get((int)0).recordingId;
        }
        return RecordingPos.getRecordingId(this.aeron.countersReader(), this.appendedPosition.counterId());
    }

    long logStopPosition(long leadershipTermId) {
        if (-1L == leadershipTermId) {
            return 0L;
        }
        return this.recordingLog.getTermEntry((long)leadershipTermId).logPosition;
    }

    void truncateLogEntry(long leadershipTermId, long logPosition) {
        this.archive.truncateRecording(this.logRecordingId(), logPosition);
        this.recordingLog.commitLogPosition(leadershipTermId, logPosition);
    }

    public void checkCatchupStop(ClusterMember member) {
        if (null != member && -1L != member.catchupReplaySessionId() && member.logPosition() >= this.logPublisher.position()) {
            this.archive.stopReplay(member.catchupReplaySessionId());
            if (this.memberStatusPublisher.stopCatchup(member.publication(), this.leadershipTermId, this.logPublisher.position(), member.id())) {
                member.catchupReplaySessionId(-1L);
            }
        }
    }

    boolean electionComplete(long nowMs) {
        boolean result = false;
        if (Cluster.Role.LEADER == this.role) {
            if (this.logPublisher.appendNewLeadershipTermEvent(this.leadershipTermId, this.election.logPosition(), nowMs, this.memberId, this.logPublisher.sessionId())) {
                this.timeOfLastLogUpdateMs = this.cachedTimeMs - this.leaderHeartbeatIntervalMs;
                this.election = null;
                result = true;
            }
        } else {
            this.timeOfLastLogUpdateMs = this.cachedTimeMs;
            this.timeOfLastAppendPosition = this.cachedTimeMs;
            this.election = null;
            result = true;
        }
        this.cancelMissedTimers();
        if (this.missedTimersSet.capacity() > 8) {
            this.missedTimersSet.compact();
        }
        if (!this.ctx.ingressChannel().contains("endpoint")) {
            ChannelUri ingressUri = ChannelUri.parse(this.ctx.ingressChannel());
            ingressUri.put("endpoint", this.thisMember.clientFacingEndpoint());
            this.ingressAdapter.connect(this.aeron.addSubscription(ingressUri.toString(), this.ctx.ingressStreamId(), null, this::onUnavailableIngressImage));
        } else if (Cluster.Role.LEADER == this.role) {
            this.ingressAdapter.connect(this.aeron.addSubscription(this.ctx.ingressChannel(), this.ctx.ingressStreamId(), null, this::onUnavailableIngressImage));
        }
        return result;
    }

    boolean dynamicJoinComplete() {
        if (0 == this.clusterMembers.length) {
            this.clusterMembers = this.dynamicJoin.clusterMembers();
            ClusterMember.addClusterMemberIds(this.clusterMembers, this.clusterMemberByIdMap);
            this.leaderMember = this.dynamicJoin.leader();
            ChannelUri memberStatusUri = ChannelUri.parse(this.ctx.memberStatusChannel());
            ClusterMember.addMemberStatusPublications(this.clusterMembers, this.thisMember, memberStatusUri, this.ctx.memberStatusStreamId(), this.aeron);
        }
        if (-1 == this.memberId) {
            this.memberId = this.dynamicJoin.memberId();
            this.ctx.clusterMarkFile().memberId(this.memberId);
            this.thisMember.id(this.dynamicJoin.memberId());
        }
        this.election = new Election(true, this.leadershipTermId, this.recoveryPlan.appendedLogPosition, this.clusterMembers, this.clusterMemberByIdMap, this.thisMember, this.memberStatusAdapter, this.memberStatusPublisher, this.ctx, this);
        this.dynamicJoin = null;
        return true;
    }

    void catchupLogPoll(Subscription subscription, int logSessionId, long stopPosition) {
        if (this.pollImageAndLogAdapter(subscription, logSessionId)) {
            Publication publication;
            long appendedPosition;
            Image image = this.logAdapter.image();
            if (this.logAdapter.poll(stopPosition) == 0) {
                if (image.position() == stopPosition) {
                    while (!this.missedTimersSet.isEmpty()) {
                        this.idle();
                        this.cancelMissedTimers();
                    }
                }
                if (image.isClosed()) {
                    throw new ClusterException("unexpected close of image when replaying log");
                }
            }
            if ((appendedPosition = this.appendedPosition.get()) != this.lastAppendedPosition && this.memberStatusPublisher.appendedPosition(publication = this.election.leader().publication(), this.leadershipTermId, appendedPosition, this.memberId)) {
                this.lastAppendedPosition = appendedPosition;
                this.timeOfLastAppendPosition = this.cachedTimeMs;
            }
            this.commitPosition.setOrdered(Math.min(image.position(), appendedPosition));
            this.consensusModuleAdapter.poll();
            this.cancelMissedTimers();
        }
    }

    boolean hasAppendReachedPosition(Subscription subscription, int logSessionId, long position) {
        return this.pollImageAndLogAdapter(subscription, logSessionId) && this.appendedPosition.get() >= position;
    }

    boolean hasAppendReachedLivePosition(Subscription subscription, int logSessionId, long position) {
        boolean result = false;
        if (this.pollImageAndLogAdapter(subscription, logSessionId)) {
            long window;
            long appendPosition = this.appendedPosition.get();
            result = appendPosition >= position - (window = (long)this.logAdapter.image().termBufferLength() * 2L);
        }
        return result;
    }

    void stopAllCatchups() {
        for (ClusterMember member : this.clusterMembers) {
            if (member.catchupReplaySessionId() == -1L) continue;
            this.archive.stopReplay(member.catchupReplaySessionId());
            member.catchupReplaySessionId(-1L);
        }
    }

    void retrievedSnapshot(long localRecordingId, RecordingLog.Snapshot leaderSnapshot) {
        this.dynamicJoinSnapshots.add(new RecordingLog.Snapshot(localRecordingId, leaderSnapshot.leadershipTermId, leaderSnapshot.termBaseLogPosition, leaderSnapshot.logPosition, leaderSnapshot.timestamp, leaderSnapshot.serviceId));
    }

    Counter loadSnapshotsFromDynamicJoin() {
        this.recoveryPlan = RecordingLog.createRecoveryPlan(this.dynamicJoinSnapshots);
        Counter recoveryStateCounter = this.addRecoveryStateCounter(this.recoveryPlan);
        if (!this.recoveryPlan.snapshots.isEmpty()) {
            this.recoverFromSnapshot(this.recoveryPlan.snapshots.get(0), this.archive);
        }
        return recoveryStateCounter;
    }

    boolean pollForEndOfSnapshotLoad(Counter recoveryStateCounter) {
        this.consensusModuleAdapter.poll();
        if (ServiceAck.hasReachedPosition(this.expectedAckPosition, this.serviceAckId, this.serviceAcks)) {
            ++this.serviceAckId;
            recoveryStateCounter.close();
            if (ConsensusModule.State.SUSPENDED != this.state) {
                this.state(ConsensusModule.State.ACTIVE);
            }
            this.timeOfLastLogUpdateMs = this.cachedTimeMs = this.epochClock.time();
            this.leadershipTermId = this.recoveryPlan.lastLeadershipTermId;
            return true;
        }
        return false;
    }

    private int slowTickWork(long nowMs) {
        int workCount = 0;
        this.markFile.updateActivityTimestamp(nowMs);
        this.checkServiceHeartbeats(nowMs);
        workCount += this.aeronClientInvoker.invoke();
        workCount += this.processRedirectSessions(this.redirectSessions, nowMs);
        workCount += this.processRejectedSessions(this.rejectedSessions, nowMs);
        if (Cluster.Role.LEADER == this.role && null == this.election) {
            workCount += this.checkControlToggle(nowMs);
            if (ConsensusModule.State.ACTIVE == this.state) {
                workCount += this.processPendingSessions(this.pendingSessions, nowMs);
                workCount += this.checkSessions(this.sessionByIdMap, nowMs);
                workCount += this.processPassiveMembers(this.passiveMembers);
                if (!ClusterMember.hasActiveQuorum(this.clusterMembers, nowMs, this.leaderHeartbeatTimeoutMs)) {
                    this.ctx.countedErrorHandler().onError(new ClusterException("lost connection to quorum of followers"));
                    this.enterElection(nowMs);
                    ++workCount;
                }
            } else if (ConsensusModule.State.TERMINATING == this.state && this.clusterTermination.canTerminate(this.clusterMembers, this.terminationPosition, this.cachedTimeMs)) {
                this.recordingLog.commitLogPosition(this.leadershipTermId, this.terminationPosition);
                this.state(ConsensusModule.State.CLOSED);
                this.ctx.terminationHook().run();
            }
        } else {
            this.cancelMissedTimers();
        }
        if (null != this.archive) {
            this.archive.checkForErrorResponse();
        }
        return workCount;
    }

    private int consensusWork(long nowMs) {
        int workCount = 0;
        if (Cluster.Role.LEADER == this.role && ConsensusModule.State.ACTIVE == this.state) {
            workCount += this.ingressAdapter.poll();
            workCount += this.timerService.poll(nowMs);
        } else if (Cluster.Role.FOLLOWER == this.role && (ConsensusModule.State.ACTIVE == this.state || ConsensusModule.State.SUSPENDED == this.state)) {
            workCount += this.ingressAdapter.poll();
            int count = this.logAdapter.poll(this.followerCommitPosition);
            if (0 == count && this.logAdapter.isImageClosed()) {
                this.ctx.countedErrorHandler().onError(new ClusterException("lost connection to leader"));
                this.enterElection(nowMs);
                return 1;
            }
            workCount += count;
            if (-1L != this.terminationPosition && this.logAdapter.position() >= this.terminationPosition && ConsensusModule.State.SNAPSHOT != this.state) {
                this.serviceProxy.terminationPosition(this.terminationPosition);
                this.expectedAckPosition = this.terminationPosition;
                this.state(ConsensusModule.State.TERMINATING);
            }
        }
        workCount += this.memberStatusAdapter.poll();
        workCount += this.updateMemberPosition(nowMs);
        return workCount += this.consensusModuleAdapter.poll();
    }

    private void checkServiceHeartbeats(long nowMs) {
        long heartbeatThreshold = nowMs - this.serviceHeartbeatTimeoutMs;
        if (null == this.dynamicJoin) {
            for (Counter serviceHeartbeat : this.serviceHeartbeats) {
                long heartbeat = serviceHeartbeat.get();
                if (heartbeat >= heartbeatThreshold) continue;
                this.ctx.countedErrorHandler().onError(new TimeoutException("no heartbeat from service: " + heartbeat));
                this.ctx.terminationHook().run();
            }
        }
    }

    private int checkControlToggle(long nowMs) {
        switch (ClusterControl.ToggleState.get(this.controlToggle)) {
            case SUSPEND: {
                if (ConsensusModule.State.ACTIVE != this.state || !this.appendAction(ClusterAction.SUSPEND, nowMs)) break;
                this.state(ConsensusModule.State.SUSPENDED);
                ClusterControl.ToggleState.reset(this.controlToggle);
                break;
            }
            case RESUME: {
                if (ConsensusModule.State.SUSPENDED != this.state || !this.appendAction(ClusterAction.RESUME, nowMs)) break;
                this.state(ConsensusModule.State.ACTIVE);
                ClusterControl.ToggleState.reset(this.controlToggle);
                break;
            }
            case SNAPSHOT: {
                if (ConsensusModule.State.ACTIVE != this.state || !this.appendAction(ClusterAction.SNAPSHOT, nowMs)) break;
                this.expectedAckPosition = this.logPosition();
                this.state(ConsensusModule.State.SNAPSHOT);
                break;
            }
            case SHUTDOWN: {
                if (ConsensusModule.State.ACTIVE != this.state || !this.appendAction(ClusterAction.SNAPSHOT, nowMs)) break;
                long position = this.logPosition();
                this.clusterTermination = new ClusterTermination(this.memberStatusPublisher, this.cachedTimeMs + TimeUnit.NANOSECONDS.toMillis(this.ctx.terminationTimeoutNs()));
                this.clusterTermination.terminationPosition(this.clusterMembers, this.thisMember, position);
                this.terminationPosition = position;
                this.expectedAckPosition = position;
                this.state(ConsensusModule.State.SNAPSHOT);
                break;
            }
            case ABORT: {
                if (ConsensusModule.State.ACTIVE != this.state) break;
                long position = this.logPosition();
                this.clusterTermination = new ClusterTermination(this.memberStatusPublisher, this.cachedTimeMs + TimeUnit.NANOSECONDS.toMillis(this.ctx.terminationTimeoutNs()));
                this.clusterTermination.terminationPosition(this.clusterMembers, this.thisMember, position);
                this.terminationPosition = position;
                this.expectedAckPosition = position;
                this.serviceProxy.terminationPosition(this.terminationPosition);
                this.state(ConsensusModule.State.TERMINATING);
                break;
            }
            default: {
                return 0;
            }
        }
        return 1;
    }

    private boolean appendAction(ClusterAction action, long nowMs) {
        int length = 68;
        long position = this.logPublisher.position() + (long)BitUtil.align(68, 32);
        return this.logPublisher.appendClusterAction(this.leadershipTermId, position, nowMs, action);
    }

    private int processPendingSessions(ArrayList<ClusterSession> pendingSessions, long nowMs) {
        int lastIndex;
        int workCount = 0;
        for (int i = lastIndex = pendingSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = pendingSessions.get(i);
            if ((session.state() == ClusterSession.State.INIT || session.state() == ClusterSession.State.CONNECTED) && session.isResponsePublicationConnected()) {
                session.state(ClusterSession.State.CONNECTED);
                this.authenticator.onConnectedSession(this.sessionProxy.session(session), nowMs);
            }
            if (session.state() == ClusterSession.State.CHALLENGED && session.isResponsePublicationConnected()) {
                this.authenticator.onChallengedSession(this.sessionProxy.session(session), nowMs);
            }
            if (session.state() == ClusterSession.State.AUTHENTICATED) {
                ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
                session.timeOfLastActivityMs(nowMs);
                this.sessionByIdMap.put(session.id(), session);
                this.appendSessionOpen(session, nowMs);
                ++workCount;
                continue;
            }
            if (session.state() == ClusterSession.State.REJECTED) {
                ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
                this.rejectedSessions.add(session);
                continue;
            }
            if (nowMs <= session.timeOfLastActivityMs() + this.sessionTimeoutMs) continue;
            ArrayListUtil.fastUnorderedRemove(pendingSessions, i, lastIndex--);
            session.close();
            this.ctx.timedOutClientCounter().incrementOrdered();
        }
        return workCount;
    }

    private int processRejectedSessions(ArrayList<ClusterSession> rejectedSessions, long nowMs) {
        int lastIndex;
        int workCount = 0;
        for (int i = lastIndex = rejectedSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = rejectedSessions.get(i);
            String detail = "Concurrent session limit";
            EventCode eventCode = EventCode.ERROR;
            if (session.state() == ClusterSession.State.REJECTED) {
                detail = "Session failed authentication";
                eventCode = EventCode.AUTHENTICATION_REJECTED;
            }
            if (!this.egressPublisher.sendEvent(session, this.leadershipTermId, this.leaderMember.id(), eventCode, detail) && nowMs <= session.timeOfLastActivityMs() + this.sessionTimeoutMs) continue;
            ArrayListUtil.fastUnorderedRemove(rejectedSessions, i, lastIndex--);
            session.close();
            ++workCount;
        }
        return workCount;
    }

    private int processRedirectSessions(ArrayList<ClusterSession> redirectSessions, long nowMs) {
        int lastIndex;
        int workCount = 0;
        for (int i = lastIndex = redirectSessions.size() - 1; i >= 0; --i) {
            ClusterSession session = redirectSessions.get(i);
            EventCode eventCode = EventCode.REDIRECT;
            int id = this.leaderMember.id();
            if (!this.egressPublisher.sendEvent(session, this.leadershipTermId, id, eventCode, this.clientFacingEndpoints) && nowMs <= session.timeOfLastActivityMs() + this.sessionTimeoutMs) continue;
            ArrayListUtil.fastUnorderedRemove(redirectSessions, i, lastIndex--);
            session.close();
            ++workCount;
        }
        return workCount;
    }

    private int processPassiveMembers(ClusterMember[] passiveMembers) {
        int workCount = 0;
        int length = passiveMembers.length;
        for (int i = 0; i < length; ++i) {
            ClusterMember member = passiveMembers[i];
            if (member.correlationId() != -1L) {
                if (!this.memberStatusPublisher.clusterMemberChange(member.publication(), member.correlationId(), this.leaderMember.id(), ClusterMember.encodeAsString(this.clusterMembers), ClusterMember.encodeAsString(passiveMembers))) continue;
                member.correlationId(-1L);
                ++workCount;
                continue;
            }
            if (!member.hasRequestedJoin() || member.logPosition() != this.logPublisher.position()) continue;
            ClusterMember[] newMembers = ClusterMember.addMember(this.clusterMembers, member);
            if (!this.logPublisher.appendMembershipChangeEvent(this.leadershipTermId, this.logPublisher.position(), this.clusterTimeMs, this.thisMember.id(), newMembers.length, ChangeType.JOIN, member.id(), ClusterMember.encodeAsString(newMembers))) continue;
            this.timeOfLastLogUpdateMs = this.cachedTimeMs - this.leaderHeartbeatIntervalMs;
            this.passiveMembers = ClusterMember.removeMember(this.passiveMembers, member.id());
            this.clusterMembers = newMembers;
            this.rankedPositions = new long[this.clusterMembers.length];
            member.hasRequestedJoin(false);
            ++workCount;
            break;
        }
        return workCount;
    }

    private int checkSessions(Long2ObjectHashMap<ClusterSession> sessionByIdMap, long nowMs) {
        int workCount = 0;
        Long2ObjectHashMap.ValueIterator i = sessionByIdMap.values().iterator();
        while (i.hasNext()) {
            ClusterSession session = (ClusterSession)i.next();
            if (nowMs > session.timeOfLastActivityMs() + this.sessionTimeoutMs) {
                switch (session.state()) {
                    case OPEN: {
                        if (session.isResponsePublicationConnected()) {
                            this.egressPublisher.sendEvent(session, this.leadershipTermId, this.leaderMember.id(), EventCode.ERROR, "Session inactive");
                        }
                        session.close(CloseReason.TIMEOUT);
                        if (!this.logPublisher.appendSessionClose(session, this.leadershipTermId, nowMs)) break;
                        i.remove();
                        this.ctx.timedOutClientCounter().incrementOrdered();
                        break;
                    }
                    case CLOSED: {
                        if (!this.logPublisher.appendSessionClose(session, this.leadershipTermId, nowMs)) break;
                        i.remove();
                        if (session.closeReason() != CloseReason.TIMEOUT) break;
                        this.ctx.timedOutClientCounter().incrementOrdered();
                        break;
                    }
                    default: {
                        i.remove();
                        session.close();
                    }
                }
                ++workCount;
                continue;
            }
            if (session.state() == ClusterSession.State.CONNECTED) {
                this.appendSessionOpen(session, nowMs);
                ++workCount;
                continue;
            }
            if (!session.hasNewLeaderEventPending()) continue;
            this.sendNewLeaderEvent(session);
            ++workCount;
        }
        return workCount;
    }

    private void sendNewLeaderEvent(ClusterSession session) {
        if (this.egressPublisher.newLeader(session, this.leadershipTermId, this.leaderMember.id(), this.clientFacingEndpoints)) {
            session.hasNewLeaderEventPending(false);
        }
    }

    private void appendSessionOpen(ClusterSession session, long nowMs) {
        long resultingPosition = this.logPublisher.appendSessionOpen(session, this.leadershipTermId, nowMs);
        if (resultingPosition > 0L) {
            session.open(resultingPosition);
        }
    }

    private void createAppendPosition(int logSessionId) {
        CountersReader counters = this.aeron.countersReader();
        int recordingCounterId = this.awaitRecordingCounter(counters, logSessionId);
        this.appendedPosition = new ReadableCounter(counters, recordingCounterId);
    }

    private void recoverFromSnapshot(RecordingLog.Snapshot snapshot, AeronArchive archive) {
        this.clusterTimeMs(snapshot.timestamp);
        this.expectedAckPosition = snapshot.logPosition;
        this.leadershipTermId = snapshot.leadershipTermId;
        String channel = this.ctx.replayChannel();
        int streamId = this.ctx.replayStreamId();
        int sessionId = (int)archive.startReplay(snapshot.recordingId, 0L, -1L, channel, streamId);
        String replaySubscriptionChannel = ChannelUri.addSessionId(channel, sessionId);
        try (Subscription subscription = this.aeron.addSubscription(replaySubscriptionChannel, streamId);){
            Image image = this.awaitImage(sessionId, subscription);
            ConsensusModuleSnapshotLoader snapshotLoader = new ConsensusModuleSnapshotLoader(image, this);
            while (true) {
                int fragments;
                if ((fragments = snapshotLoader.poll()) == 0) {
                    if (snapshotLoader.isDone()) {
                        break;
                    }
                    if (image.isClosed()) {
                        throw new ClusterException("snapshot ended unexpectedly");
                    }
                }
                this.idle(fragments);
            }
        }
    }

    private Image awaitImage(int sessionId, Subscription subscription) {
        Image image;
        this.idleStrategy.reset();
        while ((image = subscription.imageBySessionId(sessionId)) == null) {
            this.idle();
        }
        return image;
    }

    private Counter addRecoveryStateCounter(RecordingLog.RecoveryPlan plan) {
        int snapshotsCount = plan.snapshots.size();
        if (snapshotsCount > 0) {
            long[] serviceSnapshotRecordingIds = new long[snapshotsCount - 1];
            RecordingLog.Snapshot snapshot = plan.snapshots.get(0);
            for (int i = 1; i < snapshotsCount; ++i) {
                RecordingLog.Snapshot serviceSnapshot = plan.snapshots.get(i);
                serviceSnapshotRecordingIds[serviceSnapshot.serviceId] = serviceSnapshot.recordingId;
            }
            return RecoveryState.allocate(this.aeron, this.tempBuffer, snapshot.leadershipTermId, snapshot.logPosition, snapshot.timestamp, plan.hasReplay(), serviceSnapshotRecordingIds);
        }
        return RecoveryState.allocate(this.aeron, this.tempBuffer, this.leadershipTermId, 0L, 0L, plan.hasReplay(), new long[0]);
    }

    private DynamicJoin requiresDynamicJoin() {
        if (this.recoveryPlan.snapshots.isEmpty() && 0 == this.clusterMembers.length && null != this.ctx.clusterMembersStatusEndpoints()) {
            return new DynamicJoin(this.ctx.clusterMembersStatusEndpoints(), this.archive, this.memberStatusAdapter, this.memberStatusPublisher, this.ctx, this);
        }
        return null;
    }

    private void awaitServiceAcks(long logPosition) {
        while (!ServiceAck.hasReachedPosition(logPosition, this.serviceAckId, this.serviceAcks)) {
            this.idle(this.consensusModuleAdapter.poll());
        }
        ++this.serviceAckId;
    }

    private long logPosition() {
        return null != this.logAdapter ? this.logAdapter.position() : this.logPublisher.position();
    }

    private void validateServiceAck(long logPosition, long ackId, int serviceId) {
        if (logPosition != this.expectedAckPosition || ackId != this.serviceAckId) {
            throw new ClusterException("invalid service ACK state " + (Object)((Object)this.state) + ": serviceId=" + serviceId + ", logPosition=" + logPosition + " expected " + this.expectedAckPosition + ", ackId=" + ackId + " expected " + this.serviceAckId);
        }
    }

    private void updateClientFacingEndpoints(ClusterMember[] members) {
        StringBuilder builder = new StringBuilder(100);
        int length = members.length;
        for (int i = 0; i < length; ++i) {
            if (0 != i) {
                builder.append(',');
            }
            ClusterMember member = members[i];
            builder.append(member.id()).append('=').append(member.clientFacingEndpoint());
        }
        this.clientFacingEndpoints = builder.toString();
    }

    private void handleMemberRemovals(long commitPosition) {
        ClusterMember[] newClusterMembers = this.clusterMembers;
        for (ClusterMember member : this.clusterMembers) {
            if (!member.hasRequestedRemove() || member.removalPosition() > commitPosition) continue;
            if (member == this.thisMember) {
                this.expectedAckPosition = commitPosition;
                this.state(ConsensusModule.State.LEAVING);
            }
            newClusterMembers = ClusterMember.removeMember(newClusterMembers, member.id());
            this.clusterMemberByIdMap.remove(member.id());
            this.clusterMemberByIdMap.compact();
            CloseHelper.close(member.publication());
            member.publication(null);
            this.logPublisher.removePassiveFollower(member.logEndpoint());
            --this.pendingMemberRemovals;
        }
        this.clusterMembers = newClusterMembers;
        this.rankedPositions = new long[this.clusterMembers.length];
    }

    private int updateMemberPosition(long nowMs) {
        int workCount = 0;
        long appendedPosition = this.appendedPosition.get();
        if (Cluster.Role.LEADER == this.role) {
            this.thisMember.logPosition(appendedPosition).timeOfLastAppendPositionMs(nowMs);
            long quorumPosition = ClusterMember.quorumPosition(this.clusterMembers, this.rankedPositions);
            if (this.commitPosition.proposeMaxOrdered(Math.min(quorumPosition, appendedPosition)) || nowMs >= this.timeOfLastLogUpdateMs + this.leaderHeartbeatIntervalMs) {
                long commitPosition = this.commitPosition.getWeak();
                for (ClusterMember member : this.clusterMembers) {
                    if (member == this.thisMember) continue;
                    Publication publication = member.publication();
                    this.memberStatusPublisher.commitPosition(publication, this.leadershipTermId, commitPosition, this.memberId);
                }
                this.timeOfLastLogUpdateMs = nowMs;
                if (this.pendingMemberRemovals > 0) {
                    this.handleMemberRemovals(commitPosition);
                }
                ++workCount;
            }
        } else {
            Publication publication = this.leaderMember.publication();
            if ((appendedPosition != this.lastAppendedPosition || nowMs >= this.timeOfLastAppendPosition + this.leaderHeartbeatIntervalMs) && this.memberStatusPublisher.appendedPosition(publication, this.leadershipTermId, appendedPosition, this.memberId)) {
                this.lastAppendedPosition = appendedPosition;
                this.timeOfLastAppendPosition = nowMs;
                ++workCount;
            }
            this.commitPosition.proposeMaxOrdered(Math.min(this.logAdapter.position(), appendedPosition));
            if (nowMs >= this.timeOfLastLogUpdateMs + this.leaderHeartbeatTimeoutMs) {
                this.ctx.countedErrorHandler().onError(new ClusterException("heartbeat timeout from leader"));
                this.enterElection(nowMs);
                ++workCount;
            }
        }
        return workCount;
    }

    private void enterElection(long nowMs) {
        this.ingressAdapter.close();
        this.commitPosition.proposeMaxOrdered(this.followerCommitPosition);
        this.election = new Election(false, this.leadershipTermId, this.commitPosition.getWeak(), this.clusterMembers, this.clusterMemberByIdMap, this.thisMember, this.memberStatusAdapter, this.memberStatusPublisher, this.ctx, this);
        this.election.doWork(nowMs);
        this.serviceProxy.electionStartEvent(this.commitPosition.getWeak());
    }

    private void idle() {
        ConsensusModuleAgent.checkInterruptedStatus();
        this.aeronClientInvoker.invoke();
        this.idleStrategy.idle();
    }

    private void idle(int workCount) {
        ConsensusModuleAgent.checkInterruptedStatus();
        this.aeronClientInvoker.invoke();
        this.idleStrategy.idle(workCount);
    }

    private static void checkInterruptedStatus() {
        if (Thread.currentThread().isInterrupted()) {
            throw new TimeoutException("unexpected interrupt");
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void takeSnapshot(long timestampMs, long logPosition) {
        try (ExclusivePublication publication = this.aeron.addExclusivePublication(this.ctx.snapshotChannel(), this.ctx.snapshotStreamId());){
            String channel = ChannelUri.addSessionId(this.ctx.snapshotChannel(), publication.sessionId());
            long subscriptionId = this.archive.startRecording(channel, this.ctx.snapshotStreamId(), SourceLocation.LOCAL);
            try {
                CountersReader counters = this.aeron.countersReader();
                int counterId = this.awaitRecordingCounter(counters, publication.sessionId());
                long recordingId = RecordingPos.getRecordingId(counters, counterId);
                long termBaseLogPosition = this.recordingLog.getTermEntry((long)this.leadershipTermId).termBaseLogPosition;
                this.snapshotState(publication, logPosition, this.leadershipTermId);
                this.awaitRecordingComplete(recordingId, ((Publication)publication).position(), counters, counterId);
                for (int serviceId = this.serviceAcks.length - 1; serviceId >= 0; --serviceId) {
                    long snapshotId = this.serviceAcks[serviceId].relevantId();
                    this.recordingLog.appendSnapshot(snapshotId, this.leadershipTermId, termBaseLogPosition, logPosition, timestampMs, serviceId);
                }
                this.recordingLog.appendSnapshot(recordingId, this.leadershipTermId, termBaseLogPosition, logPosition, timestampMs, -1);
                this.recordingLog.force();
            }
            finally {
                this.archive.stopRecording(subscriptionId);
            }
            this.ctx.snapshotCounter().incrementOrdered();
        }
    }

    private void awaitRecordingComplete(long recordingId, long position, CountersReader counters, int counterId) {
        this.idleStrategy.reset();
        do {
            this.idle();
            if (RecordingPos.isActive(counters, counterId, recordingId)) continue;
            throw new ClusterException("recording has stopped unexpectedly: " + recordingId);
        } while (counters.getCounterValue(counterId) < position);
    }

    private int awaitRecordingCounter(CountersReader counters, int sessionId) {
        this.idleStrategy.reset();
        int counterId = RecordingPos.findCounterIdBySession(counters, sessionId);
        while (-1 == counterId) {
            this.idle();
            counterId = RecordingPos.findCounterIdBySession(counters, sessionId);
        }
        return counterId;
    }

    private void snapshotState(Publication publication, long logPosition, long leadershipTermId) {
        ConsensusModuleSnapshotTaker snapshotTaker = new ConsensusModuleSnapshotTaker(publication, this.idleStrategy, this.aeronClientInvoker);
        snapshotTaker.markBegin(1L, logPosition, leadershipTermId, 0);
        for (ClusterSession session : this.sessionByIdMap.values()) {
            if (session.state() != ClusterSession.State.OPEN && session.state() != ClusterSession.State.CLOSED) continue;
            snapshotTaker.snapshotSession(session);
        }
        this.aeronClientInvoker.invoke();
        this.timerService.snapshot(snapshotTaker);
        snapshotTaker.consensusModuleState(this.nextSessionId);
        snapshotTaker.clusterMembers(this.memberId, this.highMemberId, this.clusterMembers);
        snapshotTaker.markEnd(1L, logPosition, leadershipTermId, 0);
    }

    private Publication createLogPublication(ChannelUri channelUri, RecordingLog.RecoveryPlan plan, long position) {
        channelUri.put("tags", "1,2");
        if (!plan.logs.isEmpty()) {
            RecordingLog.Log log = plan.logs.get(0);
            this.logInitialTermId = log.initialTermId;
            this.logTermBufferLength = log.termBufferLength;
            this.logMtuLength = log.mtuLength;
        }
        if (-1 != this.logInitialTermId) {
            channelUri.initialPosition(position, this.logInitialTermId, this.logTermBufferLength);
            channelUri.put("mtu", Integer.toString(this.logMtuLength));
        }
        ExclusivePublication publication = this.aeron.addExclusivePublication(channelUri.toString(), this.ctx.logStreamId());
        if (!channelUri.containsKey("endpoint") && "udp".equals(channelUri.media())) {
            ChannelUriStringBuilder builder = new ChannelUriStringBuilder().media("udp");
            for (ClusterMember member : this.clusterMembers) {
                if (member == this.thisMember) continue;
                publication.addDestination(builder.endpoint(member.logEndpoint()).build());
            }
            for (ClusterMember member : this.passiveMembers) {
                publication.addDestination(builder.endpoint(member.logEndpoint()).build());
            }
        }
        return publication;
    }

    private void startLogRecording(String channel, SourceLocation sourceLocation) {
        if (!this.recoveryPlan.logs.isEmpty()) {
            this.lastRecordingId = this.recoveryPlan.logs.get((int)0).recordingId;
        }
        if (-1L == this.lastRecordingId) {
            this.archive.startRecording(channel, this.ctx.logStreamId(), sourceLocation);
        } else {
            this.archive.extendRecording(this.lastRecordingId, channel, this.ctx.logStreamId(), sourceLocation);
        }
        this.logRecordingChannel = channel;
    }

    private void replayClusterAction(long leadershipTermId, long logPosition, ConsensusModule.State newState) {
        this.leadershipTermId = leadershipTermId;
        this.expectedAckPosition = logPosition;
        this.state(newState);
    }

    private void clusterMemberJoined(int memberId, ClusterMember[] newMembers) {
        this.highMemberId = Math.max(this.highMemberId, memberId);
        ClusterMember eventMember = ClusterMember.findMember(newMembers, memberId);
        this.clusterMembers = ClusterMember.addMember(this.clusterMembers, eventMember);
        this.clusterMemberByIdMap.put(memberId, eventMember);
        this.rankedPositions = new long[this.clusterMembers.length];
    }

    private void clusterMemberQuit(int memberId) {
        this.clusterMembers = ClusterMember.removeMember(this.clusterMembers, memberId);
        this.clusterMemberByIdMap.remove(memberId);
        this.rankedPositions = new long[this.clusterMembers.length];
    }

    private void closeExistingLog() {
        this.logPublisher.disconnect();
        CloseHelper.close(this.logAdapter);
        this.logAdapter = null;
    }

    private void cancelMissedTimers() {
        this.missedTimersSet.removeIf(this.timerService::cancelTimer);
    }

    private void onUnavailableIngressImage(Image image) {
        this.ingressAdapter.freeSessionBuffer(image.sessionId());
    }

    private void clusterTimeMs(long nowMs) {
        if (-1L == this.clusterTimeMs) {
            this.timerService.resetStartTime(nowMs);
        }
        this.clusterTimeMs = nowMs;
    }

    private ClusterMember determineMemberAndCheckEndpoints(ClusterMember[] clusterMembers) {
        ClusterMember member;
        int memberId = this.ctx.clusterMemberId();
        ClusterMember clusterMember = member = -1 != memberId ? ClusterMember.findMember(clusterMembers, memberId) : null;
        if ((null == clusterMembers || 0 == clusterMembers.length) && null == member) {
            member = ClusterMember.parseEndpoints(-1, this.ctx.memberEndpoints());
        } else {
            if (null == member) {
                throw new ClusterException("memberId=" + memberId + " not found in clusterMembers");
            }
            if (!this.ctx.memberEndpoints().equals("")) {
                ClusterMember.validateMemberEndpoints(member, this.ctx.memberEndpoints());
            }
        }
        return member;
    }
}

