/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.archive;

import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.archive.Archive;
import io.aeron.archive.Catalog;
import io.aeron.archive.ControlResponseProxy;
import io.aeron.archive.ControlSession;
import io.aeron.archive.RecordingSummary;
import io.aeron.archive.Session;
import io.aeron.archive.client.ArchiveException;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.FrameDescriptor;
import io.aeron.protocol.DataHeaderFlyweight;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.EnumSet;
import org.agrona.BitUtil;
import org.agrona.CloseHelper;
import org.agrona.LangUtil;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.UnsafeBuffer;

class ReplaySession
implements Session,
AutoCloseable {
    private static final EnumSet<StandardOpenOption> FILE_OPTIONS = EnumSet.of(StandardOpenOption.READ);
    private static final FileAttribute<?>[] NO_ATTRIBUTES = new FileAttribute[0];
    private final long connectDeadlineMs;
    private final long correlationId;
    private final long sessionId;
    private final long recordingId;
    private final long startPosition;
    private long replayPosition;
    private long stopPosition;
    private long replayLimit;
    private int termOffset;
    private int termBaseSegmentOffset;
    private int segmentFileIndex;
    private final int streamId;
    private final int termLength;
    private final int segmentLength;
    private final BufferClaim bufferClaim = new BufferClaim();
    private final ExclusivePublication publication;
    private final ControlSession controlSession;
    private final EpochClock epochClock;
    private final File archiveDir;
    private final Catalog catalog;
    private final Counter recordingPosition;
    private final UnsafeBuffer replayBuffer;
    private FileChannel fileChannel;
    private File segmentFile;
    private State state = State.INIT;
    private String errorMessage = null;
    private volatile boolean isAborted;

    ReplaySession(long position, long length, long replaySessionId, long connectTimeoutMs, long correlationId, ControlSession controlSession, ControlResponseProxy controlResponseProxy, UnsafeBuffer replayBuffer, Catalog catalog, File archiveDir, File initialSegmentFile, EpochClock epochClock, ExclusivePublication publication, RecordingSummary recordingSummary, Counter recordingPosition) {
        long currentPosition;
        long replayLength;
        this.controlSession = controlSession;
        this.sessionId = replaySessionId;
        this.correlationId = correlationId;
        this.recordingId = recordingSummary.recordingId;
        this.segmentLength = recordingSummary.segmentFileLength;
        this.termLength = recordingSummary.termBufferLength;
        this.streamId = recordingSummary.streamId;
        this.epochClock = epochClock;
        this.archiveDir = archiveDir;
        this.segmentFile = initialSegmentFile;
        this.publication = publication;
        this.recordingPosition = recordingPosition;
        this.replayBuffer = replayBuffer;
        this.catalog = catalog;
        this.startPosition = recordingSummary.startPosition;
        this.stopPosition = null == recordingPosition ? recordingSummary.stopPosition : recordingPosition.get();
        long fromPosition = position == -1L ? this.startPosition : position;
        long maxLength = null == recordingPosition ? this.stopPosition - fromPosition : Long.MAX_VALUE - fromPosition;
        long l = replayLength = length == -1L ? maxLength : Math.min(length, maxLength);
        if (replayLength < 0L) {
            this.close();
            String msg = "replay recording " + this.recordingId + " - length must be positive: " + replayLength;
            controlSession.attemptErrorResponse(correlationId, msg, controlResponseProxy);
            throw new ArchiveException(msg);
        }
        if (null != recordingPosition && (currentPosition = recordingPosition.get()) < fromPosition) {
            this.close();
            String msg = "replay recording " + this.recordingId + " - " + fromPosition + " after current position of " + currentPosition;
            controlSession.attemptErrorResponse(correlationId, msg, controlResponseProxy);
            throw new ArchiveException(msg);
        }
        this.segmentFileIndex = Archive.segmentFileIndex(this.startPosition, fromPosition, this.segmentLength);
        this.replayPosition = fromPosition;
        this.replayLimit = fromPosition + replayLength;
        controlSession.sendOkResponse(correlationId, replaySessionId, controlResponseProxy);
        this.connectDeadlineMs = epochClock.time() + connectTimeoutMs;
    }

    @Override
    public void close() {
        this.closeRecordingSegment();
        CloseHelper.close(this.publication);
    }

    @Override
    public long sessionId() {
        return this.sessionId;
    }

    @Override
    public int doWork() {
        int workCount = 0;
        if (this.isAborted) {
            this.state = State.INACTIVE;
        }
        try {
            if (State.INIT == this.state) {
                workCount += this.init();
            }
            if (State.REPLAY == this.state) {
                workCount += this.replay();
            }
        }
        catch (IOException ex) {
            this.onError("IOException - " + ex.getMessage() + " - " + this.segmentFile.getName());
            LangUtil.rethrowUnchecked(ex);
        }
        if (State.INACTIVE == this.state) {
            this.closeRecordingSegment();
            this.state = State.DONE;
        }
        return workCount;
    }

    @Override
    public void abort() {
        this.isAborted = true;
    }

    @Override
    public boolean isDone() {
        return this.state == State.DONE;
    }

    long recordingId() {
        return this.recordingId;
    }

    State state() {
        return this.state;
    }

    void sendPendingError(ControlResponseProxy controlResponseProxy) {
        if (null != this.errorMessage && !this.controlSession.isDone()) {
            this.controlSession.attemptErrorResponse(this.correlationId, this.errorMessage, controlResponseProxy);
        }
    }

    private int init() throws IOException {
        if (null == this.fileChannel) {
            int positionBitsToShift = this.publication.positionBitsToShift();
            long startTermBasePosition = this.startPosition - (this.startPosition & (long)(this.termLength - 1));
            int segmentOffset = (int)(this.replayPosition - startTermBasePosition) & this.segmentLength - 1;
            int termId = (int)(this.replayPosition >> positionBitsToShift) + this.publication.initialTermId();
            this.openRecordingSegment();
            this.termOffset = (int)(this.replayPosition & (long)(this.termLength - 1));
            this.termBaseSegmentOffset = segmentOffset - this.termOffset;
            if (this.replayPosition > this.startPosition && this.replayPosition != this.stopPosition && ReplaySession.notHeaderAligned(this.fileChannel, this.replayBuffer, segmentOffset, this.termOffset, termId, this.streamId)) {
                this.onError(this.replayPosition + " position not aligned to data header");
                return 0;
            }
        }
        if (!this.publication.isConnected()) {
            if (this.epochClock.time() > this.connectDeadlineMs) {
                this.onError("no connection established for replay");
            }
            return 0;
        }
        this.state = State.REPLAY;
        return 1;
    }

    private int replay() throws IOException {
        int fragments = 0;
        if (this.recordingPosition != null && this.replayPosition >= this.stopPosition && this.noNewData(this.replayPosition, this.stopPosition)) {
            return fragments;
        }
        if (this.termOffset == this.termLength) {
            this.nextTerm();
        }
        int frameOffset = 0;
        int bytesRead = this.readRecording(this.stopPosition - this.replayPosition);
        while (frameOffset < bytesRead) {
            int frameLength = FrameDescriptor.frameLength(this.replayBuffer, frameOffset);
            int frameType = FrameDescriptor.frameType(this.replayBuffer, frameOffset);
            int alignedLength = BitUtil.align(frameLength, 32);
            int dataOffset = frameOffset + 32;
            int dataLength = frameLength - 32;
            long result = 0L;
            if (frameType == 1) {
                if (frameOffset + alignedLength > bytesRead) break;
                result = this.publication.tryClaim(dataLength, this.bufferClaim);
                if (result > 0L) {
                    this.bufferClaim.flags(FrameDescriptor.frameFlags(this.replayBuffer, frameOffset)).reservedValue(this.replayBuffer.getLong(frameOffset + 24, ByteOrder.LITTLE_ENDIAN)).putBytes(this.replayBuffer, dataOffset, dataLength).commit();
                }
            } else if (frameType == 0) {
                result = this.publication.appendPadding(dataLength);
            }
            if (result > 0L) {
                ++fragments;
                frameOffset += alignedLength;
                this.termOffset += alignedLength;
                this.replayPosition += (long)alignedLength;
                if (this.replayPosition < this.replayLimit) continue;
                this.state = State.INACTIVE;
                break;
            }
            if (result != -4L && result != -1L) break;
            this.onError("stream closed before replay is complete");
            break;
        }
        return fragments;
    }

    private int readRecording(long availableReplay) throws IOException {
        if (this.publication.availableWindow() > 0L) {
            int limit = Math.min((int)Math.min(availableReplay, 0x200000L), this.termLength - this.termOffset);
            ByteBuffer byteBuffer = this.replayBuffer.byteBuffer();
            byteBuffer.clear().limit(limit);
            int position = this.termBaseSegmentOffset + this.termOffset;
            do {
                position += this.fileChannel.read(byteBuffer, position);
            } while (byteBuffer.remaining() > 0);
            return byteBuffer.limit();
        }
        if (!this.publication.isConnected()) {
            this.state = State.INACTIVE;
        }
        return 0;
    }

    private void onError(String errorMessage) {
        this.state = State.INACTIVE;
        this.errorMessage = errorMessage;
    }

    private boolean noNewData(long replayPosition, long oldStopPosition) {
        long newStopPosition;
        long currentRecodingPosition = this.recordingPosition.get();
        boolean hasRecordingStopped = this.recordingPosition.isClosed();
        long l = newStopPosition = hasRecordingStopped ? this.catalog.stopPosition(this.recordingId) : currentRecodingPosition;
        if (hasRecordingStopped && newStopPosition < this.replayLimit) {
            this.replayLimit = newStopPosition;
        }
        if (replayPosition >= this.replayLimit) {
            this.state = State.INACTIVE;
        } else if (newStopPosition > oldStopPosition) {
            this.stopPosition = newStopPosition;
            return false;
        }
        return true;
    }

    private void nextTerm() throws IOException {
        this.termOffset = 0;
        this.termBaseSegmentOffset += this.termLength;
        if (this.termBaseSegmentOffset == this.segmentLength) {
            this.closeRecordingSegment();
            ++this.segmentFileIndex;
            this.openRecordingSegment();
            this.termBaseSegmentOffset = 0;
        }
    }

    private void closeRecordingSegment() {
        CloseHelper.close(this.fileChannel);
        this.fileChannel = null;
        this.segmentFile = null;
    }

    private void openRecordingSegment() throws IOException {
        if (null == this.segmentFile) {
            String segmentFileName = Archive.segmentFileName(this.recordingId, this.segmentFileIndex);
            this.segmentFile = new File(this.archiveDir, segmentFileName);
            if (!this.segmentFile.exists()) {
                String msg = "recording segment not found " + segmentFileName;
                this.onError(msg);
                throw new ArchiveException(msg);
            }
        }
        this.fileChannel = FileChannel.open(this.segmentFile.toPath(), FILE_OPTIONS, NO_ATTRIBUTES);
    }

    static boolean notHeaderAligned(FileChannel channel, UnsafeBuffer buffer, int segmentOffset, int termOffset, int termId, int streamId) throws IOException {
        ByteBuffer byteBuffer = buffer.byteBuffer();
        byteBuffer.clear().limit(32);
        if (32 != channel.read(byteBuffer, segmentOffset)) {
            throw new ArchiveException("failed to read fragment header");
        }
        return ReplaySession.isInvalidHeader(buffer, streamId, termId, termOffset);
    }

    static boolean isInvalidHeader(UnsafeBuffer buffer, int streamId, int termId, int termOffset) {
        return DataHeaderFlyweight.termOffset(buffer, 0) != termOffset || DataHeaderFlyweight.termId(buffer, 0) != termId || DataHeaderFlyweight.streamId(buffer, 0) != streamId;
    }

    static enum State {
        INIT,
        REPLAY,
        INACTIVE,
        DONE;

    }
}

