package org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.impl;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Random;
import jodd.util.StringPool;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.Column;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.OdpsException;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.TableSchema;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.commons.transport.Connection;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.commons.transport.Response;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.data.ArrayRecord;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.data.Record;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.HttpHeaders;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TableTunnel;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TunnelConstants;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TunnelException;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.impl.SessionBase;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.io.CompressOption;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.io.ProtobufRecordPack;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.io.StreamRecordPackImpl;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.utils.ConnectionWatcher;
import org.apache.paimon.maxcompute.shade.com.google.gson.JsonParser;
import org.apache.paimon.maxcompute.shade.com.google.gson.JsonSyntaxException;
import org.apache.paimon.maxcompute.shade.com.ibm.icu.text.DateFormat;

/* loaded from: input_file:org/apache/paimon/maxcompute/shade/com/aliyun/odps/tunnel/impl/StreamUploadSessionImpl.class */
public class StreamUploadSessionImpl extends StreamSessionBase implements TableTunnel.StreamUploadSession {
    protected Slots slots;
    private boolean p2pMode = false;
    private List<Column> columns;

    /* loaded from: input_file:org/apache/paimon/maxcompute/shade/com/aliyun/odps/tunnel/impl/StreamUploadSessionImpl$Builder.class */
    public static class Builder extends TableTunnel.StreamUploadSession.Builder {
        private String projectName;
        private String tableName;
        private CompressOption compressOption = new CompressOption();
        private boolean p2pMode = false;
        private List<Column> zorderColumns;
        private ConfigurationImpl config;

        public String getProjectName() {
            return this.projectName;
        }

        public Builder setProjectName(String str) {
            this.projectName = str;
            return this;
        }

        public String getTableName() {
            return this.tableName;
        }

        public Builder setTableName(String str) {
            this.tableName = str;
            return this;
        }

        public CompressOption getCompressOption() {
            return this.compressOption;
        }

        public Builder setCompressOption(CompressOption compressOption) {
            this.compressOption = compressOption;
            return this;
        }

        public List<Column> getZorderColumns() {
            return this.zorderColumns;
        }

        public Builder setZorderColumns(List<Column> list) {
            this.zorderColumns = list;
            return this;
        }

        public ConfigurationImpl getConfig() {
            return this.config;
        }

        public Builder setConfig(ConfigurationImpl configurationImpl) {
            this.config = configurationImpl;
            return this;
        }

