package com.scene7.is.cache.clustering.impl;

import com.scene7.is.cache.clustering.parsers.PeerListParser;
import com.scene7.is.scalautil.javautil.PlatformUtil;
import com.scene7.is.util.IOUtil;
import com.scene7.is.util.PSDeploymentConfig;
import com.scene7.is.util.error.Scaffold;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.DatagramPacket;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.UnknownHostException;
import java.util.Iterator;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.jetbrains.annotations.NotNull;

/* loaded from: input_file:com/scene7/is/cache/clustering/impl/ClusterNetwork.class */
public class ClusterNetwork extends Thread {
    private static final Logger LOGGER = Logger.getLogger(ClusterNetwork.class.getName());
    private static final String NAME = "Cluster Network";
    private static final String ID = "cacheCluster";
    private static final int BUFFER_SIZE = 1024;
    private static final long RECONNECT_TIMEOUT = 60000;

    @NotNull
    private final List<ClusterPacketListener> listeners;
    private final int port;

    @NotNull
    private final ReconnectingDatagramSocket socket;
    private boolean enable;

    @NotNull
    private String hosts;

    @NotNull
    private List<SocketAddress> peers;
    private boolean tracePackets;

    public ClusterNetwork(@NotNull List<ClusterPacketListener> list, @NotNull PSDeploymentConfig pSDeploymentConfig, @NotNull String str, boolean z, boolean z2) {
        super(NAME);
        this.enable = z2 && PlatformUtil.osFamily() != PlatformUtil.UnknownOs();
        this.listeners = list;
        this.port = pSDeploymentConfig.getPort();
        this.hosts = str;
        this.peers = PeerListParser.parse(str, this.port);
        this.tracePackets = z;
        if (!z2) {
            this.socket = new ReconnectingDatagramSocketDoNothing();
        } else {
            this.socket = new ReconnectingDatagramSocketImpl(new InetSocketAddress(this.port), RECONNECT_TIMEOUT);
            start();
        }
    }

    public void dispose() {
        try {
            if (isAlive()) {
                interrupt();
                IOUtil.closeQuietly(this.socket);
                sendNopToSelf();
                join();
            }
        } catch (InterruptedException e) {
            LOGGER.log(Level.WARNING, "Unable to succesfully join {0} thread", NAME);
            LOGGER.log(Level.FINER, "Reason: ", (Throwable) e);
        }
    }

    public boolean isEnable() {
        return this.enable;
    }

    public void setEnable(boolean z) {
        this.enable = z;
    }

    @NotNull
    public String getHosts() {
        return this.hosts;
    }

    public void setHosts(@NotNull String str) {
        this.hosts = str;
        setPeers(PeerListParser.parse(str, this.port));
    }

    public int getPort() {
        return this.port;
    }

    public int getNumPeers() {
        return getPeers().size();
    }

    public boolean isTracePackets() {
        return this.tracePackets;
    }

    public void setTracePackets(boolean z) {
        this.tracePackets = z;
    }

    public <T extends ClusterPacket> boolean broadcastPacket(@NotNull ClusterPacketType<T> clusterPacketType, @NotNull T t) {
        if (isTracePackets()) {
            LOGGER.log(Level.FINEST, "Sending cluster packet to all peers: {0}", new Object[]{t});
        }
        byte[] marshalPacket = marshalPacket(clusterPacketType, t);
        boolean z = false;
        Iterator<SocketAddress> it = getPeers().iterator();
        while (it.hasNext()) {
            z = this.socket.send(marshalPacket, it.next()) || z;
        }
        return z;
    }

    public <T extends ClusterPacket> void sendPacket(@NotNull ClusterPacketType<T> clusterPacketType, @NotNull T t, @NotNull SocketAddress socketAddress) {
        if (isTracePackets()) {
            LOGGER.log(Level.FINEST, "Sending cluster packet to peer {0}: {1}", new Object[]{socketAddress, t});
        }
        this.socket.send(marshalPacket(clusterPacketType, t), socketAddress);
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        LOGGER.log(Level.INFO, "{0} started on UDP port {1}", new Object[]{NAME, Integer.valueOf(this.port)});
        byte[] bArr = new byte[BUFFER_SIZE];
        while (!isInterrupted()) {
            try {
                receiveDatagram(bArr);
            } catch (InterruptedException e) {
            } catch (ThreadDeath e2) {
                throw e2;
            } catch (Throwable th) {
                LOGGER.log(Level.SEVERE, "Uncaught exception while handling cluster packets", th);
            }
        }
        this.socket.close();
        LOGGER.log(Level.INFO, "{0} stopped on UDP port {1}", new Object[]{NAME, Integer.valueOf(this.port)});
    }

