package org.ethereum.db;

import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.ethereum.config.SystemProperties;
import org.ethereum.datasource.AbstractCachedSource;
import org.ethereum.datasource.AsyncFlushable;
import org.ethereum.datasource.DbSource;
import org.ethereum.datasource.Source;
import org.ethereum.listener.CompositeEthereumListener;
import org.ethereum.listener.EthereumListener;
import org.ethereum.listener.EthereumListenerAdapter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:org/ethereum/db/DbFlushManager.class */
public class DbFlushManager {
    private static final Logger logger = LoggerFactory.getLogger("db");
    Set<DbSource> dbSources;
    AbstractCachedSource<byte[], byte[]> stateDbCache;
    long sizeThreshold;
    int commitsCountThreshold;
    boolean flushAfterSyncDone;
    SystemProperties config;
    List<AbstractCachedSource<byte[], byte[]>> writeCaches = new ArrayList();
    boolean syncDone = false;
    int commitCount = 0;
    private final BlockingQueue<Runnable> executorQueue = new ArrayBlockingQueue(1);
    private final ExecutorService flushThread = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, this.executorQueue, new ThreadFactoryBuilder().setNameFormat("DbFlushManagerThread-%d").build());
    Future<Boolean> lastFlush = Futures.immediateFuture(false);

    public DbFlushManager(SystemProperties systemProperties, Set<DbSource> set, AbstractCachedSource<byte[], byte[]> abstractCachedSource) {
        this.dbSources = new HashSet();
        this.config = systemProperties;
        this.dbSources = set;
        this.sizeThreshold = systemProperties.getConfig().getInt("cache.flush.writeCacheSize") * 1024 * 1024;
        this.commitsCountThreshold = systemProperties.getConfig().getInt("cache.flush.blocks");
        this.flushAfterSyncDone = systemProperties.getConfig().getBoolean("cache.flush.shortSyncFlush");
        this.stateDbCache = abstractCachedSource;
    }

    @Autowired
    public void setEthereumListener(CompositeEthereumListener compositeEthereumListener) {
        if (this.flushAfterSyncDone) {
            compositeEthereumListener.addListener(new EthereumListenerAdapter() { // from class: org.ethereum.db.DbFlushManager.1
                @Override // org.ethereum.listener.EthereumListenerAdapter, org.ethereum.listener.EthereumListener
                public void onSyncDone(EthereumListener.SyncState syncState) {
                    if (syncState == EthereumListener.SyncState.COMPLETE) {
                        DbFlushManager.logger.info("DbFlushManager: long sync done, flushing each block now");
                        DbFlushManager.this.syncDone = true;
                    }
                }
            });
        }
    }

    public void setSizeThreshold(long j) {
        this.sizeThreshold = j;
    }

    public void addCache(AbstractCachedSource<byte[], byte[]> abstractCachedSource) {
        this.writeCaches.add(abstractCachedSource);
    }

    public long getCacheSize() {
        long j = 0;
        Iterator<AbstractCachedSource<byte[], byte[]>> it = this.writeCaches.iterator();
        while (it.hasNext()) {
            j += it.next().estimateCacheSize();
        }
        return j;
    }

    public synchronized void commit(Runnable runnable) {
        runnable.run();
        commit();
    }

    public synchronized void commit() {
        long cacheSize = getCacheSize();
        if (this.sizeThreshold >= 0 && cacheSize >= this.sizeThreshold) {
            if (logger.isInfoEnabled()) {
                logger.info("DbFlushManager: flushing db due to write cache size ({}) reached threshold ({})", Long.valueOf(cacheSize), Long.valueOf(this.sizeThreshold));
            }
            flush();
        } else if (this.commitsCountThreshold > 0 && this.commitCount >= this.commitsCountThreshold) {
            if (logger.isInfoEnabled()) {
                logger.info("DbFlushManager: flushing db due to commits ({}) reached threshold ({})", Integer.valueOf(this.commitCount), Integer.valueOf(this.commitsCountThreshold));
            }
            flush();
            this.commitCount = 0;
        } else if (this.flushAfterSyncDone && this.syncDone) {
            logger.debug("DbFlushManager: flushing db due to short sync");
            flush();
        }
        this.commitCount++;
    }

    public synchronized void flushSync() {
        try {
            flush().get();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    public synchronized Future<Boolean> flush() {
        if (!this.lastFlush.isDone()) {
            logger.info("Waiting for previous flush to complete...");
            try {
                this.lastFlush.get();
            } catch (Exception e) {
                logger.error("Error during last flush", e);
            }
        }
        logger.debug("Flipping async storages");
        for (Source source : this.writeCaches) {
            try {
                if (source instanceof AsyncFlushable) {
                    ((AsyncFlushable) source).flipStorage();
                }
            } catch (InterruptedException e2) {
                throw new RuntimeException(e2);
            }
        }
        logger.debug("Submitting flush task");
        Future<Boolean> submit = this.flushThread.submit(new Callable<Boolean>() { // from class: org.ethereum.db.DbFlushManager.2
            /* JADX WARN: Can't rename method to resolve collision */
            @Override // java.util.concurrent.Callable
            public Boolean call() throws Exception {
                boolean z = false;
                long nanoTime = System.nanoTime();
                DbFlushManager.logger.info("Flush started");
                for (Source source2 : DbFlushManager.this.writeCaches) {
                    if (source2 instanceof AsyncFlushable) {
                        try {
                            z |= ((Boolean) ((AsyncFlushable) source2).flushAsync().get()).booleanValue();
                        } catch (InterruptedException e3) {
                            throw new RuntimeException(e3);
                        }
                    } else {
                        z |= source2.flush();
                    }
                }
                if (DbFlushManager.this.stateDbCache != null) {
                    DbFlushManager.logger.debug("Flushing to DB");
                    DbFlushManager.this.stateDbCache.flush();
                }
                if (DbFlushManager.logger.isInfoEnabled()) {
                    DbFlushManager.logger.info("Flush completed in {} ms", Long.valueOf((System.nanoTime() - nanoTime) / 1000000));
                }
                return Boolean.valueOf(z);
            }
        });
        this.lastFlush = submit;
        return submit;
    }

    public synchronized void close() {
        logger.info("Flushing DBs...");
        flushSync();
        logger.info("Flush done.");
        for (DbSource dbSource : this.dbSources) {
            if (logger.isInfoEnabled()) {
                logger.info("Closing DB: {}", dbSource.getName());
            }
            try {
                dbSource.close();
            } catch (Exception e) {
                logger.error(String.format("Caught error while closing DB: %s", dbSource.getName()), e);
            }
        }
    }
}
