package com.aliyun.jindodata.commit;

import com.aliyun.jindodata.api.spec.protos.JdoObjectUploadInfo;
import com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter;
import com.aliyun.jindodata.commit.impl.CommitConstants;
import com.aliyun.jindodata.commit.impl.CommitOperations;
import com.aliyun.jindodata.commit.impl.InternalCommitterConstants;
import com.aliyun.jindodata.commit.impl.Tasks;
import com.aliyun.jindodata.commit.pendingset.PendingSet;
import com.aliyun.jindodata.commit.pendingset.SinglePendingCommit;
import com.aliyun.jindodata.commit.pendingset.SuccessData;
import com.aliyun.jindodata.commit.util.CommitUtils;
import com.aliyun.jindodata.commit.util.CommitUtilsWithMR;
import com.aliyun.jindodata.commit.util.HadoopExecutors;
import com.aliyun.jindodata.commit.util.JindoCommitUtils;
import com.aliyun.jindodata.commit.util.PathCommitException;
import com.aliyun.jindodata.common.JindoConstant;
import com.aliyun.jindodata.common.JindoHadoopSystem;
import com.aliyun.jindodata.common.JindoInMemoryMagicTracker;
import com.aliyun.jindodata.shade.google_guava.annotations.VisibleForTesting;
import com.aliyun.jindodata.shade.google_guava.base.Preconditions;
import com.aliyun.jindodata.shade.google_guava.util.concurrent.ThreadFactoryBuilder;
import com.aliyun.jindodata.thirdparty.util.DurationInfo;
import com.aliyun.jindodata.thirdparty.util.MagicCommitPaths;
import com.aliyun.jindodata.thirdparty.util.NetUtils;
import com.aliyun.jindodata.thirdparty.util.StringUtils;
import com.aliyun.jindodata.types.ObjectCommitData;
import java.io.ByteArrayInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Date;
import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/aliyun/jindodata/commit/JindoMagicCommitter.class */
public class JindoMagicCommitter extends AbstractJindoFileOutputCommitter {
    private static final Logger LOG = LoggerFactory.getLogger(JindoMagicCommitter.class);
    public static final String THREAD_PREFIX = "jindo-object-committer-pool-";

    @VisibleForTesting
    public static final String E_SELF_GENERATED_JOB_UUID = "has a self-generated job UUID";
    private final String uuid;
    private final JobUUIDSource uuidSource;
    private boolean jobSetup;
    private ExecutorService threadPool;
    private final CommitOperations commitOperations;
    private Path outputPath;
    private final String role;
    private Path workPath;
    private Configuration conf;
    private FileSystem destFS;
    private final JobContext jobContext;
    private final boolean createJobMarker;
    private ExecutorService jobHelperThreadPool;
    private List<PendingSet> jobPendingSets;
    private ConcurrentHashMap<String, PendingSet> taskPendingSet;
    private boolean enableTaskMPU;
    private boolean inMemoryTackerEnabled;
    private boolean inMemoryCommitEnabled;
    private boolean inMemCompatCheckerEnabled;

    /* loaded from: input_file:com/aliyun/jindodata/commit/JindoMagicCommitter$ActiveCommit.class */
    public static class ActiveCommit {
        private static final ActiveCommit EMPTY = new ActiveCommit((FileSystem) null, new ArrayList());
        private final List<FileStatus> sourceFiles;
        private final List<PendingSet> pendingSets;
        private final FileSystem sourceFS;
        private final List<String> committedObjects;
        private int committedObjectCount;
        private long committedBytes;

        public ActiveCommit(FileSystem fileSystem, List<? extends FileStatus> list) {
            this.committedObjects = new ArrayList();
            this.sourceFiles = list;
            this.pendingSets = new ArrayList();
            this.sourceFS = fileSystem;
        }

        public ActiveCommit(List<PendingSet> list, FileSystem fileSystem) {
            this.committedObjects = new ArrayList();
            this.sourceFiles = new ArrayList();
            this.pendingSets = list;
            this.sourceFS = fileSystem;
        }

        public static ActiveCommit fromStatusList(FileSystem fileSystem, List<? extends FileStatus> list) {
            return new ActiveCommit(fileSystem, list);
        }

        public static ActiveCommit fromPendingSet(FileSystem fileSystem, List<PendingSet> list) {
            return new ActiveCommit(list, fileSystem);
        }

        public static ActiveCommit empty() {
            return EMPTY;
        }

        public List<FileStatus> getSourceFiles() {
            return this.sourceFiles;
        }

        public List<PendingSet> getPendingSets() {
            return this.pendingSets;
        }

        public FileSystem getSourceFS() {
            return this.sourceFS;
        }

        public synchronized void uploadCommitted(String str, long j) {
            if (this.committedObjects.size() < 100) {
                this.committedObjects.add(str);
            }
            this.committedObjectCount++;
            this.committedBytes += j;
        }

        public synchronized List<String> getCommittedObjects() {
            return this.committedObjects;
        }

        public synchronized int getCommittedFileCount() {
            return this.committedObjectCount;
        }

        public synchronized long getCommittedBytes() {
            return this.committedBytes;
        }

        public int size() {
            return this.sourceFiles.size();
        }

        public boolean isEmpty() {
            return this.sourceFiles.isEmpty() && this.pendingSets.isEmpty();
        }

        public void add(FileStatus fileStatus) {
            this.sourceFiles.add(fileStatus);
        }
    }

    /* loaded from: input_file:com/aliyun/jindodata/commit/JindoMagicCommitter$JobUUIDSource.class */
    public enum JobUUIDSource {
        SparkWriteUUID(InternalCommitterConstants.SPARK_WRITE_UUID),
        CommitterUUIDProperty(InternalCommitterConstants.FS_OSS_COMMITTER_UUID),
        JobID("JobID"),
        GeneratedLocally("Generated Locally"),
        WithoutID("WithoutID");

        private final String text;

        JobUUIDSource(String str) {
            this.text = str;
        }

        public String getText() {
            return this.text;
        }

