/*
 * Decompiled with CFR 0.152.
 */
package org.apache.nifi.controller;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.DescribedValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Tags(value={"stats", "log"})
@CapabilityDescription(value="Logs the 5-minute stats that are shown in the NiFi Summary Page for Processors and Connections, as well optionally logging the deltas between the previous iteration and the current iteration. Processors' stats are logged using the org.apache.nifi.controller.ControllerStatusReportingTask.Processors logger, while Connections' stats are logged using the org.apache.nifi.controller.ControllerStatusReportingTask.Connections logger. These can be configured in the NiFi logging configuration to log to different files, if desired.")
public class ControllerStatusReportingTask
extends AbstractReportingTask {
    static final AllowableValue FIVE_MINUTE_GRANULARITY = new AllowableValue("five-minutes", "Five Minutes", "The stats that are reported will reflect up to the last 5 minutes' worth of processing, which will coincide with the stats that are shown in the UI.");
    static final AllowableValue ONE_SECOND_GRANULARITY = new AllowableValue("one-second", "One Second", "The stats that are reported will be an average of the value per second, gathered over the last 5 minutes. This is essentially obtained by dividing the stats that are shown in the UI by 300 (300 seconds in 5 minutes), with the exception of when NiFi has been running for less than 5 minutes. In that case, the stats will be divided by the amount of time NiFi has been running.");
    public static final PropertyDescriptor SHOW_DELTAS = new PropertyDescriptor.Builder().name("Show Deltas").description("Specifies whether or not to show the difference in values between the current status and the previous status").required(true).allowableValues(new String[]{"true", "false"}).defaultValue("true").build();
    static final PropertyDescriptor REPORTING_GRANULARITY = new PropertyDescriptor.Builder().name("reporting-granularity").displayName("Reporting Granularity").description("When reporting information, specifies the granularity of the metrics to report").allowableValues(new DescribedValue[]{FIVE_MINUTE_GRANULARITY, ONE_SECOND_GRANULARITY}).defaultValue(FIVE_MINUTE_GRANULARITY.getValue()).build();
    private static final Logger processorLogger = LoggerFactory.getLogger((String)(ControllerStatusReportingTask.class.getName() + ".Processors"));
    private static final Logger connectionLogger = LoggerFactory.getLogger((String)(ControllerStatusReportingTask.class.getName() + ".Connections"));
    private static final Logger counterLogger = LoggerFactory.getLogger((String)(ControllerStatusReportingTask.class.getName() + ".Counters"));
    private static final String PROCESSOR_LINE_FORMAT_NO_DELTA = "| %1$-30.30s | %2$-36.36s | %3$-24.24s | %4$10.10s | %5$19.19s | %6$19.19s | %7$12.12s | %8$13.13s | %9$5.5s | %10$12.12s |\n";
    private static final String PROCESSOR_LINE_FORMAT_WITH_DELTA = "| %1$-30.30s | %2$-36.36s | %3$-24.24s | %4$10.10s | %5$43.43s | %6$43.43s | %7$28.28s | %8$30.30s | %9$14.14s | %10$28.28s |\n";
    private static final String CONNECTION_LINE_FORMAT_NO_DELTA = "| %1$-36.36s | %2$-30.30s | %3$-36.36s | %4$-30.30s | %5$19.19s | %6$19.19s | %7$19.19s |\n";
    private static final String CONNECTION_LINE_FORMAT_WITH_DELTA = "| %1$-36.36s | %2$-30.30s | %3$-36.36s | %4$-30.30s | %5$43.43s | %6$43.43s | %7$43.43s |\n";
    private static final String COUNTER_LINE_FORMAT = "| %1$-36.36s | %2$-36.36s | %3$-36.36s |\n";
    private volatile String processorLineFormat;
    private volatile String processorHeader;
    private volatile String processorBorderLine;
    private volatile String connectionLineFormat;
    private volatile String connectionHeader;
    private volatile String connectionBorderLine;
    private volatile String counterHeader;
    private volatile String counterBorderLine;
    private volatile Map<String, ProcessorStatus> lastProcessorStatus = new HashMap<String, ProcessorStatus>();
    private volatile Map<String, ConnectionStatus> lastConnectionStatus = new HashMap<String, ConnectionStatus>();
    private volatile Map<String, Long> lastCounterValues = new HashMap<String, Long>();
    private final long startTimestamp = System.currentTimeMillis();

    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        ArrayList<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
        descriptors.add(SHOW_DELTAS);
        descriptors.add(REPORTING_GRANULARITY);
        return descriptors;
    }

    @OnScheduled
    public void onConfigured(ConfigurationContext context) {
        boolean showDeltas = context.getProperty(SHOW_DELTAS).asBoolean();
        this.connectionLineFormat = showDeltas ? CONNECTION_LINE_FORMAT_WITH_DELTA : CONNECTION_LINE_FORMAT_NO_DELTA;
        this.connectionHeader = String.format(this.connectionLineFormat, "Connection ID", "Source", "Connection Name", "Destination", "Flow Files In", "Flow Files Out", "FlowFiles Queued");
        this.connectionBorderLine = this.createLine(this.connectionHeader);
        this.processorLineFormat = showDeltas ? PROCESSOR_LINE_FORMAT_WITH_DELTA : PROCESSOR_LINE_FORMAT_NO_DELTA;
        this.processorHeader = String.format(this.processorLineFormat, "Processor Name", "Processor ID", "Processor Type", "Run Status", "Flow Files In", "Flow Files Out", "Bytes Read", "Bytes Written", "Tasks", "Proc Time");
        this.processorBorderLine = this.createLine(this.processorHeader);
        this.counterHeader = String.format(COUNTER_LINE_FORMAT, "Counter Context", "Counter Name", "Counter Value");
        this.counterBorderLine = this.createLine(this.counterHeader);
    }

    private String createLine(String valueToUnderscore) {
        StringBuilder processorBorderBuilder = new StringBuilder(valueToUnderscore.length());
        for (int i = 0; i < valueToUnderscore.length(); ++i) {
            processorBorderBuilder.append('-');
        }
        return processorBorderBuilder.toString();
    }

    public void onTrigger(ReportingContext context) {
        long divisor;
        ProcessGroupStatus controllerStatus = context.getEventAccess().getControllerStatus();
        boolean showDeltas = context.getProperty(SHOW_DELTAS).asBoolean();
        String reportingGranularity = context.getProperty(REPORTING_GRANULARITY).getValue();
        if (ONE_SECOND_GRANULARITY.getValue().equalsIgnoreCase(reportingGranularity)) {
            long timestamp = System.currentTimeMillis();
            long secondsRunning = TimeUnit.MILLISECONDS.toSeconds(timestamp - this.startTimestamp);
            divisor = Math.min(secondsRunning, 300L);
        } else {
            divisor = 1L;
        }
        this.printProcessorStatuses(controllerStatus, showDeltas, divisor);
        this.printConnectionStatuses(controllerStatus, showDeltas, divisor);
        this.printCounters(controllerStatus, showDeltas, divisor);
    }

    private void printProcessorStatuses(ProcessGroupStatus controllerStatus, boolean showDeltas, long divisor) {
        StringBuilder builder = new StringBuilder();
        builder.append("Processor Statuses:\n");
        builder.append(this.processorBorderLine);
        builder.append("\n");
        builder.append(this.processorHeader);
        builder.append(this.processorBorderLine);
        builder.append("\n");
        this.printProcessorStatus(controllerStatus, builder, showDeltas, divisor);
        builder.append(this.processorBorderLine);
        processorLogger.info("{}", (Object)builder);
    }

    private void printConnectionStatuses(ProcessGroupStatus controllerStatus, boolean showDeltas, long divisor) {
        StringBuilder builder = new StringBuilder();
        builder.append("Connection Statuses:\n");
        builder.append(this.connectionBorderLine);
        builder.append("\n");
        builder.append(this.connectionHeader);
        builder.append(this.connectionBorderLine);
        builder.append("\n");
        this.printConnectionStatus(controllerStatus, builder, showDeltas, divisor);
        builder.append(this.connectionBorderLine);
        connectionLogger.info("{}", (Object)builder);
    }

    private void printCounters(ProcessGroupStatus controllerStatus, boolean showDeltas, long divisor) {
        StringBuilder builder = new StringBuilder();
        builder.append("Counters:\n");
        builder.append(this.counterBorderLine);
        builder.append("\n");
        builder.append(this.counterHeader);
        builder.append(this.counterBorderLine);
        builder.append("\n");
        this.printCounterStatus(controllerStatus, builder, showDeltas, divisor);
        builder.append(this.counterBorderLine);
        counterLogger.info("{}", (Object)builder);
    }

    private void printCounterStatus(ProcessGroupStatus status, StringBuilder builder, boolean showDeltas, long divisor) {
        Collection processorStatuses = status.getProcessorStatus();
        for (ProcessorStatus processorStatus : processorStatuses) {
            Map counters = processorStatus.getCounters();
            if (counters == null || counters.isEmpty()) continue;
            for (Map.Entry entry : counters.entrySet()) {
                String counterName = (String)entry.getKey();
                Long counterValue = (Long)entry.getValue() / divisor;
                String counterId = processorStatus.getId() + "_" + counterName;
                Long lastValue = this.lastCounterValues.getOrDefault(counterId, 0L);
                this.lastCounterValues.put(counterId, counterValue);
                if (showDeltas) {
                    String diff = this.toDiff(lastValue, counterValue);
                    builder.append(String.format(COUNTER_LINE_FORMAT, processorStatus.getName() + "(" + processorStatus.getId() + ")", counterName, counterValue + diff));
                    continue;
                }
                builder.append(String.format(COUNTER_LINE_FORMAT, processorStatus.getName() + "(" + processorStatus.getId() + ")", counterName, counterValue));
            }
        }
        for (ProcessGroupStatus childGroupStatus : status.getProcessGroupStatus()) {
            this.printCounterStatus(childGroupStatus, builder, showDeltas, divisor);
        }
    }

    private void populateConnectionStatuses(ProcessGroupStatus groupStatus, List<ConnectionStatus> statuses) {
        statuses.addAll(groupStatus.getConnectionStatus());
        for (ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
            this.populateConnectionStatuses(childGroupStatus, statuses);
        }
    }

    private void populateProcessorStatuses(ProcessGroupStatus groupStatus, List<ProcessorStatus> statuses) {
        statuses.addAll(groupStatus.getProcessorStatus());
        for (ProcessGroupStatus childGroupStatus : groupStatus.getProcessGroupStatus()) {
            this.populateProcessorStatuses(childGroupStatus, statuses);
        }
    }

    private void printConnectionStatus(ProcessGroupStatus groupStatus, StringBuilder builder, boolean showDeltas, long divisor) {
        ArrayList<ConnectionStatus> connectionStatuses = new ArrayList<ConnectionStatus>();
        this.populateConnectionStatuses(groupStatus, connectionStatuses);
        connectionStatuses.sort(new Comparator<ConnectionStatus>(this){

            @Override
            public int compare(ConnectionStatus o1, ConnectionStatus o2) {
                if (o1 == null && o2 == null) {
                    return 0;
                }
                if (o1 == null) {
                    return 1;
                }
                if (o2 == null) {
                    return -1;
                }
                return -Long.compare(o1.getQueuedBytes(), o2.getQueuedBytes());
            }
        });
        for (ConnectionStatus connectionStatus : connectionStatuses) {
            long inputCount = (long)connectionStatus.getInputCount() / divisor;
            long outputCount = (long)connectionStatus.getOutputCount() / divisor;
            long queuedCount = (long)connectionStatus.getQueuedCount() / divisor;
            long inputBytes = connectionStatus.getInputBytes() / divisor;
            long outputBytes = connectionStatus.getOutputBytes() / divisor;
            long queuedBytes = connectionStatus.getQueuedBytes() / divisor;
            String input = inputCount + " / " + FormatUtils.formatDataSize((double)inputBytes);
            String output = outputCount + " / " + FormatUtils.formatDataSize((double)outputBytes);
            String queued = queuedCount + " / " + FormatUtils.formatDataSize((double)queuedBytes);
            if (showDeltas) {
                ConnectionStatus lastStatus = this.lastConnectionStatus.get(connectionStatus.getId());
                long lastInputCount = lastStatus == null ? 0L : (long)lastStatus.getInputCount() / divisor;
                long lastOutputCount = lastStatus == null ? 0L : (long)lastStatus.getOutputCount() / divisor;
                long lastQueuedCount = lastStatus == null ? 0L : (long)lastStatus.getQueuedCount() / divisor;
                long lastInputBytes = lastStatus == null ? 0L : lastStatus.getInputBytes() / divisor;
                long lastOutputBytes = lastStatus == null ? 0L : lastStatus.getOutputBytes() / divisor;
                long lastQueuedBytes = lastStatus == null ? 0L : lastStatus.getQueuedBytes() / divisor;
                String inputDiff = this.toDiff(lastInputCount, lastInputBytes, inputCount, inputBytes);
                String outputDiff = this.toDiff(lastOutputCount, lastOutputBytes, outputCount, outputBytes);
                String queuedDiff = this.toDiff(lastQueuedCount, lastQueuedBytes, queuedCount, queuedBytes);
                builder.append(String.format(this.connectionLineFormat, connectionStatus.getId(), connectionStatus.getSourceName(), connectionStatus.getName(), connectionStatus.getDestinationName(), input + inputDiff, output + outputDiff, queued + queuedDiff));
            } else {
                builder.append(String.format(this.connectionLineFormat, connectionStatus.getId(), connectionStatus.getSourceName(), connectionStatus.getName(), connectionStatus.getDestinationName(), input, output, queued));
            }
            this.lastConnectionStatus.put(connectionStatus.getId(), connectionStatus);
        }
    }

    private String toDiff(long oldValue, long newValue) {
        return this.toDiff(oldValue, newValue, false, false);
    }

    private String toDiff(long oldValue, long newValue, boolean formatDataSize, boolean formatTime) {
        String formattedDiff;
        if (formatDataSize && formatTime) {
            throw new IllegalArgumentException("Cannot format units as both data size and time");
        }
        long diff = Math.abs(newValue - oldValue);
        String string = formatDataSize ? FormatUtils.formatDataSize((double)diff) : (formattedDiff = formatTime ? FormatUtils.formatHoursMinutesSeconds((long)diff, (TimeUnit)TimeUnit.NANOSECONDS) : String.valueOf(diff));
        if (oldValue > newValue) {
            return " (-" + formattedDiff + ")";
        }
        return " (+" + formattedDiff + ")";
    }

    private String toDiff(long oldCount, long oldBytes, long newCount, long newBytes) {
        long countDiff = Math.abs(newCount - oldCount);
        long bytesDiff = Math.abs(newBytes - oldBytes);
        StringBuilder sb = new StringBuilder();
        sb.append(" (").append(oldCount > newCount ? "-" : "+").append(countDiff).append("/");
        sb.append(oldBytes > newBytes ? "-" : "+");
        sb.append(FormatUtils.formatDataSize((double)bytesDiff)).append(")");
        return sb.toString();
    }

    private void printProcessorStatus(ProcessGroupStatus groupStatus, StringBuilder builder, boolean showDeltas, long divisor) {
        ArrayList<ProcessorStatus> processorStatuses = new ArrayList<ProcessorStatus>();
        this.populateProcessorStatuses(groupStatus, processorStatuses);
        Collections.sort(processorStatuses, new Comparator<ProcessorStatus>(this){

            @Override
            public int compare(ProcessorStatus o1, ProcessorStatus o2) {
                if (o1 == null && o2 == null) {
                    return 0;
                }
                if (o1 == null) {
                    return 1;
                }
                if (o2 == null) {
                    return -1;
                }
                return -Long.compare(o1.getProcessingNanos(), o2.getProcessingNanos());
            }
        });
        for (ProcessorStatus processorStatus : processorStatuses) {
            long inputCount = (long)processorStatus.getInputCount() / divisor;
            long inputBytes = processorStatus.getInputBytes() / divisor;
            long outputCount = (long)processorStatus.getOutputCount() / divisor;
            long outputBytes = processorStatus.getOutputBytes() / divisor;
            long bytesRead = processorStatus.getBytesRead() / divisor;
            long bytesWritten = processorStatus.getBytesWritten() / divisor;
            long invocationCount = (long)processorStatus.getInvocations() / divisor;
            String input = inputCount + " / " + FormatUtils.formatDataSize((double)inputBytes);
            String output = outputCount + " / " + FormatUtils.formatDataSize((double)outputBytes);
            String read = FormatUtils.formatDataSize((double)bytesRead);
            String written = FormatUtils.formatDataSize((double)bytesWritten);
            String invocations = String.valueOf(invocationCount);
            long nanos = processorStatus.getProcessingNanos() / divisor;
            String procTime = FormatUtils.formatHoursMinutesSeconds((long)nanos, (TimeUnit)TimeUnit.NANOSECONDS);
            String runStatus = "";
            if (processorStatus.getRunStatus() != null) {
                runStatus = processorStatus.getRunStatus().toString();
            }
            if (showDeltas) {
                ProcessorStatus lastStatus = this.lastProcessorStatus.get(processorStatus.getId());
                long lastInputCount = lastStatus == null ? 0L : (long)lastStatus.getInputCount() / divisor;
                long lastInputBytes = lastStatus == null ? 0L : lastStatus.getInputBytes() / divisor;
                long lastOutputCount = lastStatus == null ? 0L : (long)lastStatus.getOutputCount() / divisor;
                long lastOutputBytes = lastStatus == null ? 0L : lastStatus.getOutputBytes() / divisor;
                long lastBytesRead = lastStatus == null ? 0L : lastStatus.getBytesRead() / divisor;
                long lastBytesWritten = lastStatus == null ? 0L : lastStatus.getBytesWritten() / divisor;
                long lastInvocationCount = lastStatus == null ? 0L : (long)lastStatus.getInvocations() / divisor;
                long lastProcessingNanos = lastStatus == null ? 0L : lastStatus.getProcessingNanos() / divisor;
                String inputDiff = this.toDiff(lastInputCount, lastInputBytes, inputCount, inputBytes);
                String outputDiff = this.toDiff(lastOutputCount, lastOutputBytes, outputCount, outputBytes);
                String readDiff = this.toDiff(lastBytesRead, bytesRead, true, false);
                String writtenDiff = this.toDiff(lastBytesWritten, bytesWritten, true, false);
                String invocationsDiff = this.toDiff(lastInvocationCount, invocationCount);
                String procTimeDiff = this.toDiff(lastProcessingNanos, nanos, false, true);
                builder.append(String.format(this.processorLineFormat, processorStatus.getName(), processorStatus.getId(), processorStatus.getType(), runStatus, input + inputDiff, output + outputDiff, read + readDiff, written + writtenDiff, invocations + invocationsDiff, procTime + procTimeDiff));
            } else {
                builder.append(String.format(this.processorLineFormat, processorStatus.getName(), processorStatus.getId(), processorStatus.getType(), runStatus, input, output, read, written, invocations, procTime));
            }
            this.lastProcessorStatus.put(processorStatus.getId(), processorStatus);
        }
    }
}

