/*
 * Decompiled with CFR 0.152.
 */
package com.aliyun.datahub.client.example.examples;

import com.aliyun.datahub.client.DatahubClient;
import com.aliyun.datahub.client.DatahubClientBuilder;
import com.aliyun.datahub.client.auth.AliyunAccount;
import com.aliyun.datahub.client.common.DatahubConfig;
import com.aliyun.datahub.client.exception.AuthorizationFailureException;
import com.aliyun.datahub.client.exception.DatahubClientException;
import com.aliyun.datahub.client.exception.InvalidParameterException;
import com.aliyun.datahub.client.exception.LimitExceededException;
import com.aliyun.datahub.client.exception.ResourceNotFoundException;
import com.aliyun.datahub.client.exception.SeekOutOfRangeException;
import com.aliyun.datahub.client.exception.ShardSealedException;
import com.aliyun.datahub.client.model.CursorType;
import com.aliyun.datahub.client.model.Field;
import com.aliyun.datahub.client.model.GetRecordsResult;
import com.aliyun.datahub.client.model.ListShardResult;
import com.aliyun.datahub.client.model.RecordEntry;
import com.aliyun.datahub.client.model.RecordSchema;
import com.aliyun.datahub.client.model.ShardEntry;
import com.aliyun.datahub.client.model.TupleRecordData;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

class ReaderTuple {
    private DatahubClient datahubClient = DatahubClientBuilder.newBuilder().setDatahubConfig(new DatahubConfig("", new AliyunAccount("", ""), true)).build();
    private RecordSchema schema;
    private Map<String, Thread> mThread = new HashMap<String, Thread>();

    public void freshThread() {
        try {
            ListShardResult listShardResult = this.datahubClient.listShard("", "");
            ArrayList<ReadThread> list = new ArrayList<ReadThread>();
            for (ShardEntry shardEntry : listShardResult.getShards()) {
                if (this.mThread.containsKey(shardEntry.getShardId())) continue;
                ReadThread thread2 = new ReadThread(3, shardEntry.getShardId(), this.schema);
                this.mThread.put(shardEntry.getShardId(), thread2);
                list.add(thread2);
                thread2.start();
            }
            for (Thread thread2 : list) {
                thread2.join();
            }
        }
        catch (DatahubClientException e) {
            e.printStackTrace();
            throw e;
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    public void getRecords() {
        try {
            this.schema = this.datahubClient.getTopic("", "").getRecordSchema();
            ListShardResult listShardResult = this.datahubClient.listShard("", "");
            for (ShardEntry entry : listShardResult.getShards()) {
                ReadThread thread2 = new ReadThread(3, entry.getShardId(), this.schema);
                this.mThread.put(entry.getShardId(), thread2);
                thread2.start();
            }
            for (Thread thread3 : this.mThread.values()) {
                thread3.join();
            }
        }
        catch (DatahubClientException e) {
            e.printStackTrace();
            throw e;
        }
        catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    class ReadThread
    extends Thread {
        private int maxRetry;
        private int recordLimit = 1000;
        private int noDataSleepTimeMs = 5000;
        private String shardId;
        private String cursor;
        private RecordSchema schema;

        private void init() {
            try {
                this.cursor = ReaderTuple.this.datahubClient.getCursor("", "", this.shardId, CursorType.OLDEST).getCursor();
            }
            catch (DatahubClientException e) {
                e.printStackTrace();
                throw e;
            }
        }

        private void handleRecords(List<RecordEntry> records) {
            for (RecordEntry re : records) {
                TupleRecordData data = (TupleRecordData)re.getRecordData();
                String res = this.shardId + "\t";
                for (Field field : this.schema.getFields()) {
                    res = res + field.getName() + ":" + data.getField(field.getName()) + "\t";
                }
                System.out.println(res);
            }
        }

        public ReadThread(int maxRetry, String shardId, RecordSchema schema) {
            this.shardId = shardId;
            this.maxRetry = maxRetry;
            this.schema = schema;
            this.init();
        }

        @Override
        public void run() {
            int retryNum = 0;
            while (retryNum < this.maxRetry) {
                try {
                    GetRecordsResult result = ReaderTuple.this.datahubClient.getRecords("", "", this.shardId, this.schema, this.cursor, this.recordLimit);
                    if (result.getRecordCount() <= 0) {
                        System.out.printf("%s no data, sleep %d seconds\n", this.shardId, this.noDataSleepTimeMs);
                        Thread.sleep(this.noDataSleepTimeMs);
                        continue;
                    }
                    this.handleRecords(result.getRecords());
                    this.cursor = result.getNextCursor();
                    retryNum = 0;
                }
                catch (InvalidParameterException e) {
                    e.printStackTrace();
                    throw e;
                }
                catch (AuthorizationFailureException e) {
                    e.printStackTrace();
                    throw e;
                }
                catch (ResourceNotFoundException e) {
                    e.printStackTrace();
                    throw e;
                }
                catch (SeekOutOfRangeException e) {
                    e.printStackTrace();
                    throw e;
                }
                catch (ShardSealedException e) {
                    System.out.printf("shard %s all data has been read\n", this.shardId);
                    ReaderTuple.this.freshThread();
                    break;
                }
                catch (LimitExceededException e) {
                    e.printStackTrace();
                    ++retryNum;
                }
                catch (DatahubClientException e) {
                    e.printStackTrace();
                    ++retryNum;
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                    System.exit(-1);
                }
            }
        }
    }
}