        @Override // java.lang.Enum
        public String toString() {
            StringBuilder sb = new StringBuilder("JobUUIDSource{");
            sb.append("text='").append(this.text).append('\'');
            sb.append('}');
            return sb.toString();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/aliyun/jindodata/commit/JindoMagicCommitter$PoolSubmitter.class */
    public final class PoolSubmitter implements Tasks.Submitter {
        private final JobContext context;
        private final int numThreads;

        private PoolSubmitter(JobContext jobContext) {
            this.numThreads = JindoMagicCommitter.this.getThreadCount(jobContext);
            Preconditions.checkArgument(this.numThreads > 0, "Cannot create a thread pool with no threads");
            this.context = jobContext;
        }

        @Override // com.aliyun.jindodata.commit.impl.Tasks.Submitter
        public Future<?> submit(Runnable runnable) {
            return JindoMagicCommitter.this.submitRunnable(this.context, runnable);
        }
    }

    public JindoMagicCommitter(Path path, TaskAttemptContext taskAttemptContext) throws IOException {
        super(path, taskAttemptContext);
        this.jobPendingSets = new ArrayList();
        this.taskPendingSet = new ConcurrentHashMap<>();
        this.enableTaskMPU = false;
        this.inMemoryTackerEnabled = false;
        this.inMemoryCommitEnabled = false;
        this.inMemCompatCheckerEnabled = true;
        Preconditions.checkArgument(path != null, "null output path");
        Preconditions.checkArgument(taskAttemptContext != null, "null job context");
        this.jobContext = taskAttemptContext;
        this.role = "Task committer " + taskAttemptContext.getTaskAttemptID();
        setConf(taskAttemptContext.getConfiguration());
        Pair<String, JobUUIDSource> buildJobUUID = buildJobUUID(this.conf, taskAttemptContext.getJobID());
        this.uuid = (String) buildJobUUID.getKey();
        this.uuidSource = (JobUUIDSource) buildJobUUID.getValue();
        LOG.info("Job UUID {} source {}", getUUID(), getUUIDSource().getText());
        initOutput(path);
        LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", new Object[]{this.role, CommitUtilsWithMR.jobName(taskAttemptContext), CommitUtilsWithMR.jobIdString(taskAttemptContext), path});
        JindoHadoopSystem destOSSFS = getDestOSSFS();
        this.createJobMarker = taskAttemptContext.getConfiguration().getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true);
        this.commitOperations = new CommitOperations(destOSSFS, path);
        setWorkPath(getTaskAttemptPath(taskAttemptContext));
        CommitUtils.verifyIsMagicCommitPath(this.destFS, getWorkPath());
        if (JindoInMemoryMagicTracker.enabled(taskAttemptContext.getConfiguration(), path)) {
            this.inMemoryTackerEnabled = true;
        }
        String scheme = path.toUri().getScheme();
        if (this.conf.getBoolean(JindoConstant.FS_CONF_PREFIX_KEY + scheme + "." + JindoConstant.MAGIC_IN_MEM_COMMIT_ENABLED, false)) {
            this.inMemoryCommitEnabled = true;
            if (this.algorithmVersion == 2) {
                this.enableTaskMPU = true;
            }
        }
        this.inMemCompatCheckerEnabled = this.conf.getBoolean(JindoConstant.FS_CONF_PREFIX_KEY + scheme + "." + JindoConstant.MAGIC_IN_MEM_COMPAT_CHECKER_ENABLED, true);
        LOG.info("inMemoryCommitEnabled {}, inMemoryTackerEnabled {}, taskMPUEnabled {}, inMemCompatCheckerEnabled {}", new Object[]{Boolean.valueOf(this.inMemoryCommitEnabled), Boolean.valueOf(this.inMemoryTackerEnabled), Boolean.valueOf(this.enableTaskMPU), Boolean.valueOf(this.inMemCompatCheckerEnabled)});
    }

    public JindoMagicCommitter(Path path, JobContext jobContext) throws IOException {
        super(path, jobContext);
        this.jobPendingSets = new ArrayList();
        this.taskPendingSet = new ConcurrentHashMap<>();
        this.enableTaskMPU = false;
        this.inMemoryTackerEnabled = false;
        this.inMemoryCommitEnabled = false;
        this.inMemCompatCheckerEnabled = true;
        Preconditions.checkArgument(path != null, "null output path");
        Preconditions.checkArgument(jobContext != null, "null job context");
        this.jobContext = jobContext;
        this.role = "Task committer " + jobContext.getJobID();
        setConf(jobContext.getConfiguration());
        Pair<String, JobUUIDSource> buildJobUUID = buildJobUUID(this.conf, jobContext.getJobID());
        this.uuid = (String) buildJobUUID.getKey();
        this.uuidSource = (JobUUIDSource) buildJobUUID.getValue();
        LOG.info("Job UUID {} source {}", getUUID(), getUUIDSource().getText());
        initOutput(path);
        LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}", new Object[]{this.role, CommitUtilsWithMR.jobName(jobContext), CommitUtilsWithMR.jobIdString(jobContext), path});
        JindoHadoopSystem destOSSFS = getDestOSSFS();
        this.createJobMarker = jobContext.getConfiguration().getBoolean("mapreduce.fileoutputcommitter.marksuccessfuljobs", true);
        this.commitOperations = new CommitOperations(destOSSFS, path);
    }

    public byte[] getTaskPendingSetByteArray(String str) throws IOException {
        PendingSet pendingSet = new PendingSet(0);
        pendingSet.setTaskId(str);
        return this.taskPendingSet.getOrDefault(str, pendingSet).toBytes();
    }

    public void parsePendingSetFromByteArray(byte[] bArr) throws IOException {
        this.jobPendingSets.add(PendingSet.serializer().fromJsonStream(new ByteArrayInputStream(bArr)));
    }

    @VisibleForTesting
    protected void initOutput(Path path) throws IOException {
        FileSystem destinationFS = getDestinationFS(path, getConf());
        setDestFS(destinationFS);
        setOutputPath(destinationFS.makeQualified(path));
    }

