package org.apache.doris.flink.lookup;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.TreeSet;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.doris.flink.cfg.DorisLookupOptions;
import org.apache.doris.flink.cfg.DorisOptions;
import org.apache.doris.flink.connection.JdbcConnectionProvider;
import org.apache.doris.flink.connection.SimpleJdbcConnectionProvider;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.apache.doris.shaded.org.apache.arrow.vector.util.DateUtility;
import org.apache.flink.annotation.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:org/apache/doris/flink/lookup/Worker.class */
public class Worker implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger((Class<?>) Worker.class);
    private final String name;
    private final AtomicBoolean started;
    private final JdbcConnectionProvider jdbcConnectionProvider;
    private final int maxRetryTimes;
    private ArrayBlockingQueue<GetAction> queue = new ArrayBlockingQueue<>(1);
    private AtomicReference<Throwable> exception = new AtomicReference<>(null);

    public Worker(AtomicBoolean atomicBoolean, DorisOptions dorisOptions, DorisLookupOptions dorisLookupOptions, int i) {
        this.started = atomicBoolean;
        this.name = "Worker-" + i;
        this.jdbcConnectionProvider = new SimpleJdbcConnectionProvider(dorisOptions);
        this.maxRetryTimes = dorisLookupOptions.getMaxRetryTimes();
    }

    public boolean offer(GetAction getAction) {
        if (this.exception.get() != null) {
            throw new DorisRuntimeException(this.exception.get());
        }
        return this.queue.offer(getAction);
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.info("worker:{} start", this);
        while (this.started.get()) {
            try {
                GetAction poll = this.queue.poll(2000L, TimeUnit.MILLISECONDS);
                if (poll != null) {
                    try {
                        handle(poll);
                        if (poll.getSemaphore() != null) {
                            poll.getSemaphore().release();
                        }
                    } finally {
                    }
                }
            } catch (Exception e) {
                LOG.error("worker running error", (Throwable) e);
                this.exception.set(e);
            }
        }
        LOG.info("worker:{} stop", this);
        this.jdbcConnectionProvider.closeConnection();
    }

    private void handle(GetAction getAction) {
        if (getAction.getGetList().size() <= 0) {
            return;
        }
        LookupSchema schema = getAction.getGetList().get(0).getRecord().getSchema();
        List<Get> getList = getAction.getGetList();
        List<Get> deduplicateRecords = deduplicateRecords(getList);
        LOG.debug("record size {}, after deduplicate size {}", Integer.valueOf(getList.size()), Integer.valueOf(deduplicateRecords.size()));
        StringBuilder sb = new StringBuilder();
        for (int i = 0; i < deduplicateRecords.size(); i++) {
            if (i > 0) {
                sb.append(" union all ");
            }
            boolean z = true;
            appendSelect(sb, schema);
            sb.append(" where ( ");
            for (String str : schema.getConditionFields()) {
                if (!z) {
                    sb.append(" and ");
                }
                z = false;
                sb.append(quoteIdentifier(str)).append("=?");
            }
            sb.append(" ) ");
        }
        try {
            Map<RecordKey, List<Record>> executeQuery = executeQuery(sb.toString(), deduplicateRecords, schema);
            for (Get get : getList) {
                Record record = get.getRecord();
                if (get.getFuture() != null) {
                    get.getFuture().complete(executeQuery.get(new RecordKey(record)));
                }
            }
        } catch (Exception e) {
            for (Get get2 : getList) {
                if (get2.getFuture() != null && !get2.getFuture().isDone()) {
                    get2.getFuture().completeExceptionally(e);
                }
            }
        }
    }

    @VisibleForTesting
    public static List<Get> deduplicateRecords(List<Get> list) {
        if (list == null || list.size() <= 1) {
            return list;
        }
        TreeSet treeSet = new TreeSet((get, get2) -> {
            return Arrays.equals(get.getRecord().getValues(), get2.getRecord().getValues()) ? 0 : -1;
        });
        treeSet.addAll(list);
        return new ArrayList(treeSet);
    }

    private void appendSelect(StringBuilder sb, LookupSchema lookupSchema) {
        String[] selectFields = lookupSchema.getSelectFields();
        sb.append("select ");
        for (int i = 0; i < selectFields.length; i++) {
            if (i > 0) {
                sb.append(",");
            }
            sb.append(quoteIdentifier(selectFields[i]));
        }
        sb.append(" from ").append(lookupSchema.getTableIdentifier());
    }

    /* JADX WARN: Finally extract failed */
    private Map<RecordKey, List<Record>> executeQuery(String str, List<Get> list, LookupSchema lookupSchema) {
        HashMap hashMap = new HashMap();
        for (int i = 0; i <= this.maxRetryTimes; i++) {
            hashMap = new HashMap();
            try {
                long currentTimeMillis = System.currentTimeMillis();
                PreparedStatement prepareStatement = this.jdbcConnectionProvider.getOrEstablishConnection().prepareStatement(str);
                Throwable th = null;
                try {
                    int i2 = 0;
                    Iterator<Get> it = list.iterator();
                    while (it.hasNext()) {
                        Record record = it.next().getRecord();
                        for (int i3 : lookupSchema.getKeyIndex()) {
                            i2++;
                            prepareStatement.setObject(i2, record.getObject(i3));
                        }
                    }
                    ResultSet executeQuery = prepareStatement.executeQuery();
                    Throwable th2 = null;
                    while (executeQuery.next()) {
                        try {
                            try {
                                Record record2 = new Record(lookupSchema);
                                for (int i4 = 0; i4 < lookupSchema.getFieldTypes().length; i4++) {
                                    record2.setObject(i4, executeQuery.getObject(i4 + 1));
                                }
                                ((List) hashMap.computeIfAbsent(new RecordKey(record2), recordKey -> {
                                    return new ArrayList();
                                })).add(record2);
                            } finally {
                            }
                        } catch (Throwable th3) {
                            if (executeQuery != null) {
                                if (th2 != null) {
                                    try {
                                        executeQuery.close();
                                    } catch (Throwable th4) {
                                        th2.addSuppressed(th4);
                                    }
                                } else {
                                    executeQuery.close();
                                }
                            }
                            throw th3;
                        }
                    }
                    if (executeQuery != null) {
                        if (0 != 0) {
                            try {
                                executeQuery.close();
                            } catch (Throwable th5) {
                                th2.addSuppressed(th5);
                            }
                        } else {
                            executeQuery.close();
                        }
                    }
                    if (prepareStatement != null) {
                        if (0 != 0) {
                            try {
                                prepareStatement.close();
                            } catch (Throwable th6) {
                                th.addSuppressed(th6);
                            }
                        } else {
                            prepareStatement.close();
                        }
                    }
                    LOG.debug("query cost {}ms, batch {} records, sql is {}", Long.valueOf(System.currentTimeMillis() - currentTimeMillis), Integer.valueOf(list.size()), str);
                    return hashMap;
                } catch (Throwable th7) {
                    if (prepareStatement != null) {
                        if (0 != 0) {
                            try {
                                prepareStatement.close();
                            } catch (Throwable th8) {
                                th.addSuppressed(th8);
                            }
                        } else {
                            prepareStatement.close();
                        }
                    }
                    throw th7;
                }
            } catch (Exception e) {
                LOG.error(String.format("query doris error, retry times = %d", Integer.valueOf(i)), (Throwable) e);
                if (i >= this.maxRetryTimes) {
                    throw new RuntimeException(e);
                }
                try {
                    Thread.sleep(DateUtility.secondsToMillis * i);
                } catch (InterruptedException e2) {
                    throw new RuntimeException(e2);
                }
            }
        }
        return hashMap;
    }

    public String quoteIdentifier(String str) {
        return "`" + str + "`";
    }

    public String toString() {
        return this.name;
    }
}
