package com.adobe.cq.assetcompute.impl;

import com.adobe.cq.assetcompute.api.AssetComputeRequest;
import com.adobe.cq.assetcompute.api.AssetComputeService;
import com.adobe.cq.assetcompute.api.monitor.AssetProcessMonitor;
import com.adobe.cq.assetcompute.api.profile.AssetProcessingProfileManager;
import com.adobe.cq.assetcompute.api.profile.WatermarkingProfileService;
import com.adobe.cq.assetcompute.connection.ConnectionService;
import com.adobe.cq.assetcompute.impl.assetprocessor.AssetProcessorInitService;
import com.adobe.cq.assetcompute.impl.asyncprocess.AsyncProcessJobManager;
import com.adobe.cq.assetcompute.impl.asyncprocess.AsyncProcessQueue;
import com.adobe.cq.assetcompute.impl.scanprocess.ScanProcessQueue;
import com.adobe.cq.dam.processor.api.CustomDamWorkflowRunner;
import com.adobe.cq.dam.processor.api.DMProcessor;
import com.adobe.granite.toggle.api.ToggleRouter;
import com.day.cq.dam.api.processingstate.provider.AssetProcessingStateProvider;
import com.day.cq.dam.api.processingstate.updater.AssetProcessingStateUpdater;
import com.day.cq.search.QueryBuilder;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.sling.api.resource.ResourceResolverFactory;
import org.apache.sling.commons.scheduler.ScheduleOptions;
import org.apache.sling.commons.scheduler.Scheduler;
import org.apache.sling.commons.threads.ModifiableThreadPoolConfig;
import org.apache.sling.commons.threads.ThreadPool;
import org.apache.sling.commons.threads.ThreadPoolConfig;
import org.apache.sling.commons.threads.ThreadPoolManager;
import org.apache.sling.event.jobs.JobManager;
import org.osgi.framework.BundleContext;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.osgi.service.component.annotations.ReferencePolicyOption;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@Component(service = {AssetComputeService.class}, immediate = true)
/* loaded from: input_file:com/adobe/cq/assetcompute/impl/AssetComputeServiceImpl.class */
public class AssetComputeServiceImpl implements AssetComputeService {
    private static final Logger LOG = LoggerFactory.getLogger(AssetComputeServiceImpl.class);
    private static final String ADD_LEADER_QUEUE_JOB = "assets-add-leader-queue-job";
    private static final int ASSET_COMPUTE_QUEUE_BUSY_THRESHOLD = 300;
    private ThreadPool assetComputeThreadPool;
    private List<Queue> processingQueueList;
    private Queue<AssetComputeRequest> assetComputeQueue = new ConcurrentLinkedQueue();

    @Reference
    private ConnectionService assetComputeConnection;

    @Reference
    private ThreadPoolManager threadPoolManager;

    @Reference
    private AssetProcessingProfileManager profileManager;

    @Reference
    private Scheduler scheduler;

    @Reference
    private ResourceResolverFactory resolverFactory;

    @Reference
    private AssetProcessMonitor monitor;

    @Reference
    private DMProcessor dmProcessor;

    @Reference
    private JobManager jobManager;

    @Reference
    protected AssetProcessingStateUpdater stateUpdater;

    @Reference
    private AssetProcessingStateProvider stateProvider;

    @Reference(policy = ReferencePolicy.STATIC)
    private QueryBuilder queryBuilder;

    @Reference(policyOption = ReferencePolicyOption.GREEDY)
    private ToggleRouter toggleRouter;

    @Reference
    private WatermarkingProfileService watermarkingProfileService;

    @Reference
    private AsyncProcessJobManager asyncJobManager;

    @Reference
    private CustomDamWorkflowRunner workflowRunner;

    @Reference
    private AssetProcessorInitService assetProcessorInitService;

    @Activate
    protected void activate(ComponentContext componentContext) {
        LOG.info("Activating Asset Compute service.");
        this.assetComputeThreadPool = getAssetComputeThreadPool();
        this.processingQueueList = new CopyOnWriteArrayList();
        this.processingQueueList.add(this.assetComputeQueue);
        terminateAllJobs();
        if (!scheduleAssetComputeConsumingJobs()) {
            LOG.warn("Failed to schedule the process consuming job");
        }
        if (scheduleAddLeaderQueueJob()) {
            return;
        }
        LOG.warn("Failed to schedule the adding queue in leader");
    }

