package org.apache.paimon.flink.source.operator;

import java.time.Duration;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.temporal.TemporalAmount;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.paimon.append.MultiTableUnawareAppendCompactionTask;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/paimon/flink/source/operator/MultiUnawareTablesReadOperator.class */
public class MultiUnawareTablesReadOperator extends AbstractStreamOperator<MultiTableUnawareAppendCompactionTask> implements OneInputStreamOperator<MultiTableUnawareAppendCompactionTask, MultiTableUnawareAppendCompactionTask> {
    private static final long serialVersionUID = 1;
    private static final Logger LOG = LoggerFactory.getLogger(MultiUnawareTablesReadOperator.class);
    private final Catalog.Loader catalogLoader;
    private final Duration partitionIdleTime;
    private transient Catalog catalog;
    private transient Map<Identifier, FileStoreTable> tablesMap;

    public MultiUnawareTablesReadOperator(Catalog.Loader loader, Duration duration) {
        this.catalogLoader = loader;
        this.partitionIdleTime = duration;
    }

    public void open() throws Exception {
        super.open();
        this.tablesMap = new HashMap();
        this.catalog = this.catalogLoader.load();
    }

    public void processElement(StreamRecord<MultiTableUnawareAppendCompactionTask> streamRecord) {
        if (checkIsHistoryPartition(((MultiTableUnawareAppendCompactionTask) streamRecord.getValue()).partition(), getPartitionInfo(getTable(((MultiTableUnawareAppendCompactionTask) streamRecord.getValue()).tableIdentifier())))) {
            this.output.collect(streamRecord);
        }
    }

    private FileStoreTable getTable(Identifier identifier) {
        FileStoreTable fileStoreTable = this.tablesMap.get(identifier);
        if (fileStoreTable == null) {
            try {
                Table table = this.catalog.getTable(identifier);
                Preconditions.checkArgument(table instanceof FileStoreTable, "Only FileStoreTable supports compact action. The table type is '%s'.", table.getClass().getName());
                fileStoreTable = (FileStoreTable) table;
                this.tablesMap.put(identifier, fileStoreTable);
            } catch (Catalog.TableNotExistException e) {
                LOG.error(String.format("table: %s not found.", identifier.getFullName()));
            }
        }
        return fileStoreTable;
    }

    private Map<BinaryRow, Long> getPartitionInfo(FileStoreTable fileStoreTable) {
        return (Map) fileStoreTable.newSnapshotReader().partitionEntries().stream().collect(Collectors.toMap((v0) -> {
            return v0.partition();
        }, (v0) -> {
            return v0.lastFileCreationTime();
        }));
    }

    /* JADX WARN: Type inference failed for: r0v2, types: [java.time.ZonedDateTime] */
    private boolean checkIsHistoryPartition(BinaryRow binaryRow, Map<BinaryRow, Long> map) {
        return map.get(binaryRow).longValue() <= LocalDateTime.now().minus((TemporalAmount) this.partitionIdleTime).atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
    }

    public void close() throws Exception {
        super.close();
        if (this.catalog != null) {
            this.catalog.close();
        }
    }
}
