/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.operators;

import java.io.Closeable;
import java.io.OutputStream;
import java.io.Serializable;
import java.util.Collection;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import org.apache.commons.io.IOUtils;
import org.apache.commons.math3.stat.descriptive.DescriptiveStatistics;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.KeyedStateStore;
import org.apache.flink.api.common.state.OperatorStateStore;
import org.apache.flink.api.common.state.State;
import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultKeyedStateStore;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.KeyGroupsList;
import org.apache.flink.runtime.state.KeyGroupsStateHandle;
import org.apache.flink.runtime.state.KeyedStateBackend;
import org.apache.flink.runtime.state.KeyedStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateHandle;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateInitializationContextImpl;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator;
import org.apache.flink.streaming.api.operators.HeapInternalTimerService;
import org.apache.flink.streaming.api.operators.InternalTimerService;
import org.apache.flink.streaming.api.operators.KeyContext;
import org.apache.flink.streaming.api.operators.OperatorSnapshotResult;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
import org.apache.flink.streaming.api.operators.Triggerable;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@PublicEvolving
public abstract class AbstractStreamOperator<OUT>
implements StreamOperator<OUT>,
Serializable,
KeyContext {
    private static final long serialVersionUID = 1L;
    protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperator.class);
    protected ChainingStrategy chainingStrategy = ChainingStrategy.HEAD;
    private transient StreamTask<?, ?> container;
    protected transient StreamConfig config;
    protected transient Output<StreamRecord<OUT>> output;
    private transient StreamingRuntimeContext runtimeContext;
    private transient KeySelector<?, ?> stateKeySelector1;
    private transient KeySelector<?, ?> stateKeySelector2;
    private transient AbstractKeyedStateBackend<?> keyedStateBackend;
    private transient DefaultKeyedStateStore keyedStateStore;
    private transient OperatorStateBackend operatorStateBackend;
    protected MetricGroup metrics;
    protected LatencyGauge latencyGauge;
    private transient Map<String, HeapInternalTimerService<?, ?>> timerServices;
    private long combinedWatermark = Long.MIN_VALUE;
    private long input1Watermark = Long.MIN_VALUE;
    private long input2Watermark = Long.MIN_VALUE;

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<OUT>> output) {
        Configuration taskManagerConfig;
        int historySize;
        this.container = containingTask;
        this.config = config;
        this.metrics = this.container.getEnvironment().getMetricGroup().addOperator(config.getOperatorName());
        this.output = new CountingOutput(output, ((OperatorMetricGroup)this.metrics).getIOMetricGroup().getNumRecordsOutCounter());
        if (config.isChainStart()) {
            ((OperatorMetricGroup)this.metrics).getIOMetricGroup().reuseInputMetricsForTask();
        }
        if (config.isChainEnd()) {
            ((OperatorMetricGroup)this.metrics).getIOMetricGroup().reuseOutputMetricsForTask();
        }
        if ((historySize = (taskManagerConfig = this.container.getEnvironment().getTaskManagerInfo().getConfiguration()).getInteger("metrics.latency.history-size", 128)) <= 0) {
            LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", (Object)"metrics.latency.history-size", (Object)historySize);
            historySize = 128;
        }
        this.latencyGauge = (LatencyGauge)this.metrics.gauge("latency", (Gauge)new LatencyGauge(historySize));
        this.runtimeContext = new StreamingRuntimeContext(this, this.container.getEnvironment(), this.container.getAccumulatorMap());
        this.stateKeySelector1 = config.getStatePartitioner(0, this.getUserCodeClassloader());
        this.stateKeySelector2 = config.getStatePartitioner(1, this.getUserCodeClassloader());
    }

    @Override
    public MetricGroup getMetricGroup() {
        return this.metrics;
    }

    @Override
    public final void initializeState(OperatorStateHandles stateHandles) throws Exception {
        Collection<KeyGroupsStateHandle> keyedStateHandlesRaw = null;
        Collection<OperatorStateHandle> operatorStateHandlesRaw = null;
        Collection<OperatorStateHandle> operatorStateHandlesBackend = null;
        boolean restoring = null != stateHandles;
        this.initKeyedState();
        if (restoring) {
            this.restoreStreamCheckpointed(stateHandles);
            operatorStateHandlesBackend = stateHandles.getManagedOperatorState();
            operatorStateHandlesRaw = stateHandles.getRawOperatorState();
            if (null != this.getKeyedStateBackend()) {
                keyedStateHandlesRaw = stateHandles.getRawKeyedState();
            }
        }
        this.initOperatorState(operatorStateHandlesBackend);
        StateInitializationContextImpl initializationContext = new StateInitializationContextImpl(restoring, (OperatorStateStore)this.operatorStateBackend, (KeyedStateStore)this.keyedStateStore, keyedStateHandlesRaw, operatorStateHandlesRaw, this.getContainingTask().getCancelables());
        this.initializeState((StateInitializationContext)initializationContext);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Deprecated
    private void restoreStreamCheckpointed(OperatorStateHandles stateHandles) throws Exception {
        StreamStateHandle state = stateHandles.getLegacyOperatorState();
        if (null != state) {
            if (this instanceof CheckpointedRestoringOperator) {
                LOG.debug("Restore state of task {} in chain ({}).", (Object)stateHandles.getOperatorChainIndex(), (Object)this.getContainingTask().getName());
                FSDataInputStream is = state.openInputStream();
                try {
                    this.getContainingTask().getCancelables().registerClosable((Closeable)is);
                    ((CheckpointedRestoringOperator)((Object)this)).restoreState(is);
                }
                finally {
                    this.getContainingTask().getCancelables().unregisterClosable((Closeable)is);
                    is.close();
                }
            } else {
                throw new Exception("Found legacy operator state for operator that does not implement StreamCheckpointedOperator.");
            }
        }
    }

    @Override
    public void open() throws Exception {
        if (this.timerServices == null) {
            this.timerServices = new HashMap();
        }
    }

    private void initKeyedState() {
        try {
            TypeSerializer keySerializer = this.config.getStateKeySerializer(this.getUserCodeClassloader());
            if (null != keySerializer) {
                KeyGroupRange subTaskKeyGroupRange = KeyGroupRangeAssignment.computeKeyGroupRangeForOperatorIndex((int)this.container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(), (int)this.container.getEnvironment().getTaskInfo().getNumberOfParallelSubtasks(), (int)this.container.getEnvironment().getTaskInfo().getIndexOfThisSubtask());
                this.keyedStateBackend = this.container.createKeyedStateBackend(keySerializer, this.container.getEnvironment().getTaskInfo().getNumberOfKeyGroups(), subTaskKeyGroupRange);
                this.keyedStateStore = new DefaultKeyedStateStore(this.keyedStateBackend, this.getExecutionConfig());
            }
        }
        catch (Exception e) {
            throw new IllegalStateException("Could not initialize keyed state backend.", e);
        }
    }

    private void initOperatorState(Collection<OperatorStateHandle> operatorStateHandles) {
        try {
            this.operatorStateBackend = this.container.createOperatorStateBackend(this, operatorStateHandles);
        }
        catch (Exception e) {
            throw new IllegalStateException("Could not initialize operator state backend.", e);
        }
    }

    @Override
    public void close() throws Exception {
    }

    @Override
    public void dispose() throws Exception {
        if (this.operatorStateBackend != null) {
            IOUtils.closeQuietly((Closeable)this.operatorStateBackend);
            this.operatorStateBackend.dispose();
        }
        if (this.keyedStateBackend != null) {
            IOUtils.closeQuietly(this.keyedStateBackend);
            this.keyedStateBackend.dispose();
        }
    }

    @Override
    public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointStreamFactory streamFactory) throws Exception {
        KeyGroupRange keyGroupRange = null != this.keyedStateBackend ? this.keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
        OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
        try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(checkpointId, timestamp, streamFactory, keyGroupRange, this.getContainingTask().getCancelables());){
            this.snapshotState((StateSnapshotContext)snapshotContext);
            snapshotInProgress.setKeyedStateRawFuture(snapshotContext.getKeyedStateStreamFuture());
            snapshotInProgress.setOperatorStateRawFuture(snapshotContext.getOperatorStateStreamFuture());
            if (null != this.operatorStateBackend) {
                snapshotInProgress.setOperatorStateManagedFuture(this.operatorStateBackend.snapshot(checkpointId, timestamp, streamFactory));
            }
            if (null != this.keyedStateBackend) {
                snapshotInProgress.setKeyedStateManagedFuture(this.keyedStateBackend.snapshot(checkpointId, timestamp, streamFactory));
            }
        }
        catch (Exception snapshotException) {
            try {
                snapshotInProgress.cancel();
            }
            catch (Exception e) {
                snapshotException.addSuppressed(e);
            }
            throw new Exception("Could not complete snapshot " + checkpointId + " for operator " + this.getOperatorName() + '.', snapshotException);
        }
        return snapshotInProgress;
    }

    public void snapshotState(StateSnapshotContext context) throws Exception {
        if (this.getKeyedStateBackend() != null) {
            KeyedStateCheckpointOutputStream out;
            try {
                out = context.getRawKeyedOperatorStateOutput();
            }
            catch (Exception exception) {
                throw new Exception("Could not open raw keyed operator state stream for " + this.getOperatorName() + '.', exception);
            }
            try {
                KeyGroupsList allKeyGroups = out.getKeyGroupList();
                Iterator i$ = allKeyGroups.iterator();
                while (i$.hasNext()) {
                    int keyGroupIdx = (Integer)i$.next();
                    out.startNewKeyGroup(keyGroupIdx);
                    DataOutputViewStreamWrapper dov = new DataOutputViewStreamWrapper((OutputStream)out);
                    dov.writeInt(this.timerServices.size());
                    for (Map.Entry<String, HeapInternalTimerService<?, ?>> entry : this.timerServices.entrySet()) {
                        String serviceName = entry.getKey();
                        HeapInternalTimerService<?, ?> timerService = entry.getValue();
                        dov.writeUTF(serviceName);
                        timerService.snapshotTimersForKeyGroup(dov, keyGroupIdx);
                    }
                }
            }
            catch (Exception exception) {
                throw new Exception("Could not write timer service of " + this.getOperatorName() + " to checkpoint state stream.", exception);
            }
            finally {
                try {
                    out.close();
                }
                catch (Exception closeException) {
                    LOG.warn("Could not close raw keyed operator state stream for {}. This might have prevented deleting some state data.", (Object)this.getOperatorName(), (Object)closeException);
                }
            }
        }
    }

    public void initializeState(StateInitializationContext context) throws Exception {
        if (this.getKeyedStateBackend() != null) {
            int totalKeyGroups = this.getKeyedStateBackend().getNumberOfKeyGroups();
            KeyGroupsList localKeyGroupRange = this.getKeyedStateBackend().getKeyGroupRange();
            this.timerServices = new HashMap();
            for (KeyGroupStatePartitionStreamProvider streamProvider : context.getRawKeyedStateInputs()) {
                DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(streamProvider.getStream());
                int keyGroupIdx = streamProvider.getKeyGroupId();
                Preconditions.checkArgument((boolean)localKeyGroupRange.contains(keyGroupIdx), (Object)("Key Group " + keyGroupIdx + " does not belong to the local range."));
                int noOfTimerServices = div.readInt();
                for (int i = 0; i < noOfTimerServices; ++i) {
                    String serviceName = div.readUTF();
                    HeapInternalTimerService<Object, Object> timerService = this.timerServices.get(serviceName);
                    if (timerService == null) {
                        timerService = new HeapInternalTimerService(totalKeyGroups, localKeyGroupRange, this, this.getRuntimeContext().getProcessingTimeService());
                        this.timerServices.put(serviceName, timerService);
                    }
                    timerService.restoreTimersForKeyGroup(div, keyGroupIdx, this.getUserCodeClassloader());
                }
            }
        }
    }

    @Override
    public void notifyOfCompletedCheckpoint(long checkpointId) throws Exception {
    }

    public ExecutionConfig getExecutionConfig() {
        return this.container.getExecutionConfig();
    }

    public StreamConfig getOperatorConfig() {
        return this.config;
    }

    public StreamTask<?, ?> getContainingTask() {
        return this.container;
    }

    public ClassLoader getUserCodeClassloader() {
        return this.container.getUserCodeClassLoader();
    }

    protected String getOperatorName() {
        if (this.runtimeContext != null) {
            return this.runtimeContext.getTaskNameWithSubtasks();
        }
        return this.getClass().getSimpleName();
    }

    public StreamingRuntimeContext getRuntimeContext() {
        return this.runtimeContext;
    }

    public <K> KeyedStateBackend<K> getKeyedStateBackend() {
        return this.keyedStateBackend;
    }

    public OperatorStateBackend getOperatorStateBackend() {
        return this.operatorStateBackend;
    }

    protected ProcessingTimeService getProcessingTimeService() {
        return this.container.getProcessingTimeService();
    }

    protected <S extends State> S getPartitionedState(StateDescriptor<S, ?> stateDescriptor) throws Exception {
        return this.getPartitionedState((Object)VoidNamespace.INSTANCE, (TypeSerializer)VoidNamespaceSerializer.INSTANCE, stateDescriptor);
    }

    protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception {
        if (this.keyedStateStore != null) {
            return (S)this.keyedStateBackend.getPartitionedState(namespace, namespaceSerializer, stateDescriptor);
        }
        throw new RuntimeException("Cannot create partitioned state. The keyed state backend has not been set. This indicates that the operator is not partitioned/keyed.");
    }

    @Override
    public void setKeyContextElement1(StreamRecord record) throws Exception {
        this.setKeyContextElement(record, this.stateKeySelector1);
    }

    @Override
    public void setKeyContextElement2(StreamRecord record) throws Exception {
        this.setKeyContextElement(record, this.stateKeySelector2);
    }

    private <T> void setKeyContextElement(StreamRecord<T> record, KeySelector<T, ?> selector) throws Exception {
        if (selector != null) {
            Object key = selector.getKey(record.getValue());
            this.setCurrentKey(key);
        }
    }

    @Override
    public void setCurrentKey(Object key) {
        if (this.keyedStateBackend != null) {
            try {
                AbstractKeyedStateBackend<?> rawBackend = this.keyedStateBackend;
                rawBackend.setCurrentKey(key);
            }
            catch (Exception e) {
                throw new RuntimeException("Exception occurred while setting the current key context.", e);
            }
        }
    }

    @Override
    public Object getCurrentKey() {
        if (this.keyedStateBackend != null) {
            return this.keyedStateBackend.getCurrentKey();
        }
        throw new UnsupportedOperationException("Key can only be retrieven on KeyedStream.");
    }

    public KeyedStateStore getKeyedStateStore() {
        return this.keyedStateStore;
    }

    @Override
    public final void setChainingStrategy(ChainingStrategy strategy) {
        this.chainingStrategy = strategy;
    }

    @Override
    public final ChainingStrategy getChainingStrategy() {
        return this.chainingStrategy;
    }

    public void processLatencyMarker(LatencyMarker latencyMarker) throws Exception {
        this.reportOrForwardLatencyMarker(latencyMarker);
    }

    public void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception {
        this.reportOrForwardLatencyMarker(latencyMarker);
    }

    public void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception {
        this.reportOrForwardLatencyMarker(latencyMarker);
    }

    protected void reportOrForwardLatencyMarker(LatencyMarker marker) {
        this.latencyGauge.reportLatency(marker, false);
        this.output.emitLatencyMarker(marker);
    }

    public <N> InternalTimerService<N> getInternalTimerService(String name, TypeSerializer<N> namespaceSerializer, Triggerable<?, N> triggerable) {
        if (this.getKeyedStateBackend() == null) {
            throw new UnsupportedOperationException("Timers can only be used on keyed operators.");
        }
        HeapInternalTimerService<Object, Object> timerService = this.timerServices.get(name);
        if (timerService == null) {
            timerService = new HeapInternalTimerService(this.getKeyedStateBackend().getNumberOfKeyGroups(), this.getKeyedStateBackend().getKeyGroupRange(), this, this.getRuntimeContext().getProcessingTimeService());
            this.timerServices.put(name, timerService);
        }
        Triggerable<?, N> rawTriggerable = triggerable;
        timerService.startTimerService(this.getKeyedStateBackend().getKeySerializer(), namespaceSerializer, rawTriggerable);
        return timerService;
    }

    public void processWatermark(Watermark mark) throws Exception {
        for (HeapInternalTimerService<?, ?> service : this.timerServices.values()) {
            service.advanceWatermark(mark.getTimestamp());
        }
        this.output.emitWatermark(mark);
    }

    public void processWatermark1(Watermark mark) throws Exception {
        this.input1Watermark = mark.getTimestamp();
        long newMin = Math.min(this.input1Watermark, this.input2Watermark);
        if (newMin > this.combinedWatermark) {
            this.combinedWatermark = newMin;
            this.processWatermark(new Watermark(this.combinedWatermark));
        }
    }

    public void processWatermark2(Watermark mark) throws Exception {
        this.input2Watermark = mark.getTimestamp();
        long newMin = Math.min(this.input1Watermark, this.input2Watermark);
        if (newMin > this.combinedWatermark) {
            this.combinedWatermark = newMin;
            this.processWatermark(new Watermark(this.combinedWatermark));
        }
    }

    @VisibleForTesting
    public int numProcessingTimeTimers() {
        int count = 0;
        for (HeapInternalTimerService<?, ?> timerService : this.timerServices.values()) {
            count += timerService.numProcessingTimeTimers();
        }
        return count;
    }

    @VisibleForTesting
    public int numEventTimeTimers() {
        int count = 0;
        for (HeapInternalTimerService<?, ?> timerService : this.timerServices.values()) {
            count += timerService.numEventTimeTimers();
        }
        return count;
    }

    public class CountingOutput
    implements Output<StreamRecord<OUT>> {
        private final Output<StreamRecord<OUT>> output;
        private final Counter numRecordsOut;

        public CountingOutput(Output<StreamRecord<OUT>> output, Counter counter) {
            this.output = output;
            this.numRecordsOut = counter;
        }

        @Override
        public void emitWatermark(Watermark mark) {
            this.output.emitWatermark(mark);
        }

        @Override
        public void emitLatencyMarker(LatencyMarker latencyMarker) {
            this.output.emitLatencyMarker(latencyMarker);
        }

        public void collect(StreamRecord<OUT> record) {
            this.numRecordsOut.inc();
            this.output.collect(record);
        }

        public void close() {
            this.output.close();
        }
    }

    private static class LatencySourceDescriptor {
        private final int vertexID;
        private final int subtaskIndex;

        public static LatencySourceDescriptor of(LatencyMarker marker, boolean ignoreSubtaskIndex) {
            if (ignoreSubtaskIndex) {
                return new LatencySourceDescriptor(marker.getVertexID(), -1);
            }
            return new LatencySourceDescriptor(marker.getVertexID(), marker.getSubtaskIndex());
        }

        private LatencySourceDescriptor(int vertexID, int subtaskIndex) {
            this.vertexID = vertexID;
            this.subtaskIndex = subtaskIndex;
        }

        public boolean equals(Object o) {
            if (this == o) {
                return true;
            }
            if (o == null || this.getClass() != o.getClass()) {
                return false;
            }
            LatencySourceDescriptor that = (LatencySourceDescriptor)o;
            if (this.vertexID != that.vertexID) {
                return false;
            }
            return this.subtaskIndex == that.subtaskIndex;
        }

        public int hashCode() {
            int result = this.vertexID;
            result = 31 * result + this.subtaskIndex;
            return result;
        }

        public String toString() {
            return "LatencySourceDescriptor{vertexID=" + this.vertexID + ", subtaskIndex=" + this.subtaskIndex + '}';
        }
    }

    protected static class LatencyGauge
    implements Gauge<Map<String, HashMap<String, Double>>> {
        private final Map<LatencySourceDescriptor, DescriptiveStatistics> latencyStats = new HashMap<LatencySourceDescriptor, DescriptiveStatistics>();
        private final int historySize;

        LatencyGauge(int historySize) {
            this.historySize = historySize;
        }

        public void reportLatency(LatencyMarker marker, boolean isSink) {
            LatencySourceDescriptor sourceDescriptor = LatencySourceDescriptor.of(marker, !isSink);
            DescriptiveStatistics sourceStats = this.latencyStats.get(sourceDescriptor);
            if (sourceStats == null) {
                sourceStats = new DescriptiveStatistics(this.historySize);
                this.latencyStats.put(sourceDescriptor, sourceStats);
            }
            long now = System.currentTimeMillis();
            sourceStats.addValue((double)(now - marker.getMarkedTime()));
        }

        public Map<String, HashMap<String, Double>> getValue() {
            while (true) {
                try {
                    HashMap<String, HashMap<String, Double>> ret = new HashMap<String, HashMap<String, Double>>();
                    for (Map.Entry<LatencySourceDescriptor, DescriptiveStatistics> source : this.latencyStats.entrySet()) {
                        HashMap<String, Double> sourceStatistics = new HashMap<String, Double>(6);
                        sourceStatistics.put("max", source.getValue().getMax());
                        sourceStatistics.put("mean", source.getValue().getMean());
                        sourceStatistics.put("min", source.getValue().getMin());
                        sourceStatistics.put("p50", source.getValue().getPercentile(50.0));
                        sourceStatistics.put("p95", source.getValue().getPercentile(95.0));
                        sourceStatistics.put("p99", source.getValue().getPercentile(99.0));
                        ret.put(source.getKey().toString(), sourceStatistics);
                    }
                    return ret;
                }
                catch (ConcurrentModificationException ignore) {
                    LOG.debug("Unable to report latency statistics", (Throwable)ignore);
                    continue;
                }
                break;
            }
        }
    }
}