        @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession.Builder
        public TableTunnel.StreamUploadSession build() throws TunnelException {
            return new StreamUploadSessionImpl(this.config, this.projectName, getSchemaName(), this.tableName, getPartitionSpec(), isCreatePartition(), getSlotNum(), this.zorderColumns);
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:org/apache/paimon/maxcompute/shade/com/aliyun/odps/tunnel/impl/StreamUploadSessionImpl$Slots.class */
    public static class Slots implements Iterable<Slot> {
        private Random rand = new Random();
        private final List<Slot> slots;
        private int curSlotIndex;
        private Iterator<Slot> iter;

        public Slots(final List<Slot> list) throws TunnelException {
            this.slots = list;
            this.curSlotIndex = -1;
            if (this.slots.size() > 0) {
                this.curSlotIndex = this.rand.nextInt(this.slots.size());
            }
            this.iter = new Iterator<Slot>() { // from class: org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.impl.StreamUploadSessionImpl.Slots.1
                @Override // java.util.Iterator
                public boolean hasNext() {
                    return Slots.this.curSlotIndex >= 0;
                }

                /* JADX WARN: Can't rename method to resolve collision */
                @Override // java.util.Iterator
                public synchronized Slot next() {
                    if (!hasNext()) {
                        return null;
                    }
                    if (Slots.this.curSlotIndex >= list.size()) {
                        Slots.this.curSlotIndex = 0;
                    }
                    return (Slot) list.get(Slots.access$008(Slots.this));
                }
            };
        }

        @Override // java.lang.Iterable
        public Iterator<Slot> iterator() {
            return this.iter;
        }

        public int getSlotNum() {
            return this.slots.size();
        }

        static /* synthetic */ int access$008(Slots slots) {
            int i = slots.curSlotIndex;
            slots.curSlotIndex = i + 1;
            return i;
        }
    }

    public StreamUploadSessionImpl(ConfigurationImpl configurationImpl, String str, String str2, String str3, String str4, boolean z, long j, List<Column> list) throws TunnelException {
        this.config = configurationImpl;
        this.projectName = str;
        this.schemaName = str2;
        this.tableName = str3;
        this.partitionSpec = str4;
        this.columns = list;
        this.httpClient = Util.newRestClient(configurationImpl, str);
        initiate(j, z);
    }

    private void initiate(long j, boolean z) throws TunnelException {
        HashMap<String, String> commonParams = getCommonParams();
        if (z) {
            commonParams.put(TunnelConstants.CREATE_PARTITION, "");
        }
        if (this.columns != null && this.columns.size() != 0) {
            commonParams.put(TunnelConstants.ZORDER_COLUMNS, getColumnString());
        }
        HashMap<String, String> commonHeaders = getCommonHeaders();
        if (j > 0) {
            commonHeaders.put(HttpHeaders.HEADER_ODPS_SLOT_NUM, String.valueOf(j));
        }
        SessionBase.HttpResult httpRequest = httpRequest(commonHeaders, commonParams, "POST", "create stream upload session");
        try {
            this.slots = new Slots(loadFromJson(httpRequest.requestId, new JsonParser().parse(httpRequest.body).getAsJsonObject(), false));
        } catch (JsonSyntaxException e) {
            throw new TunnelException(httpRequest.requestId, "Invalid json content: '" + httpRequest.body + StringPool.SINGLE_QUOTE, e);
        }
    }

    private void reload() throws TunnelException {
        HashMap<String, String> commonParams = getCommonParams();
        commonParams.put(TunnelConstants.UPLOADID, this.id);
        SessionBase.HttpResult httpRequest = httpRequest(getCommonHeaders(), commonParams, "GET", "get stream upload session");
        try {
            this.slots = new Slots(loadFromJson(httpRequest.requestId, new JsonParser().parse(httpRequest.body).getAsJsonObject(), true));
        } catch (JsonSyntaxException e) {
            throw new TunnelException(httpRequest.requestId, "Invalid json content: '" + httpRequest.body + StringPool.SINGLE_QUOTE, e);
        }
    }

    public void reloadSlots(Slot slot, String str, int i) throws TunnelException {
        if (this.slots.getSlotNum() != i) {
            reload();
        } else {
            if (slot.getServer().equals(str)) {
                return;
            }
            slot.setServer(str);
        }
    }

    private Connection getConnection(CompressOption compressOption, Slot slot, long j, long j2) throws OdpsException, IOException {
        HashMap hashMap = new HashMap();
        hashMap.put(TunnelConstants.UPLOADID, this.id);
        hashMap.put(TunnelConstants.SLOT_ID, slot.getSlot());
        if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
            hashMap.put("partition", this.partitionSpec);
        }
        if (j2 > 0) {
            hashMap.put(TunnelConstants.RECORD_COUNT, String.valueOf(j2));
        }
        if (this.columns != null && this.columns.size() != 0) {
            hashMap.put(TunnelConstants.ZORDER_COLUMNS, getColumnString());
        }
        HashMap hashMap2 = new HashMap();
        if (j < 0) {
            hashMap2.put("Transfer-Encoding", "chunked");
        } else {
            hashMap2.put("Content-Length", String.valueOf(j));
        }
        hashMap2.put("Content-Type", "application/octet-stream");
        hashMap2.put(HttpHeaders.HEADER_ODPS_TUNNEL_VERSION, String.valueOf(5));
        hashMap2.put(HttpHeaders.HEADER_ODPS_SLOT_NUM, String.valueOf(this.slots.getSlotNum()));
        switch (compressOption.algorithm) {
            case ODPS_RAW:
                break;
            case ODPS_ZLIB:
                hashMap2.put("Content-Encoding", "deflate");
                break;
            case ODPS_SNAPPY:
                hashMap2.put("Content-Encoding", "x-snappy-framed");
                break;
            default:
                throw new TunnelException("unsupported compression option.");
        }
        hashMap2.put(HttpHeaders.HEADER_ODPS_ROUTED_SERVER, slot.getServer());
        if (!this.p2pMode) {
            return this.httpClient.connect(getResource(), "PUT", hashMap, hashMap2);
        }
        try {
            return this.httpClient.connect(getResource(), "PUT", hashMap, hashMap2, new URI(this.httpClient.getEndpoint()).getScheme() + "://" + slot.getIp());
        } catch (URISyntaxException e) {
            throw new TunnelException("Invalid endpoint: " + this.httpClient.getEndpoint());
        }
    }

    public String writeBlock(ProtobufRecordPack protobufRecordPack) throws IOException {
        return writeBlock(protobufRecordPack, 0L);
    }

