/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.samples;

import io.aeron.Aeron;
import io.aeron.ConcurrentPublication;
import io.aeron.Image;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import io.aeron.driver.ThreadingMode;
import io.aeron.logbuffer.BufferClaim;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.logbuffer.Header;
import io.aeron.samples.SampleConfiguration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import org.agrona.DirectBuffer;
import org.agrona.SystemUtil;
import org.agrona.UnsafeAccess;
import org.agrona.concurrent.NoOpIdleStrategy;
import org.agrona.concurrent.SigInt;

public class EmbeddedBufferClaimIpcThroughput {
    public static final int BURST_LENGTH = 1000000;
    public static final int MESSAGE_LENGTH = SampleConfiguration.MESSAGE_LENGTH;
    public static final int MESSAGE_COUNT_LIMIT = SampleConfiguration.FRAGMENT_COUNT_LIMIT;
    public static final String CHANNEL = "aeron:ipc";
    public static final int STREAM_ID = SampleConfiguration.STREAM_ID;

    public static void main(String[] args) throws Exception {
        SystemUtil.loadPropertiesFiles(args);
        AtomicBoolean running = new AtomicBoolean(true);
        SigInt.register(() -> running.set(false));
        MediaDriver.Context ctx = new MediaDriver.Context().threadingMode(ThreadingMode.SHARED).sharedIdleStrategy(new NoOpIdleStrategy());
        try (MediaDriver ignore = MediaDriver.launch(ctx);
             Aeron aeron = Aeron.connect();
             ConcurrentPublication publication = aeron.addPublication(CHANNEL, STREAM_ID);
             Subscription subscription = aeron.addSubscription(CHANNEL, STREAM_ID);){
            Subscriber subscriber = new Subscriber(running, subscription);
            Thread subscriberThread = new Thread(subscriber);
            subscriberThread.setName("subscriber");
            Thread publisherThread = new Thread(new Publisher(running, publication));
            publisherThread.setName("publisher");
            Thread rateReporterThread = new Thread(new RateReporter(running, subscriber));
            rateReporterThread.setName("rate-reporter");
            rateReporterThread.start();
            subscriberThread.start();
            publisherThread.start();
            subscriberThread.join();
            publisherThread.join();
            rateReporterThread.join();
        }
    }

    public static final class Subscriber
    implements Runnable,
    FragmentHandler {
        private static final long TOTAL_BYTES_OFFSET;
        private final AtomicBoolean running;
        private final Subscription subscription;
        private volatile long totalBytes = 0L;

        public Subscriber(AtomicBoolean running, Subscription subscription) {
            this.running = running;
            this.subscription = subscription;
        }

        public long totalBytes() {
            return this.totalBytes;
        }

        @Override
        public void run() {
            while (!this.subscription.isConnected()) {
                Thread.yield();
            }
            Image image = this.subscription.images().get(0);
            long failedPolls = 0L;
            long successfulPolls = 0L;
            while (this.running.get()) {
                int fragmentsRead = image.poll(this, MESSAGE_COUNT_LIMIT);
                if (0 == fragmentsRead) {
                    ++failedPolls;
                    continue;
                }
                ++successfulPolls;
            }
            double failureRatio = (double)failedPolls / (double)(successfulPolls + failedPolls);
            System.out.format("Subscriber poll failure ratio: %f%n", failureRatio);
        }

        @Override
        public void onFragment(DirectBuffer buffer, int offset, int length, Header header) {
            UnsafeAccess.UNSAFE.putOrderedLong(this, TOTAL_BYTES_OFFSET, this.totalBytes + (long)length);
        }

        static {
            try {
                TOTAL_BYTES_OFFSET = UnsafeAccess.UNSAFE.objectFieldOffset(Subscriber.class.getDeclaredField("totalBytes"));
            }
            catch (Exception ex) {
                throw new RuntimeException(ex);
            }
        }
    }

    public static final class Publisher
    implements Runnable {
        private final AtomicBoolean running;
        private final Publication publication;

        public Publisher(AtomicBoolean running, Publication publication) {
            this.running = running;
            this.publication = publication;
        }

        @Override
        public void run() {
            Publication publication = this.publication;
            BufferClaim bufferClaim = new BufferClaim();
            long backPressureCount = 0L;
            long totalMessageCount = 0L;
            block0: while (this.running.get()) {
                for (int i = 0; i < 1000000; ++i) {
                    while (publication.tryClaim(MESSAGE_LENGTH, bufferClaim) <= 0L) {
                        ++backPressureCount;
                        if (this.running.get()) continue;
                        break block0;
                    }
                    int offset = bufferClaim.offset();
                    bufferClaim.buffer().putInt(offset, i);
                    bufferClaim.commit();
                    ++totalMessageCount;
                }
            }
            double backPressureRatio = (double)backPressureCount / (double)totalMessageCount;
            System.out.format("Publisher back pressure ratio: %f%n", backPressureRatio);
        }
    }

    public static final class RateReporter
    implements Runnable {
        private final AtomicBoolean running;
        private final Subscriber subscriber;

        public RateReporter(AtomicBoolean running, Subscriber subscriber) {
            this.running = running;
            this.subscriber = subscriber;
        }

        @Override
        public void run() {
            long lastTimeStamp = System.currentTimeMillis();
            long lastTotalBytes = this.subscriber.totalBytes();
            while (this.running.get()) {
                LockSupport.parkNanos(1000000000L);
                long newTimeStamp = System.currentTimeMillis();
                long newTotalBytes = this.subscriber.totalBytes();
                long duration = newTimeStamp - lastTimeStamp;
                long bytesTransferred = newTotalBytes - lastTotalBytes;
                System.out.format("Duration %dms - %,d messages - %,d payload bytes%n", duration, bytesTransferred / (long)MESSAGE_LENGTH, bytesTransferred);
                lastTimeStamp = newTimeStamp;
                lastTotalBytes = newTotalBytes;
            }
        }
    }
}

