/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.record;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.network.Send;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.network.TransferableChannel;
import org.apache.flink.cdc.connectors.shaded.org.apache.kafka.common.record.BaseRecords;

public abstract class RecordsSend<T extends BaseRecords>
implements Send {
    private static final ByteBuffer EMPTY_BYTE_BUFFER = ByteBuffer.allocate(0);
    private final T records;
    private final int maxBytesToWrite;
    private int remaining;
    private boolean pending = false;

    protected RecordsSend(T records, int maxBytesToWrite) {
        this.records = records;
        this.maxBytesToWrite = maxBytesToWrite;
        this.remaining = maxBytesToWrite;
    }

    @Override
    public boolean completed() {
        return this.remaining <= 0 && !this.pending;
    }

    @Override
    public final long writeTo(TransferableChannel channel) throws IOException {
        long written = 0L;
        if (this.remaining > 0) {
            written = this.writeTo(channel, this.size() - (long)this.remaining, this.remaining);
            if (written < 0L) {
                throw new EOFException("Wrote negative bytes to channel. This shouldn't happen.");
            }
            this.remaining = (int)((long)this.remaining - written);
        }
        this.pending = channel.hasPendingWrites();
        if (this.remaining <= 0 && this.pending) {
            channel.write(EMPTY_BYTE_BUFFER);
        }
        return written;
    }

    @Override
    public long size() {
        return this.maxBytesToWrite;
    }

    protected T records() {
        return this.records;
    }

    protected abstract long writeTo(TransferableChannel var1, long var2, int var4) throws IOException;
}

