/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.archive;

import io.aeron.Aeron;
import io.aeron.AvailableImageHandler;
import io.aeron.ChannelUri;
import io.aeron.ChannelUriStringBuilder;
import io.aeron.Counter;
import io.aeron.ExclusivePublication;
import io.aeron.Image;
import io.aeron.Subscription;
import io.aeron.archive.Archive;
import io.aeron.archive.ArchiveMarkFile;
import io.aeron.archive.Catalog;
import io.aeron.archive.ControlResponseProxy;
import io.aeron.archive.ControlSession;
import io.aeron.archive.ControlSessionDemuxer;
import io.aeron.archive.ListRecordingsForUriSession;
import io.aeron.archive.ListRecordingsSession;
import io.aeron.archive.RecordingEventsProxy;
import io.aeron.archive.RecordingSession;
import io.aeron.archive.RecordingSummary;
import io.aeron.archive.ReplaySession;
import io.aeron.archive.Session;
import io.aeron.archive.SessionWorker;
import io.aeron.archive.client.ArchiveException;
import io.aeron.archive.codecs.RecordingDescriptorDecoder;
import io.aeron.archive.codecs.SourceLocation;
import io.aeron.archive.status.RecordingPos;
import io.aeron.logbuffer.LogBufferDescriptor;
import java.io.File;
import java.io.IOException;
import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.nio.file.attribute.FileAttribute;
import java.util.ArrayDeque;
import java.util.EnumSet;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import org.agrona.BufferUtil;
import org.agrona.CloseHelper;
import org.agrona.LangUtil;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.Object2ObjectHashMap;
import org.agrona.concurrent.AgentInvoker;
import org.agrona.concurrent.CachedEpochClock;
import org.agrona.concurrent.EpochClock;
import org.agrona.concurrent.UnsafeBuffer;

