package org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.impl;

import com.aliyun.jindodata.api.FSExceptionMessages;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.commons.transport.Request;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.data.Record;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TunnelException;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.TunnelTableSchema;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.hasher.TypeHasher;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.io.Checksum;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.io.CompressOption;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.io.ProtobufRecordPack;
import org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream;
import org.apache.paimon.maxcompute.shade.io.netty.bootstrap.Bootstrap;
import org.apache.paimon.maxcompute.shade.io.netty.buffer.ByteBufInputStream;
import org.apache.paimon.maxcompute.shade.io.netty.buffer.Unpooled;
import org.apache.paimon.maxcompute.shade.io.netty.channel.Channel;
import org.apache.paimon.maxcompute.shade.io.netty.channel.ChannelHandlerContext;
import org.apache.paimon.maxcompute.shade.io.netty.channel.ChannelInboundHandlerAdapter;
import org.apache.paimon.maxcompute.shade.io.netty.handler.codec.http.DefaultFullHttpRequest;
import org.apache.paimon.maxcompute.shade.io.netty.handler.codec.http.FullHttpResponse;
import org.apache.paimon.maxcompute.shade.io.netty.handler.codec.http.HttpHeaderNames;
import org.apache.paimon.maxcompute.shade.io.netty.handler.codec.http.HttpMethod;
import org.apache.paimon.maxcompute.shade.io.netty.handler.codec.http.HttpRequest;
import org.apache.paimon.maxcompute.shade.io.netty.handler.codec.http.HttpResponseStatus;
import org.apache.paimon.maxcompute.shade.io.netty.handler.codec.http.HttpVersion;
import org.apache.paimon.maxcompute.shade.org.apache.commons.io.FileUtils;

/* loaded from: input_file:org/apache/paimon/maxcompute/shade/com/aliyun/odps/tunnel/impl/UpsertStreamImpl.class */
public class UpsertStreamImpl implements UpsertStream {
    private long maxBufferSize;
    private long slotBufferSize;
    private final CompressOption compressOption;
    private final URI endpoint;
    private final UpsertSessionImpl session;
    private Map<Integer, Slot> buckets;
    private List<Integer> hashKeys;
    private TunnelTableSchema schema;
    private final Bootstrap bootstrap;
    private CountDownLatch latch;
    private UpsertStream.Listener listener;
    private final Map<Integer, ProtobufRecordPack> bucketBuffer = new HashMap();
    private long totalBufferSize = 0;
    private Status status = Status.NORMAL;

    /* loaded from: input_file:org/apache/paimon/maxcompute/shade/com/aliyun/odps/tunnel/impl/UpsertStreamImpl$Builder.class */
    public static class Builder implements UpsertStream.Builder {
        private UpsertSessionImpl session;
        private long maxBufferSize = 67108864;
        private long slotBufferSize = FileUtils.ONE_MB;
        private CompressOption compressOption = new CompressOption();
        private UpsertStream.Listener listener = null;

        public Builder setSession(UpsertSessionImpl upsertSessionImpl) {
            this.session = upsertSessionImpl;
            return this;
        }

        public UpsertSessionImpl getSession() {
            return this.session;
        }

        @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public long getMaxBufferSize() {
            return this.maxBufferSize;
        }

        @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public Builder setMaxBufferSize(long j) {
            this.maxBufferSize = j;
            return this;
        }

        @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public long getSlotBufferSize() {
            return this.slotBufferSize;
        }

        @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public Builder setSlotBufferSize(long j) {
            this.slotBufferSize = j;
            return this;
        }

        @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public CompressOption getCompressOption() {
            return this.compressOption;
        }

        @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public Builder setCompressOption(CompressOption compressOption) {
            this.compressOption = compressOption;
            return this;
        }

        @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public UpsertStream.Listener getListener() {
            return this.listener;
        }

        @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public Builder setListener(UpsertStream.Listener listener) {
            this.listener = listener;
            return this;
        }