    public String writeBlock(ProtobufRecordPack protobufRecordPack, long j) throws IOException {
        Connection connection = null;
        try {
            try {
                Slot next = this.slots.iterator().next();
                connection = getConnection(protobufRecordPack.getCompressOption(), next, protobufRecordPack.getTotalBytes(), protobufRecordPack.getSize());
                String sendBlock = sendBlock(protobufRecordPack, connection, next, j);
                if (null != connection) {
                    connection.disconnect();
                }
                return sendBlock;
            } catch (OdpsException e) {
                throw new IOException(e.getMessage(), e);
            }
        } catch (Throwable th) {
            if (null != connection) {
                connection.disconnect();
            }
            throw th;
        }
    }

    private String sendBlock(ProtobufRecordPack protobufRecordPack, Connection connection, Slot slot, long j) throws IOException, TunnelException {
        if (null == connection) {
            throw new IOException("Invalid connection");
        }
        ByteArrayOutputStream protobufStream = protobufRecordPack.getProtobufStream();
        if (j > 0) {
            ConnectionWatcher.getInstance().mark(connection, j);
        }
        try {
            try {
                protobufStream.writeTo(connection.getOutputStream());
                connection.getOutputStream().close();
                protobufStream.close();
                Response response = connection.getResponse();
                if (j > 0) {
                    ConnectionWatcher.getInstance().release(connection);
                }
                if (response.isOK()) {
                    reloadSlots(slot, response.getHeader(HttpHeaders.HEADER_ODPS_ROUTED_SERVER), Integer.valueOf(response.getHeader(HttpHeaders.HEADER_ODPS_SLOT_NUM)).intValue());
                    return response.getHeader("x-odps-request-id");
                }
                TunnelException tunnelException = new TunnelException(response.getHeader("x-odps-request-id"), connection.getInputStream(), Integer.valueOf(response.getStatus()));
                throw new IOException(tunnelException.getMessage(), tunnelException);
            } catch (Throwable th) {
                if (j <= 0 || !ConnectionWatcher.getInstance().checkTimedOut(connection)) {
                    throw th;
                }
                throw new SocketTimeoutException("Flush time exceeded timeout user set: " + j + DateFormat.MINUTE_SECOND);
            }
        } catch (Throwable th2) {
            if (j > 0) {
                ConnectionWatcher.getInstance().release(connection);
            }
            throw th2;
        }
    }

    private String getColumnString() {
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < this.columns.size(); i++) {
            sb.append(this.columns.get(i).getName());
            if (i != this.columns.size() - 1) {
                sb.append(",");
            }
        }
        return sb.toString();
    }

    @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public String getId() {
        return this.id;
    }

    @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public void setP2pMode(boolean z) {
        this.p2pMode = z;
    }

    @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public TableSchema getSchema() {
        return this.schema;
    }

    @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public String getQuotaName() {
        return this.quotaName;
    }

    @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public TableTunnel.StreamRecordPack newRecordPack() throws IOException {
        return new StreamRecordPackImpl(this, new CompressOption(CompressOption.CompressAlgorithm.ODPS_RAW, 0, 0));
    }

    @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public TableTunnel.StreamRecordPack newRecordPack(CompressOption compressOption) throws IOException {
        return new StreamRecordPackImpl(this, compressOption);
    }

    @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TableTunnel.StreamUploadSession
    public Record newRecord() {
        return new ArrayRecord((Column[]) this.schema.getColumns().toArray(new Column[0]));
    }

    public void abort() throws TunnelException {
        HashMap hashMap = new HashMap();
        hashMap.put(TunnelConstants.UPLOADID, this.id);
        if (this.partitionSpec != null && this.partitionSpec.length() > 0) {
            hashMap.put("partition", this.partitionSpec);
        }
        HashMap<String, String> commonHeader = Util.getCommonHeader();
        commonHeader.put(HttpHeaders.HEADER_ODPS_ROUTED_SERVER, this.slots.iterator().next().getServer());
        Connection connection = null;
        String str = null;
        try {
            try {
                connection = this.httpClient.connect(getResource(), "POST", hashMap, commonHeader);
                Response response = connection.getResponse();
                str = response.getHeader("x-odps-request-id");
                if (!response.isOK()) {
                    throw new TunnelException(str, connection.getInputStream(), Integer.valueOf(response.getStatus()));
                }
                if (connection != null) {
                    try {
                        connection.disconnect();
                    } catch (IOException e) {
                    }
                }
            } catch (IOException e2) {
                throw new TunnelException(str, "Failed abort upload session with tunnel endpoint " + this.httpClient.getEndpoint(), e2);
            } catch (TunnelException e3) {
                throw e3;
            } catch (OdpsException e4) {
                throw new TunnelException("not available", e4.getMessage(), e4);
            }
        } catch (Throwable th) {
            if (connection != null) {
                try {
                    connection.disconnect();
                } catch (IOException e5) {
                }
            }
            throw th;
        }
    }
}
