/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.kafka.sink;

import java.lang.reflect.Constructor;
import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.Map;
import java.util.Properties;
import javax.annotation.Nullable;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.internals.TransactionManager;
import org.apache.kafka.clients.producer.internals.TransactionalRequestResult;
import org.apache.kafka.common.errors.ProducerFencedException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

class FlinkKafkaInternalProducer<K, V>
extends KafkaProducer<K, V> {
    private static final Logger LOG = LoggerFactory.getLogger(FlinkKafkaInternalProducer.class);
    private static final String TRANSACTION_MANAGER_FIELD_NAME = "transactionManager";
    private static final String TRANSACTION_MANAGER_STATE_ENUM = "org.apache.kafka.clients.producer.internals.TransactionManager$State";
    private static final String PRODUCER_ID_AND_EPOCH_FIELD_NAME = "producerIdAndEpoch";
    @Nullable
    private String transactionalId;
    private volatile boolean inTransaction;
    private volatile boolean closed;

    public FlinkKafkaInternalProducer(Properties properties, @Nullable String transactionalId) {
        super(FlinkKafkaInternalProducer.withTransactionalId(properties, transactionalId));
        this.transactionalId = transactionalId;
    }

    private static Properties withTransactionalId(Properties properties, @Nullable String transactionalId) {
        if (transactionalId == null) {
            return properties;
        }
        Properties props = new Properties();
        props.putAll((Map<?, ?>)properties);
        props.setProperty("transactional.id", transactionalId);
        return props;
    }

    @Override
    public void flush() {
        super.flush();
        if (this.inTransaction) {
            this.flushNewPartitions();
        }
    }

    @Override
    public void beginTransaction() throws ProducerFencedException {
        super.beginTransaction();
        this.inTransaction = true;
    }

    @Override
    public void abortTransaction() throws ProducerFencedException {
        LOG.debug("abortTransaction {}", (Object)this.transactionalId);
        Preconditions.checkState((boolean)this.inTransaction, (Object)"Transaction was not started");
        this.inTransaction = false;
        super.abortTransaction();
    }

    @Override
    public void commitTransaction() throws ProducerFencedException {
        LOG.debug("commitTransaction {}", (Object)this.transactionalId);
        Preconditions.checkState((boolean)this.inTransaction, (Object)"Transaction was not started");
        this.inTransaction = false;
        super.commitTransaction();
    }

    public boolean isInTransaction() {
        return this.inTransaction;
    }

    @Override
    public void close() {
        this.closed = true;
        if (this.inTransaction) {
            super.close(Duration.ZERO);
        } else {
            super.close(Duration.ofHours(1L));
        }
    }

    @Override
    public void close(Duration timeout) {
        this.closed = true;
        super.close(timeout);
    }

    public boolean isClosed() {
        return this.closed;
    }

    @Nullable
    public String getTransactionalId() {
        return this.transactionalId;
    }

    public short getEpoch() {
        Object transactionManager = this.getTransactionManager();
        Object producerIdAndEpoch = FlinkKafkaInternalProducer.getField(transactionManager, PRODUCER_ID_AND_EPOCH_FIELD_NAME);
        return (Short)FlinkKafkaInternalProducer.getField(producerIdAndEpoch, "epoch");
    }

    public long getProducerId() {
        Object transactionManager = this.getTransactionManager();
        Object producerIdAndEpoch = FlinkKafkaInternalProducer.getField(transactionManager, PRODUCER_ID_AND_EPOCH_FIELD_NAME);
        return (Long)FlinkKafkaInternalProducer.getField(producerIdAndEpoch, "producerId");
    }

    public void initTransactionId(String transactionalId) {
        if (!transactionalId.equals(this.transactionalId)) {
            this.setTransactionId(transactionalId);
            this.initTransactions();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setTransactionId(String transactionalId) {
        if (!transactionalId.equals(this.transactionalId)) {
            Object transactionManager;
            Preconditions.checkState((!this.inTransaction ? 1 : 0) != 0, (Object)String.format("Another transaction %s is still open.", transactionalId));
            LOG.debug("Change transaction id from {} to {}", (Object)this.transactionalId, (Object)transactionalId);
            Object object = transactionManager = this.getTransactionManager();
            synchronized (object) {
                FlinkKafkaInternalProducer.setField(transactionManager, "transactionalId", transactionalId);
                FlinkKafkaInternalProducer.setField(transactionManager, "currentState", FlinkKafkaInternalProducer.getTransactionManagerState("UNINITIALIZED"));
                this.transactionalId = transactionalId;
            }
        }
    }

    private void flushNewPartitions() {
        LOG.info("Flushing new partitions");
        TransactionalRequestResult result = this.enqueueNewPartitions();
        Object sender = this.getField("sender");
        FlinkKafkaInternalProducer.invoke(sender, "wakeup", new Object[0]);
        result.await();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private TransactionalRequestResult enqueueNewPartitions() {
        Object transactionManager;
        Object object = transactionManager = this.getTransactionManager();
        synchronized (object) {
            TransactionalRequestResult result;
            Object newPartitionsInTransaction = FlinkKafkaInternalProducer.getField(transactionManager, "newPartitionsInTransaction");
            Object newPartitionsInTransactionIsEmpty = FlinkKafkaInternalProducer.invoke(newPartitionsInTransaction, "isEmpty", new Object[0]);
            if (newPartitionsInTransactionIsEmpty instanceof Boolean && !((Boolean)newPartitionsInTransactionIsEmpty).booleanValue()) {
                Object txnRequestHandler = FlinkKafkaInternalProducer.invoke(transactionManager, "addPartitionsToTransactionHandler", new Object[0]);
                FlinkKafkaInternalProducer.invoke(transactionManager, "enqueueRequest", new Class[]{txnRequestHandler.getClass().getSuperclass()}, new Object[]{txnRequestHandler});
                result = (TransactionalRequestResult)FlinkKafkaInternalProducer.getField(txnRequestHandler, txnRequestHandler.getClass().getSuperclass(), "result");
            } else {
                result = new TransactionalRequestResult("AddPartitionsToTxn");
                result.done();
            }
            return result;
        }
    }

    private static Object invoke(Object object, String methodName, Object ... args) {
        Class[] argTypes = new Class[args.length];
        for (int i = 0; i < args.length; ++i) {
            argTypes[i] = args[i].getClass();
        }
        return FlinkKafkaInternalProducer.invoke(object, methodName, argTypes, args);
    }

    private static Object invoke(Object object, String methodName, Class<?>[] argTypes, Object[] args) {
        try {
            Method method = object.getClass().getDeclaredMethod(methodName, argTypes);
            method.setAccessible(true);
            return method.invoke(object, args);
        }
        catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private Object getField(String fieldName) {
        return FlinkKafkaInternalProducer.getField(this, KafkaProducer.class, fieldName);
    }

    private static Object getField(Object object, String fieldName) {
        return FlinkKafkaInternalProducer.getField(object, object.getClass(), fieldName);
    }

    private static Object getField(Object object, Class<?> clazz, String fieldName) {
        try {
            Field field = clazz.getDeclaredField(fieldName);
            field.setAccessible(true);
            return field.get(object);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void resumeTransaction(long producerId, short epoch) {
        Object transactionManager;
        Preconditions.checkState((!this.inTransaction ? 1 : 0) != 0, (String)"Already in transaction %s", (Object[])new Object[]{this.transactionalId});
        Preconditions.checkState((producerId >= 0L && epoch >= 0 ? 1 : 0) != 0, (String)"Incorrect values for producerId %s and epoch %s", (Object[])new Object[]{producerId, epoch});
        LOG.info("Attempting to resume transaction {} with producerId {} and epoch {}", this.transactionalId, producerId, epoch);
        Object object = transactionManager = this.getTransactionManager();
        synchronized (object) {
            Object topicPartitionBookkeeper = FlinkKafkaInternalProducer.getField(transactionManager, "topicPartitionBookkeeper");
            FlinkKafkaInternalProducer.transitionTransactionManagerStateTo(transactionManager, "INITIALIZING");
            FlinkKafkaInternalProducer.invoke(topicPartitionBookkeeper, "reset", new Object[0]);
            FlinkKafkaInternalProducer.setField(transactionManager, PRODUCER_ID_AND_EPOCH_FIELD_NAME, FlinkKafkaInternalProducer.createProducerIdAndEpoch(producerId, epoch));
            FlinkKafkaInternalProducer.transitionTransactionManagerStateTo(transactionManager, "READY");
            FlinkKafkaInternalProducer.transitionTransactionManagerStateTo(transactionManager, "IN_TRANSACTION");
            FlinkKafkaInternalProducer.setField(transactionManager, "transactionStarted", true);
            this.inTransaction = true;
        }
    }

    private static Object createProducerIdAndEpoch(long producerId, short epoch) {
        try {
            Field field = TransactionManager.class.getDeclaredField(PRODUCER_ID_AND_EPOCH_FIELD_NAME);
            Class<?> clazz = field.getType();
            Constructor<?> constructor = clazz.getDeclaredConstructor(Long.TYPE, Short.TYPE);
            constructor.setAccessible(true);
            return constructor.newInstance(producerId, epoch);
        }
        catch (IllegalAccessException | InstantiationException | NoSuchFieldException | NoSuchMethodException | InvocationTargetException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static void setField(Object object, String fieldName, Object value) {
        FlinkKafkaInternalProducer.setField(object, object.getClass(), fieldName, value);
    }

    private static void setField(Object object, Class<?> clazz, String fieldName, Object value) {
        try {
            Field field = clazz.getDeclaredField(fieldName);
            field.setAccessible(true);
            field.set(object, value);
        }
        catch (IllegalAccessException | NoSuchFieldException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private static Enum<?> getTransactionManagerState(String enumName) {
        try {
            Class<?> cl = Class.forName(TRANSACTION_MANAGER_STATE_ENUM);
            return Enum.valueOf(cl, enumName);
        }
        catch (ClassNotFoundException e) {
            throw new RuntimeException("Incompatible KafkaProducer version", e);
        }
    }

    private Object getTransactionManager() {
        return this.getField(TRANSACTION_MANAGER_FIELD_NAME);
    }

    private static void transitionTransactionManagerStateTo(Object transactionManager, String state) {
        FlinkKafkaInternalProducer.invoke(transactionManager, "transitionTo", FlinkKafkaInternalProducer.getTransactionManagerState(state));
    }

    public String toString() {
        return "FlinkKafkaInternalProducer{transactionalId='" + this.transactionalId + "', inTransaction=" + this.inTransaction + ", closed=" + this.closed + '}';
    }
}

