/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.cluster.service;

import io.aeron.Aeron;
import io.aeron.ChannelUri;
import io.aeron.DirectBufferVector;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.archive.client.AeronArchive;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.cluster.client.ClusterException;
import io.aeron.cluster.codecs.ChangeType;
import io.aeron.cluster.codecs.CloseReason;
import io.aeron.cluster.codecs.ClusterAction;
import io.aeron.cluster.codecs.EgressMessageHeaderEncoder;
import io.aeron.cluster.codecs.MessageHeaderEncoder;
import io.aeron.cluster.service.ActiveLogEvent;
import io.aeron.cluster.service.BoundedLogAdapter;
import io.aeron.cluster.service.ClientSession;
import io.aeron.cluster.service.Cluster;
import io.aeron.cluster.service.ClusterMarkFile;
import io.aeron.cluster.service.ClusterNodeRole;
import io.aeron.cluster.service.ClusteredService;
import io.aeron.cluster.service.ClusteredServiceContainer;
import io.aeron.cluster.service.CommitPos;
import io.aeron.cluster.service.ConsensusModuleProxy;
import io.aeron.cluster.service.RecoveryState;
import io.aeron.cluster.service.ServiceAdapter;
import io.aeron.cluster.service.ServiceHeartbeat;
import io.aeron.cluster.service.ServiceSnapshotLoader;
import io.aeron.cluster.service.ServiceSnapshotTaker;
import io.aeron.logbuffer.Header;
import io.aeron.status.ReadableCounter;
import java.util.Collection;
import java.util.Collections;
import org.agrona.CloseHelper;
import org.agrona.DirectBuffer;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.concurrent.Agent;
import org.agrona.concurrent.AgentTerminationException;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.IdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.CountersReader;

