package com.adobe.granite.eventing.provider.client.impl;

import com.adobe.granite.eventing.provider.client.ClientConfig;
import com.adobe.granite.eventing.provider.client.MetricSupport;
import java.io.Closeable;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/adobe/granite/eventing/provider/client/impl/QueuedConsumer.class */
public class QueuedConsumer implements Consumer<TimedCloudEvent>, Closeable {
    private final Logger log = LoggerFactory.getLogger(getClass());
    final ExecutorService executor;
    private final BlockingQueue<TimedCloudEvent> queue;
    private final Consumer<TimedCloudEvent> consumer;
    private final MetricSupport metricSupport;
    private final int shutdownWaitSeconds;
    private final int almostFullWaitMS;
    private final int almostFullLimit;
    private final int bufferSize;

    public QueuedConsumer(Consumer<TimedCloudEvent> consumer, ClientConfig clientConfig, int i, int i2, MetricSupport metricSupport) {
        this.consumer = consumer;
        this.bufferSize = clientConfig.getSendBufferSize();
        this.shutdownWaitSeconds = i;
        this.almostFullWaitMS = i2;
        this.almostFullLimit = (this.bufferSize * 10) / 100;
        this.metricSupport = metricSupport;
        this.queue = new ArrayBlockingQueue(this.bufferSize);
        MetricSupport metricSupport2 = this.metricSupport;
        BlockingQueue<TimedCloudEvent> blockingQueue = this.queue;
        Objects.requireNonNull(blockingQueue);
        metricSupport2.queueSizeGauge(blockingQueue::size);
        this.executor = Executors.newSingleThreadExecutor();
        this.executor.execute(this::run);
    }

    @Override // java.util.function.Consumer
    public void accept(TimedCloudEvent timedCloudEvent) throws IllegalStateException {
        if (this.executor.isShutdown()) {
            throw new IllegalStateException("Queued sender shutting down. Not accepting new events!");
        }
        try {
            if (this.almostFullWaitMS > 0 && this.queue.size() > this.almostFullLimit) {
                int computeWaitMS = computeWaitMS();
                this.log.debug("Queue size at {} delaying for {} ms", Integer.valueOf(this.queue.size()), Integer.valueOf(computeWaitMS));
                Thread.sleep(computeWaitMS);
            }
            this.queue.add(timedCloudEvent);
            this.metricSupport.queued(timedCloudEvent.event.getType());
            this.log.debug("Queued event of type={}, queueSize={}", timedCloudEvent.event.getType(), Integer.valueOf(this.queue.size()));
        } catch (IllegalStateException e) {
            this.metricSupport.discarded(MetricSupport.Reason.queueFull, timedCloudEvent.event.getType());
            throw e;
        } catch (InterruptedException e2) {
            Thread.currentThread().interrupt();
        }
    }

    private int computeWaitMS() {
        int size = this.bufferSize - this.queue.size();
        return ((1000 - ((1000 * size) / (this.bufferSize - this.almostFullLimit))) * this.almostFullWaitMS) / 1000;
    }

    private void run() {
        while (!this.executor.isShutdown()) {
            try {
                TimedCloudEvent poll = this.queue.poll(1L, TimeUnit.SECONDS);
                if (poll != null) {
                    trySend(poll);
                }
            } catch (InterruptedException e) {
                this.log.warn("Work thread interrupted", e);
                this.executor.shutdown();
                Thread.currentThread().interrupt();
            }
        }
    }

    private void trySend(TimedCloudEvent timedCloudEvent) {
        try {
            this.consumer.accept(timedCloudEvent);
        } catch (Throwable th) {
            this.log.error("Gave up sending async event id={}, type={}", new Object[]{timedCloudEvent.event.getId(), timedCloudEvent.event.getType(), th});
        }
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        this.log.info("Shut down of queued sender with {} items.", Integer.valueOf(this.queue.size()));
        waitUntilQueueEmpty();
        if (this.queue.isEmpty()) {
            this.log.info("Shut down completed. All items sent.");
        } else {
            Iterator it = this.queue.iterator();
            while (it.hasNext()) {
                this.metricSupport.discarded(MetricSupport.Reason.shutdown, ((TimedCloudEvent) it.next()).event.getType());
            }
            this.log.error("Forced shut down of queued sender. Discarding {} items.", Integer.valueOf(this.queue.size()));
        }
        shutDownExecutor();
    }

    private void waitUntilQueueEmpty() {
        long currentTimeMillis = System.currentTimeMillis() + (this.shutdownWaitSeconds * 1000);
        while (!this.queue.isEmpty() && System.currentTimeMillis() < currentTimeMillis) {
            try {
                Thread.sleep(100L);
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        }
    }

    private void shutDownExecutor() {
        this.executor.shutdown();
        try {
            if (!this.executor.awaitTermination(this.shutdownWaitSeconds, TimeUnit.SECONDS)) {
                this.log.warn("Timeout waiting for executor termination");
            }
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        this.executor.shutdownNow();
    }
}
