/*
 * Decompiled with CFR 0.152.
 */
package com.alibaba.hologres.hive.output;

import com.alibaba.hologres.client.Put;
import com.alibaba.hologres.client.copy.CopyInOutputStream;
import com.alibaba.hologres.client.copy.CopyUtil;
import com.alibaba.hologres.client.copy.RecordBinaryOutputStream;
import com.alibaba.hologres.client.copy.RecordOutputStream;
import com.alibaba.hologres.client.copy.RecordTextOutputStream;
import com.alibaba.hologres.client.exception.HoloClientException;
import com.alibaba.hologres.client.model.Record;
import com.alibaba.hologres.client.model.TableSchema;
import com.alibaba.hologres.client.model.WriteMode;
import com.alibaba.hologres.client.utils.RecordChecker;
import com.alibaba.hologres.hive.HoloRecordWritable;
import com.alibaba.hologres.hive.conf.HoloClientParam;
import com.alibaba.hologres.hive.exception.HiveHoloStorageException;
import com.alibaba.hologres.hive.utils.JDBCUtils;
import com.alibaba.hologres.org.postgresql.PGProperty;
import com.alibaba.hologres.org.postgresql.copy.CopyIn;
import com.alibaba.hologres.org.postgresql.copy.CopyManager;
import com.alibaba.hologres.org.postgresql.core.BaseConnection;
import com.alibaba.hologres.org.postgresql.jdbc.PgConnection;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.Properties;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class HoloRecordCopyWriter
implements FileSinkOperator.RecordWriter {
    private static final Logger logger = LoggerFactory.getLogger(HoloRecordCopyWriter.class);
    private final int maxWriterNumberPerTask;
    private final int maxCellBufferSize;
    private ScheduledExecutorService backgroundExecutorService;
    private final AtomicInteger nextCopyContextIndex;
    private final transient ConcurrentMap<Integer, CopyContext> copyContexts;
    private final int maxWriterNumber;
    private final boolean binary;
    private final boolean bulkLoad;
    private final HoloClientParam param;
    private final TableSchema schema;
    private final String appName;
    private long count = 0L;

    public HoloRecordCopyWriter(HoloClientParam param, TableSchema schema, TaskAttemptContext context) throws IOException {
        this.maxWriterNumberPerTask = param.getMaxWriterNumberPerTask();
        this.param = param;
        this.binary = "binary".equals(param.getCopyWriteFormat());
        this.bulkLoad = param.isBulkLoad();
        this.schema = schema;
        this.appName = String.format("hologres-connector-hive_copy_%s", context.getJobID());
        CopyContext copyContext = new CopyContext();
        copyContext.init(param, this.appName);
        this.nextCopyContextIndex = new AtomicInteger(0);
        this.copyContexts = new ConcurrentHashMap<Integer, CopyContext>(1);
        this.copyContexts.put(this.nextCopyContextIndex.getAndIncrement(), copyContext);
        this.maxWriterNumber = param.getMaxWriterNumber();
        this.maxCellBufferSize = param.getMaxCellBufferSize();
        if (this.maxWriterNumber > 0) {
            this.backgroundExecutorService = Executors.newSingleThreadScheduledExecutor();
            this.backgroundExecutorService.scheduleAtFixedRate(this::checkIfNeedIncreaseCopyContexts, new Random().nextInt(60), 60L, TimeUnit.SECONDS);
        }
    }

    public void write(Writable writable) throws IOException {
        if (!(writable instanceof HoloRecordWritable)) {
            throw new IOException("Expected HoloRecordWritable. Got " + writable.getClass().getName());
        }
        HoloRecordWritable recordWritable = (HoloRecordWritable)writable;
        try {
            Put put = new Put(this.schema);
            recordWritable.write(put);
            Record record = put.getRecord();
            ++this.count;
            if (this.param.isDirtyDataCheck()) {
                try {
                    RecordChecker.check(record);
                }
                catch (HoloClientException e) {
                    JDBCUtils.logErrorAndExceptionInConsole(String.format("failed to copy because dirty data, the error record is %s.", record), e);
                    throw new IOException(String.format("failed to copy because dirty data, the error record is %s.", record), e);
                }
            }
            this.writeWithCopyContext(record, (CopyContext)this.copyContexts.get((int)(this.count % (long)this.nextCopyContextIndex.get())));
        }
        catch (HiveHoloStorageException e) {
            JDBCUtils.logErrorAndExceptionInConsole(String.format("failed while write values %s, because:", Arrays.toString(recordWritable.getColumnValues())), e);
            if (this.copyContexts != null) {
                for (CopyContext copyContext : this.copyContexts.values()) {
                    if (copyContext == null) continue;
                    copyContext.close();
                }
            }
            throw new IOException(e);
        }
    }

    private void writeWithCopyContext(Record record, CopyContext copyContext) throws IOException {
        try {
            if (copyContext.os == null) {
                TableSchema schema;
                copyContext.schema = schema = record.getSchema();
                String sql = CopyUtil.buildCopyInSql(record, this.binary, this.param.getWriteMode() == WriteMode.INSERT_OR_IGNORE ? WriteMode.INSERT_OR_IGNORE : WriteMode.INSERT_OR_UPDATE, !this.bulkLoad);
                logger.info("copy sql :{}", (Object)sql);
                CopyIn in = copyContext.manager.copyIn(sql);
                copyContext.os = this.binary && !this.bulkLoad ? new RecordBinaryOutputStream(new CopyInOutputStream(in), schema, copyContext.pgConn.unwrap(BaseConnection.class), this.maxCellBufferSize) : new RecordTextOutputStream(new CopyInOutputStream(in), schema, copyContext.pgConn.unwrap(BaseConnection.class), this.maxCellBufferSize);
            }
            copyContext.os.putRecord(record);
        }
        catch (SQLException e) {
            JDBCUtils.logErrorAndExceptionInConsole(String.format("failed while writeWithCopyContext record %s, because:", record), e);
            copyContext.close();
            throw new IOException(e);
        }
    }

    public void close(boolean b) throws IOException {
        if (this.backgroundExecutorService != null) {
            this.backgroundExecutorService.shutdown();
        }
        if (this.copyContexts != null) {
            for (CopyContext copyContext : this.copyContexts.values()) {
                if (copyContext == null) continue;
                copyContext.close();
            }
        }
    }

    private void checkIfNeedIncreaseCopyContexts() {
        if (this.copyContexts.size() >= this.maxWriterNumberPerTask) {
            return;
        }
        int connectionsNumber = JDBCUtils.getConnectionsNumberOfThisJob(this.param, this.appName);
        if (this.probabilityIncrease(connectionsNumber)) {
            CopyContext temp = new CopyContext();
            temp.init(this.param, this.appName);
            this.copyContexts.put(this.nextCopyContextIndex.get(), temp);
            if (this.nextCopyContextIndex.incrementAndGet() != this.copyContexts.size()) {
                throw new RuntimeException(String.format("should not happened, the size of copyContexts %s not equals to nextCopyContextIndex %s", this.copyContexts.size(), this.nextCopyContextIndex.incrementAndGet()));
            }
            logger.info("create new enhance copy contexts, current number is {}", (Object)this.nextCopyContextIndex.get());
        }
    }

    private boolean probabilityIncrease(int connectionsNumber) {
        if (this.maxWriterNumber <= connectionsNumber) {
            return false;
        }
        double rate = (double)(this.maxWriterNumber - connectionsNumber) / (double)(connectionsNumber + 1);
        return (double)new Random().nextInt(100) < rate * 100.0;
    }

    static class CopyContext {
        PgConnection pgConn;
        CopyManager manager;
        RecordOutputStream os = null;
        TableSchema schema;

        CopyContext() {
        }

        public void init(HoloClientParam param, String appName) {
            Connection conn = null;
            String url = param.getUrl();
            Properties info = new Properties();
            PGProperty.USER.set(info, param.getUsername());
            PGProperty.PASSWORD.set(info, param.getPassword());
            PGProperty.APPLICATION_NAME.set(info, appName);
            try {
                if (param.isDirectConnect()) {
                    String directUrl = JDBCUtils.getJdbcDirectConnectionUrl(param);
                    try {
                        conn = DriverManager.getConnection(directUrl, info);
                        logger.info("init conn success with direct url {}", (Object)directUrl);
                    }
                    catch (Exception e) {
                        logger.warn("could not connect directly to holo.");
                    }
                }
                if (conn == null) {
                    logger.info("init conn success to " + url);
                    conn = DriverManager.getConnection(url, info);
                }
                this.pgConn = conn.unwrap(PgConnection.class);
                logger.info("init unwrap conn success");
                this.manager = new CopyManager(this.pgConn);
                logger.info("init new manager success");
            }
            catch (SQLException e) {
                if (null != conn) {
                    try {
                        conn.close();
                    }
                    catch (SQLException sQLException) {
                        // empty catch block
                    }
                }
                this.pgConn = null;
                this.manager = null;
                throw new RuntimeException(e);
            }
        }

        public void close() {
            logger.info("close copyContext");
            if (this.os != null) {
                try {
                    this.os.close();
                }
                catch (IOException e) {
                    JDBCUtils.logErrorAndExceptionInConsole("close RecordOutputStream fail", e);
                    throw new RuntimeException(e);
                }
                finally {
                    this.os = null;
                }
            }
            this.manager = null;
            if (this.pgConn != null) {
                try {
                    this.pgConn.close();
                }
                catch (SQLException e) {
                    JDBCUtils.logErrorAndExceptionInConsole("close pg Conn fail", e);
                    throw new RuntimeException(e);
                }
                this.pgConn = null;
            }
        }
    }
}