class ClusteredServiceAgent
implements Agent,
Cluster {
    public static final int SESSION_HEADER_LENGTH = 32;
    private final int serviceId;
    private final AeronArchive.Context archiveCtx;
    private final ClusteredServiceContainer.Context ctx;
    private final Aeron aeron;
    private final Long2ObjectHashMap<ClientSession> sessionByIdMap = new Long2ObjectHashMap();
    private final Collection<ClientSession> readOnlyClientSessions = Collections.unmodifiableCollection(this.sessionByIdMap.values());
    private final ClusteredService service;
    private final ConsensusModuleProxy consensusModuleProxy;
    private final ServiceAdapter serviceAdapter;
    private final IdleStrategy idleStrategy;
    private final EpochClock epochClock;
    private final ClusterMarkFile markFile;
    private final UnsafeBuffer headerBuffer = new UnsafeBuffer(new byte[32]);
    private final DirectBufferVector headerVector = new DirectBufferVector(this.headerBuffer, 0, this.headerBuffer.capacity());
    private final EgressMessageHeaderEncoder egressMessageHeaderEncoder = new EgressMessageHeaderEncoder();
    private long ackId = 0L;
    private long clusterTimeMs;
    private long cachedTimeMs;
    private long terminationPosition = -1L;
    private long roleChangePosition = -1L;
    private int memberId = -1;
    private boolean isServiceActive;
    private BoundedLogAdapter logAdapter;
    private AtomicCounter heartbeatCounter;
    private ReadableCounter roleCounter;
    private ReadableCounter commitPosition;
    private ActiveLogEvent activeLogEvent;
    private Cluster.Role role = Cluster.Role.FOLLOWER;
    private String logChannel = null;

    ClusteredServiceAgent(ClusteredServiceContainer.Context ctx) {
        this.ctx = ctx;
        this.archiveCtx = ctx.archiveContext();
        this.aeron = ctx.aeron();
        this.service = ctx.clusteredService();
        this.idleStrategy = ctx.idleStrategy();
        this.serviceId = ctx.serviceId();
        this.epochClock = ctx.epochClock();
        this.markFile = ctx.clusterMarkFile();
        String channel = ctx.serviceControlChannel();
        this.consensusModuleProxy = new ConsensusModuleProxy(this.aeron.addPublication(channel, ctx.consensusModuleStreamId()));
        this.serviceAdapter = new ServiceAdapter(this.aeron.addSubscription(channel, ctx.serviceStreamId()), this);
        this.egressMessageHeaderEncoder.wrapAndApplyHeader(this.headerBuffer, 0, new MessageHeaderEncoder());
    }

    @Override
    public void onStart() {
        CountersReader counters = this.aeron.countersReader();
        this.roleCounter = this.awaitClusterRoleCounter(counters);
        this.heartbeatCounter = this.awaitHeartbeatCounter(counters);
        this.commitPosition = this.awaitCommitPositionCounter(counters);
        this.service.onStart(this);
        this.isServiceActive = true;
        int recoveryCounterId = this.awaitRecoveryCounter(counters);
        this.heartbeatCounter.setOrdered(this.epochClock.time());
        this.checkForSnapshot(counters, recoveryCounterId);
        this.checkForReplay(counters, recoveryCounterId);
    }

    @Override
    public void onClose() {
        if (this.isServiceActive) {
            this.isServiceActive = false;
            try {
                this.service.onTerminate(this);
            }
            catch (Exception ex) {
                this.ctx.countedErrorHandler().onError(ex);
            }
        }
        if (!this.ctx.ownsAeronClient()) {
            for (ClientSession session : this.sessionByIdMap.values()) {
                session.disconnect();
            }
            CloseHelper.close(this.logAdapter);
            CloseHelper.close(this.serviceAdapter);
            CloseHelper.close(this.consensusModuleProxy);
        }
        this.ctx.close();
    }

    @Override
    public int doWork() {
        int workCount = 0;
        if (this.checkForClockTick()) {
            this.pollServiceAdapter();
            ++workCount;
        }
        if (null != this.logAdapter) {
            int polled = this.logAdapter.poll();
            if (0 == polled && this.logAdapter.isDone()) {
                this.logAdapter.close();
                this.logAdapter = null;
            }
            workCount += polled;
        }
        return workCount;
    }

    @Override
    public String roleName() {
        return this.ctx.serviceName();
    }

    @Override
    public Cluster.Role role() {
        return this.role;
    }

    @Override
    public int memberId() {
        return this.memberId;
    }

    @Override
    public Aeron aeron() {
        return this.aeron;
    }

    @Override
    public ClusteredServiceContainer.Context context() {
        return this.ctx;
    }

    @Override
    public ClientSession getClientSession(long clusterSessionId) {
        return this.sessionByIdMap.get(clusterSessionId);
    }

    @Override
    public Collection<ClientSession> clientSessions() {
        return this.readOnlyClientSessions;
    }

    @Override
    public boolean closeSession(long clusterSessionId) {
        ClientSession clientSession = this.sessionByIdMap.get(clusterSessionId);
        if (clientSession == null) {
            throw new ClusterException("unknown clusterSessionId: " + clusterSessionId);
        }
        if (clientSession.isClosing()) {
            return true;
        }
        if (this.consensusModuleProxy.closeSession(clusterSessionId)) {
            clientSession.markClosing();
            return true;
        }
        return false;
    }

    @Override
    public long timeMs() {
        return this.clusterTimeMs;
    }

    @Override
    public boolean scheduleTimer(long correlationId, long deadlineMs) {
        return this.consensusModuleProxy.scheduleTimer(correlationId, deadlineMs);
    }

    @Override
    public boolean cancelTimer(long correlationId) {
        return this.consensusModuleProxy.cancelTimer(correlationId);
    }

    @Override
    public void idle() {
        ClusteredServiceAgent.checkInterruptedStatus();
        this.checkForClockTick();
        this.idleStrategy.idle();
    }

    @Override
    public void idle(int workCount) {
        ClusteredServiceAgent.checkInterruptedStatus();
        this.checkForClockTick();
        this.idleStrategy.idle(workCount);
    }

    public long offer(long clusterSessionId, Publication publication, DirectBuffer buffer, int offset, int length) {
        if (this.role != Cluster.Role.LEADER) {
            return 1L;
        }
        if (null == publication) {
            return -1L;
        }
        this.egressMessageHeaderEncoder.clusterSessionId(clusterSessionId).timestamp(this.clusterTimeMs);
        return publication.offer(this.headerBuffer, 0, this.headerBuffer.capacity(), buffer, offset, length, null);
    }

    public long offer(long clusterSessionId, Publication publication, DirectBufferVector[] vectors) {
        if (this.role != Cluster.Role.LEADER) {
            return 1L;
        }
        if (null == publication) {
            return -1L;
        }
        this.egressMessageHeaderEncoder.clusterSessionId(clusterSessionId).timestamp(this.clusterTimeMs);
        if (vectors[0] != this.headerVector) {
            vectors[0] = this.headerVector;
        }
        return publication.offer(vectors, null);
    }

    public void onJoinLog(long leadershipTermId, long logPosition, long maxLogPosition, int memberId, int logSessionId, int logStreamId, String logChannel) {
        if (null != this.logAdapter && !logChannel.equals(this.logChannel)) {
            this.logAdapter.close();
            this.logAdapter = null;
        }
        this.roleChangePosition = -1L;
        this.activeLogEvent = new ActiveLogEvent(leadershipTermId, logPosition, maxLogPosition, memberId, logSessionId, logStreamId, logChannel);
    }

    public void onServiceTerminationPosition(long logPosition) {
        this.terminationPosition = logPosition;
    }

    public void onElectionStartEvent(long logPosition) {
        this.roleChangePosition = logPosition;
    }

    void onSessionMessage(long clusterSessionId, long timestampMs, DirectBuffer buffer, int offset, int length, Header header) {
        this.clusterTimeMs = timestampMs;
        ClientSession clientSession = this.sessionByIdMap.get(clusterSessionId);
        this.service.onSessionMessage(clientSession, timestampMs, buffer, offset, length, header);
    }

    void onTimerEvent(long correlationId, long timestampMs) {
        this.clusterTimeMs = timestampMs;
        this.service.onTimerEvent(correlationId, timestampMs);
    }

    void onSessionOpen(long clusterSessionId, long timestampMs, int responseStreamId, String responseChannel, byte[] encodedPrincipal) {
        this.clusterTimeMs = timestampMs;
        ClientSession session = new ClientSession(clusterSessionId, responseStreamId, responseChannel, encodedPrincipal, this);
        if (Cluster.Role.LEADER == this.role && this.ctx.isRespondingService()) {
            session.connect(this.aeron);
        }
        this.sessionByIdMap.put(clusterSessionId, session);
        this.service.onSessionOpen(session, timestampMs);
    }

    void onSessionClose(long clusterSessionId, long timestampMs, CloseReason closeReason) {
        this.clusterTimeMs = timestampMs;
        ClientSession session = this.sessionByIdMap.remove(clusterSessionId);
        session.disconnect();
        this.service.onSessionClose(session, timestampMs, closeReason);
    }

    void onServiceAction(long logPosition, long leadershipTermId, long timestampMs, ClusterAction action) {
        this.clusterTimeMs = timestampMs;
        this.executeAction(action, logPosition, leadershipTermId);
    }

    void onNewLeadershipTermEvent(long leadershipTermId, long logPosition, long timestampMs, int leaderMemberId, int logSessionId) {
        this.egressMessageHeaderEncoder.leadershipTermId(leadershipTermId);
        this.clusterTimeMs = timestampMs;
    }

    void onMembershipChange(long leadershipTermId, long logPosition, long timestampMs, int leaderMemberId, int clusterSize, ChangeType changeType, int memberId, String clusterMembers) {
        this.clusterTimeMs = timestampMs;
        if (memberId == this.memberId && changeType == ChangeType.QUIT) {
            this.terminate(logPosition);
        }
    }

    void addSession(long clusterSessionId, int responseStreamId, String responseChannel, byte[] encodedPrincipal) {
        this.sessionByIdMap.put(clusterSessionId, new ClientSession(clusterSessionId, responseStreamId, responseChannel, encodedPrincipal, this));
    }

    void handleError(Throwable ex) {
        this.ctx.countedErrorHandler().onError(ex);
    }

    private void role(Cluster.Role newRole) {
        if (newRole != this.role) {
            this.role = newRole;
            this.service.onRoleChange(newRole);
        }
    }

    private void checkForSnapshot(CountersReader counters, int recoveryCounterId) {
        this.clusterTimeMs = RecoveryState.getTimestamp(counters, recoveryCounterId);
        long leadershipTermId = RecoveryState.getLeadershipTermId(counters, recoveryCounterId);
        if (-1L != leadershipTermId) {
            this.loadSnapshot(RecoveryState.getSnapshotRecordingId(counters, recoveryCounterId, this.serviceId));
        }
        this.heartbeatCounter.setOrdered(this.epochClock.time());
        this.consensusModuleProxy.ack(RecoveryState.getLogPosition(counters, recoveryCounterId), this.ackId++, this.serviceId);
    }

    private void checkForReplay(CountersReader counters, int recoveryCounterId) {
        if (RecoveryState.hasReplay(counters, recoveryCounterId)) {
            this.awaitActiveLog();
            try (Subscription subscription = this.aeron.addSubscription(this.activeLogEvent.channel, this.activeLogEvent.streamId);){
                this.consensusModuleProxy.ack(this.activeLogEvent.logPosition, this.ackId++, this.serviceId);
                Image image = this.awaitImage(this.activeLogEvent.sessionId, subscription);
                BoundedLogAdapter adapter = new BoundedLogAdapter(image, this.commitPosition, this);
                this.consumeImage(image, adapter, this.activeLogEvent.maxLogPosition);
            }
            this.activeLogEvent = null;
            this.heartbeatCounter.setOrdered(this.epochClock.time());
        }
    }

    private void awaitActiveLog() {
        this.idleStrategy.reset();
        while (null == this.activeLogEvent) {
            this.serviceAdapter.poll();
            ClusteredServiceAgent.checkInterruptedStatus();
            this.heartbeatCounter.setOrdered(this.epochClock.time());
            this.idleStrategy.idle();
        }
    }

    private void consumeImage(Image image, BoundedLogAdapter adapter, long maxLogPosition) {
        while (true) {
            int workCount;
            if ((workCount = adapter.poll()) == 0) {
                if (adapter.position() >= maxLogPosition) break;
                if (image.isClosed()) {
                    throw new ClusterException("unexpected close of replay");
                }
            }
            this.idle(workCount);
        }
        this.consensusModuleProxy.ack(image.position(), this.ackId++, this.serviceId);
    }

    private int awaitRecoveryCounter(CountersReader counters) {
        this.idleStrategy.reset();
        int counterId = RecoveryState.findCounterId(counters);
        while (-1 == counterId) {
            ClusteredServiceAgent.checkInterruptedStatus();
            this.idleStrategy.idle();
            this.heartbeatCounter.setOrdered(this.epochClock.time());
            counterId = RecoveryState.findCounterId(counters);
        }
        return counterId;
    }

    private void joinActiveLog() {
        Subscription logSubscription = this.aeron.addSubscription(this.activeLogEvent.channel, this.activeLogEvent.streamId);
        this.consensusModuleProxy.ack(this.activeLogEvent.logPosition, this.ackId++, this.serviceId);
        Image image = this.awaitImage(this.activeLogEvent.sessionId, logSubscription);
        this.heartbeatCounter.setOrdered(this.epochClock.time());
        this.egressMessageHeaderEncoder.leadershipTermId(this.activeLogEvent.leadershipTermId);
        this.memberId = this.activeLogEvent.memberId;
        this.ctx.clusterMarkFile().memberId(this.memberId);
        this.logChannel = this.activeLogEvent.channel;
        this.activeLogEvent = null;
        this.logAdapter = new BoundedLogAdapter(image, this.commitPosition, this);
        this.role(Cluster.Role.get((int)this.roleCounter.get()));
        for (ClientSession session : this.sessionByIdMap.values()) {
            if (Cluster.Role.LEADER == this.role) {
                if (this.ctx.isRespondingService()) {
                    session.connect(this.aeron);
                }
                session.resetClosing();
                continue;
            }
            session.disconnect();
        }
    }

    private Image awaitImage(int sessionId, Subscription subscription) {
        Image image;
        this.idleStrategy.reset();
        while ((image = subscription.imageBySessionId(sessionId)) == null) {
            ClusteredServiceAgent.checkInterruptedStatus();
            this.idleStrategy.idle();
        }
        return image;
    }

    private ReadableCounter awaitClusterRoleCounter(CountersReader counters) {
        this.idleStrategy.reset();
        int counterId = ClusterNodeRole.findCounterId(counters);
        while (-1 == counterId) {
            ClusteredServiceAgent.checkInterruptedStatus();
            this.idleStrategy.idle();
            counterId = ClusterNodeRole.findCounterId(counters);
        }
        return new ReadableCounter(counters, counterId);
    }

    private ReadableCounter awaitCommitPositionCounter(CountersReader counters) {
        this.idleStrategy.reset();
        int counterId = CommitPos.findCounterId(counters);
        while (-1 == counterId) {
            ClusteredServiceAgent.checkInterruptedStatus();
            this.idleStrategy.idle();
            this.heartbeatCounter.setOrdered(this.epochClock.time());
            counterId = CommitPos.findCounterId(counters);
        }
        return new ReadableCounter(counters, counterId);
    }

    private AtomicCounter awaitHeartbeatCounter(CountersReader counters) {
        this.idleStrategy.reset();
        int counterId = ServiceHeartbeat.findCounterId(counters, this.ctx.serviceId());
        while (-1 == counterId) {
            ClusteredServiceAgent.checkInterruptedStatus();
            this.idleStrategy.idle();
            counterId = ServiceHeartbeat.findCounterId(counters, this.ctx.serviceId());
        }
        return new AtomicCounter(counters.valuesBuffer(), counterId);
    }

    private void loadSnapshot(long recordingId) {
        try (AeronArchive archive = AeronArchive.connect(this.archiveCtx);){
            String channel = this.ctx.replayChannel();
            int streamId = this.ctx.replayStreamId();
            int sessionId = (int)archive.startReplay(recordingId, 0L, -1L, channel, streamId);
            String replaySessionChannel = ChannelUri.addSessionId(channel, sessionId);
            try (Subscription subscription = this.aeron.addSubscription(replaySessionChannel, streamId);){
                Image image = this.awaitImage(sessionId, subscription);
                this.loadState(image);
                this.service.onLoadSnapshot(image);
            }
        }
    }

    private void loadState(Image image) {
        ServiceSnapshotLoader snapshotLoader = new ServiceSnapshotLoader(image, this);
        while (true) {
            int fragments = snapshotLoader.poll();
            if (snapshotLoader.isDone()) break;
            if (fragments != 0) continue;
            ClusteredServiceAgent.checkInterruptedStatus();
            if (image.isClosed()) {
                throw new ClusterException("snapshot ended unexpectedly");
            }
            this.idleStrategy.idle(fragments);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private long onTakeSnapshot(long logPosition, long leadershipTermId) {
        long recordingId;
        try (AeronArchive archive = AeronArchive.connect(this.archiveCtx);
             ExclusivePublication publication = this.aeron.addExclusivePublication(this.ctx.snapshotChannel(), this.ctx.snapshotStreamId());){
            String channel = ChannelUri.addSessionId(this.ctx.snapshotChannel(), publication.sessionId());
            long subscriptionId = archive.startRecording(channel, this.ctx.snapshotStreamId(), SourceLocation.LOCAL);
            try {
                CountersReader counters = this.aeron.countersReader();
                int counterId = this.awaitRecordingCounter(publication.sessionId(), counters);
                recordingId = RecordingPos.getRecordingId(counters, counterId);
                this.snapshotState(publication, logPosition, leadershipTermId);
                this.service.onTakeSnapshot(publication);
                this.awaitRecordingComplete(recordingId, ((Publication)publication).position(), counters, counterId, archive);
            }
            finally {
                archive.stopRecording(subscriptionId);
            }
        }
        return recordingId;
    }

    private void awaitRecordingComplete(long recordingId, long position, CountersReader counters, int counterId, AeronArchive archive) {
        this.idleStrategy.reset();
        do {
            this.idleStrategy.idle();
            ClusteredServiceAgent.checkInterruptedStatus();
            if (!RecordingPos.isActive(counters, counterId, recordingId)) {
                throw new ClusterException("recording has stopped unexpectedly: " + recordingId);
            }
            archive.checkForErrorResponse();
        } while (counters.getCounterValue(counterId) < position);
    }

    private void snapshotState(Publication publication, long logPosition, long leadershipTermId) {
        ServiceSnapshotTaker snapshotTaker = new ServiceSnapshotTaker(publication, this.idleStrategy, null);
        snapshotTaker.markBegin(2L, logPosition, leadershipTermId, 0);
        for (ClientSession clientSession : this.sessionByIdMap.values()) {
            snapshotTaker.snapshotSession(clientSession);
        }
        snapshotTaker.markEnd(2L, logPosition, leadershipTermId, 0);
    }

    private void executeAction(ClusterAction action, long position, long leadershipTermId) {
        if (ClusterAction.SNAPSHOT == action) {
            this.consensusModuleProxy.ack(position, this.ackId++, this.onTakeSnapshot(position, leadershipTermId), this.serviceId);
        }
    }

    private int awaitRecordingCounter(int sessionId, CountersReader counters) {
        this.idleStrategy.reset();
        int counterId = RecordingPos.findCounterIdBySession(counters, sessionId);
        while (-1 == counterId) {
            ClusteredServiceAgent.checkInterruptedStatus();
            this.idleStrategy.idle();
            counterId = RecordingPos.findCounterIdBySession(counters, sessionId);
        }
        return counterId;
    }

    private static void checkInterruptedStatus() {
        if (Thread.currentThread().isInterrupted()) {
            throw new AgentTerminationException("unexpected interrupt during operation");
        }
    }

    private boolean checkForClockTick() {
        long nowMs = this.epochClock.time();
        if (this.cachedTimeMs != nowMs) {
            this.cachedTimeMs = nowMs;
            if (this.consensusModuleProxy.isConnected()) {
                this.markFile.updateActivityTimestamp(nowMs);
                this.heartbeatCounter.setOrdered(nowMs);
            } else {
                this.ctx.countedErrorHandler().onError(new ClusterException("Consensus Module not connected"));
                this.ctx.terminationHook().run();
            }
            return true;
        }
        return false;
    }

    private void pollServiceAdapter() {
        this.serviceAdapter.poll();
        if (null != this.activeLogEvent && null == this.logAdapter) {
            this.joinActiveLog();
        }
        if (-1L != this.terminationPosition) {
            this.checkForTermination();
        }
        if (-1L != this.roleChangePosition) {
            this.checkForRoleChange();
        }
    }

    private void checkForTermination() {
        if (null != this.logAdapter && this.logAdapter.position() >= this.terminationPosition) {
            long logPosition = this.terminationPosition;
            this.terminationPosition = -1L;
            this.terminate(logPosition);
        }
    }

    private void checkForRoleChange() {
        if (null != this.logAdapter && this.logAdapter.position() >= this.roleChangePosition) {
            this.roleChangePosition = -1L;
            this.role(Cluster.Role.get((int)this.roleCounter.get()));
        }
    }

    private void terminate(long logPosition) {
        this.isServiceActive = false;
        try {
            this.service.onTerminate(this);
        }
        catch (Exception ex) {
            this.ctx.countedErrorHandler().onError(ex);
        }
        this.consensusModuleProxy.ack(logPosition, this.ackId++, this.serviceId);
        this.ctx.terminationHook().run();
    }
}

