package org.apache.paimon.flink.sink.cdc;

import java.util.ArrayList;
import java.util.List;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.table.data.ColumnSpec;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.SchemaSpec;
import org.apache.flink.table.data.SinkRecord;
import org.apache.flink.table.evolution.SchemaClient;
import org.apache.paimon.casting.CastExecutor;
import org.apache.paimon.casting.CastExecutors;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.flink.FlinkRowWrapper;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.VvrConnectorOptions;
import org.apache.paimon.flink.sink.StoreSinkWrite;
import org.apache.paimon.flink.sink.TableWriteOperator;
import org.apache.paimon.flink.sink.TypeNormalizationUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.SchemaValidation;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/VvrCdcRecordStoreWriteOperatorBase.class */
public abstract class VvrCdcRecordStoreWriteOperatorBase<T> extends TableWriteOperator<T> {
    private static final long serialVersionUID = 1;
    private transient int maxSchemaId;
    private transient SchemaClient schemaClient;
    private transient SchemaManager schemaManager;
    private transient boolean enableTypeNormalization;
    private transient List<InternalRow.FieldGetter> fieldGetters;
    private transient List<CastExecutor<Object, Object>> castExecutors;

    public VvrCdcRecordStoreWriteOperatorBase(FileStoreTable fileStoreTable, StoreSinkWrite.Provider provider, String str) {
        super(fileStoreTable, provider, str);
    }

    @Override // org.apache.paimon.flink.sink.TableWriteOperator
    public void initializeState(StateInitializationContext stateInitializationContext) throws Exception {
        super.initializeState(stateInitializationContext);
        this.maxSchemaId = -1;
        this.schemaClient = SchemaClient.of(getRuntimeContext());
        this.schemaManager = new SchemaManager(this.table.fileIO(), this.table.location());
        this.enableTypeNormalization = ((Boolean) Options.fromMap(this.table.options()).get(VvrConnectorOptions.ENABLE_TYPE_NORMALIZATION)).booleanValue();
        this.fieldGetters = new ArrayList();
        this.castExecutors = new ArrayList();
    }

    @Override // org.apache.paimon.flink.sink.TableWriteOperator
    protected boolean containLogSystem() {
        return false;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public GenericRow toGenericRow(SinkRecord sinkRecord) throws Exception {
        int schemaId = sinkRecord.getSchemaId();
        if (this.maxSchemaId < schemaId) {
            SchemaSpec schemaSpec = this.schemaClient.getSchemaSpec(sinkRecord.getTablePath(), schemaId);
            SchemaSpec schemaSpec2 = schemaSpec;
            if (this.enableTypeNormalization) {
                schemaSpec2 = TypeNormalizationUtils.getNormalizedSchemaSpec(schemaSpec);
            }
            updateSchema(schemaSpec2);
            this.maxSchemaId = schemaId;
            this.fieldGetters.clear();
            this.castExecutors.clear();
            for (int i = 0; i < schemaSpec.getColumnCount(); i++) {
                DataType dataType = LogicalTypeConversion.toDataType(((ColumnSpec) schemaSpec.getColumns().get(i)).getDataType().getLogicalType());
                DataType dataType2 = LogicalTypeConversion.toDataType(((ColumnSpec) schemaSpec2.getColumns().get(i)).getDataType().getLogicalType());
                this.fieldGetters.add(InternalRow.createFieldGetter(dataType, i));
                if (dataType.equalsIgnoreNullable(dataType2)) {
                    this.castExecutors.add(CastExecutors.identityCastExecutor());
                } else {
                    this.castExecutors.add(CastExecutors.resolve(dataType, dataType2));
                }
            }
        }
        RowData row = sinkRecord.getRow();
        if (this.maxSchemaId > schemaId) {
            row = (RowData) this.schemaClient.createProjection(sinkRecord.getTablePath(), schemaId, this.maxSchemaId).apply(sinkRecord.getRow());
        }
        FlinkRowWrapper flinkRowWrapper = new FlinkRowWrapper(row);
        GenericRow genericRow = new GenericRow(flinkRowWrapper.getFieldCount());
        genericRow.setRowKind(flinkRowWrapper.getRowKind());
        for (int i2 = 0; i2 < genericRow.getFieldCount(); i2++) {
            Object fieldOrNull = this.fieldGetters.get(i2).getFieldOrNull(flinkRowWrapper);
            if (fieldOrNull != null) {
                genericRow.setField(i2, this.castExecutors.get(i2).cast(fieldOrNull));
            } else {
                genericRow.setField(i2, null);
            }
        }
        return genericRow;
    }

    private void updateSchema(SchemaSpec schemaSpec) throws Exception {
        TableSchema copy = getMatchedSchema(schemaSpec, this.table.schema().id()).copy(this.table.schema().options());
        SchemaValidation.validateTableSchema(copy);
        this.table = FileStoreTableFactory.create(this.table.fileIO(), this.table.location(), copy);
        this.write.replace(this.table);
    }

    private TableSchema getMatchedSchema(SchemaSpec schemaSpec, long j) {
        long id = this.schemaManager.latest().orElseThrow(() -> {
            return new RuntimeException("Table does not exist. This is unexpected.");
        }).id();
        while (true) {
            long j2 = id;
            if (j2 < j) {
                throw new IllegalArgumentException("Cannot find a TableSchema with id >= " + j + " which matches the SchemaSpec " + schemaSpec + ". This is unexpected.");
            }
            TableSchema schema = this.schemaManager.schema(j2);
            if (schemaMatched(schema, schemaSpec)) {
                return schema;
            }
            id = j2 - 1;
        }
    }

    private boolean schemaMatched(TableSchema tableSchema, SchemaSpec schemaSpec) {
        if (tableSchema.fields().size() != schemaSpec.getColumnCount()) {
            return false;
        }
        for (int i = 0; i < schemaSpec.getColumnCount(); i++) {
            DataField dataField = tableSchema.fields().get(i);
            ColumnSpec columnSpec = (ColumnSpec) schemaSpec.getColumns().get(i);
            if (!dataField.name().equals(columnSpec.getName()) || !dataField.type().equalsIgnoreNullable(LogicalTypeConversion.toDataType(columnSpec.getDataType().getLogicalType()))) {
                return false;
            }
        }
        return true;
    }
}
