/*
 * Decompiled with CFR 0.152.
 */
package org.elasticsearch.transport;

import java.io.Closeable;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.common.AsyncBiFunction;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.metrics.CounterMetric;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.ConnectionProfile;
import org.elasticsearch.transport.TcpChannel;

final class TransportKeepAlive
implements Closeable {
    static final int PING_DATA_SIZE = -1;
    private static final BytesReference PING_MESSAGE;
    private final Logger logger = LogManager.getLogger(TransportKeepAlive.class);
    private final CounterMetric successfulPings = new CounterMetric();
    private final CounterMetric failedPings = new CounterMetric();
    private final ConcurrentMap<TimeValue, ScheduledPing> pingIntervals = ConcurrentCollections.newConcurrentMap();
    private final ThreadPool threadPool;
    private final AsyncBiFunction<TcpChannel, BytesReference, Void> pingSender;
    private volatile boolean isClosed;

    TransportKeepAlive(ThreadPool threadPool, AsyncBiFunction<TcpChannel, BytesReference, Void> pingSender) {
        this.threadPool = threadPool;
        this.pingSender = pingSender;
    }

    void registerNodeConnection(List<TcpChannel> nodeChannels, ConnectionProfile connectionProfile) {
        TimeValue pingInterval = connectionProfile.getPingInterval();
        if (pingInterval.millis() < 0L) {
            return;
        }
        ScheduledPing scheduledPing = this.pingIntervals.computeIfAbsent(pingInterval, x$0 -> new ScheduledPing((TimeValue)x$0));
        scheduledPing.ensureStarted();
        for (TcpChannel channel : nodeChannels) {
            scheduledPing.addChannel(channel);
            channel.addCloseListener(ActionListener.running(() -> scheduledPing.removeChannel(channel)));
        }
    }

    void receiveKeepAlive(TcpChannel channel) {
        if (channel.isServerChannel()) {
            this.sendPing(channel);
        }
    }

    long successfulPingCount() {
        return this.successfulPings.count();
    }

    long failedPingCount() {
        return this.failedPings.count();
    }

    private void sendPing(final TcpChannel channel) {
        this.pingSender.apply(channel, PING_MESSAGE, new ActionListener<Void>(){

            @Override
            public void onResponse(Void v) {
                TransportKeepAlive.this.successfulPings.inc();
            }

            @Override
            public void onFailure(Exception e) {
                if (channel.isOpen()) {
                    TransportKeepAlive.this.logger.debug(() -> "[" + channel + "] failed to send transport ping", (Throwable)e);
                    TransportKeepAlive.this.failedPings.inc();
                } else {
                    TransportKeepAlive.this.logger.trace(() -> "[" + channel + "] failed to send transport ping (channel closed)", (Throwable)e);
                }
            }
        });
    }

    @Override
    public void close() {
        this.isClosed = true;
    }

    static {
        try (BytesStreamOutput out = new BytesStreamOutput();){
            out.writeByte((byte)69);
            out.writeByte((byte)83);
            out.writeInt(-1);
            PING_MESSAGE = out.copyBytes();
        }
        catch (IOException e) {
            throw new AssertionError(e.getMessage(), e);
        }
    }

    private class ScheduledPing
    extends AbstractRunnable {
        private final TimeValue pingInterval;
        private final Set<TcpChannel> channels = ConcurrentCollections.newConcurrentSet();
        private final AtomicBoolean isStarted = new AtomicBoolean(false);
        private volatile long lastPingRelativeMillis;

        private ScheduledPing(TimeValue pingInterval) {
            this.pingInterval = pingInterval;
            this.lastPingRelativeMillis = TransportKeepAlive.this.threadPool.relativeTimeInMillis();
        }

        void ensureStarted() {
            if (!this.isStarted.get() && this.isStarted.compareAndSet(false, true)) {
                TransportKeepAlive.this.threadPool.schedule(this, this.pingInterval, "generic");
            }
        }

        void addChannel(TcpChannel channel) {
            this.channels.add(channel);
        }

        void removeChannel(TcpChannel channel) {
            this.channels.remove(channel);
        }

        @Override
        protected void doRun() throws Exception {
            if (TransportKeepAlive.this.isClosed) {
                return;
            }
            for (TcpChannel channel : this.channels) {
                if (!this.needsKeepAlivePing(channel)) continue;
                TransportKeepAlive.this.sendPing(channel);
            }
            this.lastPingRelativeMillis = TransportKeepAlive.this.threadPool.relativeTimeInMillis();
        }

        @Override
        public void onAfter() {
            if (TransportKeepAlive.this.isClosed) {
                return;
            }
            TransportKeepAlive.this.threadPool.scheduleUnlessShuttingDown(this.pingInterval, "generic", this);
        }

        @Override
        public void onFailure(Exception e) {
            TransportKeepAlive.this.logger.warn("failed to send ping transport message", (Throwable)e);
        }

        private boolean needsKeepAlivePing(TcpChannel channel) {
            TcpChannel.ChannelStats stats = channel.getChannelStats();
            long accessedDelta = stats.lastAccessedTime() - this.lastPingRelativeMillis;
            return accessedDelta <= 0L;
        }
    }
}