    @Deactivate
    protected void deactivate(BundleContext bundleContext) {
        LOG.info("Deactivating Asset Compute service.");
        if (this.assetComputeThreadPool != null) {
            LOG.info("Releasing the thread pool: {}", this.assetComputeThreadPool.getName());
            this.threadPoolManager.release(this.assetComputeThreadPool);
        }
    }

    public boolean process(AssetComputeRequest assetComputeRequest) {
        if (!this.assetComputeConnection.isEnabled()) {
            return false;
        }
        this.assetComputeQueue.add(assetComputeRequest);
        return true;
    }

    public boolean isBusy() {
        return this.assetComputeQueue.size() > ASSET_COMPUTE_QUEUE_BUSY_THRESHOLD;
    }

    public boolean isEnabled() {
        return this.assetComputeConnection.isEnabled();
    }

    private boolean scheduleAssetComputeConsumingJobs() {
        ScheduleOptions NOW = this.scheduler.NOW(-1, 3L);
        NOW.name(AssetComputeConstants.ASSET_COMPUTE_CONSUMING_JOB_NAME);
        NOW.canRunConcurrently(false);
        NOW.threadPoolName(AssetComputeConstants.THREAD_POOL_NAME);
        LOG.info("Schedule the process consuming job '{}'", AssetComputeConstants.ASSET_COMPUTE_CONSUMING_JOB_NAME);
        return this.scheduler.schedule(new AssetComputeConsumingJob(this.processingQueueList, this.assetComputeThreadPool, this.assetComputeConnection, this.resolverFactory, this.monitor, this.toggleRouter, this.stateUpdater), NOW);
    }

    private boolean scheduleAddLeaderQueueJob() {
        ScheduleOptions NOW = this.scheduler.NOW(-1, 10L);
        NOW.onLeaderOnly(true);
        NOW.canRunConcurrently(false);
        NOW.name(ADD_LEADER_QUEUE_JOB);
        NOW.threadPoolName(AssetComputeConstants.THREAD_POOL_NAME);
        LOG.info("Schedule the job '{}'", ADD_LEADER_QUEUE_JOB);
        return this.scheduler.schedule(() -> {
            LOG.info("Run add leader queue job in leader author only");
            if (this.assetComputeConnection.isScanEnabled()) {
                this.processingQueueList.add(new ScanProcessQueue(this.resolverFactory, this.queryBuilder, this.profileManager, this.dmProcessor, this.assetComputeConnection, this.watermarkingProfileService, this.assetProcessorInitService, this.stateProvider, this.workflowRunner));
            }
            this.processingQueueList.add(new AsyncProcessQueue(this.resolverFactory, this.queryBuilder, this.profileManager, this.jobManager, this.dmProcessor, this.toggleRouter, this.watermarkingProfileService, this.asyncJobManager, this.assetProcessorInitService, this.workflowRunner, this.stateProvider));
            LOG.info("Remove the add leader queue job after it's done");
            this.scheduler.unschedule(ADD_LEADER_QUEUE_JOB);
        }, NOW);
    }

    private void terminateAllJobs() {
        LOG.info("Terminating asset compute consuming job: {}", AssetComputeConstants.ASSET_COMPUTE_CONSUMING_JOB_NAME);
        this.scheduler.unschedule(AssetComputeConstants.ASSET_COMPUTE_CONSUMING_JOB_NAME);
        this.scheduler.unschedule(ADD_LEADER_QUEUE_JOB);
    }

    private ThreadPool getAssetComputeThreadPool() {
        ModifiableThreadPoolConfig modifiableThreadPoolConfig = new ModifiableThreadPoolConfig();
        modifiableThreadPoolConfig.setMinPoolSize(20);
        modifiableThreadPoolConfig.setMaxPoolSize(40);
        modifiableThreadPoolConfig.setPriority(ThreadPoolConfig.ThreadPriority.NORM);
        return this.threadPoolManager.create(modifiableThreadPoolConfig);
    }
}
