/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.channel;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.integration.channel.AbstractMessageChannel;
import org.springframework.integration.channel.ReactiveStreamsSubscribableChannel;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;

public class FluxMessageChannel
extends AbstractMessageChannel
implements Publisher<Message<?>>,
ReactiveStreamsSubscribableChannel {
    private final List<Subscriber<? super Message<?>>> subscribers = new ArrayList();
    private final Map<Publisher<Message<?>>, ConnectableFlux<Message<?>>> publishers = new ConcurrentHashMap();
    private final Flux<Message<?>> flux = Flux.create(emitter -> {
        this.sink = emitter;
    }, (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.IGNORE).publish().autoConnect();
    private FluxSink<Message<?>> sink;

    @Override
    protected boolean doSend(Message<?> message, long timeout) {
        Assert.state((this.subscribers.size() > 0 ? 1 : 0) != 0, () -> "The [" + this + "] doesn't have subscribers to accept messages");
        this.sink.next(message);
        return true;
    }

    public void subscribe(Subscriber<? super Message<?>> subscriber) {
        this.subscribers.add(subscriber);
        this.flux.doOnCancel(() -> this.subscribers.remove(subscriber)).retry().subscribe(subscriber);
        this.publishers.values().forEach(ConnectableFlux::connect);
    }

    @Override
    public void subscribeTo(Publisher<Message<?>> publisher) {
        ConnectableFlux connectableFlux = Flux.from(publisher).doOnComplete(() -> this.publishers.remove(publisher)).doOnNext(this::send).publish();
        this.publishers.put(publisher, connectableFlux);
        if (!this.subscribers.isEmpty()) {
            connectableFlux.connect();
        }
    }
}

