package com.adobe.cq.assetcompute.impl.connection;

import com.adobe.cq.assetcompute.api.event.AssetComputeEvent;
import com.adobe.cq.assetcompute.api.event.AssetComputeEventHandler;
import com.adobe.cq.assetcompute.api.monitor.AssetProcessMonitor;
import com.adobe.cq.assetcompute.api.monitor.GaugeMonitor;
import com.adobe.cq.assetcompute.connection.ConnectionService;
import com.adobe.cq.assetcompute.impl.AssetComputeConstants;
import com.adobe.cq.assetcompute.impl.JSONUtils;
import com.adobe.cq.assetcompute.impl.assetprocessor.AssetProcessorEventHandler;
import com.adobe.cq.assetcompute.impl.event.AssetComputeEventImpl;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Queue;
import javax.annotation.Nonnull;
import org.apache.commons.lang.StringUtils;
import org.json.JSONException;
import org.json.JSONObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/adobe/cq/assetcompute/impl/connection/EventConsumingJob.class */
final class EventConsumingJob implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(EventConsumingJob.class);
    private static final int EVENTS_BATCH_NO = 3000;
    private Queue<AssetComputeEvent> eventsQueue;
    private ConnectionService service;
    private JournalUrlManager journalUrlManager;
    private GaugeMonitor gauge;

    /* JADX INFO: Access modifiers changed from: package-private */
    public EventConsumingJob(@Nonnull Queue<AssetComputeEvent> queue, @Nonnull ConnectionService connectionService, @Nonnull JournalUrlManager journalUrlManager, AssetProcessMonitor assetProcessMonitor) {
        this.eventsQueue = queue;
        this.service = connectionService;
        this.journalUrlManager = journalUrlManager;
        if (assetProcessMonitor != null) {
            this.gauge = assetProcessMonitor.gauge(getClass().getSimpleName() + "_eventsQueue");
        }
    }

    @Override // java.lang.Runnable
    public void run() {
        LOG.debug("Start events consuming job");
        if (this.gauge != null) {
            this.gauge.setValue(this.eventsQueue.size());
        }
        while (!this.eventsQueue.isEmpty()) {
            int size = this.eventsQueue.size();
            LOG.info("Events consuming start batch polling when queue size is {}", Integer.valueOf(size));
            if (this.gauge != null) {
                this.gauge.setValue(size);
            }
            String str = null;
            HashMap hashMap = new HashMap();
            int i = 0;
            while (true) {
                AssetComputeEvent poll = this.eventsQueue.poll();
                if (poll == null) {
                    break;
                }
                JSONObject jSONObject = (JSONObject) poll.getData();
                LOG.debug("Events Consumer Job processing event data '{}'", jSONObject);
                try {
                    try {
                        String eventHandlerIdFromEvent = JSONUtils.getEventHandlerIdFromEvent(jSONObject);
                        if (StringUtils.isEmpty(eventHandlerIdFromEvent)) {
                            LOG.debug("Received event without '{}', fall back to default one", AssetComputeConstants.KEY_EVENT_HANDLER_ID);
                            eventHandlerIdFromEvent = AssetProcessorEventHandler.HANDLER_ID;
                        }
                        if (hashMap.containsKey(eventHandlerIdFromEvent)) {
                            ((List) hashMap.get(eventHandlerIdFromEvent)).add(poll);
                        } else {
                            ArrayList arrayList = new ArrayList();
                            arrayList.add(poll);
                            hashMap.put(eventHandlerIdFromEvent, arrayList);
                        }
                        if (poll instanceof AssetComputeEventImpl) {
                            str = ((AssetComputeEventImpl) poll).getJournalUrl();
                        }
                    } catch (JSONException e) {
                        LOG.error("Unable to process event '{}' due to JSONException.", new Object[]{jSONObject.toString()}, e);
                        if (poll instanceof AssetComputeEventImpl) {
                            str = ((AssetComputeEventImpl) poll).getJournalUrl();
                        }
                    }
                    i++;
                    if (i >= EVENTS_BATCH_NO) {
                        LOG.info("The event consuming hit batch limit {}", Integer.valueOf(EVENTS_BATCH_NO));
                        break;
                    }
                } catch (Throwable th) {
                    if (poll instanceof AssetComputeEventImpl) {
                        ((AssetComputeEventImpl) poll).getJournalUrl();
                    }
                    throw th;
                }
            }
            LOG.info("{} events consumed, dispatch them to event handlers", Integer.valueOf(i));
            for (String str2 : hashMap.keySet()) {
                AssetComputeEventHandler eventHandlerById = this.service.getEventHandlerById(str2);
                if (eventHandlerById != null) {
                    eventHandlerById.onEvents((List) hashMap.get(str2));
                } else {
                    LOG.error("Unable to process events with event handler id {}. No handlers registered with this id.", str2);
                }
            }
            this.journalUrlManager.setJournalUrl(str);
        }
    }
}
