/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.integration.channel;

import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.SubscribableChannel;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink;
import reactor.core.scheduler.Schedulers;

public final class MessageChannelReactiveUtils {
    private MessageChannelReactiveUtils() {
    }

    public static <T> Publisher<Message<T>> toPublisher(MessageChannel messageChannel) {
        if (messageChannel instanceof Publisher) {
            return (Publisher)messageChannel;
        }
        if (messageChannel instanceof SubscribableChannel) {
            return MessageChannelReactiveUtils.adaptSubscribableChannelToPublisher((SubscribableChannel)messageChannel);
        }
        if (messageChannel instanceof PollableChannel) {
            return MessageChannelReactiveUtils.adaptPollableChannelToPublisher((PollableChannel)messageChannel);
        }
        throw new IllegalArgumentException("The 'messageChannel' must be an instance of Publisher, SubscribableChannel or PollableChannel, not: " + messageChannel);
    }

    private static <T> Publisher<Message<T>> adaptSubscribableChannelToPublisher(SubscribableChannel inputChannel) {
        return new SubscribableChannelPublisherAdapter(inputChannel);
    }

    private static <T> Publisher<Message<T>> adaptPollableChannelToPublisher(PollableChannel inputChannel) {
        return new PollableChannelPublisherAdapter(inputChannel);
    }

    private static final class PollableChannelPublisherAdapter<T>
    implements Publisher<Message<T>> {
        private final PollableChannel channel;

        PollableChannelPublisherAdapter(PollableChannel channel) {
            this.channel = channel;
        }

        public void subscribe(Subscriber<? super Message<T>> subscriber) {
            Flux.create(sink -> sink.onRequest(n -> {
                Message m;
                while (!sink.isCancelled() && n-- > 0L && (m = this.channel.receive()) != null) {
                    sink.next((Object)m);
                }
            }), (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.IGNORE).subscribeOn(Schedulers.elastic()).subscribe(subscriber);
        }
    }

    private static final class SubscribableChannelPublisherAdapter<T>
    implements Publisher<Message<T>> {
        private final SubscribableChannel channel;

        SubscribableChannelPublisherAdapter(SubscribableChannel channel) {
            this.channel = channel;
        }

        public void subscribe(Subscriber<? super Message<T>> subscriber) {
            Flux.create(emitter -> {
                MessageHandler messageHandler = arg_0 -> ((FluxSink)emitter).next(arg_0);
                this.channel.subscribe(messageHandler);
                emitter.onCancel(() -> this.channel.unsubscribe(messageHandler));
            }, (FluxSink.OverflowStrategy)FluxSink.OverflowStrategy.IGNORE).subscribe(subscriber);
        }
    }
}

