/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.hologres.client;

import com.alibaba.hologres.client.BinlogShardGroupReader;
import com.alibaba.hologres.client.Command;
import com.alibaba.hologres.client.Exporter;
import com.alibaba.hologres.client.Get;
import com.alibaba.hologres.client.HoloConfig;
import com.alibaba.hologres.client.Importer;
import com.alibaba.hologres.client.Put;
import com.alibaba.hologres.client.Scan;
import com.alibaba.hologres.client.Subscribe;
import com.alibaba.hologres.client.exception.ExceptionCode;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.exception.HoloClientWithDetailsException;
import com.alibaba.hologres.client.function.FunctionWithSQLException;
import com.alibaba.hologres.client.impl.ExecutionPool;
import com.alibaba.hologres.client.impl.action.CopyAction;
import com.alibaba.hologres.client.impl.action.PutAction;
import com.alibaba.hologres.client.impl.action.ScanAction;
import com.alibaba.hologres.client.impl.action.SqlAction;
import com.alibaba.hologres.client.impl.binlog.BinlogOffset;
import com.alibaba.hologres.client.impl.binlog.Committer;
import com.alibaba.hologres.client.impl.binlog.TableSchemaSupplier;
import com.alibaba.hologres.client.impl.binlog.action.BinlogAction;
import com.alibaba.hologres.client.impl.collector.ActionCollector;
import com.alibaba.hologres.client.impl.collector.BatchState;
import com.alibaba.hologres.client.impl.copy.CopyContext;
import com.alibaba.hologres.client.impl.copy.InternalPipedOutputStream;
import com.alibaba.hologres.client.model.ExportContext;
import com.alibaba.hologres.client.model.HoloVersion;
import com.alibaba.hologres.client.model.ImportContext;
import com.alibaba.hologres.client.model.Partition;
import com.alibaba.hologres.client.model.Record;
import com.alibaba.hologres.client.model.RecordScanner;
import com.alibaba.hologres.client.model.TableName;
import com.alibaba.hologres.client.model.TableSchema;
import com.alibaba.hologres.client.utils.IdentifierUtil;
import com.alibaba.hologres.client.utils.Tuple;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoloClient
implements Closeable {
    public static final Logger LOGGER = LoggerFactory.getLogger(HoloClient.class);
    private ActionCollector collector;
    private final boolean useFixedFe;
    private ExecutionPool fixedPool = null;
    private ExecutionPool pool = null;
    private final HoloConfig config;
    boolean asyncCommit = true;
    boolean isShadingEnv = false;
    boolean isEmbeddedPool = false;
    boolean isEmbeddedFixedPool = false;

    public HoloClient(HoloConfig config) throws HoloClientException {
        try {
            DriverManager.getDrivers();
            Class.forName("com.alibaba.hologres.org.postgresql.Driver");
            this.isShadingEnv = true;
        }
        catch (Exception e) {
            try {
                DriverManager.getDrivers();
                Class.forName("com.alibaba.hologres.org.postgresql.Driver");
            }
            catch (Exception e2) {
                throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "load driver fail", e);
            }
        }
        this.checkConfig(config);
        this.config = config;
        this.useFixedFe = config.isUseFixedFe();
    }

    private void checkConfig(HoloConfig config) throws HoloClientException {
        if (config.getJdbcUrl() == null || config.getJdbcUrl().isEmpty()) {
            throw new HoloClientException(ExceptionCode.INVALID_Config, "jdbcUrl cannot be null");
        }
        if (config.getPassword() == null || config.getPassword().isEmpty()) {
            throw new HoloClientException(ExceptionCode.INVALID_Config, "password cannot be null");
        }
        if (config.getUsername() == null || config.getUsername().isEmpty()) {
            throw new HoloClientException(ExceptionCode.INVALID_Config, "username cannot be null");
        }
        if (config.getWriteBatchSize() < 1) {
            throw new HoloClientException(ExceptionCode.INVALID_Config, "batchSize must > 0");
        }
        if (config.getWriteBatchByteSize() < 1L) {
            throw new HoloClientException(ExceptionCode.INVALID_Config, "batchByteSize must > 0");
        }
    }

    public TableSchema getTableSchema(String tableName) throws HoloClientException {
        return this.getTableSchema(TableName.valueOf(tableName), false);
    }

    public TableSchema getTableSchema(String tableName, boolean noCache) throws HoloClientException {
        return this.getTableSchema(TableName.valueOf(tableName), noCache);
    }

    public TableSchema getTableSchema(TableName tableName) throws HoloClientException {
        return this.getTableSchema(tableName, false);
    }

    public TableSchema getTableSchema(TableName tableName, boolean noCache) throws HoloClientException {
        this.ensurePoolOpen();
        return this.pool.getOrSubmitTableSchema(tableName, noCache);
    }

    private void checkGet(Get get) throws HoloClientException {
        if (get == null) {
            throw new HoloClientException(ExceptionCode.CONSTRAINT_VIOLATION, "Get cannot be null");
        }
        if (get.getRecord().getSchema().getPrimaryKeys().length == 0) {
            throw new HoloClientException(ExceptionCode.CONSTRAINT_VIOLATION, "Get table must have primary key:" + get.getRecord().getSchema().getTableNameObj().getFullName());
        }
        for (int index : get.getRecord().getKeyIndex()) {
            if (get.getRecord().isSet(index) && null != get.getRecord().getObject(index)) continue;
            throw new HoloClientException(ExceptionCode.CONSTRAINT_VIOLATION, "Get primary key cannot be null:" + get.getRecord().getSchema().getColumnSchema()[index].getName());
        }
    }

    private void checkPut(Put put) throws HoloClientException {
        if (put == null) {
            throw new HoloClientException(ExceptionCode.CONSTRAINT_VIOLATION, "Put cannot be null");
        }
        for (int index : put.getRecord().getKeyIndex()) {
            if (put.getRecord().isSet(index) && null != put.getRecord().getObject(index) || put.getRecord().getSchema().getColumn(index).getDefaultValue() != null) continue;
            throw new HoloClientWithDetailsException(ExceptionCode.CONSTRAINT_VIOLATION, "Put primary key cannot be null:" + put.getRecord().getSchema().getColumnSchema()[index].getName(), put.getRecord());
        }
        if (put.getRecord().getSchema().isPartitionParentTable() && (!put.getRecord().isSet(put.getRecord().getSchema().getPartitionIndex()) || null == put.getRecord().getObject(put.getRecord().getSchema().getPartitionIndex()))) {
            throw new HoloClientWithDetailsException(ExceptionCode.CONSTRAINT_VIOLATION, "Put partition key cannot be null:" + put.getRecord().getSchema().getColumnSchema()[put.getRecord().getSchema().getPartitionIndex()].getName(), put.getRecord());
        }
        if (put.getRecord().getType() == Put.MutationType.DELETE && put.getRecord().getSchema().getPrimaryKeys().length == 0) {
            throw new HoloClientWithDetailsException(ExceptionCode.CONSTRAINT_VIOLATION, "Delete Put table must have primary key:" + put.getRecord().getSchema().getTableNameObj().getFullName(), put.getRecord());
        }
    }

    public CompletableFuture<Record> get(Get get) throws HoloClientException {
        this.ensurePoolOpen();
        this.checkGet(get);
        get.setStartTime(System.nanoTime());
        get.setFuture(new CompletableFuture<Record>());
        if (get.isFullColumn()) {
            for (int i = 0; i < get.getRecord().getSchema().getColumnSchema().length; ++i) {
                if (get.getRecord().isSet(i)) continue;
                get.getRecord().setObject(i, null);
            }
        }
        try {
            if (this.rewriteForPartitionTable(get.getRecord(), false, false)) {
                get.getFuture().complete(null);
            }
        }
        catch (HoloClientException e) {
            get.getFuture().completeExceptionally(e);
        }
        this.collector.appendGet(get);
        return get.getFuture();
    }

    public List<CompletableFuture<Record>> get(List<Get> gets) throws HoloClientException {
        this.ensurePoolOpen();
        for (Get get : gets) {
            this.checkGet(get);
            get.setStartTime(System.nanoTime());
            get.setFuture(new CompletableFuture<Record>());
            if (get.isFullColumn()) {
                for (int i = 0; i < get.getRecord().getSchema().getColumnSchema().length; ++i) {
                    if (get.getRecord().isSet(i)) continue;
                    get.getRecord().setObject(i, null);
                }
            }
            try {
                if (!this.rewriteForPartitionTable(get.getRecord(), false, false)) continue;
                get.getFuture().complete(null);
            }
            catch (HoloClientException e) {
                get.getFuture().completeExceptionally(e);
            }
        }
        ArrayList<CompletableFuture<Record>> ret = new ArrayList<CompletableFuture<Record>>();
        this.collector.appendGet(gets);
        for (Get get : gets) {
            ret.add(get.getFuture());
        }
        return ret;
    }

    public <T> CompletableFuture<T> sql(FunctionWithSQLException<Connection, T> func) throws HoloClientException {
        this.ensurePoolOpen();
        SqlAction<T> action = new SqlAction<T>(func);
        while (!this.pool.submit(action)) {
        }
        return action.getFuture();
    }

    public RecordScanner scan(Scan scan) throws HoloClientException {
        return (RecordScanner)this.doScan(scan).getResult();
    }

    public CompletableFuture<RecordScanner> asyncScan(Scan scan) throws HoloClientException {
        return this.doScan(scan).getFuture();
    }

    private ScanAction doScan(Scan scan) throws HoloClientException {
        ExecutionPool execPool;
        this.ensurePoolOpen();
        ScanAction action = new ScanAction(scan);
        ExecutionPool executionPool = execPool = this.useFixedFe ? this.fixedPool : this.pool;
        while (!execPool.submit(action)) {
        }
        return action;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private void ensurePoolOpen() throws HoloClientException {
        ExecutionPool temp;
        HoloClient holoClient;
        if (this.pool == null) {
            holoClient = this;
            synchronized (holoClient) {
                if (this.pool == null) {
                    temp = new ExecutionPool("embedded-" + this.config.getAppName(), this.config, this.isShadingEnv, false);
                    ActionCollector tempCollector = temp.register(this, this.config);
                    if (!this.useFixedFe) {
                        this.collector = tempCollector;
                    }
                    this.pool = temp;
                    this.isEmbeddedPool = true;
                }
            }
        }
        if (!this.pool.isRunning()) {
            throw new HoloClientException(ExceptionCode.ALREADY_CLOSE, "already close at " + (String)this.pool.getCloseReasonStack().l, (Throwable)this.pool.getCloseReasonStack().r);
        }
        if (this.useFixedFe && this.fixedPool == null) {
            holoClient = this;
            synchronized (holoClient) {
                if (this.fixedPool == null) {
                    temp = new ExecutionPool("embedded-fixed-" + this.config.getAppName(), this.config, this.isShadingEnv, true);
                    this.collector = temp.register(this, this.config);
                    this.fixedPool = temp;
                    this.isEmbeddedFixedPool = true;
                }
            }
        }
        if (this.useFixedFe && !this.fixedPool.isRunning()) {
            throw new HoloClientException(ExceptionCode.ALREADY_CLOSE, "already close");
        }
    }

    public synchronized void setPool(ExecutionPool pool) throws HoloClientException {
        if (pool.isFixedPool()) {
            throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "fixed pool recived, require is not fixed");
        }
        ExecutionPool temp = pool;
        ActionCollector tempCollector = temp.register(this, this.config);
        if (!this.useFixedFe) {
            this.collector = tempCollector;
        }
        this.pool = temp;
        this.isEmbeddedPool = false;
    }

    public synchronized void setFixedPool(ExecutionPool fixedPool) throws HoloClientException {
        ExecutionPool tempFixedPool;
        if (!this.useFixedFe || fixedPool == null || !fixedPool.isFixedPool()) {
            throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "fixedPool required not null, and enable HoloConfig:useFixedFe");
        }
        this.fixedPool = tempFixedPool = fixedPool;
        this.collector = tempFixedPool.register(this, this.config);
        this.isEmbeddedFixedPool = false;
    }

    private void tryThrowException() throws HoloClientException {
        if (this.pool != null) {
            this.pool.tryThrowException();
        }
        if (this.fixedPool != null) {
            this.fixedPool.tryThrowException();
        }
    }

    private boolean rewriteForPartitionTable(Record record, boolean createIfNotExists, boolean exceptionIfNotExists) throws HoloClientException {
        TableSchema schema = record.getSchema();
        if (schema.isPartitionParentTable()) {
            boolean isStr = 12 == schema.getColumn(schema.getPartitionIndex()).getType() || 91 == schema.getColumn(schema.getPartitionIndex()).getType();
            String value = String.valueOf(record.getObject(schema.getPartitionIndex()));
            Partition partition = this.pool.getOrSubmitPartition(schema.getTableNameObj(), value, isStr, createIfNotExists);
            if (partition != null) {
                TableSchema newSchema = this.pool.getOrSubmitTableSchema(TableName.valueOf(IdentifierUtil.quoteIdentifier(partition.getSchemaName(), true), IdentifierUtil.quoteIdentifier(partition.getTableName(), true)), false);
                record.changeToChildSchema(newSchema);
            } else {
                if (exceptionIfNotExists) {
                    throw new HoloClientWithDetailsException(ExceptionCode.TABLE_NOT_FOUND, "child table is not found", record);
                }
                return true;
            }
        }
        return false;
    }

    public void put(Put put) throws HoloClientException {
        this.ensurePoolOpen();
        this.tryThrowException();
        this.checkPut(put);
        ExecutionPool execPool = this.useFixedFe ? this.fixedPool : this.pool;
        if (!this.rewriteForPartitionTable(put.getRecord(), this.config.isDynamicPartition() && !Put.MutationType.DELETE.equals((Object)put.getRecord().getType()), !Put.MutationType.DELETE.equals((Object)put.getRecord().getType()))) {
            if (!this.asyncCommit) {
                Record r = put.getRecord();
                PutAction action = new PutAction(Collections.singletonList(r), r.getByteSize(), this.config.getWriteMode(), BatchState.SizeEnough);
                while (!execPool.submit(action)) {
                }
                action.getResult();
            } else {
                this.collector.append(put.getRecord());
            }
        }
    }

    public CompletableFuture<Void> putAsync(Put put) throws HoloClientException {
        this.ensurePoolOpen();
        this.tryThrowException();
        this.checkPut(put);
        CompletableFuture<Void> ret = new CompletableFuture<Void>();
        if (!this.rewriteForPartitionTable(put.getRecord(), this.config.isDynamicPartition() && !Put.MutationType.DELETE.equals((Object)put.getRecord().getType()), !Put.MutationType.DELETE.equals((Object)put.getRecord().getType()))) {
            put.getRecord().setPutFuture(ret);
            this.collector.append(put.getRecord());
        } else {
            put.getRecord().setPutFuture(ret);
            ret.complete(null);
        }
        return ret;
    }

    public void put(List<Put> puts) throws HoloClientException {
        this.ensurePoolOpen();
        this.tryThrowException();
        HoloClientWithDetailsException detailException = null;
        ArrayList<Put> putList = new ArrayList<Put>();
        for (Put put : puts) {
            try {
                this.checkPut(put);
                if (this.rewriteForPartitionTable(put.getRecord(), this.config.isDynamicPartition() && !Put.MutationType.DELETE.equals((Object)put.getRecord().getType()), !Put.MutationType.DELETE.equals((Object)put.getRecord().getType()))) continue;
                putList.add(put);
            }
            catch (HoloClientWithDetailsException e) {
                if (detailException == null) {
                    detailException = e;
                    continue;
                }
                detailException.merge(e);
            }
        }
        for (Put put : putList) {
            this.collector.append(put.getRecord());
        }
        if (!this.asyncCommit) {
            this.collector.flush(false);
        }
        if (detailException != null) {
            throw detailException;
        }
    }

    public ExportContext exportData(Exporter exporter) throws HoloClientException {
        int endShard;
        int maxThread;
        this.ensurePoolOpen();
        this.tryThrowException();
        int threadSize = exporter.getThreadSize();
        int n = maxThread = this.config.readThreadSize > 1 ? this.config.readThreadSize - 1 : 1;
        if (threadSize > maxThread) {
            LOGGER.warn("Thread size is larger than max read thread size of holo client, will be using {}", (Object)maxThread);
            threadSize = maxThread;
        }
        int shardCount = Command.getShardCount(this, exporter.getSchema());
        int startShard = exporter.getStartShardId() == -1 ? 0 : exporter.getStartShardId();
        int n2 = endShard = exporter.getEndShardId() == -1 ? shardCount : exporter.getEndShardId();
        if (threadSize > endShard - startShard) {
            threadSize = endShard - startShard;
            LOGGER.warn("Thread size is larger than shard count, will be using thread size {}", (Object)threadSize);
        }
        OutputStream os = exporter.getOutputStream();
        InputStream[] istreams = new InputStream[threadSize];
        OutputStream[] ostreams = new OutputStream[threadSize];
        if (os == null) {
            for (int t = 0; t < threadSize; ++t) {
                InternalPipedOutputStream ostream = new InternalPipedOutputStream();
                PipedInputStream istream = new PipedInputStream();
                ostreams[t] = ostream;
                istreams[t] = istream;
                try {
                    ostream.connect(istream);
                    continue;
                }
                catch (IOException e) {
                    throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "should not happen", e);
                }
            }
        } else {
            ostreams[0] = os;
        }
        int shardSize = (endShard - startShard) / threadSize;
        int remain = (endShard - startShard) % threadSize;
        CopyAction[] actions = new CopyAction[threadSize];
        for (int t = 0; t < threadSize; ++t) {
            int end;
            if (remain > 0) {
                end = startShard + shardSize + 1;
                --remain;
            } else {
                end = startShard + shardSize;
            }
            CopyAction action = new CopyAction(exporter.getSchema(), ostreams[t], null, startShard, end, CopyAction.Mode.OUT);
            startShard = end;
            actions[t] = action;
            while (!this.pool.submit(action)) {
            }
        }
        try {
            CopyContext[] copyContexts = new CopyContext[threadSize];
            CompletableFuture[] futures = new CompletableFuture[threadSize];
            for (int t = 0; t < threadSize; ++t) {
                copyContexts[t] = actions[t].getReadyToStart().get();
                futures[t] = actions[t].getFuture();
            }
            return new ExportContext(futures, copyContexts, istreams);
        }
        catch (InterruptedException e) {
            throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "interrupt", e);
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof HoloClientException) {
                throw (HoloClientException)cause;
            }
            throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "", cause);
        }
    }

    public ImportContext importData(Importer importer) throws HoloClientException {
        int endShard;
        int maxThread;
        this.ensurePoolOpen();
        this.tryThrowException();
        int threadSize = importer.getThreadSize();
        int n = maxThread = this.config.writeThreadSize > 1 ? this.config.writeThreadSize - 1 : 1;
        if (threadSize > maxThread) {
            LOGGER.warn("Thread size is larger than max write thread size of holo client, will be using {}", (Object)maxThread);
            threadSize = maxThread;
        }
        int shardCount = Command.getShardCount(this, importer.getSchema());
        int startShard = importer.getStartShardId() == -1 ? 0 : importer.getStartShardId();
        int n2 = endShard = importer.getEndShardId() == -1 ? shardCount : importer.getEndShardId();
        if (threadSize > endShard - startShard) {
            threadSize = endShard - startShard;
            LOGGER.warn("Thread size is larger than shard count, will be using thread size {}", (Object)threadSize);
        }
        InputStream is = importer.getInputStream();
        InputStream[] istreams = new InputStream[threadSize];
        OutputStream[] ostreams = new OutputStream[threadSize];
        if (is == null) {
            for (int t = 0; t < threadSize; ++t) {
                PipedInputStream istream = new PipedInputStream(importer.getBufferSize() > 0 ? importer.getBufferSize() : 1024);
                PipedOutputStream ostream = new PipedOutputStream();
                istreams[t] = istream;
                ostreams[t] = ostream;
                try {
                    ostream.connect(istream);
                    continue;
                }
                catch (IOException e) {
                    throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "should not happen", e);
                }
            }
        } else {
            istreams[0] = is;
        }
        int shardSize = (endShard - startShard) / threadSize;
        int remain = (endShard - startShard) % threadSize;
        CopyAction[] actions = new CopyAction[threadSize];
        TreeMap<Integer, Integer> shardMap = new TreeMap<Integer, Integer>();
        for (int t = 0; t < threadSize; ++t) {
            int end;
            if (remain > 0) {
                end = startShard + shardSize + 1;
                --remain;
            } else {
                end = startShard + shardSize;
            }
            CopyAction action = new CopyAction(importer.getSchema(), null, istreams[t], startShard, end, CopyAction.Mode.IN);
            action.setBufferSize(importer.getBufferSize());
            shardMap.put(startShard, t);
            startShard = end;
            actions[t] = action;
            while (!this.pool.submit(action)) {
            }
        }
        try {
            CopyContext[] copyContexts = new CopyContext[threadSize];
            CompletableFuture[] futures = new CompletableFuture[threadSize];
            for (int t = 0; t < threadSize; ++t) {
                copyContexts[t] = actions[t].getReadyToStart().get();
                futures[t] = actions[t].getFuture();
            }
            return new ImportContext(shardMap, futures, copyContexts, ostreams, shardCount);
        }
        catch (InterruptedException e) {
            throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "interrupt", e);
        }
        catch (ExecutionException e) {
            Throwable cause = e.getCause();
            if (cause instanceof HoloClientException) {
                throw (HoloClientException)cause;
            }
            throw new HoloClientException(ExceptionCode.INTERNAL_ERROR, "", cause);
        }
    }

    public BinlogShardGroupReader binlogSubscribe(final Subscribe subscribe) throws HoloClientException {
        HoloVersion holoVersion;
        this.ensurePoolOpen();
        TableSchemaSupplier supplier = new TableSchemaSupplier(){

            @Override
            public TableSchema apply() throws HoloClientException {
                return HoloClient.this.getTableSchema(subscribe.getTableName(), true);
            }
        };
        TableSchema schema = supplier.apply();
        int shardCount = Command.getShardCount(this, schema);
        if (subscribe.getSlotName() != null && !Command.getSlotNames(this, schema).contains(subscribe.getSlotName())) {
            throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("The table %s has no slot named %s", schema.getTableNameObj().getFullName(), subscribe.getSlotName()));
        }
        if (subscribe.getSlotName() == null && (holoVersion = Command.getHoloVersion(this)).compareTo(new HoloVersion("2.1.0")) < 0) {
            throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("For hologres instance version lower than r2.1.0, need to provide slotName to subscribe binlog. your version is %s", holoVersion));
        }
        Map<Integer, BinlogOffset> offsetMap = subscribe.getOffsetMap();
        if (null != offsetMap) {
            for (Integer shardId : offsetMap.keySet()) {
                if (shardId >= 0 && shardId < shardCount) continue;
                throw new HoloClientException(ExceptionCode.INVALID_REQUEST, String.format("invalid shard id [%s] for table %s", shardId, subscribe.getTableName()));
            }
        } else {
            offsetMap = new HashMap<Integer, BinlogOffset>();
            for (int i = 0; i < shardCount; ++i) {
                offsetMap.put(i, new BinlogOffset().setTimestamp(subscribe.getBinlogReadStartTime()));
            }
        }
        BinlogShardGroupReader reader = null;
        try {
            AtomicBoolean started = new AtomicBoolean(true);
            HashMap<Integer, Committer> committerMap = new HashMap<Integer, Committer>();
            reader = new BinlogShardGroupReader(this.config, subscribe, offsetMap.size(), committerMap, started);
            for (Map.Entry<Integer, BinlogOffset> entry : offsetMap.entrySet()) {
                ArrayBlockingQueue<Tuple<CompletableFuture<Void>, Long>> queue = new ArrayBlockingQueue<Tuple<CompletableFuture<Void>, Long>>(1);
                Committer committer = new Committer(queue);
                committerMap.put(entry.getKey(), committer);
                BinlogAction action = new BinlogAction(subscribe.getTableName(), subscribe.getSlotName(), entry.getKey(), entry.getValue().getSequence(), entry.getValue().getStartTimeText(), reader.getCollector(), supplier, queue);
                if (this.useFixedFe) {
                    reader.addThread(this.fixedPool.submitOneShotAction(started, entry.getKey(), action));
                    continue;
                }
                reader.addThread(this.pool.submitOneShotAction(started, entry.getKey(), action));
            }
        }
        catch (HoloClientException e) {
            if (null != reader) {
                reader.close();
            }
            throw e;
        }
        return reader;
    }

    public void flush() throws HoloClientException {
        this.ensurePoolOpen();
        this.collector.flush(false);
    }

    public boolean isAsyncCommit() {
        return this.asyncCommit;
    }

    public void setAsyncCommit(boolean asyncCommit) {
        this.asyncCommit = asyncCommit;
    }

    private void closeInternal() {
        if (this.pool != null && this.pool.isRegister(this)) {
            try {
                this.tryThrowException();
                this.flush();
            }
            catch (HoloClientException e) {
                LOGGER.error("fail when close", e);
            }
            this.pool.unregister(this);
            if (this.isEmbeddedPool) {
                this.pool.close();
            }
        }
        if (this.fixedPool != null && this.fixedPool.isRegister(this)) {
            this.fixedPool.unregister(this);
            if (this.isEmbeddedFixedPool) {
                this.fixedPool.close();
            }
        }
    }

    @Override
    public void close() {
        this.closeInternal();
    }

    static {
        LOGGER.info("=========holo-client version==========");
        LOGGER.info("version:{}", (Object)"2.3.0");
        LOGGER.info("revision:{}", (Object)"274c91d0e81c82144c6b6caf00d5662120a2d30f");
        LOGGER.info("date:{}", (Object)"Thu Dec 21 09:52:47 CST 2023");
        LOGGER.info("======================================");
        DriverManager.getDrivers();
    }
}

