package org.apache.paimon.flink.sink.cdc;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.flink.table.connector.sink.evolution.AlterTableAddColumnEvent;
import org.apache.flink.table.connector.sink.evolution.AlterTableModifyColumnTypeEvent;
import org.apache.flink.table.connector.sink.evolution.SchemaChangeEvent;
import org.apache.flink.table.connector.sink.evolution.SchemaChangeListener;
import org.apache.flink.table.data.ColumnSpec;
import org.apache.flink.table.data.SchemaSpec;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.LogicalTypeConversion;
import org.apache.paimon.flink.sink.TypeNormalizationUtils;
import org.apache.paimon.flink.sink.cdc.UpdatedDataFieldsProcessFunctionBase;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataType;
import org.apache.paimon.utils.Preconditions;

/* loaded from: input_file:org/apache/paimon/flink/sink/cdc/PaimonSchemaChangeListener.class */
public class PaimonSchemaChangeListener implements SchemaChangeListener {
    private static final long serialVersionUID = 2;
    private final Catalog.Loader catalogLoader;
    private final Identifier identifier;
    private final boolean enableTypeNormalization;
    private transient Catalog catalog;

    public PaimonSchemaChangeListener(Catalog.Loader loader, Identifier identifier, boolean z) {
        this.catalogLoader = (Catalog.Loader) Preconditions.checkNotNull(loader, "Currently VVR CTAS/CDAS cannot support Paimon temporary tables. Please use Paimon catalog tables instead.");
        this.identifier = identifier;
        this.enableTypeNormalization = z;
    }

    public void onTableChanged(SchemaChangeEvent schemaChangeEvent) {
        List<SchemaChange> extractSchemaChanges;
        if (this.catalog == null) {
            this.catalog = this.catalogLoader.load();
        }
        try {
            TableSchema schema = ((FileStoreTable) this.catalog.getTable(this.identifier)).schema();
            if (schemaChangeEvent instanceof AlterTableAddColumnEvent) {
                extractSchemaChanges = extractSchemaChanges(schema, ((AlterTableAddColumnEvent) schemaChangeEvent).getNewSchema());
            } else {
                if (!(schemaChangeEvent instanceof AlterTableModifyColumnTypeEvent)) {
                    throw new UnsupportedOperationException("Currently VVR Paimon connector only supports adding columns and modifying column types. Unsupported schema change event: " + schemaChangeEvent.toString());
                }
                extractSchemaChanges = extractSchemaChanges(schema, ((AlterTableModifyColumnTypeEvent) schemaChangeEvent).getNewSchema());
            }
            verifyColumnTypeChanges(schema, extractSchemaChanges);
            try {
                this.catalog.alterTable(this.identifier, extractSchemaChanges, false);
            } catch (Exception e) {
                throw new RuntimeException("Failed to commit schema changes", e);
            }
        } catch (Catalog.TableNotExistException e2) {
            throw new RuntimeException("Table does not exist. This is unexpected.", e2);
        }
    }

    private List<SchemaChange> extractSchemaChanges(TableSchema tableSchema, SchemaSpec schemaSpec) {
        boolean z;
        if (this.enableTypeNormalization) {
            schemaSpec = TypeNormalizationUtils.getNormalizedSchemaSpec(schemaSpec);
        }
        HashMap hashMap = new HashMap();
        for (DataField dataField : tableSchema.fields()) {
            hashMap.put(dataField.name(), dataField.type());
        }
        ArrayList arrayList = new ArrayList();
        for (ColumnSpec columnSpec : schemaSpec.getColumns()) {
            if (hashMap.containsKey(columnSpec.getName())) {
                DataType dataType = (DataType) hashMap.get(columnSpec.getName());
                DataType dataType2 = LogicalTypeConversion.toDataType(columnSpec.getDataType().getLogicalType());
                if (!dataType.equalsIgnoreNullable(dataType2)) {
                    arrayList.add(SchemaChange.updateColumnType(columnSpec.getName(), dataType2));
                }
            } else {
                arrayList.add(SchemaChange.addColumn(columnSpec.getName(), LogicalTypeConversion.toDataType(columnSpec.getDataType().getLogicalType())));
            }
        }
        if (tableSchema.fields().size() <= schemaSpec.getColumns().size()) {
            z = false;
            int i = 0;
            while (true) {
                if (i >= tableSchema.fields().size()) {
                    break;
                }
                if (!tableSchema.fieldNames().get(i).equals(schemaSpec.getColumnNames().get(i))) {
                    z = true;
                    break;
                }
                i++;
            }
        } else {
            z = true;
        }
        if (z) {
            List columnNames = schemaSpec.getColumnNames();
            if (!tableSchema.fieldNames().get(0).equals(columnNames.get(0))) {
                arrayList.add(SchemaChange.updateColumnPosition(SchemaChange.Move.first((String) columnNames.get(0))));
            }
            for (int i2 = 1; i2 < columnNames.size(); i2++) {
                arrayList.add(SchemaChange.updateColumnPosition(SchemaChange.Move.after((String) columnNames.get(i2), (String) columnNames.get(i2 - 1))));
            }
        }
        return arrayList;
    }

    private void verifyColumnTypeChanges(TableSchema tableSchema, List<SchemaChange> list) {
        for (SchemaChange schemaChange : list) {
            if (schemaChange instanceof SchemaChange.UpdateColumnType) {
                SchemaChange.UpdateColumnType updateColumnType = (SchemaChange.UpdateColumnType) schemaChange;
                int indexOf = tableSchema.fieldNames().indexOf(updateColumnType.fieldName());
                Preconditions.checkState(indexOf >= 0, "Field name " + updateColumnType.fieldName() + " does not exist in table. This is unexpected.");
                DataType type = tableSchema.fields().get(indexOf).type();
                DataType newDataType = updateColumnType.newDataType();
                if (UpdatedDataFieldsProcessFunction.canConvert(type, newDataType) == UpdatedDataFieldsProcessFunctionBase.ConvertAction.EXCEPTION) {
                    throw new UnsupportedOperationException(String.format("Cannot convert field %s from type %s to %s", updateColumnType.fieldName(), type, newDataType));
                }
            }
        }
    }
}
