package org.apache.paimon.flink;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.flink.table.api.Schema;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogInfoProvider;
import org.apache.flink.table.catalog.CatalogMaterializedTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableProvider;
import org.apache.flink.table.catalog.IntervalFreshness;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.TableChange;
import org.apache.flink.table.catalog.TableInfo;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.table.factories.Factory;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.Snapshot;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.flink.sink.TypeNormalizationUtils;
import org.apache.paimon.options.Options;
import org.apache.paimon.schema.SchemaChange;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/VvrCatalog.class */
public class VvrCatalog extends FlinkCatalog implements CatalogTableProvider, CatalogInfoProvider {
    private static final Logger LOG = LoggerFactory.getLogger(VvrCatalog.class);
    public static final String IS_SCHEMA_EVOLUTION = "paimon.vvr.schema-evolution";
    public static final String PARTITION_KEYS = "partition-keys";
    private final Options catalogOptions;

    public VvrCatalog(Catalog catalog, String str, String str2, ClassLoader classLoader, Options options) {
        super(catalog, str, str2, classLoader, options);
        this.catalogOptions = options;
    }

    @Override // org.apache.paimon.flink.FlinkCatalog
    public Optional<Factory> getFactory() {
        return Optional.of(new VvrTableFactory(this.catalogOptions, this));
    }

    @Override // org.apache.paimon.flink.FlinkCatalog
    public void createDatabase(String str, CatalogDatabase catalogDatabase, boolean z) throws DatabaseAlreadyExistException, CatalogException {
        if (catalogDatabase != null && catalogDatabase.getDescription().isPresent() && !((String) catalogDatabase.getDescription().get()).equals("")) {
            throw new UnsupportedOperationException("Create database with description is unsupported.");
        }
        try {
            catalog().createDatabase(str, z);
        } catch (Catalog.DatabaseAlreadyExistException e) {
            throw new DatabaseAlreadyExistException(getName(), e.database());
        }
    }

    @Override // org.apache.paimon.flink.FlinkCatalog
    public boolean databaseExists(String str) throws CatalogException {
        LOG.info("Checking database existence: " + str);
        try {
            return super.databaseExists(str);
        } catch (Exception e) {
            LOG.info("Unexpected Exception: " + e.getMessage());
            if (e.getMessage().contains("Database name is invalid!")) {
                return false;
            }
            throw e;
        }
    }

    public CatalogTable inferTable(ObjectPath objectPath, CatalogDatabase catalogDatabase, CatalogTable catalogTable) throws CatalogException {
        if (VvrTableFactory.isFlinkTable(catalogTable)) {
            return catalogTable;
        }
        HashMap hashMap = new HashMap(catalogDatabase.getProperties());
        hashMap.putAll(catalogTable.getOptions());
        updateDefaultOptions(hashMap);
        CatalogTable addPartitionKeys = addPartitionKeys(objectPath, catalogTable, hashMap);
        hashMap.put(IS_SCHEMA_EVOLUTION, "true");
        return addPartitionKeys.copy(hashMap);
    }

    private CatalogTable addPartitionKeys(ObjectPath objectPath, CatalogTable catalogTable, Map<String, String> map) {
        List<String> partitionKeys;
        String str = map.get(PARTITION_KEYS);
        if (str == null) {
            return catalogTable;
        }
        Preconditions.checkArgument(!catalogTable.isPartitioned(), "Please do not use PARTITIONED BY clause and partition-keys table option at the same time.");
        Schema unresolvedSchema = catalogTable.getUnresolvedSchema();
        Identifier create = Identifier.create(objectPath.getDatabaseName(), objectPath.getObjectName());
        if (catalog().tableExists(create)) {
            try {
                partitionKeys = catalog().getTable(create).partitionKeys();
            } catch (Exception e) {
                throw new RuntimeException("Failed to fetch partition keys from existing table", e);
            }
        } else {
            Set set = (Set) unresolvedSchema.getColumns().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toSet());
            Stream map2 = Arrays.stream(str.split(",")).map((v0) -> {
                return v0.trim();
            });
            set.getClass();
            partitionKeys = (List) map2.filter((v1) -> {
                return r1.contains(v1);
            }).collect(Collectors.toList());
        }
        return CatalogTable.of(unresolvedSchema, catalogTable.getComment(), partitionKeys, map);
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.paimon.flink.FlinkCatalog
    public org.apache.paimon.schema.Schema buildPaimonSchema(Identifier identifier, CatalogBaseTable catalogBaseTable, Map<String, String> map) {
        updateDefaultOptions(map);
        org.apache.paimon.schema.Schema buildPaimonSchema = super.buildPaimonSchema(identifier, catalogBaseTable, map);
        if (((Boolean) Options.fromMap(buildPaimonSchema.options()).get(VvrConnectorOptions.ENABLE_TYPE_NORMALIZATION)).booleanValue()) {
            buildPaimonSchema = TypeNormalizationUtils.getNormalizedSchema(buildPaimonSchema);
        }
        return buildPaimonSchema;
    }