    @NotNull
    private static <T extends ClusterPacket> byte[] marshalPacket(@NotNull ClusterPacketType<T> clusterPacketType, @NotNull T t) {
        try {
            ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(BUFFER_SIZE);
            DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
            dataOutputStream.writeUTF(ID);
            ClusterPacketTypeSerializer.PACKETTYPE_SERIALIZER.store((ClusterPacketType) clusterPacketType, (DataOutput) dataOutputStream);
            clusterPacketType.packetSerializer().store(t, dataOutputStream);
            dataOutputStream.close();
            return byteArrayOutputStream.toByteArray();
        } catch (IOException e) {
            throw Scaffold.error(e, "Did not expect an error while writing to byte array");
        }
    }

    private void receiveDatagram(@NotNull byte[] bArr) throws InterruptedException {
        DatagramPacket receive = this.socket.receive(bArr);
        if (isDatagramEmpty(receive)) {
            return;
        }
        SocketAddress socketAddress = receive.getSocketAddress();
        DataInputStream dataInputStream = new DataInputStream(new ByteArrayInputStream(receive.getData(), receive.getOffset(), receive.getLength()));
        try {
            try {
                String readUTF = dataInputStream.readUTF();
                if (!ID.equals(readUTF)) {
                    if (isTracePackets()) {
                        LOGGER.log(Level.FINEST, "Cluster packet received from peer {0} with unknown ID: {1}", new Object[]{socketAddress, readUTF});
                    }
                    IOUtil.closeQuietly(dataInputStream);
                    return;
                }
                ClusterPacketType<UnknownPacket> m17load = ClusterPacketTypeSerializer.PACKETTYPE_SERIALIZER.m17load((DataInput) dataInputStream);
                if (m17load == ClusterPacketType.UNKNOWN) {
                    if (isTracePackets()) {
                        LOGGER.log(Level.FINEST, "Unknown cluster packet received from peer {0}", socketAddress);
                    }
                    IOUtil.closeQuietly(dataInputStream);
                    return;
                }
                ClusterPacket clusterPacket = (ClusterPacket) m17load.packetSerializer().load(dataInputStream);
                if (isTracePackets()) {
                    LOGGER.log(Level.FINEST, "Cluster packet received from peer {0}: {1}", new Object[]{socketAddress, clusterPacket});
                }
                Iterator<ClusterPacketListener> it = this.listeners.iterator();
                while (it.hasNext()) {
                    it.next().doClusterPacket(this, socketAddress, m17load, clusterPacket);
                }
                IOUtil.closeQuietly(dataInputStream);
            } catch (IOException e) {
                LOGGER.log(Level.WARNING, "Failed to handle packet received from {0}", socketAddress);
                LOGGER.log(Level.FINER, "Reason: ", (Throwable) e);
                IOUtil.closeQuietly(dataInputStream);
            }
        } catch (Throwable th) {
            IOUtil.closeQuietly(dataInputStream);
            throw th;
        }
    }

    @NotNull
    private synchronized List<SocketAddress> getPeers() {
        return this.peers;
    }

    private synchronized void setPeers(@NotNull List<SocketAddress> list) {
        this.peers = list;
    }

    private void sendNopToSelf() {
        try {
            sendPacket(ClusterPacketType.NOP, new NopPacket(), new InetSocketAddress(InetAddress.getByAddress(new byte[]{Byte.MAX_VALUE, 0, 0, 1}), this.port));
        } catch (UnknownHostException e) {
            LOGGER.log(Level.SEVERE, "Unable to resolve 127.0.0.1 to host instance", (Throwable) e);
        }
    }

    private static boolean isDatagramEmpty(@NotNull DatagramPacket datagramPacket) {
        return datagramPacket.getAddress() == null || datagramPacket.getPort() < 0;
    }
}