        @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public UpsertStream build() throws IOException, TunnelException {
            return new UpsertStreamImpl(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/maxcompute/shade/com/aliyun/odps/tunnel/impl/UpsertStreamImpl$FlushResultHandler.class */
    public class FlushResultHandler extends ChannelInboundHandlerAdapter {
        private ProtobufRecordPack pack;
        CountDownLatch latch;
        long start;
        UpsertStream.Listener listener;
        int retry;
        private UpsertStream.FlushResult flushResult = new UpsertStream.FlushResult();
        private TunnelException exception = null;
        boolean needRetry = false;

        public UpsertStream.FlushResult getFlushResult() {
            return this.flushResult;
        }

        public TunnelException getException() {
            return this.exception;
        }

        public boolean isNeedRetry() {
            return this.needRetry;
        }

        FlushResultHandler(ProtobufRecordPack protobufRecordPack, CountDownLatch countDownLatch, UpsertStream.Listener listener, int i) {
            this.flushResult.recordCount = protobufRecordPack.getSize();
            this.pack = protobufRecordPack;
            this.flushResult.flushSize = protobufRecordPack.getTotalBytes();
            this.latch = countDownLatch;
            this.start = System.currentTimeMillis();
            this.listener = listener;
            this.retry = i;
        }

        @Override // org.apache.paimon.maxcompute.shade.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.paimon.maxcompute.shade.io.netty.channel.ChannelInboundHandler
        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            try {
                try {
                    FullHttpResponse fullHttpResponse = (FullHttpResponse) obj;
                    this.flushResult.traceId = fullHttpResponse.headers().get("x-odps-request-id");
                    if (fullHttpResponse.status() == HttpResponseStatus.OK) {
                        this.pack.reset();
                        if (this.listener != null) {
                            try {
                                this.listener.onFlush(this.flushResult);
                            } catch (Exception e) {
                            }
                        }
                    } else {
                        this.exception = new TunnelException(this.flushResult.traceId, new ByteBufInputStream(fullHttpResponse.content()), Integer.valueOf(fullHttpResponse.status().code()));
                        if (this.listener != null) {
                            try {
                                this.listener.onFlushFail(this.exception.getMessage(), this.retry);
                            } catch (Exception e2) {
                            }
                        }
                    }
                    this.latch.countDown();
                    channelHandlerContext.close();
                    this.flushResult.flushTime = System.currentTimeMillis() - this.start;
                } catch (Exception e3) {
                    this.exception = new TunnelException(e3.getMessage(), e3);
                    try {
                        this.needRetry = this.listener.onFlushFail(e3.getMessage(), this.retry);
                    } catch (Exception e4) {
                    }
                    this.latch.countDown();
                    channelHandlerContext.close();
                    this.flushResult.flushTime = System.currentTimeMillis() - this.start;
                }
            } catch (Throwable th) {
                this.latch.countDown();
                channelHandlerContext.close();
                this.flushResult.flushTime = System.currentTimeMillis() - this.start;
                throw th;
            }
        }

        @Override // org.apache.paimon.maxcompute.shade.io.netty.channel.ChannelInboundHandlerAdapter, org.apache.paimon.maxcompute.shade.io.netty.channel.ChannelHandlerAdapter, org.apache.paimon.maxcompute.shade.io.netty.channel.ChannelHandler, org.apache.paimon.maxcompute.shade.io.netty.channel.ChannelInboundHandler
        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            this.exception = new TunnelException(th.getMessage(), th);
            this.latch.countDown();
            channelHandlerContext.close();
            this.flushResult.flushTime = System.currentTimeMillis() - this.start;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/maxcompute/shade/com/aliyun/odps/tunnel/impl/UpsertStreamImpl$Operation.class */
    public enum Operation {
        UPSERT,
        DELETE
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:org/apache/paimon/maxcompute/shade/com/aliyun/odps/tunnel/impl/UpsertStreamImpl$Status.class */
    public enum Status {
        NORMAL,
        ERROR,
        CLOSED
    }

    public UpsertStreamImpl(Builder builder) throws IOException, TunnelException {
        this.hashKeys = new ArrayList();
        this.listener = null;
        this.compressOption = builder.getCompressOption();
        this.slotBufferSize = builder.getSlotBufferSize();
        this.maxBufferSize = builder.getMaxBufferSize();
        this.session = builder.session;
        this.endpoint = this.session.getEndpoint();
        this.buckets = this.session.getBuckets();
        this.schema = this.session.getRecordSchema();
        Iterator<Integer> it = this.buckets.keySet().iterator();
        while (it.hasNext()) {
            this.bucketBuffer.put(it.next(), new ProtobufRecordPack(this.schema, new Checksum(), 0, new CompressOption()));
        }
        this.hashKeys = this.session.getHashKeys();
        this.bootstrap = this.session.getBootstrap();
        this.listener = builder.getListener();
    }

    @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream
    public void upsert(Record record) throws IOException, TunnelException {
        write(record, Operation.UPSERT, null);
    }

    @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream
    public void delete(Record record) throws IOException, TunnelException {
        write(record, Operation.DELETE, null);
    }

    @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream
    public void flush() throws IOException, TunnelException {
        flush(true);
    }

    @Override // org.apache.paimon.maxcompute.shade.com.aliyun.odps.tunnel.streams.UpsertStream
    public void close() throws IOException, TunnelException {
        if (this.status == Status.NORMAL) {
            flush();
            this.status = Status.CLOSED;
        }
    }

    private void write(Record record, Operation operation, List<String> list) throws TunnelException, IOException {
        List<Integer> list2;
        checkStatus();
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = this.hashKeys.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            Object obj = record.get(intValue);
            if (obj == null) {
                throw new TunnelException("Hash key " + intValue + " can not be null!");
            }
            arrayList.add(Integer.valueOf(TypeHasher.getHasher(this.schema.getColumn(intValue).getTypeInfo().getTypeName().toLowerCase(), this.session.getHasher()).hash(obj)));
        }
        int CombineHashVal = TypeHasher.CombineHashVal(arrayList) % this.buckets.size();
        if (!this.bucketBuffer.containsKey(Integer.valueOf(CombineHashVal))) {
            throw new TunnelException("Tunnel internal error! Do not have bucket for hash key " + CombineHashVal);
        }
        ProtobufRecordPack protobufRecordPack = this.bucketBuffer.get(Integer.valueOf(CombineHashVal));
        UpsertRecord upsertRecord = (UpsertRecord) record;
        upsertRecord.setOperation(operation == Operation.UPSERT ? (byte) 85 : (byte) 68);
        if (list == null) {
            list2 = new ArrayList<>();
        } else {
            Stream<String> stream = list.stream();
            TunnelTableSchema tunnelTableSchema = this.schema;
            tunnelTableSchema.getClass();
            list2 = (List) stream.map(tunnelTableSchema::getColumnIndex).collect(Collectors.toList());
        }
        upsertRecord.setValueCols(list2);
        long totalBytes = protobufRecordPack.getTotalBytes();
        protobufRecordPack.append(upsertRecord.getRecord());
        this.totalBufferSize += protobufRecordPack.getTotalBytes() - totalBytes;
        if (protobufRecordPack.getTotalBytes() > this.slotBufferSize) {
            flush(false);
        } else if (this.totalBufferSize > this.maxBufferSize) {
            flush(true);
        }
    }

    /* JADX WARN: Type inference failed for: r0v81, types: [org.apache.paimon.maxcompute.shade.io.netty.channel.ChannelFuture] */
    private void flush(boolean z) throws TunnelException, IOException {
        boolean z2;
        ArrayList<FlushResultHandler> arrayList = new ArrayList();
        int i = 0;
        Map<Integer, Slot> buckets = this.session.getBuckets();
        if (buckets.size() != this.buckets.size()) {
            throw new TunnelException("session slot map is changed");
        }
        this.buckets = buckets;
        do {
            z2 = true;
            arrayList.clear();
            try {
                checkStatus();
                this.latch = new CountDownLatch(this.bucketBuffer.size());
                for (Map.Entry<Integer, ProtobufRecordPack> entry : this.bucketBuffer.entrySet()) {
                    ProtobufRecordPack value = entry.getValue();
                    if (value.getSize() <= 0) {
                        this.latch.countDown();
                    } else if (value.getTotalBytes() > this.slotBufferSize || z) {
                        int intValue = entry.getKey().intValue();
                        long totalBytes = value.getTotalBytes();
                        value.checkTransConsistency(false);
                        value.complete();
                        long totalBytes2 = value.getTotalBytes() - totalBytes;
                        if (!z) {
                            this.totalBufferSize += totalBytes2;
                        }
                        Request buildRequest = this.session.buildRequest("PUT", intValue, this.buckets.get(Integer.valueOf(intValue)), value.getTotalBytes(), value.getSize(), this.compressOption);
                        String host = buildRequest.getURI().getHost();
                        int port = buildRequest.getURI().getPort();
                        if (port == -1) {
                            port = buildRequest.getURI().getScheme().equalsIgnoreCase("https") ? 443 : 80;
                        }
                        FlushResultHandler flushResultHandler = new FlushResultHandler(value, this.latch, this.listener, i);
                        Channel channel = this.bootstrap.connect(host, port).sync2().channel();
                        channel.pipeline().addLast(flushResultHandler);
                        arrayList.add(flushResultHandler);
                        channel.writeAndFlush(buildFullHttpRequest(buildRequest, value.getProtobufStream()));
                    } else {
                        this.latch.countDown();
                    }
                }
                this.latch.await();
                for (FlushResultHandler flushResultHandler2 : arrayList) {
                    if (flushResultHandler2.getException() != null) {
                        z2 = false;
                        if (!flushResultHandler2.isNeedRetry()) {
                            this.status = Status.ERROR;
                            throw flushResultHandler2.getException();
                        }
                    } else if (!z) {
                        this.totalBufferSize -= flushResultHandler2.getFlushResult().flushSize;
                    }
                }
                i++;
            } catch (InterruptedException e) {
                throw new TunnelException("flush interrupted", e);
            }
        } while (!z2);
        if (z) {
            this.totalBufferSize = 0L;
        }
    }

    private void checkStatus() throws TunnelException {
        if (Status.CLOSED == this.status) {
            throw new TunnelException(FSExceptionMessages.STREAM_IS_CLOSED);
        }
        if (Status.ERROR == this.status) {
            throw new TunnelException("Stream has error!");
        }
    }

    private HttpRequest buildFullHttpRequest(Request request, ByteArrayOutputStream byteArrayOutputStream) {
        DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, request.getURI().toString().replace(this.endpoint.toString(), ""), Unpooled.wrappedBuffer(byteArrayOutputStream.toByteArray()));
        request.getHeaders().forEach((str, str2) -> {
            defaultFullHttpRequest.headers().set(str, (Object) str2);
        });
        defaultFullHttpRequest.headers().set(HttpHeaderNames.HOST, request.getURI().getHost());
        return defaultFullHttpRequest;
    }
}