    /* JADX INFO: Access modifiers changed from: protected */
    @Override // org.apache.paimon.flink.FlinkCatalog
    public boolean handleMaterializedTableChange(TableChange tableChange, List<SchemaChange> list) {
        if (!(tableChange instanceof TableChange.ModifyFreshness)) {
            return super.handleMaterializedTableChange(tableChange, list);
        }
        TableChange.ModifyFreshness modifyFreshness = (TableChange.ModifyFreshness) tableChange;
        CatalogMaterializedTable.RefreshMode refreshMode = modifyFreshness.getRefreshMode();
        IntervalFreshness intervalFreshness = modifyFreshness.getIntervalFreshness();
        list.add(SchemaChange.setOption(CoreOptions.MATERIALIZED_TABLE_REFRESH_MODE.key(), refreshMode.name()));
        list.add(SchemaChange.setOption(CoreOptions.MATERIALIZED_TABLE_INTERVAL_FRESHNESS.key(), intervalFreshness.getInterval()));
        list.add(SchemaChange.setOption(CoreOptions.MATERIALIZED_TABLE_INTERVAL_FRESHNESS_TIME_UNIT.key(), intervalFreshness.getTimeUnit().name()));
        return true;
    }

    private void updateDefaultOptions(Map<String, String> map) {
        if (map.containsKey(CoreOptions.METASTORE_PARTITIONED_TABLE.key())) {
            return;
        }
        map.put(CoreOptions.METASTORE_PARTITIONED_TABLE.key(), "true");
    }

    public Map<String, String> getTableMetadata(ObjectPath objectPath) throws TableNotExistException, CatalogException {
        HashMap hashMap = new HashMap();
        try {
            Table table = catalog().getTable(Identifier.create(objectPath.getDatabaseName(), objectPath.getObjectName()));
            if (!(table instanceof FileStoreTable)) {
                throw new CatalogException(String.format("table %s.%s is a %s, not a FileStoreTable. This should not happen.", getName(), objectPath, table.getClass()));
            }
            FileStoreTable fileStoreTable = (FileStoreTable) table;
            if (fileStoreTable.latestSnapshotId().isPresent()) {
                Snapshot snapshot = fileStoreTable.snapshot(fileStoreTable.latestSnapshotId().getAsLong());
                hashMap.put("numRows", String.valueOf(snapshot.totalRecordCount()));
                hashMap.put(Catalog.LAST_UPDATE_TIME_PROP, String.valueOf(snapshot.timeMillis()));
                return hashMap;
            }
            LOG.info("catalog {} does not have any snapshot for table {}. Using schema metadata instead.", getName(), objectPath);
            hashMap.put("numRows", "0");
            hashMap.put(Catalog.LAST_UPDATE_TIME_PROP, String.valueOf(fileStoreTable.schema().timeMillis()));
            return hashMap;
        } catch (Catalog.TableNotExistException e) {
            throw new TableNotExistException(getName(), objectPath, e);
        }
    }

    public void alterTableMetadata(ObjectPath objectPath, Map<String, String> map) throws CatalogException {
        LOG.warn("alterTableMetadata is not supported for Paimon Catalog yet. Will be ignored.");
    }

    @Override // org.apache.flink.table.catalog.CatalogInfoProvider
    public List<TableInfo> listTableInfos(String str) throws DatabaseNotExistException, CatalogException {
        List<String> listTables = listTables(str);
        ArrayList arrayList = new ArrayList();
        for (String str2 : listTables) {
            try {
                CatalogBaseTable table = getTable(new ObjectPath(str, str2));
                arrayList.add(new TableInfo(getName(), str, str2, table.getTableKind(), table.getComment()));
            } catch (TableNotExistException e) {
                throw new CatalogException(String.format("Failed to get table %s.%s", str, str2), e);
            }
        }
        return arrayList;
    }

    public long getTableTimestamp(ObjectPath objectPath, long j) throws TableNotExistException, CatalogException {
        try {
            Table table = catalog().getTable(toIdentifier(objectPath));
            if (!(table instanceof FileStoreTable)) {
                throw new CatalogException(String.format("table %s.%s is a %s, not a FileStoreTable. This should not happen.", getName(), objectPath, table.getClass()));
            }
            Snapshot earlierOrEqualTimeMills = ((FileStoreTable) table).snapshotManager().earlierOrEqualTimeMills(j);
            if (earlierOrEqualTimeMills != null && earlierOrEqualTimeMills.timeMillis() <= j) {
                return earlierOrEqualTimeMills.timeMillis();
            }
            LOG.info("catalog {} does not have any snapshot for table {} earlier or equal to timestamp {}.", new Object[]{getName(), objectPath, Long.valueOf(j)});
            return -1L;
        } catch (Catalog.TableNotExistException e) {
            throw new TableNotExistException(getName(), objectPath);
        }
    }
}