abstract class ArchiveConductor
extends SessionWorker<Session>
implements AvailableImageHandler {
    private static final EnumSet<StandardOpenOption> FILE_OPTIONS = EnumSet.of(StandardOpenOption.READ, StandardOpenOption.WRITE);
    private static final FileAttribute<?>[] NO_ATTRIBUTES = new FileAttribute[0];
    private final ArrayDeque<Runnable> taskQueue = new ArrayDeque();
    private final ChannelUriStringBuilder channelBuilder = new ChannelUriStringBuilder();
    private final Long2ObjectHashMap<ReplaySession> replaySessionByIdMap = new Long2ObjectHashMap();
    private final Long2ObjectHashMap<RecordingSession> recordingSessionByIdMap = new Long2ObjectHashMap();
    private final Object2ObjectHashMap<String, Subscription> recordingSubscriptionMap = new Object2ObjectHashMap();
    private final RecordingSummary recordingSummary = new RecordingSummary();
    private final UnsafeBuffer descriptorBuffer = new UnsafeBuffer();
    private final RecordingDescriptorDecoder recordingDescriptorDecoder = new RecordingDescriptorDecoder();
    private final ControlResponseProxy controlResponseProxy = new ControlResponseProxy();
    private final UnsafeBuffer tempBuffer = new UnsafeBuffer(new byte[512]);
    private final UnsafeBuffer dataHeaderBuffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(32, 128));
    private final UnsafeBuffer replayBuffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned(0x200000, 128));
    private final Aeron aeron;
    private final AgentInvoker aeronAgentInvoker;
    private final AgentInvoker driverAgentInvoker;
    private final EpochClock epochClock;
    private final CachedEpochClock cachedEpochClock = new CachedEpochClock();
    private final File archiveDir;
    private final FileChannel archiveDirChannel;
    private final Subscription controlSubscription;
    private final Subscription localControlSubscription;
    private final long connectTimeoutMs;
    private final Catalog catalog;
    private final ArchiveMarkFile markFile;
    private final RecordingEventsProxy recordingEventsProxy;
    private final int maxConcurrentRecordings;
    private final int maxConcurrentReplays;
    private int replayId = 1;
    protected final Archive.Context ctx;
    protected SessionWorker<ReplaySession> replayer;
    protected SessionWorker<RecordingSession> recorder;
    private long nextControlSessionId = ThreadLocalRandom.current().nextInt();

    ArchiveConductor(Archive.Context ctx) {
        super("archive-conductor", ctx.countedErrorHandler());
        this.ctx = ctx;
        this.aeron = ctx.aeron();
        this.aeronAgentInvoker = this.aeron.conductorAgentInvoker();
        this.driverAgentInvoker = ctx.mediaDriverAgentInvoker();
        this.epochClock = ctx.epochClock();
        this.archiveDir = ctx.archiveDir();
        this.archiveDirChannel = ctx.archiveDirChannel();
        this.maxConcurrentRecordings = ctx.maxConcurrentRecordings();
        this.maxConcurrentReplays = ctx.maxConcurrentReplays();
        this.connectTimeoutMs = TimeUnit.NANOSECONDS.toMillis(ctx.connectTimeoutNs());
        ChannelUri controlChannelUri = ChannelUri.parse(ctx.controlChannel());
        controlChannelUri.put("sparse", Boolean.toString(ctx.controlTermBufferSparse()));
        this.controlSubscription = this.aeron.addSubscription(controlChannelUri.toString(), ctx.controlStreamId(), this, null);
        this.localControlSubscription = this.aeron.addSubscription(ctx.localControlChannel(), ctx.localControlStreamId(), this, null);
        this.recordingEventsProxy = new RecordingEventsProxy(this.aeron.addExclusivePublication(ctx.recordingEventsChannel(), ctx.recordingEventsStreamId()));
        this.cachedEpochClock.update(this.epochClock.time());
        this.catalog = ctx.catalog();
        this.markFile = ctx.archiveMarkFile();
    }

    @Override
    public void onStart() {
        this.replayer = this.newReplayer();
        this.recorder = this.newRecorder();
    }

    @Override
    public void onAvailableImage(Image image) {
        this.addSession(new ControlSessionDemuxer(image, this));
    }

    protected abstract SessionWorker<RecordingSession> newRecorder();

    protected abstract SessionWorker<ReplaySession> newReplayer();

    @Override
    protected final void preSessionsClose() {
        this.closeSessionWorkers();
    }

    protected abstract void closeSessionWorkers();

    @Override
    protected void postSessionsClose() {
        if (!this.ctx.ownsAeronClient()) {
            for (Subscription subscription : this.recordingSubscriptionMap.values()) {
                subscription.close();
            }
            CloseHelper.close(this.localControlSubscription);
            CloseHelper.close(this.controlSubscription);
            CloseHelper.close(this.recordingEventsProxy);
        }
        this.ctx.close();
    }

    @Override
    protected int preWork() {
        int workCount = 0;
        long nowMs = this.epochClock.time();
        if (this.cachedEpochClock.time() != nowMs) {
            this.cachedEpochClock.update(nowMs);
            this.markFile.updateActivityTimestamp(nowMs);
            workCount += this.aeronAgentInvoker.invoke();
        }
        workCount += this.invokeDriverConductor();
        return workCount += this.runTasks(this.taskQueue);
    }

    protected final int invokeDriverConductor() {
        return null != this.driverAgentInvoker ? this.driverAgentInvoker.invoke() : 0;
    }

    Catalog catalog() {
        return this.catalog;
    }

    void startRecordingSubscription(long correlationId, ControlSession controlSession, int streamId, String originalChannel, SourceLocation sourceLocation) {
        if (this.recordingSessionByIdMap.size() >= this.maxConcurrentRecordings) {
            String msg = "max concurrent recordings reached " + this.maxConcurrentRecordings;
            controlSession.sendErrorResponse(correlationId, 8L, msg, this.controlResponseProxy);
            return;
        }
        try {
            ChannelUri channelUri = ChannelUri.parse(originalChannel);
            String strippedChannel = this.strippedChannelBuilder(channelUri).build();
            String key = ArchiveConductor.makeKey(streamId, channelUri);
            Subscription oldSubscription = this.recordingSubscriptionMap.get(key);
            if (oldSubscription == null) {
                String channel = channelUri.media().equals("udp") && sourceLocation == SourceLocation.LOCAL ? "aeron-spy:" + strippedChannel : strippedChannel;
                AvailableImageHandler handler = image -> this.taskQueue.addLast(() -> this.startRecordingSession(controlSession, correlationId, strippedChannel, originalChannel, image));
                Subscription subscription = this.aeron.addSubscription(channel, streamId, handler, null);
                this.recordingSubscriptionMap.put((Object)key, (Object)subscription);
                controlSession.sendOkResponse(correlationId, subscription.registrationId(), this.controlResponseProxy);
            } else {
                String msg = "recording exists for streamId=" + streamId + " channel=" + originalChannel;
                controlSession.sendErrorResponse(correlationId, 3L, msg, this.controlResponseProxy);
            }
        }
        catch (Exception ex) {
            this.errorHandler.onError(ex);
            controlSession.sendErrorResponse(correlationId, ex.getMessage(), this.controlResponseProxy);
        }
    }

    void stopRecording(long correlationId, ControlSession controlSession, int streamId, String channel) {
        try {
            String key = ArchiveConductor.makeKey(streamId, ChannelUri.parse(channel));
            Subscription oldSubscription = this.recordingSubscriptionMap.remove(key);
            if (oldSubscription != null) {
                oldSubscription.close();
                controlSession.sendOkResponse(correlationId, this.controlResponseProxy);
            } else {
                String msg = "no recording found for streamId=" + streamId + " channel=" + channel;
                controlSession.sendErrorResponse(correlationId, 4L, msg, this.controlResponseProxy);
            }
        }
        catch (Exception ex) {
            this.errorHandler.onError(ex);
            controlSession.sendErrorResponse(correlationId, ex.getMessage(), this.controlResponseProxy);
        }
    }

    void stopRecordingSubscription(long correlationId, ControlSession controlSession, long subscriptionId) {
        Object2ObjectHashMap.EntryIterator iter = this.recordingSubscriptionMap.entrySet().iterator();
        while (iter.hasNext()) {
            Map.Entry entry = (Map.Entry)iter.next();
            Subscription subscription = (Subscription)entry.getValue();
            if (subscription.registrationId() != subscriptionId) continue;
            iter.remove();
            subscription.close();
            controlSession.sendOkResponse(correlationId, this.controlResponseProxy);
            return;
        }
        String msg = "no recording subscription found for " + subscriptionId;
        controlSession.sendErrorResponse(correlationId, 4L, msg, this.controlResponseProxy);
    }

    void newListRecordingsSession(long correlationId, long fromId, int count, ControlSession controlSession) {
        if (controlSession.activeListRecordingsSession() != null) {
            String msg = "active listing already in progress";
            controlSession.sendErrorResponse(correlationId, 1L, "active listing already in progress", this.controlResponseProxy);
        } else {
            ListRecordingsSession session = new ListRecordingsSession(correlationId, fromId, count, this.catalog, this.controlResponseProxy, controlSession, this.descriptorBuffer);
            this.addSession(session);
            controlSession.activeListRecordingsSession(session);
        }
    }

    void newListRecordingsForUriSession(long correlationId, long fromRecordingId, int count, int streamId, byte[] channelFragment, ControlSession controlSession) {
        if (controlSession.activeListRecordingsSession() != null) {
            String msg = "active listing already in progress";
            controlSession.sendErrorResponse(correlationId, 1L, "active listing already in progress", this.controlResponseProxy);
        } else {
            ListRecordingsForUriSession session = new ListRecordingsForUriSession(correlationId, fromRecordingId, count, channelFragment, streamId, this.catalog, this.controlResponseProxy, controlSession, this.descriptorBuffer, this.recordingDescriptorDecoder);
            this.addSession(session);
            controlSession.activeListRecordingsSession(session);
        }
    }

    void listRecording(long correlationId, ControlSession controlSession, long recordingId) {
        if (controlSession.activeListRecordingsSession() != null) {
            String msg = "active listing already in progress";
            controlSession.sendErrorResponse(correlationId, 1L, "active listing already in progress", this.controlResponseProxy);
        } else if (this.catalog.wrapAndValidateDescriptor(recordingId, this.descriptorBuffer)) {
            controlSession.sendDescriptor(correlationId, this.descriptorBuffer, this.controlResponseProxy);
        } else {
            controlSession.sendRecordingUnknown(correlationId, recordingId, this.controlResponseProxy);
        }
    }

    void findLastMatchingRecording(long correlationId, long minRecordingId, int sessionId, int streamId, byte[] channelFragment, ControlSession controlSession) {
        if (minRecordingId < 0L || minRecordingId >= (long)this.catalog.countEntries()) {
            String msg = "min recording id outside valid range: " + minRecordingId;
            controlSession.sendErrorResponse(correlationId, 5L, msg, this.controlResponseProxy);
        } else {
            long recordingId = this.catalog.findLast(minRecordingId, sessionId, streamId, channelFragment);
            controlSession.sendOkResponse(correlationId, recordingId, this.controlResponseProxy);
        }
    }

    void startReplay(long correlationId, ControlSession controlSession, long recordingId, long position, long length, int replayStreamId, String replayChannel) {
        File segmentFile;
        if (this.replaySessionByIdMap.size() >= this.maxConcurrentReplays) {
            String msg = "max concurrent replays reached " + this.maxConcurrentReplays;
            controlSession.sendErrorResponse(correlationId, 7L, msg, this.controlResponseProxy);
            return;
        }
        if (!this.catalog.hasRecording(recordingId)) {
            String msg = "unknown recording id " + recordingId;
            controlSession.sendErrorResponse(correlationId, 5L, msg, this.controlResponseProxy);
            return;
        }
        this.catalog.recordingSummary(recordingId, this.recordingSummary);
        long replayPosition = this.recordingSummary.startPosition;
        if (position != -1L) {
            if (!this.validateReplayPosition(correlationId, controlSession, recordingId, position, this.recordingSummary)) {
                return;
            }
            replayPosition = position;
        }
        if (null == (segmentFile = this.segmentFile(controlSession, this.archiveDir, replayPosition, recordingId, correlationId))) {
            return;
        }
        ExclusivePublication replayPublication = this.newReplayPublication(correlationId, controlSession, replayChannel, replayStreamId, replayPosition, this.recordingSummary);
        long replaySessionId = (long)this.replayId++ << 32 | (long)replayPublication.sessionId() & 0xFFFFFFFFL;
        RecordingSession recordingSession = this.recordingSessionByIdMap.get(recordingId);
        ReplaySession replaySession = new ReplaySession(replayPosition, length, replaySessionId, this.connectTimeoutMs, correlationId, controlSession, this.controlResponseProxy, this.replayBuffer, this.catalog, this.archiveDir, segmentFile, this.cachedEpochClock, replayPublication, this.recordingSummary, null == recordingSession ? null : recordingSession.recordingPosition());
        this.replaySessionByIdMap.put(replaySessionId, replaySession);
        this.replayer.addSession(replaySession);
    }

    void stopReplay(long correlationId, ControlSession controlSession, long replaySessionId) {
        ReplaySession replaySession = this.replaySessionByIdMap.get(replaySessionId);
        if (null == replaySession) {
            String errorMessage = "replay session not known for " + replaySessionId;
            controlSession.sendErrorResponse(correlationId, 6L, errorMessage, this.controlResponseProxy);
        } else {
            replaySession.abort();
            controlSession.sendOkResponse(correlationId, this.controlResponseProxy);
        }
    }

    void extendRecording(long correlationId, ControlSession controlSession, long recordingId, int streamId, String originalChannel, SourceLocation sourceLocation) {
        if (this.recordingSessionByIdMap.size() >= this.maxConcurrentRecordings) {
            String msg = "max concurrent recordings reached of " + this.maxConcurrentRecordings;
            controlSession.sendErrorResponse(correlationId, 8L, msg, this.controlResponseProxy);
            return;
        }
        if (!this.catalog.hasRecording(recordingId)) {
            String msg = "unknown recording id " + recordingId;
            controlSession.sendErrorResponse(correlationId, 5L, msg, this.controlResponseProxy);
            return;
        }
        if (this.recordingSessionByIdMap.containsKey(recordingId)) {
            String msg = "cannot extend active recording for " + recordingId;
            controlSession.sendErrorResponse(correlationId, 2L, msg, this.controlResponseProxy);
            return;
        }
        try {
            ChannelUri channelUri = ChannelUri.parse(originalChannel);
            String strippedChannel = this.strippedChannelBuilder(channelUri).build();
            String key = ArchiveConductor.makeKey(streamId, channelUri);
            Subscription oldSubscription = this.recordingSubscriptionMap.get(key);
            if (oldSubscription == null) {
                String channel = originalChannel.contains("udp") && sourceLocation == SourceLocation.LOCAL ? "aeron-spy:" + strippedChannel : strippedChannel;
                AvailableImageHandler handler = image -> this.taskQueue.addLast(() -> this.extendRecordingSession(controlSession, correlationId, recordingId, strippedChannel, originalChannel, image));
                Subscription subscription = this.aeron.addSubscription(channel, streamId, handler, null);
                this.recordingSubscriptionMap.put((Object)key, (Object)subscription);
                controlSession.sendOkResponse(correlationId, subscription.registrationId(), this.controlResponseProxy);
            } else {
                String msg = "recording exists for streamId=" + streamId + " channel=" + originalChannel;
                controlSession.sendErrorResponse(correlationId, 3L, msg, this.controlResponseProxy);
            }
        }
        catch (Exception ex) {
            this.errorHandler.onError(ex);
            controlSession.sendErrorResponse(correlationId, ex.getMessage(), this.controlResponseProxy);
        }
    }

    void getRecordingPosition(long correlationId, ControlSession controlSession, long recordingId) {
        RecordingSession recordingSession = this.recordingSessionByIdMap.get(recordingId);
        long position = null == recordingSession ? -1L : recordingSession.recordingPosition().get();
        controlSession.sendOkResponse(correlationId, position, this.controlResponseProxy);
    }

    void getStopPosition(long correlationId, ControlSession controlSession, long recordingId) {
        if (this.catalog.hasRecording(recordingId)) {
            controlSession.sendOkResponse(correlationId, this.catalog.stopPosition(recordingId), this.controlResponseProxy);
        } else {
            String msg = "unknown recording " + recordingId;
            controlSession.sendErrorResponse(correlationId, 5L, msg, this.controlResponseProxy);
        }
    }

    void truncateRecording(long correlationId, ControlSession controlSession, long recordingId, long position) {
        RecordingSummary summary = this.validateFramePosition(correlationId, controlSession, recordingId, position);
        if (null != summary) {
            long startPosition = summary.startPosition;
            if (position < startPosition) {
                String msg = "position " + position + " before start position " + startPosition;
                controlSession.sendErrorResponse(correlationId, 0L, msg, this.controlResponseProxy);
                return;
            }
            long stopPosition = summary.stopPosition;
            if (stopPosition == position) {
                controlSession.sendOkResponse(correlationId, this.controlResponseProxy);
                return;
            }
            int segmentLength = summary.segmentFileLength;
            int segmentIndex = Archive.segmentFileIndex(startPosition, position, segmentLength);
            File file = new File(this.archiveDir, Archive.segmentFileName(recordingId, segmentIndex));
            int segmentOffset = (int)(position & (long)(segmentLength - 1));
            int termLength = summary.termBufferLength;
            int termOffset = (int)(position & (long)(termLength - 1));
            if (termOffset > 0) {
                try (FileChannel channel = FileChannel.open(file.toPath(), FILE_OPTIONS, NO_ATTRIBUTES);){
                    int termCount = (int)(position >> LogBufferDescriptor.positionBitsToShift(termLength));
                    int termId = summary.initialTermId + termCount;
                    if (ReplaySession.notHeaderAligned(channel, this.dataHeaderBuffer, segmentOffset, termOffset, termId, summary.streamId)) {
                        String msg = position + " position not aligned to data header";
                        controlSession.sendErrorResponse(correlationId, msg, this.controlResponseProxy);
                        return;
                    }
                    channel.truncate(segmentOffset);
                    this.dataHeaderBuffer.byteBuffer().put(0, (byte)0).limit(1).position(0);
                    channel.write(this.dataHeaderBuffer.byteBuffer(), segmentLength - 1);
                }
                catch (IOException ex) {
                    controlSession.sendErrorResponse(correlationId, ex.getMessage(), this.controlResponseProxy);
                    LangUtil.rethrowUnchecked(ex);
                }
            } else if (!file.delete()) {
                String msg = "failed to delete " + file;
                controlSession.sendErrorResponse(correlationId, 0L, msg, this.controlResponseProxy);
                throw new ArchiveException(msg);
            }
            int i = segmentIndex + 1;
            while ((long)i * (long)segmentLength <= stopPosition) {
                File f = new File(this.archiveDir, Archive.segmentFileName(recordingId, i));
                if (!f.delete()) {
                    String msg = "failed to delete " + file;
                    controlSession.sendErrorResponse(correlationId, 0L, msg, this.controlResponseProxy);
                    throw new ArchiveException(msg);
                }
                ++i;
            }
            this.catalog.recordingStopped(recordingId, position);
            controlSession.sendOkResponse(correlationId, this.controlResponseProxy);
        }
    }

    ControlSession newControlSession(long correlationId, int streamId, String channel, ControlSessionDemuxer demuxer) {
        String controlChannel = this.strippedChannelBuilder(ChannelUri.parse(channel)).sparse(this.ctx.controlTermBufferSparse()).termLength(this.ctx.controlTermBufferLength()).mtu(this.ctx.controlMtuLength()).build();
        ControlSession controlSession = new ControlSession(this.nextControlSessionId++, correlationId, this.connectTimeoutMs, demuxer, this.aeron.addExclusivePublication(controlChannel, streamId), this, this.cachedEpochClock, this.controlResponseProxy);
        this.addSession(controlSession);
        return controlSession;
    }

    void closeRecordingSession(RecordingSession session) {
        long recordingId = session.sessionId();
        this.catalog.recordingStopped(recordingId, session.recordingPosition().get(), this.epochClock.time());
        this.recordingSessionByIdMap.remove(recordingId);
        this.closeSession(session);
    }

    void closeReplaySession(ReplaySession session) {
        this.replaySessionByIdMap.remove(session.sessionId());
        session.sendPendingError(this.controlResponseProxy);
        this.closeSession(session);
    }

    private int runTasks(ArrayDeque<Runnable> taskQueue) {
        Runnable runnable;
        int workCount = 0;
        while (null != (runnable = taskQueue.pollFirst())) {
            runnable.run();
            ++workCount;
        }
        return workCount;
    }

    private ChannelUriStringBuilder strippedChannelBuilder(ChannelUri channelUri) {
        String sessionIdStr = channelUri.get("session-id");
        this.channelBuilder.clear().media(channelUri.media()).endpoint(channelUri.get("endpoint")).networkInterface(channelUri.get("interface")).controlEndpoint(channelUri.get("control")).tags(channelUri.get("tags")).alias(channelUri.get("alias"));
        if (null != sessionIdStr) {
            if (ChannelUri.isTagged(sessionIdStr)) {
                this.channelBuilder.isSessionIdTagged(true).sessionId((int)ChannelUri.getTag(sessionIdStr));
            } else {
                this.channelBuilder.sessionId(Integer.valueOf(sessionIdStr));
            }
        }
        return this.channelBuilder;
    }

    private static String makeKey(int streamId, ChannelUri channelUri) {
        String tagsStr;
        String controlStr;
        String interfaceStr;
        StringBuilder sb = new StringBuilder();
        sb.append(streamId).append(':').append(channelUri.media()).append('?');
        String endpointStr = channelUri.get("endpoint");
        if (null != endpointStr) {
            sb.append("endpoint").append('=').append(endpointStr).append('|');
        }
        if (null != (interfaceStr = channelUri.get("interface"))) {
            sb.append("interface").append('=').append(interfaceStr).append('|');
        }
        if (null != (controlStr = channelUri.get("control"))) {
            sb.append("control").append('=').append(controlStr).append('|');
        }
        if (null != (tagsStr = channelUri.get("tags"))) {
            sb.append("tags").append('=').append(tagsStr).append('|');
        }
        sb.setLength(sb.length() - 1);
        return sb.toString();
    }

    private void startRecordingSession(ControlSession controlSession, long correlationId, String strippedChannel, String originalChannel, Image image) {
        this.validateMaxConcurrentRecordings(correlationId, controlSession, originalChannel, image);
        int sessionId = image.sessionId();
        int streamId = image.subscription().streamId();
        String sourceIdentity = image.sourceIdentity();
        int termBufferLength = image.termBufferLength();
        int mtuLength = image.mtuLength();
        int initialTermId = image.initialTermId();
        long startPosition = image.joinPosition();
        int segmentFileLength = Math.max(this.ctx.segmentFileLength(), termBufferLength);
        long recordingId = this.catalog.addNewRecording(startPosition, this.cachedEpochClock.time(), initialTermId, segmentFileLength, termBufferLength, mtuLength, sessionId, streamId, strippedChannel, originalChannel, sourceIdentity);
        Counter position = RecordingPos.allocate(this.aeron, this.tempBuffer, recordingId, sessionId, streamId, strippedChannel, image.sourceIdentity());
        position.setOrdered(startPosition);
        RecordingSession session = new RecordingSession(recordingId, startPosition, segmentFileLength, originalChannel, this.recordingEventsProxy, image, position, this.archiveDirChannel, this.ctx);
        this.recordingSessionByIdMap.put(recordingId, session);
        this.recorder.addSession(session);
    }

    private void extendRecordingSession(ControlSession controlSession, long correlationId, long recordingId, String strippedChannel, String originalChannel, Image image) {
        if (this.recordingSessionByIdMap.containsKey(recordingId)) {
            String msg = "cannot extend active recording for " + recordingId;
            controlSession.attemptErrorResponse(correlationId, 2L, msg, this.controlResponseProxy);
            throw new ArchiveException(msg);
        }
        this.validateMaxConcurrentRecordings(correlationId, controlSession, originalChannel, image);
        this.catalog.recordingSummary(recordingId, this.recordingSummary);
        this.validateImageForExtendRecording(correlationId, controlSession, image, this.recordingSummary);
        Counter position = RecordingPos.allocate(this.aeron, this.tempBuffer, recordingId, image.sessionId(), image.subscription().streamId(), strippedChannel, image.sourceIdentity());
        position.setOrdered(image.joinPosition());
        RecordingSession session = new RecordingSession(recordingId, this.recordingSummary.startPosition, this.recordingSummary.segmentFileLength, originalChannel, this.recordingEventsProxy, image, position, this.archiveDirChannel, this.ctx);
        this.recordingSessionByIdMap.put(recordingId, session);
        this.catalog.extendRecording(recordingId, controlSession.sessionId(), correlationId);
        this.recorder.addSession(session);
    }

    private ExclusivePublication newReplayPublication(long correlationId, ControlSession controlSession, String replayChannel, int replayStreamId, long position, RecordingSummary recording) {
        ChannelUri channelUri = ChannelUri.parse(replayChannel);
        ChannelUriStringBuilder channelBuilder = this.strippedChannelBuilder(channelUri).initialPosition(position, recording.initialTermId, recording.termBufferLength).mtu(recording.mtuLength);
        String lingerValue = channelUri.get("linger");
        channelBuilder.linger(null != lingerValue ? Long.parseLong(lingerValue) : this.ctx.replayLingerTimeoutNs());
        try {
            return this.aeron.addExclusivePublication(channelBuilder.build(), replayStreamId);
        }
        catch (Exception ex) {
            String msg = "failed to create replay publication - " + ex;
            controlSession.sendErrorResponse(correlationId, msg, this.controlResponseProxy);
            throw ex;
        }
    }

    private void validateMaxConcurrentRecordings(long correlationId, ControlSession controlSession, String originalChannel, Image image) {
        if (this.recordingSessionByIdMap.size() >= this.maxConcurrentRecordings) {
            String msg = "max concurrent recordings reached, cannot record " + image.subscription().streamId() + ":" + originalChannel;
            controlSession.attemptErrorResponse(correlationId, 8L, msg, this.controlResponseProxy);
            throw new ArchiveException(msg);
        }
    }

    private void validateImageForExtendRecording(long correlationId, ControlSession controlSession, Image image, RecordingSummary recordingSummary) {
        if (image.joinPosition() != recordingSummary.stopPosition) {
            String msg = "cannot extend recording " + recordingSummary.recordingId + " image joinPosition " + image.joinPosition() + " not equal to recording stopPosition " + recordingSummary.stopPosition;
            controlSession.attemptErrorResponse(correlationId, 9L, msg, this.controlResponseProxy);
            throw new ArchiveException(msg);
        }
        if (image.termBufferLength() != recordingSummary.termBufferLength) {
            String msg = "cannot extend recording " + recordingSummary.recordingId + " image termBufferLength " + image.termBufferLength() + " not equal to recording termBufferLength " + recordingSummary.termBufferLength;
            controlSession.attemptErrorResponse(correlationId, 9L, msg, this.controlResponseProxy);
            throw new ArchiveException(msg);
        }
        if (image.mtuLength() != recordingSummary.mtuLength) {
            String msg = "cannot extend recording " + recordingSummary.recordingId + " image mtuLength " + image.mtuLength() + " not equal to recording mtuLength " + recordingSummary.mtuLength;
            controlSession.attemptErrorResponse(correlationId, 9L, msg, this.controlResponseProxy);
            throw new ArchiveException(msg);
        }
    }

    private RecordingSummary validateFramePosition(long correlationId, ControlSession controlSession, long recordingId, long position) {
        if (!this.catalog.hasRecording(recordingId)) {
            String msg = "unknown recording " + recordingId;
            controlSession.sendErrorResponse(correlationId, 5L, msg, this.controlResponseProxy);
            return null;
        }
        for (ReplaySession replaySession : this.replaySessionByIdMap.values()) {
            if (replaySession.recordingId() != recordingId) continue;
            String msg = "cannot truncate recording with active replay " + recordingId;
            controlSession.sendErrorResponse(correlationId, 2L, msg, this.controlResponseProxy);
            return null;
        }
        this.catalog.recordingSummary(recordingId, this.recordingSummary);
        long stopPosition = this.recordingSummary.stopPosition;
        long startPosition = this.recordingSummary.startPosition;
        if (stopPosition == -1L) {
            String msg = "cannot truncate active recording";
            controlSession.sendErrorResponse(correlationId, 2L, "cannot truncate active recording", this.controlResponseProxy);
            return null;
        }
        if (position < startPosition || position > stopPosition || (position & 0x1FL) != 0L) {
            controlSession.sendErrorResponse(correlationId, "invalid position " + position, this.controlResponseProxy);
            return null;
        }
        return this.recordingSummary;
    }

    private boolean validateReplayPosition(long correlationId, ControlSession controlSession, long recordingId, long position, RecordingSummary recordingSummary) {
        if ((position & 0x1FL) != 0L) {
            String msg = "requested replay start position " + position + " is not a multiple of FRAME_ALIGNMENT (" + 32 + ") for recording " + recordingId;
            controlSession.sendErrorResponse(correlationId, msg, this.controlResponseProxy);
            return false;
        }
        long startPosition = recordingSummary.startPosition;
        if (position - startPosition < 0L) {
            String msg = "requested replay start position " + position + ") is less than recording start position " + startPosition + " for recording " + recordingId;
            controlSession.sendErrorResponse(correlationId, msg, this.controlResponseProxy);
            return false;
        }
        long stopPosition = recordingSummary.stopPosition;
        if (stopPosition != -1L && position >= stopPosition) {
            String msg = "requested replay start position " + position + " must be less than highest recorded position " + stopPosition + " for recording " + recordingId;
            controlSession.sendErrorResponse(correlationId, msg, this.controlResponseProxy);
            return false;
        }
        return true;
    }

    private File segmentFile(ControlSession controlSession, File archiveDir, long position, long recordingId, long correlationId) {
        long fromPosition = position == -1L ? this.recordingSummary.startPosition : position;
        int segmentFileIndex = Archive.segmentFileIndex(this.recordingSummary.startPosition, fromPosition, this.recordingSummary.segmentFileLength);
        File segmentFile = new File(archiveDir, Archive.segmentFileName(recordingId, segmentFileIndex));
        if (!segmentFile.exists()) {
            String msg = "initial segment file does not exist for replay recording id " + recordingId;
            controlSession.sendErrorResponse(correlationId, msg, this.controlResponseProxy);
            return null;
        }
        return segmentFile;
    }
}

