/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.driver;

import io.aeron.driver.Configuration;
import io.aeron.driver.DriverConductor;
import io.aeron.driver.DriverManagedResource;
import io.aeron.driver.FlowControl;
import io.aeron.driver.NetworkPublicationPadding3;
import io.aeron.driver.NetworkPublicationThreadLocals;
import io.aeron.driver.RetransmitHandler;
import io.aeron.driver.RetransmitSender;
import io.aeron.driver.Subscribable;
import io.aeron.driver.buffer.RawLog;
import io.aeron.driver.media.SendChannelEndpoint;
import io.aeron.driver.status.SystemCounterDescriptor;
import io.aeron.driver.status.SystemCounters;
import io.aeron.logbuffer.LogBufferDescriptor;
import io.aeron.logbuffer.LogBufferUnblocker;
import io.aeron.logbuffer.TermScanner;
import io.aeron.protocol.DataHeaderFlyweight;
import io.aeron.protocol.RttMeasurementFlyweight;
import io.aeron.protocol.SetupFlyweight;
import io.aeron.protocol.StatusMessageFlyweight;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import org.agrona.collections.ArrayUtil;
import org.agrona.concurrent.NanoClock;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.concurrent.status.AtomicCounter;
import org.agrona.concurrent.status.Position;
import org.agrona.concurrent.status.ReadablePosition;

