package com.adobe.aem.sites.eventing.impl.observers;

import com.adobe.aem.sites.eventing.impl.producer.ContentStateChange;
import com.adobe.granite.toggle.api.ToggleCondition;
import java.util.Arrays;
import java.util.Hashtable;
import lombok.Generated;
import org.apache.sling.discovery.TopologyEventListener;
import org.osgi.framework.ServiceRegistration;
import org.osgi.service.component.ComponentContext;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.component.annotations.Deactivate;
import org.osgi.service.component.annotations.Reference;
import org.osgi.service.component.annotations.ReferenceCardinality;
import org.osgi.service.component.annotations.ReferencePolicy;
import org.osgi.service.component.propertytypes.ServiceDescription;
import org.osgi.service.event.Event;
import org.osgi.service.event.EventHandler;
import org.osgi.util.converter.Converters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

@ServiceDescription("Listen to package distribution events on a cloud service cluster leader.")
@Component(service = {DistributionListener.class}, property = {"event.topics=org/apache/sling/distribution/agent/package/distributed"}, reference = {@Reference(service = ToggleCondition.class, name = "toggleCondition", target = "(toggle.name=FT_SITES-6098)", policy = ReferencePolicy.STATIC, cardinality = ReferenceCardinality.MANDATORY)})
/* loaded from: input_file:com/adobe/aem/sites/eventing/impl/observers/DistributionListener.class */
public class DistributionListener {

    @Generated
    private static final Logger log = LoggerFactory.getLogger(DistributionListener.class);
    public static final String PROPERTY_DISTRIBUTION_PATH = "distribution.paths";
    public static final String PROPERTY_DISTRIBUTION_TYPE = "distribution.type";
    public static final String PROPERTY_DISTRIBUTION_TYPE_ADD = "ADD";
    public static final String PROPERTY_DISTRIBUTION_TYPE_DELETE = "DELETE";
    public static final String DISTRIBUTION_TOPIC = "org/apache/sling/distribution/agent/package/distributed";
    private ServiceRegistration<?> serviceRegistration;
    private CompositeListener compositeListener;
    Flux<ContentStateChange> contentStateChangeFlux;

    @Activate
    public DistributionListener(ComponentContext componentContext) {
        this.contentStateChangeFlux = Flux.create(fluxSink -> {
            log.info("Activating OSGi handler");
            this.compositeListener = new CompositeListener() { // from class: com.adobe.aem.sites.eventing.impl.observers.DistributionListener.1
                public void handleEvent(Event event) {
                    DistributionListener.log.debug("Event topic {}, properties {}", event.getTopic(), Arrays.toString(event.getPropertyNames()));
                    if (this.isLeader && event.containsProperty(DistributionListener.PROPERTY_DISTRIBUTION_PATH) && event.containsProperty(DistributionListener.PROPERTY_DISTRIBUTION_TYPE)) {
                        String[] strArr = (String[]) Converters.standardConverter().convert(event.getProperty(DistributionListener.PROPERTY_DISTRIBUTION_PATH)).to(String[].class);
                        String obj = event.getProperty(DistributionListener.PROPERTY_DISTRIBUTION_TYPE).toString();
                        DistributionListener.log.debug("Package distributed event with topic {}, paths {}, type {}", new Object[]{event.getTopic(), Arrays.toString(strArr), obj});
                        if (DistributionListener.PROPERTY_DISTRIBUTION_TYPE_ADD.equalsIgnoreCase(obj)) {
                            fluxSink.next(new ContentStateChange(ContentStateChange.ChangeType.PUBLISHED, Arrays.asList(strArr)));
                        } else if (DistributionListener.PROPERTY_DISTRIBUTION_TYPE_DELETE.equalsIgnoreCase(obj)) {
                            fluxSink.next(new ContentStateChange(ContentStateChange.ChangeType.UNPUBLISHED, Arrays.asList(strArr)));
                        }
                    }
                }
            };
            if (this.serviceRegistration == null) {
                Hashtable hashtable = new Hashtable();
                hashtable.put("event.topics", DISTRIBUTION_TOPIC);
                this.serviceRegistration = componentContext.getBundleContext().registerService(new String[]{EventHandler.class.getName(), TopologyEventListener.class.getName()}, this.compositeListener, hashtable);
            }
            fluxSink.onDispose(() -> {
                log.info("OSGi emitter is disposed");
                if (this.serviceRegistration != null) {
                    this.serviceRegistration.unregister();
                    log.info("OSGi handler is disabled");
                }
            });
        }, FluxSink.OverflowStrategy.LATEST).share();
    }

    @Deactivate
    private void deactivate() {
    }

    @Generated
    public CompositeListener getCompositeListener() {
        return this.compositeListener;
    }

    @Generated
    public Flux<ContentStateChange> getContentStateChangeFlux() {
        return this.contentStateChangeFlux;
    }
}
