/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.hologres.client.impl;

import com.alibaba.hologres.client.Get;
import com.alibaba.hologres.client.HoloClient;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.exception.ExceptionCode;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.impl.Cache;
import com.alibaba.hologres.client.impl.MetaStore;
import com.alibaba.hologres.client.impl.Worker;
import com.alibaba.hologres.client.impl.action.AbstractAction;
import com.alibaba.hologres.client.impl.action.GetAction;
import com.alibaba.hologres.client.impl.action.MetaAction;
import com.alibaba.hologres.client.impl.action.PutAction;
import com.alibaba.hologres.client.impl.action.ScanAction;
import com.alibaba.hologres.client.impl.action.SqlAction;
import com.alibaba.hologres.client.impl.collector.ActionCollector;
import com.alibaba.hologres.client.impl.util.ConnectionUtil;
import com.alibaba.hologres.client.model.Partition;
import com.alibaba.hologres.client.model.TableName;
import com.alibaba.hologres.client.model.TableSchema;
import com.alibaba.hologres.client.utils.Tuple;
import java.io.Closeable;
import java.sql.SQLException;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ExecutionPool
implements Closeable {
    public static final Logger LOGGER = LoggerFactory.getLogger(ExecutionPool.class);
    static final Map<String, ExecutionPool> POOL_MAP = new ConcurrentHashMap<String, ExecutionPool>();
    private ActionWatcher readActionWatcher;
    private Runnable backgroundJob;
    private Worker[] workers;
    private Semaphore writeSemaphore;
    private Semaphore readSemaphore;
    Thread shutdownHandler = null;
    private String name;
    private Map<HoloClient, ActionCollector> clientMap;
    private AtomicBoolean started;
    private AtomicBoolean workerStated;
    private Tuple<String, HoloClientException> fatalException = null;
    final ArrayBlockingQueue<Get> queue;
    final ByteSizeCache byteSizeCache;
    private final MetaStore metaStore;
    ExecutorService workerExecutorService;
    ThreadFactory workerThreadFactory;
    ExecutorService backgroundExecutorService;
    ThreadFactory backgroundThreadFactory;
    ThreadFactory ontShotWorkerThreadFactory;
    final int writeThreadSize;
    final int readThreadSize;
    final boolean refreshBeforeGetTableSchema;
    final int refreshMetaTimeout;
    final boolean enableShutdownHook;
    final HoloConfig config;
    final boolean isShadingEnv;
    final boolean isFixedPool;
    Tuple<String, HoloClientException> closeStack = null;

    public static ExecutionPool buildOrGet(String name, HoloConfig config) {
        return ExecutionPool.buildOrGet(name, config, true);
    }

    public static ExecutionPool buildOrGet(String name, HoloConfig config, boolean isShadingEnv) {
        return ExecutionPool.buildOrGet(name, config, isShadingEnv, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static ExecutionPool buildOrGet(String name, HoloConfig config, boolean isShadingEnv, boolean isFixedPool) {
        Map<String, ExecutionPool> map = POOL_MAP;
        synchronized (map) {
            return POOL_MAP.computeIfAbsent(name, n -> new ExecutionPool((String)n, config, isShadingEnv, isFixedPool));
        }
    }

    public static ExecutionPool getInstance(String name) {
        return POOL_MAP.get(name);
    }

    public ExecutionPool(String name, HoloConfig config, boolean isShadingEnv, boolean isFixedPool) {
        this.name = name;
        this.config = config;
        this.isShadingEnv = isShadingEnv;
        this.isFixedPool = isFixedPool;
        this.workerThreadFactory = new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName(ExecutionPool.this.name + "-worker");
                t.setDaemon(false);
                return t;
            }
        };
        this.backgroundThreadFactory = new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName(ExecutionPool.this.name + "-background");
                t.setDaemon(false);
                return t;
            }
        };
        this.ontShotWorkerThreadFactory = new ThreadFactory(){

            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setName(ExecutionPool.this.name + "-oneshot-worker");
                t.setDaemon(false);
                return t;
            }
        };
        this.readThreadSize = config.getReadThreadSize();
        this.writeThreadSize = config.getWriteThreadSize();
        this.refreshBeforeGetTableSchema = config.isRefreshMetaBeforeGetTableSchema();
        this.refreshMetaTimeout = config.getRefreshMetaTimeout();
        this.enableShutdownHook = config.isEnableShutdownHook();
        this.queue = new ArrayBlockingQueue(config.getReadBatchQueueSize());
        this.readActionWatcher = new ActionWatcher(config.getReadBatchSize());
        int workerSize = config.isUseFixedFe() && !isFixedPool ? config.getConnectionSizeWhenUseFixedFe() : Math.max(this.readThreadSize, this.writeThreadSize);
        this.workers = new Worker[workerSize];
        this.started = new AtomicBoolean(false);
        this.workerStated = new AtomicBoolean(false);
        for (int i = 0; i < workerSize; ++i) {
            this.workers[i] = isFixedPool ? new Worker(config, this.workerStated, i, isShadingEnv, true) : new Worker(config, this.workerStated, i, isShadingEnv);
        }
        this.clientMap = new ConcurrentHashMap<HoloClient, ActionCollector>();
        this.byteSizeCache = new ByteSizeCache(config.getWriteBatchTotalByteSize());
        this.backgroundJob = new BackgroundJob(config);
        this.metaStore = new MetaStore(config.getMetaCacheTTL());
    }

    private synchronized void start() throws HoloClientException {
        if (this.started.compareAndSet(false, true)) {
            LOGGER.info("HoloClient ExecutionPool[{}] start", (Object)this.name);
            this.closeStack = null;
            this.workerStated.set(true);
            this.workerExecutorService = new ThreadPoolExecutor(this.workers.length, this.workers.length, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1), this.workerThreadFactory, new ThreadPoolExecutor.AbortPolicy());
            this.backgroundExecutorService = new ThreadPoolExecutor(2, 2, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(1), this.backgroundThreadFactory, new ThreadPoolExecutor.AbortPolicy());
            for (int i = 0; i < this.workers.length; ++i) {
                this.workerExecutorService.execute(this.workers[i]);
            }
            if (this.enableShutdownHook) {
                this.shutdownHandler = new Thread(() -> this.close());
                Runtime.getRuntime().addShutdownHook(this.shutdownHandler);
            }
            this.backgroundExecutorService.execute(this.backgroundJob);
            this.backgroundExecutorService.execute(this.readActionWatcher);
            this.writeSemaphore = new Semaphore(this.writeThreadSize);
            this.readSemaphore = new Semaphore(this.readThreadSize);
        }
    }

    public synchronized Tuple<String, HoloClientException> getCloseReasonStack() {
        return this.closeStack;
    }

    public boolean isFixedPool() {
        return this.isFixedPool;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public synchronized void close() {
        if (this.started.compareAndSet(true, false)) {
            this.closeStack = new Tuple<String, HoloClientException>(LocalDateTime.now().toString(), new HoloClientException(ExceptionCode.ALREADY_CLOSE, "close caused by"));
            if (this.clientMap.size() > 0) {
                LOGGER.warn("HoloClient ExecutionPool[{}] close, current client size {}", (Object)this.name, (Object)this.clientMap.size());
            } else {
                LOGGER.info("HoloClient ExecutionPool[{}] close", (Object)this.name);
            }
            if (this.shutdownHandler != null) {
                try {
                    Runtime.getRuntime().removeShutdownHook(this.shutdownHandler);
                }
                catch (Exception exception) {
                    LOGGER.warn("", exception);
                }
            }
            try {
                this.backgroundExecutorService.shutdownNow();
                while (!this.backgroundExecutorService.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
                    LOGGER.info("wait background executorService termination[{}]", (Object)this.name);
                }
                this.backgroundExecutorService = null;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.workerStated.set(false);
            for (Worker worker : this.workers) {
                try {
                    worker.offer(null);
                }
                catch (HoloClientException holoClientException) {
                    // empty catch block
                }
            }
            try {
                this.workerExecutorService.shutdown();
                while (!this.workerExecutorService.awaitTermination(500L, TimeUnit.MILLISECONDS)) {
                    LOGGER.info("wait worker executorService termination[{}]", (Object)this.name);
                }
                this.workerExecutorService = null;
                this.backgroundExecutorService = null;
            }
            catch (InterruptedException interruptedException) {
                // empty catch block
            }
            this.writeSemaphore = null;
            this.readSemaphore = null;
            Map<String, ExecutionPool> map = POOL_MAP;
            synchronized (map) {
                POOL_MAP.remove(this.name);
            }
        }
    }

    public Partition getOrSubmitPartition(TableName tableName, String partValue, boolean isStr) throws HoloClientException {
        return this.getOrSubmitPartition(tableName, partValue, isStr, false);
    }

    public Partition getOrSubmitPartition(TableName tableName, String partValue, boolean isStr, boolean createIfNotExists) throws HoloClientException {
        try {
            return this.metaStore.partitionCache.get(tableName).get(partValue, v -> {
                SqlAction<Partition> partitionAction = new SqlAction<Partition>(conn -> {
                    Partition internalPartition;
                    String value = partValue;
                    if (this.refreshMetaTimeout > 0) {
                        ConnectionUtil.refreshMeta(conn, this.refreshMetaTimeout);
                    }
                    if ((internalPartition = ConnectionUtil.getPartition(conn, tableName.getSchemaName(), tableName.getTableName(), value, isStr)) == null) {
                        block6: {
                            if (!createIfNotExists) {
                                return null;
                            }
                            try {
                                internalPartition = ConnectionUtil.retryCreatePartitionChildTable(conn, tableName.getSchemaName(), tableName.getTableName(), value, isStr);
                            }
                            catch (SQLException e) {
                                internalPartition = ConnectionUtil.getPartition(conn, tableName.getSchemaName(), tableName.getTableName(), value, isStr);
                                if (internalPartition != null) break block6;
                                throw new SQLException("create partition fail and not found the partition", e);
                            }
                        }
                        if (internalPartition != null) {
                            return internalPartition;
                        }
                        throw new SQLException("after create, partition child table is still not exists, tableName:" + tableName.getFullName() + ",partitionValue:" + value);
                    }
                    return internalPartition;
                });
                try {
                    while (!this.submit(partitionAction)) {
                    }
                    return (Partition)partitionAction.getResult();
                }
                catch (HoloClientException e) {
                    throw new SQLException(e);
                }
            });
        }
        catch (SQLException e) {
            throw HoloClientException.fromSqlException(e);
        }
        catch (Exception e) {
            throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "getOrSubmitPartition fail. tableName=" + tableName.getFullName() + ", partValue=" + partValue, e);
        }
    }

    public TableSchema getOrSubmitTableSchema(TableName tableName, boolean noCache) throws HoloClientException {
        try {
            return this.metaStore.tableCache.get(tableName, tn -> {
                try {
                    MetaAction metaAction = new MetaAction(tableName);
                    while (!this.submit(metaAction)) {
                    }
                    return (TableSchema)metaAction.getResult();
                }
                catch (HoloClientException e) {
                    throw new SQLException(e);
                }
            }, noCache ? 1 : 0);
        }
        catch (SQLException e) {
            throw HoloClientException.fromSqlException(e);
        }
    }

    public Thread submitOneShotAction(AtomicBoolean started, int index, AbstractAction action) throws HoloClientException {
        Worker worker = new Worker(this.config, started, index, this.isShadingEnv, this.isFixedPool);
        boolean ret = worker.offer(action);
        if (!ret) {
            throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "submitOneShotAction fail");
        }
        Thread thread = this.ontShotWorkerThreadFactory.newThread(worker);
        thread.start();
        return thread;
    }

    public boolean submit(AbstractAction action) throws HoloClientException {
        if (!this.started.get()) {
            throw new HoloClientException(ExceptionCode.ALREADY_CLOSE, "submit fail");
        }
        Semaphore semaphore = null;
        int start = -1;
        int end = -1;
        if (action instanceof PutAction) {
            semaphore = this.writeSemaphore;
            start = 0;
            end = Math.min(this.writeThreadSize, this.workers.length);
        } else if (action instanceof GetAction || action instanceof ScanAction) {
            semaphore = this.readSemaphore;
            start = Math.max(0, this.workers.length - this.readThreadSize);
            end = this.workers.length;
        } else {
            start = 0;
            end = this.workers.length;
        }
        if (semaphore != null) {
            try {
                boolean acquire = semaphore.tryAcquire(2000L, TimeUnit.MILLISECONDS);
                if (!acquire) {
                    return false;
                }
            }
            catch (InterruptedException e) {
                throw new HoloClientException(ExceptionCode.INTERRUPTED, "");
            }
            action.setSemaphore(semaphore);
        }
        for (int i = start; i < end; ++i) {
            Worker worker = this.workers[i];
            if (!worker.offer(action)) continue;
            return true;
        }
        if (semaphore != null) {
            semaphore.release();
        }
        return false;
    }

    public MetaStore getMetaStore() {
        return this.metaStore;
    }

    public int getWorkerCount() {
        return this.workers.length;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ActionCollector register(HoloClient client, HoloConfig config) throws HoloClientException {
        boolean needStart = false;
        ActionCollector collector = null;
        Map<HoloClient, ActionCollector> map = this.clientMap;
        synchronized (map) {
            boolean empty = this.clientMap.isEmpty();
            collector = this.clientMap.get(client);
            if (collector == null) {
                LOGGER.info("register client {}, client size {}->{}", client, this.clientMap.size(), this.clientMap.size() + 1);
                collector = new ActionCollector(config, this, this.queue);
                this.clientMap.put(client, collector);
                if (empty) {
                    needStart = true;
                }
            }
            if (needStart) {
                this.start();
            }
        }
        return collector;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized boolean isRegister(HoloClient client) {
        Map<HoloClient, ActionCollector> map = this.clientMap;
        synchronized (map) {
            return this.clientMap.containsKey(client);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized void unregister(HoloClient client) {
        boolean needClose = false;
        Map<HoloClient, ActionCollector> map = this.clientMap;
        synchronized (map) {
            int oldSize = this.clientMap.size();
            if (oldSize > 0) {
                this.clientMap.remove(client);
                int newSize = this.clientMap.size();
                LOGGER.info("unregister client {}, client size {}->{}", client, oldSize, newSize);
                if (newSize == 0) {
                    needClose = true;
                }
            }
        }
        if (needClose) {
            this.close();
        }
    }

    public boolean isRunning() {
        return this.started.get();
    }

    public void tryThrowException() throws HoloClientException {
        if (this.fatalException != null) {
            throw new HoloClientException(((HoloClientException)this.fatalException.r).getCode(), String.format("An exception occurred at %s and data may be lost, please restart holo-client and recover from the last checkpoint", this.fatalException.l), (Throwable)this.fatalException.r);
        }
    }

    public long getAvailableByteSize() {
        return this.byteSizeCache.getAvailableByteSize();
    }

    class ActionWatcher
    implements Runnable {
        private int batchSize;

        public ActionWatcher(int batchSize) {
            this.batchSize = batchSize;
        }

        @Override
        public void run() {
            ArrayList<Get> recordList = new ArrayList<Get>(this.batchSize);
            while (ExecutionPool.this.started.get()) {
                try {
                    recordList.clear();
                    Get firstGet = ExecutionPool.this.queue.poll(2L, TimeUnit.SECONDS);
                    if (firstGet == null) continue;
                    recordList.add(firstGet);
                    ExecutionPool.this.queue.drainTo(recordList, this.batchSize - 1);
                    HashMap<Tuple, List> getsByTable = new HashMap<Tuple, List>();
                    for (Get get : recordList) {
                        long waitingTime;
                        if (ExecutionPool.this.config.getReadTimeoutMilliseconds() > 0 && (waitingTime = (System.nanoTime() - get.getStartTime()) / 1000000L) - (long)ExecutionPool.this.config.getReadTimeoutMilliseconds() > 0L) {
                            get.getFuture().completeExceptionally(new HoloClientException(ExceptionCode.TIMEOUT, String.format("get waiting timeout before submit to holo, it cost %s ms greater than %s ms set in the config, maybe the holo-client is too busy and increasing readThreadSize can resolve it", waitingTime, ExecutionPool.this.config.getReadTimeoutMilliseconds())));
                        }
                        List list = getsByTable.computeIfAbsent(new Tuple<TableSchema, TableName>(get.getRecord().getSchema(), get.getRecord().getTableName()), s2 -> new ArrayList());
                        list.add(get);
                    }
                    for (Map.Entry entry : getsByTable.entrySet()) {
                        GetAction getAction = new GetAction((List)entry.getValue());
                        while (!ExecutionPool.this.submit(getAction)) {
                        }
                    }
                }
                catch (InterruptedException e) {
                    break;
                }
                catch (Exception e) {
                    for (Get get : recordList) {
                        if (get.getFuture().isDone()) continue;
                        get.getFuture().completeExceptionally(e);
                    }
                }
            }
        }

        public String toString() {
            return "ActionWatcher{batchSize=" + this.batchSize + '}';
        }
    }

    class BackgroundJob
    implements Runnable {
        long tableSchemaRemainLife;
        long forceFlushInterval;
        AtomicInteger pendingRefreshTableSchemaActionCount;
        long lastForceFlushMs = -1L;

        public BackgroundJob(HoloConfig config) {
            this.forceFlushInterval = config.getForceFlushInterval();
            this.tableSchemaRemainLife = config.getMetaCacheTTL() / (long)config.getMetaAutoRefreshFactor();
            this.pendingRefreshTableSchemaActionCount = new AtomicInteger(0);
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        private void triggerTryFlush() {
            Map map = ExecutionPool.this.clientMap;
            synchronized (map) {
                long current;
                boolean force = false;
                if (this.forceFlushInterval > 0L && (current = System.currentTimeMillis()) - this.lastForceFlushMs > this.forceFlushInterval) {
                    force = true;
                    this.lastForceFlushMs = current;
                }
                for (ActionCollector collector : ExecutionPool.this.clientMap.values()) {
                    try {
                        if (force) {
                            collector.flush(true);
                            continue;
                        }
                        collector.tryFlush();
                    }
                    catch (HoloClientException e) {
                        ExecutionPool.this.fatalException = new Tuple<String, HoloClientException>(LocalDateTime.now().toString(), e);
                        break;
                    }
                }
            }
        }

        private void refreshTableSchema() {
            if (this.pendingRefreshTableSchemaActionCount.get() == 0) {
                try {
                    ((ExecutionPool)ExecutionPool.this).metaStore.tableCache.filterKeys(this.tableSchemaRemainLife).forEach(tableNameWithState -> {
                        Cache.ItemState state = (Cache.ItemState)((Object)((Object)tableNameWithState.l));
                        TableName tableName = (TableName)tableNameWithState.r;
                        switch (state) {
                            case EXPIRE: {
                                LOGGER.info("remove expire tableSchema for {}", (Object)tableName);
                                ((ExecutionPool)ExecutionPool.this).metaStore.tableCache.remove(tableName);
                                break;
                            }
                            case NEED_REFRESH: {
                                try {
                                    ExecutionPool.this.getOrSubmitTableSchema(tableName, true);
                                    MetaAction metaAction = new MetaAction(tableName);
                                    if (!ExecutionPool.this.submit(metaAction)) break;
                                    LOGGER.info("refresh tableSchema for {}, because remain lifetime < {} ms", (Object)tableName, (Object)this.tableSchemaRemainLife);
                                    this.pendingRefreshTableSchemaActionCount.incrementAndGet();
                                    metaAction.getFuture().whenCompleteAsync((tableSchema, exception) -> {
                                        this.pendingRefreshTableSchemaActionCount.decrementAndGet();
                                        if (exception != null) {
                                            LOGGER.warn("refreshTableSchema fail", (Throwable)exception);
                                            if (exception.getMessage() != null && exception.getMessage().contains("can not found table")) {
                                                ((ExecutionPool)ExecutionPool.this).metaStore.tableCache.remove(tableName);
                                            }
                                        } else {
                                            ((ExecutionPool)ExecutionPool.this).metaStore.tableCache.put(tableName, (TableSchema)tableSchema);
                                        }
                                    });
                                }
                                catch (Exception e) {
                                    LOGGER.warn("refreshTableSchema fail", e);
                                }
                                break;
                            }
                            default: {
                                LOGGER.error("undefine item state {}", (Object)state);
                            }
                        }
                    });
                }
                catch (Throwable e) {
                    LOGGER.warn("refreshTableSchema unexpected fail", e);
                }
            }
        }

        @Override
        public void run() {
            while (ExecutionPool.this.started.get()) {
                this.triggerTryFlush();
                this.refreshTableSchema();
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException ignore) {
                    break;
                }
            }
        }

        public String toString() {
            return "CommitJob";
        }
    }

    class ByteSizeCache {
        final long maxByteSize;
        long value = 0L;
        AtomicLong last = new AtomicLong(System.nanoTime());

        public ByteSizeCache(long maxByteSize) {
            this.maxByteSize = maxByteSize;
        }

        long getAvailableByteSize() {
            return this.maxByteSize - this.getByteSize();
        }

        long getByteSize() {
            long nano = this.last.get();
            long current = System.nanoTime();
            if (current - nano > 2000000000L && this.last.compareAndSet(nano, current)) {
                long sum;
                this.value = sum = ExecutionPool.this.clientMap.values().stream().collect(Collectors.summingLong(ActionCollector::getByteSize)).longValue();
            }
            return this.value;
        }
    }
}