public class NetworkPublication
extends NetworkPublicationPadding3
implements RetransmitSender,
DriverManagedResource,
Subscribable {
    private final long registrationId;
    private final long unblockTimeoutNs;
    private final long connectionTimeoutNs;
    private final long lingerTimeoutNs;
    private final long tag;
    private final int positionBitsToShift;
    private final int initialTermId;
    private final int termBufferLength;
    private final int termLengthMask;
    private final int mtuLength;
    private final int termWindowLength;
    private final int sessionId;
    private final int streamId;
    private final boolean isExclusive;
    private final boolean spiesSimulateConnection;
    private volatile boolean hasReceivers;
    private volatile boolean hasSpies;
    private volatile boolean isConnected;
    private volatile boolean isEndOfStream;
    private volatile boolean hasSenderReleased;
    private State state = State.ACTIVE;
    private final UnsafeBuffer[] termBuffers;
    private final ByteBuffer[] sendBuffers;
    private final Position publisherPos;
    private final Position publisherLimit;
    private final Position senderPosition;
    private final Position senderLimit;
    private final SendChannelEndpoint channelEndpoint;
    private final ByteBuffer heartbeatBuffer;
    private final DataHeaderFlyweight heartbeatDataHeader;
    private final ByteBuffer setupBuffer;
    private final SetupFlyweight setupHeader;
    private final ByteBuffer rttMeasurementBuffer;
    private final RttMeasurementFlyweight rttMeasurementHeader;
    private final FlowControl flowControl;
    private final NanoClock nanoClock;
    private final RetransmitHandler retransmitHandler;
    private final UnsafeBuffer metaDataBuffer;
    private final RawLog rawLog;
    private final AtomicCounter heartbeatsSent;
    private final AtomicCounter retransmitsSent;
    private final AtomicCounter senderFlowControlLimits;
    private final AtomicCounter shortSends;
    private final AtomicCounter unblockedPublications;

    public NetworkPublication(long registrationId, long tag, SendChannelEndpoint channelEndpoint, NanoClock nanoClock, RawLog rawLog, Position publisherPos, Position publisherLimit, Position senderPosition, Position senderLimit, int sessionId, int streamId, int initialTermId, int mtuLength, SystemCounters systemCounters, FlowControl flowControl, RetransmitHandler retransmitHandler, NetworkPublicationThreadLocals threadLocals, long unblockTimeoutNs, long connectionTimeoutNs, long lingerTimeoutNs, boolean isExclusive, boolean spiesSimulateConnection) {
        int termLength;
        this.registrationId = registrationId;
        this.unblockTimeoutNs = unblockTimeoutNs;
        this.connectionTimeoutNs = connectionTimeoutNs;
        this.lingerTimeoutNs = lingerTimeoutNs;
        this.tag = tag;
        this.channelEndpoint = channelEndpoint;
        this.rawLog = rawLog;
        this.nanoClock = nanoClock;
        this.senderPosition = senderPosition;
        this.senderLimit = senderLimit;
        this.flowControl = flowControl;
        this.retransmitHandler = retransmitHandler;
        this.publisherPos = publisherPos;
        this.publisherLimit = publisherLimit;
        this.mtuLength = mtuLength;
        this.initialTermId = initialTermId;
        this.sessionId = sessionId;
        this.streamId = streamId;
        this.isExclusive = isExclusive;
        this.spiesSimulateConnection = spiesSimulateConnection;
        this.metaDataBuffer = rawLog.metaData();
        this.setupBuffer = threadLocals.setupBuffer();
        this.setupHeader = threadLocals.setupHeader();
        this.heartbeatBuffer = threadLocals.heartbeatBuffer();
        this.heartbeatDataHeader = threadLocals.heartbeatDataHeader();
        this.rttMeasurementBuffer = threadLocals.rttMeasurementBuffer();
        this.rttMeasurementHeader = threadLocals.rttMeasurementHeader();
        this.heartbeatsSent = systemCounters.get(SystemCounterDescriptor.HEARTBEATS_SENT);
        this.shortSends = systemCounters.get(SystemCounterDescriptor.SHORT_SENDS);
        this.retransmitsSent = systemCounters.get(SystemCounterDescriptor.RETRANSMITS_SENT);
        this.senderFlowControlLimits = systemCounters.get(SystemCounterDescriptor.SENDER_FLOW_CONTROL_LIMITS);
        this.unblockedPublications = systemCounters.get(SystemCounterDescriptor.UNBLOCKED_PUBLICATIONS);
        this.termBuffers = rawLog.termBuffers();
        this.sendBuffers = rawLog.sliceTerms();
        this.termBufferLength = termLength = rawLog.termLength();
        this.termLengthMask = termLength - 1;
        flowControl.initialize(initialTermId, termLength);
        long nowNs = nanoClock.nanoTime();
        this.timeOfLastSendOrHeartbeatNs = nowNs - Configuration.PUBLICATION_HEARTBEAT_TIMEOUT_NS - 1L;
        this.timeOfLastSetupNs = nowNs - Configuration.PUBLICATION_SETUP_TIMEOUT_NS - 1L;
        this.statusMessageDeadlineNs = spiesSimulateConnection ? nowNs : nowNs + connectionTimeoutNs;
        this.positionBitsToShift = LogBufferDescriptor.positionBitsToShift(termLength);
        this.termWindowLength = Configuration.publicationTermWindowLength(termLength);
        this.cleanPosition = this.lastSenderPosition = senderPosition.get();
        this.timeOfLastActivityNs = nowNs;
    }

    @Override
    public boolean free() {
        return this.rawLog.free();
    }

    @Override
    public void close() {
        this.publisherPos.close();
        this.publisherLimit.close();
        this.senderPosition.close();
        this.senderLimit.close();
        for (ReadablePosition position : this.spyPositions) {
            position.close();
        }
        this.rawLog.close();
    }

    public long tag() {
        return this.tag;
    }

    public int termBufferLength() {
        return this.termBufferLength;
    }

    public int mtuLength() {
        return this.mtuLength;
    }

    public long registrationId() {
        return this.registrationId;
    }

    public boolean isExclusive() {
        return this.isExclusive;
    }

    public final int send(long nowNs) {
        int bytesSent;
        long senderPosition = this.senderPosition.get();
        int activeTermId = LogBufferDescriptor.computeTermIdFromPosition(senderPosition, this.positionBitsToShift, this.initialTermId);
        int termOffset = (int)senderPosition & this.termLengthMask;
        if (this.shouldSendSetupFrame) {
            this.setupMessageCheck(nowNs, activeTermId, termOffset);
        }
        if (0 == (bytesSent = this.sendData(nowNs, senderPosition, termOffset))) {
            boolean isEndOfStream = this.isEndOfStream;
            bytesSent = this.heartbeatMessageCheck(nowNs, activeTermId, termOffset, isEndOfStream);
            if (this.spiesSimulateConnection && this.statusMessageDeadlineNs - nowNs < 0L && this.hasSpies) {
                long newSenderPosition = this.maxSpyPosition(senderPosition);
                this.senderPosition.setOrdered(newSenderPosition);
                this.senderLimit.setOrdered(this.flowControl.onIdle(nowNs, newSenderPosition, newSenderPosition, isEndOfStream));
            } else {
                this.senderLimit.setOrdered(this.flowControl.onIdle(nowNs, this.senderLimit.get(), senderPosition, isEndOfStream));
            }
        }
        this.updateHasReceivers(nowNs);
        this.retransmitHandler.processTimeouts(nowNs, this);
        return bytesSent;
    }

    public SendChannelEndpoint channelEndpoint() {
        return this.channelEndpoint;
    }

    public String channel() {
        return this.channelEndpoint.originalUriString();
    }

    public int sessionId() {
        return this.sessionId;
    }

    public int streamId() {
        return this.streamId;
    }

    @Override
    public void resend(int termId, int termOffset, int length) {
        long senderPosition = this.senderPosition.get();
        long resendPosition = LogBufferDescriptor.computePosition(termId, termOffset, this.positionBitsToShift, this.initialTermId);
        if (resendPosition < senderPosition && resendPosition >= senderPosition - (long)this.termBufferLength) {
            long scanOutcome;
            int available;
            int activeIndex = LogBufferDescriptor.indexByPosition(resendPosition, this.positionBitsToShift);
            UnsafeBuffer termBuffer = this.termBuffers[activeIndex];
            ByteBuffer sendBuffer = this.sendBuffers[activeIndex];
            int remainingBytes = length;
            int bytesSent = 0;
            int offset = termOffset;
            while ((available = TermScanner.available(scanOutcome = TermScanner.scanForAvailability(termBuffer, offset += bytesSent, Math.min(this.mtuLength, remainingBytes)))) > 0) {
                sendBuffer.limit(offset + available).position(offset);
                if (available != this.channelEndpoint.send(sendBuffer)) {
                    this.shortSends.increment();
                    break;
                }
                bytesSent = available + TermScanner.padding(scanOutcome);
                if ((remainingBytes -= bytesSent) > 0) continue;
            }
            this.retransmitsSent.incrementOrdered();
        }
    }

    public void triggerSendSetupFrame() {
        if (!this.isEndOfStream) {
            this.shouldSendSetupFrame = true;
        }
    }

    @Override
    public void addSubscriber(ReadablePosition spyPosition) {
        this.spyPositions = ArrayUtil.add(this.spyPositions, spyPosition);
        this.hasSpies = true;
        if (this.spiesSimulateConnection) {
            LogBufferDescriptor.isConnected(this.metaDataBuffer, true);
            this.isConnected = true;
        }
    }

    @Override
    public void removeSubscriber(ReadablePosition spyPosition) {
        this.spyPositions = ArrayUtil.remove(this.spyPositions, spyPosition);
        this.hasSpies = this.spyPositions.length > 0;
        spyPosition.close();
    }

    public void onNak(int termId, int termOffset, int length) {
        this.retransmitHandler.onNak(termId, termOffset, length, this.termBufferLength, this);
    }

    public void onStatusMessage(StatusMessageFlyweight msg, InetSocketAddress srcAddress) {
        if (!this.hasReceivers) {
            this.hasReceivers = true;
        }
        long timeNs = this.nanoClock.nanoTime();
        this.statusMessageDeadlineNs = timeNs + this.connectionTimeoutNs;
        long limit = this.flowControl.onStatusMessage(msg, srcAddress, this.senderLimit.get(), this.initialTermId, this.positionBitsToShift, timeNs);
        this.senderLimit.setOrdered(limit);
        if (!this.isConnected) {
            LogBufferDescriptor.isConnected(this.metaDataBuffer, true);
            this.isConnected = true;
        }
    }

    public void onRttMeasurement(RttMeasurementFlyweight msg, InetSocketAddress srcAddress) {
        if (128 == (msg.flags() & 0x80)) {
            this.rttMeasurementHeader.receiverId(msg.receiverId()).echoTimestampNs(msg.echoTimestampNs()).receptionDelta(0L).sessionId(this.sessionId).streamId(this.streamId).flags((short)0);
            int bytesSent = this.channelEndpoint.send(this.rttMeasurementBuffer);
            if (40 != bytesSent) {
                this.shortSends.increment();
            }
        }
    }

    RawLog rawLog() {
        return this.rawLog;
    }

    int publisherLimitId() {
        return this.publisherLimit.id();
    }

    final int updatePublisherLimit() {
        int workCount = 0;
        long senderPosition = this.senderPosition.getVolatile();
        if (this.hasReceivers || this.spiesSimulateConnection && this.spyPositions.length > 0) {
            long minConsumerPosition = senderPosition;
            for (ReadablePosition spyPosition : this.spyPositions) {
                minConsumerPosition = Math.min(minConsumerPosition, spyPosition.getVolatile());
            }
            long proposedPublisherLimit = minConsumerPosition + (long)this.termWindowLength;
            if (this.publisherLimit.proposeMaxOrdered(proposedPublisherLimit)) {
                this.cleanBuffer(proposedPublisherLimit);
                workCount = 1;
            }
        } else if (this.publisherLimit.get() > senderPosition) {
            this.publisherLimit.setOrdered(senderPosition);
        }
        return workCount;
    }

    boolean hasSpies() {
        return this.hasSpies;
    }

    final void updateHasReceivers(long timeNs) {
        if (this.statusMessageDeadlineNs - timeNs < 0L && this.hasReceivers) {
            this.hasReceivers = false;
        }
    }

    private int sendData(long nowNs, long senderPosition, int termOffset) {
        int bytesSent = 0;
        int availableWindow = (int)(this.senderLimit.get() - senderPosition);
        if (availableWindow > 0) {
            int scanLimit = Math.min(availableWindow, this.mtuLength);
            int activeIndex = LogBufferDescriptor.indexByPosition(senderPosition, this.positionBitsToShift);
            long scanOutcome = TermScanner.scanForAvailability(this.termBuffers[activeIndex], termOffset, scanLimit);
            int available = TermScanner.available(scanOutcome);
            if (available > 0) {
                ByteBuffer sendBuffer = this.sendBuffers[activeIndex];
                sendBuffer.limit(termOffset + available).position(termOffset);
                if (available == this.channelEndpoint.send(sendBuffer)) {
                    this.timeOfLastSendOrHeartbeatNs = nowNs;
                    this.trackSenderLimits = true;
                    bytesSent = available;
                    this.senderPosition.setOrdered(senderPosition + (long)bytesSent + (long)TermScanner.padding(scanOutcome));
                } else {
                    this.shortSends.increment();
                }
            }
        } else if (this.trackSenderLimits) {
            this.trackSenderLimits = false;
            this.senderFlowControlLimits.incrementOrdered();
        }
        return bytesSent;
    }

    private void setupMessageCheck(long nowNs, int activeTermId, int termOffset) {
        if (this.timeOfLastSetupNs + Configuration.PUBLICATION_SETUP_TIMEOUT_NS - nowNs < 0L) {
            this.timeOfLastSetupNs = nowNs;
            this.timeOfLastSendOrHeartbeatNs = nowNs;
            this.setupBuffer.clear();
            this.setupHeader.activeTermId(activeTermId).termOffset(termOffset).sessionId(this.sessionId).streamId(this.streamId).initialTermId(this.initialTermId).termLength(this.termBufferLength).mtuLength(this.mtuLength).ttl(this.channelEndpoint.multicastTtl());
            if (40 != this.channelEndpoint.send(this.setupBuffer)) {
                this.shortSends.increment();
            }
            if (this.hasReceivers) {
                this.shouldSendSetupFrame = false;
            }
        }
    }

    private int heartbeatMessageCheck(long nowNs, int activeTermId, int termOffset, boolean isEndOfStream) {
        int bytesSent = 0;
        if (this.timeOfLastSendOrHeartbeatNs + Configuration.PUBLICATION_HEARTBEAT_TIMEOUT_NS - nowNs < 0L) {
            this.heartbeatBuffer.clear();
            this.heartbeatDataHeader.sessionId(this.sessionId).streamId(this.streamId).termId(activeTermId).termOffset(termOffset).flags((byte)(isEndOfStream ? 224 : 192));
            bytesSent = this.channelEndpoint.send(this.heartbeatBuffer);
            if (32 != bytesSent) {
                this.shortSends.increment();
            }
            this.timeOfLastSendOrHeartbeatNs = nowNs;
            this.heartbeatsSent.incrementOrdered();
        }
        return bytesSent;
    }

    private void cleanBuffer(long publisherLimit) {
        long cleanPosition = this.cleanPosition;
        long dirtyRange = publisherLimit - cleanPosition;
        int bufferCapacity = this.termBufferLength;
        int reservedRange = bufferCapacity * 2;
        if (dirtyRange > (long)reservedRange) {
            UnsafeBuffer dirtyTerm = this.termBuffers[LogBufferDescriptor.indexByPosition(cleanPosition, this.positionBitsToShift)];
            int termOffset = (int)cleanPosition & this.termLengthMask;
            int bytesForCleaning = (int)(dirtyRange - (long)reservedRange);
            int length = Math.min(bytesForCleaning, bufferCapacity - termOffset);
            dirtyTerm.setMemory(termOffset, length, (byte)0);
            this.cleanPosition = cleanPosition + (long)length;
        }
    }

    private void checkForBlockedPublisher(long producerPosition, long senderPosition, long nowNs) {
        if (senderPosition == this.lastSenderPosition && this.isPossiblyBlocked(producerPosition, senderPosition)) {
            if (this.timeOfLastActivityNs + this.unblockTimeoutNs - nowNs < 0L && LogBufferUnblocker.unblock(this.termBuffers, this.metaDataBuffer, senderPosition, this.termBufferLength)) {
                this.unblockedPublications.incrementOrdered();
            }
        } else {
            this.timeOfLastActivityNs = nowNs;
            this.lastSenderPosition = senderPosition;
        }
    }

    private boolean isPossiblyBlocked(long producerPosition, long consumerPosition) {
        int expectedTermCount;
        int producerTermCount = LogBufferDescriptor.activeTermCount(this.metaDataBuffer);
        if (producerTermCount != (expectedTermCount = (int)(consumerPosition >> this.positionBitsToShift))) {
            return true;
        }
        return producerPosition > consumerPosition;
    }

    private boolean spiesFinishedConsuming(DriverConductor conductor, long eosPosition) {
        if (this.spyPositions.length > 0) {
            for (ReadablePosition spyPosition : this.spyPositions) {
                if (spyPosition.getVolatile() >= eosPosition) continue;
                return false;
            }
            this.hasSpies = false;
            conductor.cleanupSpies(this);
        }
        return true;
    }

    private long maxSpyPosition(long senderPosition) {
        long position = senderPosition;
        for (ReadablePosition spyPosition : this.spyPositions) {
            position = Math.max(position, spyPosition.getVolatile());
        }
        return position;
    }

    private void updateConnectedStatus() {
        boolean currentConnectedState;
        boolean bl = currentConnectedState = this.hasReceivers || this.spiesSimulateConnection && this.spyPositions.length > 0;
        if (currentConnectedState != this.isConnected) {
            LogBufferDescriptor.isConnected(this.metaDataBuffer, currentConnectedState);
            this.isConnected = currentConnectedState;
        }
    }

    @Override
    public void onTimeEvent(long timeNs, long timeMs, DriverConductor conductor) {
        switch (this.state) {
            case ACTIVE: {
                this.updateConnectedStatus();
                long producerPosition = this.producerPosition();
                this.publisherPos.setOrdered(producerPosition);
                if (this.isExclusive) break;
                this.checkForBlockedPublisher(producerPosition, this.senderPosition.getVolatile(), timeNs);
                break;
            }
            case DRAINING: {
                long producerPosition = this.producerPosition();
                this.publisherPos.setOrdered(producerPosition);
                long senderPosition = this.senderPosition.getVolatile();
                if (producerPosition > senderPosition) {
                    if (LogBufferUnblocker.unblock(this.termBuffers, this.metaDataBuffer, senderPosition, this.termBufferLength)) {
                        this.unblockedPublications.incrementOrdered();
                        break;
                    }
                    if (this.hasReceivers) {
                        break;
                    }
                } else {
                    this.isEndOfStream = true;
                }
                if (!this.spiesFinishedConsuming(conductor, producerPosition)) break;
                this.timeOfLastActivityNs = timeNs;
                this.state = State.LINGER;
                break;
            }
            case LINGER: {
                if (this.timeOfLastActivityNs + this.lingerTimeoutNs - timeNs >= 0L) break;
                conductor.cleanupPublication(this);
                this.state = State.CLOSING;
            }
        }
    }

    @Override
    public boolean hasReachedEndOfLife() {
        return this.hasSenderReleased;
    }

    public void decRef() {
        if (0 == --this.refCount) {
            this.state = State.DRAINING;
            this.channelEndpoint.decRef();
            this.timeOfLastActivityNs = this.nanoClock.nanoTime();
            long producerPosition = this.producerPosition();
            if (this.publisherLimit.get() > producerPosition) {
                this.publisherLimit.setOrdered(producerPosition);
            }
            LogBufferDescriptor.endOfStreamPosition(this.metaDataBuffer, producerPosition);
            if (this.senderPosition.getVolatile() >= producerPosition) {
                this.isEndOfStream = true;
            }
        }
    }

    public void incRef() {
        ++this.refCount;
    }

    final State state() {
        return this.state;
    }

    void senderRelease() {
        this.hasSenderReleased = true;
    }

    long producerPosition() {
        long rawTail = LogBufferDescriptor.rawTailVolatile(this.metaDataBuffer);
        int termOffset = LogBufferDescriptor.termOffset(rawTail, this.termBufferLength);
        return LogBufferDescriptor.computePosition(LogBufferDescriptor.termId(rawTail), termOffset, this.positionBitsToShift, this.initialTermId);
    }

    long consumerPosition() {
        return this.senderPosition.getVolatile();
    }

    public static enum State {
        ACTIVE,
        DRAINING,
        LINGER,
        CLOSING;

    }
}