    public final JobContext getJobContext() {
        return this.jobContext;
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public final Path getOutputPath() {
        return this.outputPath;
    }

    protected final void setOutputPath(Path path) {
        Preconditions.checkNotNull(path, "Null output path");
        this.outputPath = path;
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public final Path getWorkPath() {
        return this.workPath;
    }

    protected final void setWorkPath(Path path) {
        LOG.debug("Setting work path to {}", path);
        this.workPath = path;
    }

    public final Configuration getConf() {
        return this.conf;
    }

    protected final void setConf(Configuration configuration) {
        this.conf = configuration;
    }

    public FileSystem getDestFS() throws IOException {
        if (this.destFS == null) {
            setDestFS(getDestinationFS(this.outputPath, getConf()));
        }
        return this.destFS;
    }

    public JindoHadoopSystem getDestOSSFS() throws IOException {
        return (JindoHadoopSystem) getDestFS();
    }

    protected void setDestFS(FileSystem fileSystem) {
        this.destFS = fileSystem;
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public Path getJobAttemptPath(JobContext jobContext) {
        return getJobAttemptPath(CommitUtilsWithMR.getAppAttemptId(jobContext));
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    protected Path getJobAttemptPath(int i) {
        return CommitUtilsWithMR.getMagicJobAttemptPath(getUUID(), getOutputPath());
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public Path getTaskAttemptPath(TaskAttemptContext taskAttemptContext) {
        return CommitUtilsWithMR.getMagicTaskAttemptPath(taskAttemptContext, getUUID(), getOutputPath());
    }

    protected Path getBaseTaskAttemptPath(TaskAttemptContext taskAttemptContext) {
        return CommitUtilsWithMR.getBaseMagicTaskAttemptPath(taskAttemptContext, getUUID(), getOutputPath());
    }

    public String getName() {
        return this.inMemoryCommitEnabled ? CommitConstants.IN_MEM_COMMITTER_NAME_MAGIC : "magic";
    }

    @VisibleForTesting
    public final String getUUID() {
        return this.uuid;
    }

    @VisibleForTesting
    public final JobUUIDSource getUUIDSource() {
        return this.uuidSource;
    }

    public String toString() {
        StringBuilder sb = new StringBuilder("JindoMagicCommitter{");
        sb.append("role=").append(this.role);
        sb.append(", name=").append(getName());
        sb.append(", outputPath=").append(getOutputPath());
        sb.append(", workPath=").append(this.workPath);
        sb.append(", uuid='").append(getUUID()).append('\'');
        sb.append(", uuid source=").append(getUUIDSource());
        sb.append('}');
        return sb.toString();
    }

    protected FileSystem getDestinationFS(Path path, Configuration configuration) throws IOException {
        return CommitUtils.getJindoFileSystem(path, configuration, requiresDelayedCommitOutputInFileSystem());
    }

    protected boolean requiresDelayedCommitOutputInFileSystem() {
        return true;
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public void recoverTask(TaskAttemptContext taskAttemptContext) throws IOException {
        LOG.warn("Cannot recover task {}", taskAttemptContext.getTaskAttemptID());
        throw new PathCommitException(this.outputPath, String.format("Unable to recover task %s", taskAttemptContext.getTaskAttemptID()));
    }

    protected void maybeCreateSuccessMarkerFromCommits(JobContext jobContext, ActiveCommit activeCommit) throws IOException {
        ArrayList arrayList = new ArrayList(activeCommit.size());
        arrayList.addAll(activeCommit.committedObjects);
        maybeCreateSuccessMarker(jobContext, arrayList);
    }

    protected void maybeCreateSuccessMarker(JobContext jobContext, List<String> list) throws IOException {
        if (this.createJobMarker) {
            SuccessData successData = new SuccessData();
            successData.setCommitter(getName());
            successData.setJobId(this.uuid);
            successData.setJobIdSource(this.uuidSource.getText());
            successData.setDescription(getRole());
            successData.setHostname(NetUtils.getLocalHostname());
            Date date = new Date();
            successData.setTimestamp(date.getTime());
            successData.setDate(date.toString());
            successData.setFilenames(list);
            this.commitOperations.createSuccessMarker(getOutputPath(), successData);
        }
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public void setupJob(JobContext jobContext) throws IOException {
        DurationInfo durationInfo = new DurationInfo(LOG, "Job %s setting up", getUUID());
        Throwable th = null;
        try {
            try {
                this.jobSetup = true;
                Configuration configuration = jobContext.getConfiguration();
                configuration.set(InternalCommitterConstants.FS_OSS_COMMITTER_UUID, getUUID());
                configuration.set(InternalCommitterConstants.FS_OSS_COMMITTER_UUID_SOURCE, getUUIDSource().getText());
                Path outputPath = getOutputPath();
                if (this.createJobMarker) {
                    this.commitOperations.deleteSuccessMarker(outputPath);
                }
                getDestFS().mkdirs(outputPath);
                warnOnActiveUploads(outputPath);
                Path jobAttemptPath = getJobAttemptPath(jobContext);
                getDestinationFS(jobAttemptPath, jobContext.getConfiguration()).mkdirs(jobAttemptPath);
                if (durationInfo != null) {
                    if (0 == 0) {
                        durationInfo.close();
                        return;
                    }
                    try {
                        durationInfo.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (durationInfo != null) {
                if (th != null) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th4;
        }
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public void setupTask(TaskAttemptContext taskAttemptContext) throws IOException {
        TaskAttemptID taskAttemptID = taskAttemptContext.getTaskAttemptID();
        DurationInfo durationInfo = new DurationInfo(LOG, "Setup Task %s", taskAttemptID);
        Throwable th = null;
        try {
            if (!this.jobSetup && getUUIDSource() == JobUUIDSource.GeneratedLocally) {
                throw new PathCommitException(getOutputPath().toString(), "Task attempt " + taskAttemptID + " " + E_SELF_GENERATED_JOB_UUID);
            }
            Path taskAttemptPath = getTaskAttemptPath(taskAttemptContext);
            taskAttemptPath.getFileSystem(getConf()).mkdirs(taskAttemptPath);
            if (durationInfo != null) {
                if (0 == 0) {
                    durationInfo.close();
                    return;
                }
                try {
                    durationInfo.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (durationInfo != null) {
                if (0 != 0) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th3;
        }
    }

    protected FileSystem getTaskAttemptFilesystem(TaskAttemptContext taskAttemptContext) throws IOException {
        return getTaskAttemptPath(taskAttemptContext).getFileSystem(getConf());
    }

    protected void commitPendingUploads(JobContext jobContext, ActiveCommit activeCommit) throws IOException {
        if (this.enableTaskMPU) {
            LOG.warn("Task commit MPU has enabled, no more work when commitJob.");
            return;
        }
        if (activeCommit.isEmpty()) {
            LOG.warn("{}: No pending uploads to commit", getRole());
        }
        if (this.inMemoryCommitEnabled) {
            memoryJobCommit(jobContext, activeCommit);
        } else {
            fileJobCommit(jobContext, activeCommit);
        }
    }

    private void fileJobCommit(JobContext jobContext, ActiveCommit activeCommit) throws IOException {
        DurationInfo durationInfo = new DurationInfo(LOG, "committing the output of %s task(s)", Integer.valueOf(activeCommit.size()));
        Throwable th = null;
        try {
            Tasks.foreach(activeCommit.getSourceFiles()).stopOnFailure().suppressExceptions(false).executeWith(buildSubmitter(jobContext)).abortWith(fileStatus -> {
                loadAndAbort(activeCommit, fileStatus, true, false);
            }).revertWith(fileStatus2 -> {
                loadAndRevert(activeCommit, fileStatus2);
            }).run(fileStatus3 -> {
                loadAndCommit(activeCommit, fileStatus3);
            });
            if (durationInfo != null) {
                if (0 == 0) {
                    durationInfo.close();
                    return;
                }
                try {
                    durationInfo.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (durationInfo != null) {
                if (0 != 0) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th3;
        }
    }

    private void memoryJobCommit(JobContext jobContext, ActiveCommit activeCommit) throws IOException {
        initJobHelperSubmitter(jobContext);
        try {
            DurationInfo durationInfo = new DurationInfo(LOG, "committing the output of %s task(s)", Integer.valueOf(activeCommit.getPendingSets().size()));
            Throwable th = null;
            try {
                try {
                    Tasks.foreach(activeCommit.getPendingSets()).stopOnFailure().suppressExceptions(false).executeWith(buildSubmitter(jobContext)).abortWith(pendingSet -> {
                        abortPendingSet(pendingSet, true, false);
                    }).revertWith(this::revertPendingSet).run(pendingSet2 -> {
                        commitPendingSet(jobContext, activeCommit, pendingSet2);
                    });
                    if (durationInfo != null) {
                        if (0 != 0) {
                            try {
                                durationInfo.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            durationInfo.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            destroyJobHelperSubmitter();
        }
    }

    private void commitPendingSet(JobContext jobContext, ActiveCommit activeCommit, PendingSet pendingSet) throws IOException {
        if (pendingSet.getCommits().isEmpty()) {
            LOG.warn("{}: No pending commits to commit", pendingSet.getTaskId());
            return;
        }
        DurationInfo durationInfo = new DurationInfo(LOG, "Committing files in pendingset of task %s", pendingSet.getTaskId());
        Throwable th = null;
        try {
            String jobId = pendingSet.getJobId();
            if (!StringUtils.isEmpty(jobId) && !getUUID().equals(jobId)) {
                throw new PathCommitException(pendingSet.getTaskId(), String.format("Mismatch in Job ID (%s) and commit job ID (%s)", getUUID(), jobId));
            }
            Tasks.Builder onFailure = Tasks.foreach(pendingSet.getCommits()).stopOnFailure().suppressExceptions(false).executeWith(jobHelperSubmitter()).onFailure((singlePendingCommit, exc) -> {
                this.commitOperations.abortSingleCommit(singlePendingCommit);
            });
            CommitOperations commitOperations = this.commitOperations;
            commitOperations.getClass();
            Tasks.Builder abortWith = onFailure.abortWith(commitOperations::abortSingleCommit);
            CommitOperations commitOperations2 = this.commitOperations;
            commitOperations2.getClass();
            abortWith.revertWith(commitOperations2::revertCommit).run(singlePendingCommit2 -> {
                this.commitOperations.commitOrFail(singlePendingCommit2);
                activeCommit.uploadCommitted(singlePendingCommit2.getDestinationKey(), singlePendingCommit2.getLength());
            });
            if (durationInfo != null) {
                if (0 == 0) {
                    durationInfo.close();
                    return;
                }
                try {
                    durationInfo.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (durationInfo != null) {
                if (0 != 0) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th3;
        }
    }

    private void revertPendingSet(PendingSet pendingSet) throws IOException {
        DurationInfo durationInfo = new DurationInfo(LOG, false, "Reverting task %s", pendingSet.getTaskId());
        Throwable th = null;
        try {
            try {
                Tasks.Builder suppressExceptions = Tasks.foreach(pendingSet.getCommits()).suppressExceptions(true);
                CommitOperations commitOperations = this.commitOperations;
                commitOperations.getClass();
                suppressExceptions.run(commitOperations::revertCommit);
                if (durationInfo != null) {
                    if (0 == 0) {
                        durationInfo.close();
                        return;
                    }
                    try {
                        durationInfo.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (durationInfo != null) {
                if (th != null) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th4;
        }
    }

    private void abortPendingSet(PendingSet pendingSet, boolean z, boolean z2) throws IOException {
        DurationInfo durationInfo = new DurationInfo(LOG, false, "Aborting task %s", pendingSet.getTaskId());
        Throwable th = null;
        try {
            FileSystem destFS = getDestFS();
            Tasks.foreach(pendingSet.getCommits()).executeWith(singleThreadSubmitter()).suppressExceptions(z).run(singlePendingCommit -> {
                try {
                    this.commitOperations.abortSingleCommit(singlePendingCommit);
                } catch (FileNotFoundException e) {
                    if (z2) {
                        destFS.delete(singlePendingCommit.destinationPath(), false);
                    }
                }
            });
            if (durationInfo != null) {
                if (0 == 0) {
                    durationInfo.close();
                    return;
                }
                try {
                    durationInfo.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (durationInfo != null) {
                if (0 != 0) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th3;
        }
    }

    private Tasks.Submitter jobHelperSubmitter() {
        return runnable -> {
            return this.jobHelperThreadPool.submit(runnable);
        };
    }

    private void initJobHelperSubmitter(JobContext jobContext) {
        if (this.jobHelperThreadPool == null) {
            int threadCount = getThreadCount(jobContext);
            Preconditions.checkArgument(threadCount > 0, "Cannot create a thread pool with no threads");
            int i = threadCount * jobContext.getConfiguration().getInt(CommitConstants.FS_MAGIC_COMMITTER_THREADS_FACTOR, 2);
            LOG.info("{}: creating job helper thread pool of size {}", getRole(), Integer.valueOf(i));
            this.jobHelperThreadPool = HadoopExecutors.newFixedThreadPool(i, new ThreadFactoryBuilder().setDaemon(true).setNameFormat("jindo-job-helper-" + jobContext.getJobID() + "-%d").build());
        }
    }

    private void destroyJobHelperSubmitter() {
        if (this.jobHelperThreadPool != null) {
            HadoopExecutors.shutdown(this.jobHelperThreadPool, LOG, 30L, TimeUnit.SECONDS);
            this.jobHelperThreadPool = null;
        }
    }

    protected void precommitCheckPendingFiles(JobContext jobContext, ActiveCommit activeCommit) throws IOException {
        FileSystem sourceFS = activeCommit.getSourceFS();
        DurationInfo durationInfo = new DurationInfo(LOG, "Preflight Load of pending files", new Object[0]);
        Throwable th = null;
        try {
            try {
                Tasks.foreach(activeCommit.getSourceFiles()).stopOnFailure().suppressExceptions(false).executeWith(buildSubmitter(jobContext)).run(fileStatus -> {
                    PendingSet.load(sourceFS, fileStatus);
                });
                if (durationInfo != null) {
                    if (0 == 0) {
                        durationInfo.close();
                        return;
                    }
                    try {
                        durationInfo.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (durationInfo != null) {
                if (th != null) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th4;
        }
    }

    private void loadAndCommit(ActiveCommit activeCommit, FileStatus fileStatus) throws IOException {
        Path path = fileStatus.getPath();
        DurationInfo durationInfo = new DurationInfo(LOG, "Loading and committing files in pendingset %s", path);
        Throwable th = null;
        try {
            PendingSet load = PendingSet.load(activeCommit.getSourceFS(), fileStatus);
            String jobId = load.getJobId();
            if (!StringUtils.isEmpty(jobId) && !getUUID().equals(jobId)) {
                throw new PathCommitException(path, String.format("Mismatch in Job ID (%s) and commit job ID (%s)", getUUID(), jobId));
            }
            Tasks.Builder onFailure = Tasks.foreach(load.getCommits()).stopOnFailure().suppressExceptions(false).executeWith(singleThreadSubmitter()).onFailure((singlePendingCommit, exc) -> {
                this.commitOperations.abortSingleCommit(singlePendingCommit);
            });
            CommitOperations commitOperations = this.commitOperations;
            commitOperations.getClass();
            Tasks.Builder abortWith = onFailure.abortWith(commitOperations::abortSingleCommit);
            CommitOperations commitOperations2 = this.commitOperations;
            commitOperations2.getClass();
            abortWith.revertWith(commitOperations2::revertCommit).run(singlePendingCommit2 -> {
                this.commitOperations.commitOrFail(singlePendingCommit2);
                activeCommit.uploadCommitted(singlePendingCommit2.getDestinationKey(), singlePendingCommit2.getLength());
            });
            if (durationInfo != null) {
                if (0 == 0) {
                    durationInfo.close();
                    return;
                }
                try {
                    durationInfo.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (durationInfo != null) {
                if (0 != 0) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th3;
        }
    }

    private void loadAndRevert(ActiveCommit activeCommit, FileStatus fileStatus) throws IOException {
        DurationInfo durationInfo = new DurationInfo(LOG, false, "Committing %s", fileStatus.getPath());
        Throwable th = null;
        try {
            try {
                Tasks.Builder suppressExceptions = Tasks.foreach(PendingSet.load(activeCommit.getSourceFS(), fileStatus).getCommits()).suppressExceptions(true);
                CommitOperations commitOperations = this.commitOperations;
                commitOperations.getClass();
                suppressExceptions.run(commitOperations::revertCommit);
                if (durationInfo != null) {
                    if (0 == 0) {
                        durationInfo.close();
                        return;
                    }
                    try {
                        durationInfo.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (durationInfo != null) {
                if (th != null) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th4;
        }
    }

    private void loadAndAbort(ActiveCommit activeCommit, FileStatus fileStatus, boolean z, boolean z2) throws IOException {
        DurationInfo durationInfo = new DurationInfo(LOG, false, "Aborting %s", fileStatus.getPath());
        Throwable th = null;
        try {
            PendingSet load = PendingSet.load(activeCommit.getSourceFS(), fileStatus);
            FileSystem destFS = getDestFS();
            Tasks.foreach(load.getCommits()).executeWith(singleThreadSubmitter()).suppressExceptions(z).run(singlePendingCommit -> {
                try {
                    this.commitOperations.abortSingleCommit(singlePendingCommit);
                } catch (FileNotFoundException e) {
                    if (z2) {
                        destFS.delete(singlePendingCommit.destinationPath(), false);
                    }
                }
            });
            if (durationInfo != null) {
                if (0 == 0) {
                    durationInfo.close();
                    return;
                }
                try {
                    durationInfo.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (durationInfo != null) {
                if (0 != 0) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th3;
        }
    }

    protected void commitJobInternal(JobContext jobContext, ActiveCommit activeCommit) throws IOException {
        commitPendingUploads(jobContext, activeCommit);
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
        LOG.info("{}: aborting job {} in state {}", new Object[]{getRole(), CommitUtilsWithMR.jobIdString(jobContext), state});
        abortJobInternal(jobContext, false);
    }

    protected void abortJobInternal(JobContext jobContext, boolean z) throws IOException {
        cleanup(jobContext, z);
    }

    protected void abortPendingUploadsInCleanup(boolean z) throws IOException {
        if (!shouldAbortUploadsInCleanup()) {
            LOG.debug("Not cleanup up pending uploads to {} as {} is false ", getOutputPath(), CommitConstants.FS_MAGIC_COMMITTER_ABORT_PENDING_UPLOADS);
            return;
        }
        Path outputPath = getOutputPath();
        DurationInfo durationInfo = new DurationInfo(LOG, "Aborting all pending commits under %s", outputPath);
        Throwable th = null;
        try {
            try {
                JdoObjectUploadInfo[] uploads = getCommitOperations().listPendingUploadsUnderPath(outputPath).getObjectUploadInfoList().getUploads();
                if (uploads.length > 0) {
                    LOG.warn("{} pending uploads were found -aborting", Integer.valueOf(uploads.length));
                    LOG.warn("If other tasks/jobs are writing to {},this action may cause them to fail", outputPath);
                    LOG.warn("If you want more than one job is writing to the same destination tree simultaneously, you should change configuration {} to false to disable parts auto-clean.If disabled, you must configure the bucket lifecycle to remove uploads after a time period. Otherwise there is a risk that uncommitted uploads may run up bills.", CommitConstants.FS_MAGIC_COMMITTER_ABORT_PENDING_UPLOADS);
                    Tasks.foreach(uploads).executeWith(buildSubmitter(getJobContext())).suppressExceptions(z).run(jdoObjectUploadInfo -> {
                        this.commitOperations.abortMultipartCommit(this.destFS.makeQualified(new Path("/" + jdoObjectUploadInfo.getKey())), jdoObjectUploadInfo.getUploadId());
                    });
                } else {
                    LOG.info("No pending uploads were found");
                }
                if (durationInfo != null) {
                    if (0 == 0) {
                        durationInfo.close();
                        return;
                    }
                    try {
                        durationInfo.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (IOException e) {
                LOG.debug("Failed to list pending uploads under {}", outputPath, e);
                if (durationInfo != null) {
                    if (0 == 0) {
                        durationInfo.close();
                        return;
                    }
                    try {
                        durationInfo.close();
                    } catch (Throwable th3) {
                        th.addSuppressed(th3);
                    }
                }
            }
        } catch (Throwable th4) {
            if (durationInfo != null) {
                if (0 != 0) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th4;
        }
    }

    private boolean shouldAbortUploadsInCleanup() {
        return getConf().getBoolean(CommitConstants.FS_MAGIC_COMMITTER_ABORT_PENDING_UPLOADS, true);
    }

    @VisibleForTesting
    public void preCommitJob(JobContext jobContext, ActiveCommit activeCommit) throws IOException {
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public void commitJob(JobContext jobContext) throws IOException {
        String jobIdString = CommitUtilsWithMR.jobIdString(jobContext);
        try {
            DurationInfo durationInfo = new DurationInfo(LOG, "%s: commitJob(%s)", getRole(), jobIdString);
            Throwable th = null;
            try {
                try {
                    ActiveCommit loadPendingUploadsToCommit = loadPendingUploadsToCommit(jobContext);
                    preCommitJob(jobContext, loadPendingUploadsToCommit);
                    commitJobInternal(jobContext, loadPendingUploadsToCommit);
                    maybeCreateSuccessMarkerFromCommits(jobContext, loadPendingUploadsToCommit);
                    cleanup(jobContext, false);
                    if (durationInfo != null) {
                        if (0 != 0) {
                            try {
                                durationInfo.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            durationInfo.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } catch (IOException e) {
            LOG.warn("Commit failure for job {}", jobIdString, e);
            abortJobInternal(jobContext, true);
            throw e;
        }
    }

    public void cleanupStagingDirs() {
        Path magicSubdir = MagicCommitPaths.magicSubdir(getOutputPath(), getUUID());
        try {
            JindoCommitUtils.cleanupStagingWithWarning(this.destFS, magicSubdir, true);
        } catch (Exception e) {
            LOG.info("cleanup magic directory:" + magicSubdir.toString(), e);
        }
    }

    public void checkInMemCompatibility(JobContext jobContext) throws IOException {
        if (this.inMemCompatCheckerEnabled) {
            if (this.inMemoryTackerEnabled && this.inMemoryCommitEnabled) {
                if (!JindoCommitUtils.listAndFilter(getDestFS(), getJobAttemptPath(jobContext), true, CommitOperations.PENDING_OR_PENDINGSET_FILTER).isEmpty()) {
                    throw new IOException("Configuration fs.oss.magic.inmem.commit.enable and fs.oss.magic.inmem.tracker.enable is true, but pendingset or pending files are found under job attempt path. There should be multiple versions of jindo committer implement in your environment. Although using unified version is recommended, you can also turn off fs.oss.magic.inmem.commit.enable and fs.oss.magic.inmem.tracker.enable to workaround. ");
                }
            } else if (this.inMemoryCommitEnabled) {
                if (!JindoCommitUtils.listAndFilter(getDestFS(), getJobAttemptPath(jobContext), false, CommitOperations.PENDINGSET_FILTER).isEmpty()) {
                    throw new IOException("Configuration fs.oss.magic.inmem.commit.enable is true, but pendingset files are found under job attempt path. There should be multiple versions of jindo committer implement in your environment. Although using unified version is recommended, you can also turn off fs.oss.magic.inmem.commit.enable to workaround. ");
                }
            } else if (this.inMemoryTackerEnabled && !JindoCommitUtils.listAndFilter(getDestFS(), getJobAttemptPath(jobContext), true, CommitOperations.PENDING_FILTER).isEmpty()) {
                throw new IOException("Configuration fs.oss.magic.inmem.tracker.enable is true, but pending files are found under job attempt path. There should be multiple versions of jindo committer implement in your environment. Although using unified version is recommended, you can also turn off fs.oss.magic.inmem.tracker.enable to workaround");
            }
        }
    }

    protected ActiveCommit loadPendingUploadsToCommit(JobContext jobContext) throws IOException {
        checkInMemCompatibility(jobContext);
        if (this.inMemoryCommitEnabled) {
            return ActiveCommit.fromPendingSet(getDestFS(), this.jobPendingSets);
        }
        FileSystem destFS = getDestFS();
        return ActiveCommit.fromStatusList(destFS, JindoCommitUtils.listAndFilter(destFS, getJobAttemptPath(jobContext), false, CommitOperations.PENDINGSET_FILTER));
    }

    protected void cleanup(JobContext jobContext, boolean z) throws IOException {
        try {
            DurationInfo durationInfo = new DurationInfo(LOG, "Cleanup job %s", CommitUtilsWithMR.jobIdString(jobContext));
            Throwable th = null;
            try {
                try {
                    abortPendingUploadsInCleanup(z);
                    if (durationInfo != null) {
                        if (0 != 0) {
                            try {
                                durationInfo.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            durationInfo.close();
                        }
                    }
                } finally {
                }
            } finally {
            }
        } finally {
            destroyThreadPool();
            cleanupStagingDirs();
        }
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public void cleanupJob(JobContext jobContext) throws IOException {
        String role = getRole();
        String jobIdString = CommitUtilsWithMR.jobIdString(jobContext);
        LOG.warn("{}: using deprecated cleanupJob call for {}", role, jobIdString);
        DurationInfo durationInfo = new DurationInfo(LOG, "%s: cleanup Job %s", role, jobIdString);
        Throwable th = null;
        try {
            try {
                cleanup(jobContext, true);
                if (durationInfo != null) {
                    if (0 == 0) {
                        durationInfo.close();
                        return;
                    }
                    try {
                        durationInfo.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (durationInfo != null) {
                if (th != null) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th4;
        }
    }

    protected CommitOperations getCommitOperations() {
        return this.commitOperations;
    }

    protected String getRole() {
        return this.role;
    }

    protected Tasks.Submitter buildSubmitter(JobContext jobContext) {
        if (getThreadCount(jobContext) > 0) {
            return new PoolSubmitter(jobContext);
        }
        return null;
    }

    private synchronized ExecutorService buildThreadPool(JobContext jobContext, int i) {
        Preconditions.checkArgument(i > 0, "Cannot create a thread pool with no threads");
        if (this.threadPool == null) {
            LOG.debug("{}: creating thread pool of size {}", getRole(), Integer.valueOf(i));
            this.threadPool = HadoopExecutors.newFixedThreadPool(i, new ThreadFactoryBuilder().setDaemon(true).setNameFormat(THREAD_PREFIX + jobContext.getJobID() + "-%d").build());
        }
        return this.threadPool;
    }

    /* JADX INFO: Access modifiers changed from: private */
    public int getThreadCount(JobContext jobContext) {
        return jobContext.getConfiguration().getInt(CommitConstants.FS_MAGIC_COMMITTER_THREADS, 8);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public synchronized Future<?> submitRunnable(JobContext jobContext, Runnable runnable) {
        return buildThreadPool(jobContext, getThreadCount(jobContext)).submit(runnable);
    }

    protected void destroyThreadPool() {
        ExecutorService executorService;
        synchronized (this) {
            executorService = this.threadPool;
            this.threadPool = null;
        }
        if (executorService != null) {
            LOG.debug("Destroying thread pool");
            HadoopExecutors.shutdown(executorService, LOG, 30L, TimeUnit.SECONDS);
        }
    }

    protected final synchronized Tasks.Submitter singleThreadSubmitter() {
        return null;
    }

    public synchronized boolean hasThreadPool() {
        return this.threadPool != null;
    }

    protected void deleteTaskAttemptPathQuietly(TaskAttemptContext taskAttemptContext) {
        Path baseTaskAttemptPath = getBaseTaskAttemptPath(taskAttemptContext);
        try {
            JindoCommitUtils.deleteQuietly(getTaskAttemptFilesystem(taskAttemptContext), baseTaskAttemptPath, true);
        } catch (IOException e) {
            LOG.debug("Failed to delete {}", baseTaskAttemptPath, e);
        }
    }

    protected void abortPendingUploads(JobContext jobContext, List<SinglePendingCommit> list, boolean z) throws IOException {
        if (list == null || list.isEmpty()) {
            LOG.info("{}: no pending commits to abort", getRole());
            return;
        }
        DurationInfo durationInfo = new DurationInfo(LOG, "Aborting %s uploads", Integer.valueOf(list.size()));
        Throwable th = null;
        try {
            Tasks.Builder suppressExceptions = Tasks.foreach(list).executeWith(buildSubmitter(jobContext)).suppressExceptions(z);
            CommitOperations commitOperations = this.commitOperations;
            commitOperations.getClass();
            suppressExceptions.run(commitOperations::abortSingleCommit);
            if (durationInfo != null) {
                if (0 == 0) {
                    durationInfo.close();
                    return;
                }
                try {
                    durationInfo.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
        } catch (Throwable th3) {
            if (durationInfo != null) {
                if (0 != 0) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th3;
        }
    }

    protected void abortPendingUploads(JobContext jobContext, ActiveCommit activeCommit, boolean z, boolean z2) throws IOException {
        if (activeCommit.isEmpty()) {
            LOG.info("{}: no pending commits to abort", getRole());
            return;
        }
        DurationInfo durationInfo = new DurationInfo(LOG, "Aborting %s uploads", Integer.valueOf(activeCommit.size()));
        Throwable th = null;
        try {
            try {
                Tasks.foreach(activeCommit.getSourceFiles()).executeWith(buildSubmitter(jobContext)).suppressExceptions(z).run(fileStatus -> {
                    loadAndAbort(activeCommit, fileStatus, z, z2);
                });
                if (durationInfo != null) {
                    if (0 == 0) {
                        durationInfo.close();
                        return;
                    }
                    try {
                        durationInfo.close();
                    } catch (Throwable th2) {
                        th.addSuppressed(th2);
                    }
                }
            } catch (Throwable th3) {
                th = th3;
                throw th3;
            }
        } catch (Throwable th4) {
            if (durationInfo != null) {
                if (th != null) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th5) {
                        th.addSuppressed(th5);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th4;
        }
    }

    protected void warnOnActiveUploads(Path path) {
        try {
            List asList = Arrays.asList(getCommitOperations().listPendingUploadsUnderPath(path).getObjectUploadInfoList().getUploads());
            if (asList.isEmpty()) {
                return;
            }
            LOG.warn("{} active upload(s) in progress under {}", Integer.valueOf(asList.size()), path);
            LOG.warn("Either jobs are running concurrently or failed jobs are not being cleaned up");
            asList.forEach(jdoObjectUploadInfo -> {
                LOG.info("{}", jdoObjectUploadInfo.getKey());
            });
            if (shouldAbortUploadsInCleanup()) {
                LOG.warn("This committer will abort these uploads in job cleanup");
            }
        } catch (IOException e) {
            LOG.debug("Failed to list uploads under {}", path, e);
        }
    }

    public static Pair<String, JobUUIDSource> buildJobUUID(Configuration configuration, JobID jobID) throws PathCommitException {
        if (!configuration.getBoolean(CommitConstants.FS_COMMITTER_ALLOW_CONCURRENT, true)) {
            return new Pair<>("", JobUUIDSource.WithoutID);
        }
        String trimmed = configuration.getTrimmed(InternalCommitterConstants.FS_OSS_COMMITTER_UUID, "");
        if (!trimmed.isEmpty()) {
            return new Pair<>(trimmed, JobUUIDSource.CommitterUUIDProperty);
        }
        String trimmed2 = configuration.getTrimmed(InternalCommitterConstants.SPARK_WRITE_UUID, "");
        if (!trimmed2.isEmpty()) {
            return new Pair<>(trimmed2, JobUUIDSource.SparkWriteUUID);
        }
        if (configuration.getBoolean(CommitConstants.FS_MAGIC_COMMITTER_REQUIRE_UUID, false)) {
            throw new PathCommitException("", InternalCommitterConstants.E_NO_SPARK_UUID);
        }
        if (!configuration.getBoolean(CommitConstants.FS_MAGIC_COMMITTER_GENERATE_UUID, false)) {
            return new Pair<>(jobID.toString(), JobUUIDSource.JobID);
        }
        String uuid = UUID.randomUUID().toString();
        LOG.warn("No job ID in configuration; generating a random ID: {}", uuid);
        return new Pair<>(uuid, JobUUIDSource.GeneratedLocally);
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext) throws IOException {
        Path taskAttemptPath = getTaskAttemptPath(taskAttemptContext);
        if (this.inMemoryTackerEnabled) {
            if (JindoInMemoryMagicTracker.getTaskAttemptIdToCommitDataMap().containsKey(MagicCommitPaths.extractUniqueMagicId(getTaskAttemptPath(taskAttemptContext)))) {
                return true;
            }
            LOG.info("{} does not write any data, no need to commit", taskAttemptContext.getTaskAttemptID());
            return false;
        }
        DurationInfo durationInfo = new DurationInfo(LOG, "needsTaskCommit task %s", taskAttemptContext.getTaskAttemptID());
        Throwable th = null;
        try {
            try {
                boolean exists = taskAttemptPath.getFileSystem(taskAttemptContext.getConfiguration()).exists(taskAttemptPath);
                if (durationInfo != null) {
                    if (0 != 0) {
                        try {
                            durationInfo.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        durationInfo.close();
                    }
                }
                return exists;
            } finally {
            }
        } catch (Throwable th3) {
            if (durationInfo != null) {
                if (th != null) {
                    try {
                        durationInfo.close();
                    } catch (Throwable th4) {
                        th.addSuppressed(th4);
                    }
                } else {
                    durationInfo.close();
                }
            }
            throw th3;
        }
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public boolean needsTaskCommit(TaskAttemptContext taskAttemptContext, Path path) throws IOException {
        return needsTaskCommit(taskAttemptContext);
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public void commitTask(TaskAttemptContext taskAttemptContext) throws IOException {
        try {
            try {
                DurationInfo durationInfo = new DurationInfo(LOG, "Commit task %s", taskAttemptContext.getTaskAttemptID());
                Throwable th = null;
                try {
                    try {
                        LOG.info("Task {} committed {} files", taskAttemptContext.getTaskAttemptID(), Integer.valueOf(innerCommitTask(taskAttemptContext).size()));
                        if (durationInfo != null) {
                            if (0 != 0) {
                                try {
                                    durationInfo.close();
                                } catch (Throwable th2) {
                                    th.addSuppressed(th2);
                                }
                            } else {
                                durationInfo.close();
                            }
                        }
                    } finally {
                    }
                } catch (Throwable th3) {
                    if (durationInfo != null) {
                        if (th != null) {
                            try {
                                durationInfo.close();
                            } catch (Throwable th4) {
                                th.addSuppressed(th4);
                            }
                        } else {
                            durationInfo.close();
                        }
                    }
                    throw th3;
                }
            } finally {
                deleteTaskAttemptPathQuietly(taskAttemptContext);
            }
        } catch (IOException e) {
            throw e;
        }
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public void commitTask(TaskAttemptContext taskAttemptContext, Path path) throws IOException {
        commitTask(taskAttemptContext);
    }

    /* JADX WARN: Multi-variable type inference failed */
    /* JADX WARN: Type inference failed for: r0v14, types: [java.util.List] */
    private PendingSet innerCommitTask(TaskAttemptContext taskAttemptContext) throws IOException {
        PendingSet pendingSet;
        Path taskAttemptPath = getTaskAttemptPath(taskAttemptContext);
        ArrayList arrayList = new ArrayList(1);
        if (this.inMemoryTackerEnabled) {
            pendingSet = loadPendingSetFromTracker(taskAttemptContext);
        } else {
            Pair<PendingSet, List<Pair<LocatedFileStatus, IOException>>> loadSinglePendingCommits = getCommitOperations().loadSinglePendingCommits(taskAttemptPath, true);
            pendingSet = (PendingSet) loadSinglePendingCommits.getKey();
            arrayList = (List) loadSinglePendingCommits.getValue();
        }
        if (!arrayList.isEmpty()) {
            LOG.error("At least one commit file could not be read: failing");
            abortPendingUploads(taskAttemptContext, pendingSet.getCommits(), true);
            throw ((IOException) ((Pair) arrayList.get(0)).getValue());
        }
        String uuid = getUUID();
        String valueOf = String.valueOf(taskAttemptContext.getTaskAttemptID());
        for (SinglePendingCommit singlePendingCommit : pendingSet.getCommits()) {
            singlePendingCommit.setJobId(uuid);
            singlePendingCommit.setTaskId(valueOf);
        }
        pendingSet.putExtraData(CommitConstants.TASK_ATTEMPT_ID, valueOf);
        pendingSet.setJobId(uuid);
        pendingSet.setTaskId(valueOf);
        if (this.enableTaskMPU) {
            taskMPUWhenCommit(taskAttemptContext, pendingSet);
        } else if (this.inMemoryCommitEnabled) {
            memoryTaskCommit(taskAttemptContext, pendingSet);
        } else {
            fileTaskCommit(taskAttemptContext, pendingSet);
        }
        return pendingSet;
    }

    private void taskMPUWhenCommit(TaskAttemptContext taskAttemptContext, PendingSet pendingSet) throws IOException {
        ArrayList arrayList = new ArrayList(1);
        try {
            DurationInfo durationInfo = new DurationInfo(LOG, "committing the output of %s when commitTask", taskAttemptContext.getTaskAttemptID().toString());
            Throwable th = null;
            try {
                try {
                    boolean run = Tasks.foreach(pendingSet.getCommits()).stopOnFailure().suppressExceptions(false).executeWith(buildSubmitter(getJobContext())).onFailure((singlePendingCommit, exc) -> {
                        arrayList.add(new Pair(singlePendingCommit, exc));
                    }).run(obj -> {
                        this.commitOperations.commitOrFail((SinglePendingCommit) obj);
                    });
                    if (durationInfo != null) {
                        if (0 != 0) {
                            try {
                                durationInfo.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            durationInfo.close();
                        }
                    }
                    if (run) {
                        return;
                    }
                    LOG.error("At least one commit file could not be read: failing");
                    throw ((IOException) ((Pair) arrayList.get(0)).getValue());
                } finally {
                }
            } finally {
            }
        } catch (Exception e) {
            throw new IOException(e);
        }
    }

    private void memoryTaskCommit(TaskAttemptContext taskAttemptContext, PendingSet pendingSet) throws IOException {
        String taskId = pendingSet.getTaskId();
        LOG.info("{}: skip commit {} commits when commitTask.", taskId, Integer.valueOf(pendingSet.getCommits().size()));
        this.taskPendingSet.putIfAbsent(taskId, pendingSet);
    }

    private void fileTaskCommit(TaskAttemptContext taskAttemptContext, PendingSet pendingSet) throws IOException {
        Path jobAttemptPath = getJobAttemptPath((JobContext) taskAttemptContext);
        TaskAttemptID taskAttemptID = taskAttemptContext.getTaskAttemptID();
        Path path = new Path(jobAttemptPath, taskAttemptID.getTaskID().toString() + JindoConstant.PENDINGSET_SUFFIX);
        LOG.info("Saving work of {} to {}", taskAttemptID, path);
        try {
            pendingSet.save(getDestFS(), path, true);
        } catch (IOException e) {
            LOG.warn("Failed to save task commit data to {} ", path, e);
            abortPendingUploads(taskAttemptContext, pendingSet.getCommits(), true);
            throw e;
        }
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public void abortTask(TaskAttemptContext taskAttemptContext) throws IOException {
        Path taskAttemptPath = getTaskAttemptPath(taskAttemptContext);
        try {
            DurationInfo durationInfo = new DurationInfo(LOG, "Abort task %s", taskAttemptContext.getTaskAttemptID());
            Throwable th = null;
            try {
                if (this.inMemoryCommitEnabled) {
                    String taskAttemptID = taskAttemptContext.getTaskAttemptID().toString();
                    if (!this.taskPendingSet.isEmpty()) {
                        Tasks.Builder onFailure = Tasks.foreach(this.taskPendingSet.get(taskAttemptID).getCommits()).suppressExceptions(true).executeWith(buildSubmitter(taskAttemptContext)).onFailure((singlePendingCommit, exc) -> {
                            this.commitOperations.revertCommit(singlePendingCommit);
                        });
                        CommitOperations commitOperations = this.commitOperations;
                        commitOperations.getClass();
                        onFailure.run(commitOperations::revertCommit);
                    }
                    JindoInMemoryMagicTracker.getTaskAttemptIdToCommitDataMap().remove(MagicCommitPaths.extractUniqueMagicId(getTaskAttemptPath(taskAttemptContext)));
                } else {
                    getCommitOperations().abortAllSinglePendingCommits(taskAttemptPath, true);
                }
                if (durationInfo != null) {
                    if (0 != 0) {
                        try {
                            durationInfo.close();
                        } catch (Throwable th2) {
                            th.addSuppressed(th2);
                        }
                    } else {
                        durationInfo.close();
                    }
                }
            } finally {
            }
        } finally {
            JindoCommitUtils.deleteQuietly(taskAttemptPath.getFileSystem(taskAttemptContext.getConfiguration()), taskAttemptPath, true);
        }
    }

    @Override // com.aliyun.jindodata.commit.impl.AbstractJindoFileOutputCommitter
    public void abortTask(TaskAttemptContext taskAttemptContext, Path path) throws IOException {
        abortTask(taskAttemptContext);
    }

    private PendingSet loadPendingSetFromTracker(TaskAttemptContext taskAttemptContext) throws IOException {
        PendingSet pendingSet = new PendingSet();
        String extractUniqueMagicId = MagicCommitPaths.extractUniqueMagicId(getTaskAttemptPath(taskAttemptContext));
        if (!JindoInMemoryMagicTracker.getTaskAttemptIdToCommitDataMap().containsKey(extractUniqueMagicId)) {
            throw new IllegalStateException("task attempt id " + extractUniqueMagicId + " not found in magic tracker");
        }
        List<ObjectCommitData> orDefault = JindoInMemoryMagicTracker.getTaskAttemptIdToCommitDataMap().getOrDefault(extractUniqueMagicId, new ArrayList());
        JindoInMemoryMagicTracker.getTaskAttemptIdToCommitDataMap().remove(extractUniqueMagicId);
        Iterator<ObjectCommitData> it = orDefault.iterator();
        while (it.hasNext()) {
            pendingSet.add(SinglePendingCommit.fromObjectCommitData(it.next()));
        }
        return pendingSet;
    }
}
