package com.aliyun.odps.tunnel.impl;

import com.aliyun.odps.OdpsType;
import com.aliyun.odps.commons.transport.Request;
import com.aliyun.odps.data.Record;
import com.aliyun.odps.tunnel.HttpHeaders;
import com.aliyun.odps.tunnel.TunnelException;
import com.aliyun.odps.tunnel.TunnelTableSchema;
import com.aliyun.odps.tunnel.hasher.DecimalHashObject;
import com.aliyun.odps.tunnel.hasher.TypeHasher;
import com.aliyun.odps.tunnel.io.Checksum;
import com.aliyun.odps.tunnel.io.CompressOption;
import com.aliyun.odps.tunnel.io.ProtobufRecordPack;
import com.aliyun.odps.tunnel.streams.UpsertStream;
import com.aliyun.odps.type.DecimalTypeInfo;
import com.aliyun.odps.utils.FixedNettyChannelPool;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBufInputStream;
import io.netty.buffer.Unpooled;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.codec.http.DefaultFullHttpRequest;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpMethod;
import io.netty.handler.codec.http.HttpRequest;
import io.netty.handler.codec.http.HttpResponseStatus;
import io.netty.handler.codec.http.HttpVersion;
import io.netty.handler.timeout.ReadTimeoutException;
import io.netty.handler.timeout.ReadTimeoutHandler;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.math.BigDecimal;
import java.net.URI;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

/* loaded from: input_file:com/aliyun/odps/tunnel/impl/UpsertStreamImpl.class */
public class UpsertStreamImpl implements UpsertStream {
    private long maxBufferSize;
    private long slotBufferSize;
    private final CompressOption compressOption;
    private final URI endpoint;
    private final UpsertSessionImpl session;
    private Map<Integer, Slot> buckets;
    private List<Integer> hashKeys;
    private TunnelTableSchema schema;
    private final Bootstrap bootstrap;
    private CountDownLatch latch;
    private FixedNettyChannelPool channelPool;
    private long connectTimeout;
    private long readTimeout;
    private UpsertStream.Listener listener;
    private final Map<Integer, ProtobufRecordPack> bucketBuffer = new HashMap();
    private long totalBufferSize = 0;
    private Status status = Status.NORMAL;

    /* loaded from: input_file:com/aliyun/odps/tunnel/impl/UpsertStreamImpl$Builder.class */
    public static class Builder implements UpsertStream.Builder {
        private UpsertSessionImpl session;
        private long maxBufferSize = 67108864;
        private long slotBufferSize = 1048576;
        private CompressOption compressOption = new CompressOption(CompressOption.CompressAlgorithm.ODPS_LZ4_FRAME, -1, 0);
        private UpsertStream.Listener listener = null;

        public Builder setSession(UpsertSessionImpl upsertSessionImpl) {
            this.session = upsertSessionImpl;
            return this;
        }

        public UpsertSessionImpl getSession() {
            return this.session;
        }

        @Override // com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public long getMaxBufferSize() {
            return this.maxBufferSize;
        }

        @Override // com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public Builder setMaxBufferSize(long j) {
            this.maxBufferSize = j;
            return this;
        }

        @Override // com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public long getSlotBufferSize() {
            return this.slotBufferSize;
        }

        @Override // com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public Builder setSlotBufferSize(long j) {
            this.slotBufferSize = j;
            return this;
        }

        @Override // com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public CompressOption getCompressOption() {
            return this.compressOption;
        }

        @Override // com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public Builder setCompressOption(CompressOption compressOption) {
            this.compressOption = compressOption;
            return this;
        }

        @Override // com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public UpsertStream.Listener getListener() {
            return this.listener;
        }

        @Override // com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public Builder setListener(UpsertStream.Listener listener) {
            this.listener = listener;
            return this;
        }

        @Override // com.aliyun.odps.tunnel.streams.UpsertStream.Builder
        public UpsertStream build() throws IOException, TunnelException {
            return new UpsertStreamImpl(this);
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/aliyun/odps/tunnel/impl/UpsertStreamImpl$FlushResultHandler.class */
    public class FlushResultHandler extends ChannelInboundHandlerAdapter {
        private ProtobufRecordPack pack;
        CountDownLatch latch;
        long start;
        UpsertStream.Listener listener;
        int retry;
        int bucketId;
        private UpsertStream.FlushResult flushResult = new UpsertStream.FlushResult();
        private TunnelException exception = null;

        public UpsertStream.FlushResult getFlushResult() {
            return this.flushResult;
        }

        public TunnelException getException() {
            return this.exception;
        }

        public void setException(TunnelException tunnelException) {
            this.exception = tunnelException;
        }

        FlushResultHandler(ProtobufRecordPack protobufRecordPack, CountDownLatch countDownLatch, UpsertStream.Listener listener, int i, int i2) {
            this.flushResult.recordCount = protobufRecordPack.getSize();
            this.pack = protobufRecordPack;
            this.flushResult.flushSize = protobufRecordPack.getTotalBytes();
            this.latch = countDownLatch;
            this.start = System.currentTimeMillis();
            this.listener = listener;
            this.retry = i;
            this.bucketId = i2;
        }

        public void channelRead(ChannelHandlerContext channelHandlerContext, Object obj) throws Exception {
            FullHttpResponse fullHttpResponse = null;
            try {
                try {
                    FullHttpResponse fullHttpResponse2 = (FullHttpResponse) obj;
                    this.flushResult.traceId = fullHttpResponse2.headers().get("x-odps-request-id");
                    if (fullHttpResponse2.status().equals(HttpResponseStatus.OK)) {
                        this.flushResult.flushTime = System.currentTimeMillis() - this.start;
                        this.pack.reset();
                        if (this.listener != null) {
                            try {
                                this.listener.onFlush(this.flushResult);
                            } catch (Exception e) {
                            }
                        }
                    } else {
                        ByteBufInputStream byteBufInputStream = new ByteBufInputStream(fullHttpResponse2.content());
                        Throwable th = null;
                        try {
                            try {
                                this.exception = new TunnelException(this.flushResult.traceId, (InputStream) byteBufInputStream, Integer.valueOf(fullHttpResponse2.status().code()));
                                if (fullHttpResponse2.status().code() == 308) {
                                    if (fullHttpResponse2.headers().contains(HttpHeaders.HEADER_ODPS_ROUTED_SERVER)) {
                                        UpsertStreamImpl.this.session.updateBuckets(this.bucketId, fullHttpResponse2.headers().get(HttpHeaders.HEADER_ODPS_ROUTED_SERVER));
                                    } else {
                                        UpsertStreamImpl.this.session.updateBuckets(this.bucketId, null);
                                    }
                                    UpsertStreamImpl.this.buckets = UpsertStreamImpl.this.session.getBuckets();
                                }
                                if (byteBufInputStream != null) {
                                    if (0 != 0) {
                                        try {
                                            byteBufInputStream.close();
                                        } catch (Throwable th2) {
                                            th.addSuppressed(th2);
                                        }
                                    } else {
                                        byteBufInputStream.close();
                                    }
                                }
                            } catch (Throwable th3) {
                                th = th3;
                                throw th3;
                            }
                        } catch (Throwable th4) {
                            if (byteBufInputStream != null) {
                                if (th != null) {
                                    try {
                                        byteBufInputStream.close();
                                    } catch (Throwable th5) {
                                        th.addSuppressed(th5);
                                    }
                                } else {
                                    byteBufInputStream.close();
                                }
                            }
                            throw th4;
                        }
                    }
                    this.latch.countDown();
                    if (fullHttpResponse2 != null) {
                        fullHttpResponse2.release();
                    }
                    UpsertStreamImpl.this.channelPool.release(channelHandlerContext.channel());
                    channelHandlerContext.close();
                } catch (Exception e2) {
                    this.exception = new TunnelException(e2.getMessage(), e2);
                    this.latch.countDown();
                    if (0 != 0) {
                        fullHttpResponse.release();
                    }
                    UpsertStreamImpl.this.channelPool.release(channelHandlerContext.channel());
                    channelHandlerContext.close();
                }
            } catch (Throwable th6) {
                this.latch.countDown();
                if (0 != 0) {
                    fullHttpResponse.release();
                }
                UpsertStreamImpl.this.channelPool.release(channelHandlerContext.channel());
                channelHandlerContext.close();
                throw th6;
            }
        }

        public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable th) {
            if (th instanceof ReadTimeoutException) {
                this.exception = new TunnelException("Flush time out, cannot get response from server");
            } else {
                this.exception = new TunnelException(th.getMessage(), th);
            }
            this.latch.countDown();
            UpsertStreamImpl.this.channelPool.release(channelHandlerContext.channel());
            channelHandlerContext.close();
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/aliyun/odps/tunnel/impl/UpsertStreamImpl$Operation.class */
    public enum Operation {
        UPSERT,
        DELETE
    }

    /* JADX INFO: Access modifiers changed from: private */
    /* loaded from: input_file:com/aliyun/odps/tunnel/impl/UpsertStreamImpl$Status.class */
    public enum Status {
        NORMAL,
        ERROR,
        CLOSED
    }

    public UpsertStreamImpl(Builder builder) throws IOException, TunnelException {
        this.hashKeys = new ArrayList();
        this.listener = null;
        this.compressOption = builder.getCompressOption();
        this.slotBufferSize = builder.getSlotBufferSize();
        this.maxBufferSize = builder.getMaxBufferSize();
        this.session = builder.session;
        this.endpoint = this.session.getEndpoint();
        this.buckets = this.session.getBuckets();
        this.schema = this.session.getRecordSchema();
        this.hashKeys = this.session.getHashKeys();
        this.bootstrap = this.session.getBootstrap();
        this.channelPool = this.session.getChannelPool();
        this.connectTimeout = this.session.getConnectTimeout();
        this.readTimeout = this.session.getReadTimeout();
        this.listener = builder.getListener();
        newBucketBuffer();
    }

    private void newBucketBuffer() throws IOException {
        Iterator<Integer> it = this.buckets.keySet().iterator();
        while (it.hasNext()) {
            this.bucketBuffer.put(it.next(), new ProtobufRecordPack(this.schema, new Checksum(), 0, this.compressOption));
        }
    }

    @Override // com.aliyun.odps.tunnel.streams.UpsertStream
    public void upsert(Record record) throws IOException, TunnelException {
        write(record, Operation.UPSERT, null);
    }

    @Override // com.aliyun.odps.tunnel.streams.UpsertStream
    public void upsert(Record record, List<String> list) throws IOException, TunnelException {
        if (list != null && !list.isEmpty() && !this.session.supportPartialUpdate()) {
            throw new TunnelException("Table " + this.session.tableName + " do not support partial update, consider set table properties 'acid.partial.fields.update.enable=true'");
        }
        if (list != null && !list.isEmpty()) {
            Set set = (Set) this.schema.getColumns().stream().map((v0) -> {
                return v0.getName();
            }).collect(Collectors.toSet());
            list.forEach(str -> {
                if (!set.contains(str)) {
                    throw new IllegalArgumentException("Invalid column name:" + str);
                }
            });
        }
        write(record, Operation.UPSERT, list);
    }

    @Override // com.aliyun.odps.tunnel.streams.UpsertStream
    public void delete(Record record) throws IOException, TunnelException {
        write(record, Operation.DELETE, null);
    }

    @Override // com.aliyun.odps.tunnel.streams.UpsertStream
    public void flush() throws IOException, TunnelException {
        flush(true);
    }

    @Override // com.aliyun.odps.tunnel.streams.UpsertStream
    public void close() throws IOException, TunnelException {
        if (this.status == Status.NORMAL) {
            flush();
            this.status = Status.CLOSED;
        }
    }

    @Override // com.aliyun.odps.tunnel.streams.UpsertStream
    public void reset() throws IOException {
        if (!this.bucketBuffer.isEmpty()) {
            Iterator<ProtobufRecordPack> it = this.bucketBuffer.values().iterator();
            while (it.hasNext()) {
                it.next().reset();
            }
        }
        this.totalBufferSize = 0L;
        this.status = Status.NORMAL;
    }

    private void write(Record record, Operation operation, List<String> list) throws TunnelException, IOException {
        checkStatus();
        ArrayList arrayList = new ArrayList();
        Iterator<Integer> it = this.hashKeys.iterator();
        while (it.hasNext()) {
            int intValue = it.next().intValue();
            Object obj = record.get(intValue);
            if (obj == null) {
                throw new TunnelException(" UpsertRecord must have primary key value, consider provide values for column '" + this.schema.getColumn(intValue).getName() + "'");
            }
            DecimalTypeInfo typeInfo = this.schema.getColumn(intValue).getTypeInfo();
            if (typeInfo.getOdpsType() == OdpsType.DECIMAL) {
                DecimalTypeInfo decimalTypeInfo = typeInfo;
                obj = new DecimalHashObject((BigDecimal) obj, decimalTypeInfo.getPrecision(), decimalTypeInfo.getScale());
            }
            arrayList.add(Integer.valueOf(TypeHasher.hash(typeInfo.getOdpsType(), obj, this.session.getHasher())));
        }
        int CombineHashVal = TypeHasher.CombineHashVal(arrayList) % this.buckets.size();
        if (!this.bucketBuffer.containsKey(Integer.valueOf(CombineHashVal))) {
            throw new TunnelException("Tunnel internal error! Do not have bucket for hash key " + CombineHashVal);
        }
        ProtobufRecordPack protobufRecordPack = this.bucketBuffer.get(Integer.valueOf(CombineHashVal));
        UpsertRecord upsertRecord = (UpsertRecord) record;
        upsertRecord.setOperation(operation == Operation.UPSERT ? (byte) 85 : (byte) 68);
        ArrayList arrayList2 = new ArrayList();
        if (list != null) {
            Iterator<String> it2 = list.iterator();
            while (it2.hasNext()) {
                arrayList2.add(Long.valueOf(this.schema.getColumnId(it2.next())));
            }
        }
        upsertRecord.setValueCols(arrayList2);
        long totalBytes = protobufRecordPack.getTotalBytes();
        protobufRecordPack.append(upsertRecord.getRecord());
        this.totalBufferSize += protobufRecordPack.getTotalBytes() - totalBytes;
        if (protobufRecordPack.getTotalBytes() > this.slotBufferSize) {
            flush(false);
        } else if (this.totalBufferSize > this.maxBufferSize) {
            flush(true);
        }
    }

    private void flush(boolean z) throws TunnelException, IOException {
        boolean z2;
        ArrayList<FlushResultHandler> arrayList = new ArrayList();
        int i = 0;
        Map<Integer, Slot> buckets = this.session.getBuckets();
        if (buckets.size() != this.buckets.size()) {
            throw new TunnelException("session slot map is changed");
        }
        this.buckets = buckets;
        do {
            z2 = true;
            arrayList.clear();
            try {
                checkStatus();
                this.latch = new CountDownLatch(this.bucketBuffer.size());
                for (Map.Entry<Integer, ProtobufRecordPack> entry : this.bucketBuffer.entrySet()) {
                    ProtobufRecordPack value = entry.getValue();
                    if (value.getSize() <= 0) {
                        this.latch.countDown();
                    } else if (value.getTotalBytes() > this.slotBufferSize || z) {
                        int intValue = entry.getKey().intValue();
                        long totalBytes = value.getTotalBytes();
                        value.checkTransConsistency(false);
                        value.complete();
                        long totalBytes2 = value.getTotalBytes() - totalBytes;
                        if (!z) {
                            this.totalBufferSize += totalBytes2;
                        }
                        Request buildRequest = this.session.buildRequest("PUT", intValue, this.buckets.get(Integer.valueOf(intValue)), value.getTotalBytes(), value.getSize(), this.compressOption);
                        Channel acquire = this.channelPool.acquire();
                        ChannelHandler flushResultHandler = new FlushResultHandler(value, this.latch, this.listener, i, intValue);
                        acquire.pipeline().addLast(new ChannelHandler[]{flushResultHandler});
                        arrayList.add(flushResultHandler);
                        acquire.writeAndFlush(buildFullHttpRequest(buildRequest, value.getProtobufStream())).addListener(channelFuture -> {
                            if (channelFuture.isSuccess()) {
                                channelFuture.channel().pipeline().addFirst(new ChannelHandler[]{new ReadTimeoutHandler(this.readTimeout, TimeUnit.MILLISECONDS)});
                                return;
                            }
                            this.latch.countDown();
                            this.channelPool.release(channelFuture.channel());
                            flushResultHandler.setException(new TunnelException("Connect : " + channelFuture.cause().getMessage(), channelFuture.cause()));
                            channelFuture.channel().close();
                        });
                    } else {
                        this.latch.countDown();
                    }
                }
                this.latch.await();
                for (FlushResultHandler flushResultHandler2 : arrayList) {
                    if (flushResultHandler2.getException() != null) {
                        z2 = false;
                        if (this.listener == null) {
                            TunnelException tunnelException = new TunnelException(flushResultHandler2.getException().getErrorMsg(), flushResultHandler2.getException());
                            tunnelException.setRequestId(flushResultHandler2.getException().getRequestId());
                            tunnelException.setErrorCode(flushResultHandler2.getException().getErrorCode());
                            throw tunnelException;
                        }
                        if (!this.listener.onFlushFail(flushResultHandler2.getException(), i)) {
                            this.status = Status.ERROR;
                            TunnelException tunnelException2 = new TunnelException(flushResultHandler2.getException().getErrorMsg(), flushResultHandler2.getException());
                            tunnelException2.setRequestId(flushResultHandler2.getException().getRequestId());
                            tunnelException2.setErrorCode(flushResultHandler2.getException().getErrorCode());
                            throw tunnelException2;
                        }
                    } else if (!z) {
                        this.totalBufferSize -= flushResultHandler2.getFlushResult().flushSize;
                    }
                }
                i++;
            } catch (InterruptedException e) {
                throw new TunnelException("flush interrupted", e);
            }
        } while (!z2);
        if (z) {
            this.totalBufferSize = 0L;
        }
    }

    private void checkStatus() throws TunnelException {
        if (Status.CLOSED == this.status) {
            throw new TunnelException("Stream is closed!");
        }
        if (Status.ERROR == this.status) {
            throw new TunnelException("Stream has error!");
        }
    }

    private HttpRequest buildFullHttpRequest(Request request, ByteArrayOutputStream byteArrayOutputStream) {
        DefaultFullHttpRequest defaultFullHttpRequest = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.PUT, request.getURI().toString().replace(this.endpoint.toString(), ""), Unpooled.wrappedBuffer(byteArrayOutputStream.toByteArray()));
        request.getHeaders().forEach((str, str2) -> {
            defaultFullHttpRequest.headers().set(str, str2);
        });
        defaultFullHttpRequest.headers().set(HttpHeaderNames.HOST, request.getURI().getHost());
        return defaultFullHttpRequest;
    }
}
