/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import io.lettuce.core.pubsub.api.reactive.RedisPubSubReactiveCommands;
import java.nio.ByteBuffer;
import java.util.Set;
import java.util.concurrent.ConcurrentSkipListSet;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import org.springframework.data.redis.connection.ReactiveSubscription;
import org.springframework.data.redis.connection.SubscriptionListener;
import org.springframework.data.redis.connection.lettuce.LettuceByteBufferPubSubListenerWrapper;
import org.springframework.data.redis.connection.lettuce.LettuceMessageListener;
import org.springframework.data.redis.connection.lettuce.LettuceReactivePubSubCommands;
import org.springframework.data.redis.connection.util.ByteArrayWrapper;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.ObjectUtils;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class LettuceReactiveSubscription
implements ReactiveSubscription {
    private final LettuceByteBufferPubSubListenerWrapper listener;
    private final StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer> connection;
    private final RedisPubSubReactiveCommands<ByteBuffer, ByteBuffer> reactive;
    private final LettuceReactivePubSubCommands commands;
    private final State patternState;
    private final State channelState;

    LettuceReactiveSubscription(SubscriptionListener subscriptionListener, StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer> connection, LettuceReactivePubSubCommands commands, Function<Throwable, Throwable> exceptionTranslator) {
        this.listener = new LettuceByteBufferPubSubListenerWrapper(new LettuceMessageListener((messages, pattern) -> {}, subscriptionListener));
        this.connection = connection;
        this.reactive = connection.reactive();
        this.commands = commands;
        connection.addListener(this.listener);
        this.patternState = new State(exceptionTranslator);
        this.channelState = new State(exceptionTranslator);
    }

    @Override
    public Mono<Void> subscribe(ByteBuffer ... channels) {
        Assert.notNull((Object)channels, "Channels must not be null");
        Assert.noNullElements((Object[])channels, "Channels must not contain null elements");
        return this.channelState.subscribe(channels, this.commands::subscribe);
    }

    @Override
    public Mono<Void> pSubscribe(ByteBuffer ... patterns) {
        Assert.notNull((Object)patterns, "Patterns must not be null");
        Assert.noNullElements((Object[])patterns, "Patterns must not contain null elements");
        return this.patternState.subscribe(patterns, this.commands::pSubscribe);
    }

    @Override
    public Mono<Void> unsubscribe() {
        return this.unsubscribe(this.channelState.getTargets().toArray(new ByteBuffer[0]));
    }

    @Override
    public Mono<Void> unsubscribe(ByteBuffer ... channels) {
        Assert.notNull((Object)channels, "Channels must not be null");
        Assert.noNullElements((Object[])channels, "Channels must not contain null elements");
        return ObjectUtils.isEmpty(channels) ? Mono.empty() : this.channelState.unsubscribe(channels, this.commands::unsubscribe);
    }

    @Override
    public Mono<Void> pUnsubscribe() {
        return this.pUnsubscribe(this.patternState.getTargets().toArray(new ByteBuffer[0]));
    }

    @Override
    public Mono<Void> pUnsubscribe(ByteBuffer ... patterns) {
        Assert.notNull((Object)patterns, "Patterns must not be null");
        Assert.noNullElements((Object[])patterns, "Patterns must not contain null elements");
        return ObjectUtils.isEmpty(patterns) ? Mono.empty() : this.patternState.unsubscribe(patterns, this.commands::pUnsubscribe);
    }

    @Override
    public Set<ByteBuffer> getChannels() {
        return this.channelState.getTargets();
    }

    @Override
    public Set<ByteBuffer> getPatterns() {
        return this.patternState.getTargets();
    }

    @Override
    public Flux<ReactiveSubscription.Message<ByteBuffer, ByteBuffer>> receive() {
        Flux channelMessages = this.channelState.receive(() -> this.reactive.observeChannels().filter(message -> this.channelState.contains((ByteBuffer)message.getChannel())).map(message -> new ReactiveSubscription.ChannelMessage<ByteBuffer, ByteBuffer>((ByteBuffer)message.getChannel(), (ByteBuffer)message.getMessage())));
        Flux patternMessages = this.patternState.receive(() -> this.reactive.observePatterns().filter(message -> this.patternState.contains((ByteBuffer)message.getPattern())).map(message -> new ReactiveSubscription.PatternMessage<ByteBuffer, ByteBuffer, ByteBuffer>((ByteBuffer)message.getPattern(), (ByteBuffer)message.getChannel(), (ByteBuffer)message.getMessage())));
        return channelMessages.mergeWith(patternMessages);
    }

    @Override
    public Mono<Void> cancel() {
        return this.unsubscribe().then(this.pUnsubscribe()).then(Mono.defer(() -> {
            this.channelState.terminate();
            this.patternState.terminate();
            return this.reactive.ping().then(Mono.fromRunnable(() -> this.connection.removeListener(this.listener)));
        }));
    }

    static class State {
        private final Set<ByteArrayWrapper> targets = new ConcurrentSkipListSet<ByteArrayWrapper>();
        private final AtomicLong subscribers = new AtomicLong();
        private final AtomicReference<Flux<?>> flux = new AtomicReference();
        private final Function<Throwable, Throwable> exceptionTranslator;
        @Nullable
        private volatile Disposable disposable;

        State(Function<Throwable, Throwable> exceptionTranslator) {
            this.exceptionTranslator = exceptionTranslator;
        }

        Mono<Void> subscribe(ByteBuffer[] targets, Function<ByteBuffer[], Mono<Void>> subscribeFunction) {
            return subscribeFunction.apply(targets).doOnSuccess(discard2 -> {
                for (ByteBuffer target : targets) {
                    this.targets.add(State.getWrapper(target));
                }
            }).onErrorMap(this.exceptionTranslator);
        }

        Mono<Void> unsubscribe(ByteBuffer[] targets, Function<ByteBuffer[], Mono<Void>> unsubscribeFunction) {
            return Mono.defer(() -> ((Mono)unsubscribeFunction.apply(targets)).doOnSuccess(discard2 -> {
                for (ByteBuffer byteBuffer : targets) {
                    this.targets.remove(State.getWrapper(byteBuffer));
                }
            }).onErrorMap(this.exceptionTranslator));
        }

        Set<ByteBuffer> getTargets() {
            return this.targets.stream().map(ByteArrayWrapper::getArray).map(ByteBuffer::wrap).collect(Collectors.toUnmodifiableSet());
        }

        <T> Flux<T> receive(Supplier<Flux<T>> connectFunction) {
            Flux<?> fastPath = this.flux.get();
            if (fastPath != null) {
                return fastPath;
            }
            ConnectableFlux connectableFlux = connectFunction.get().onErrorMap(this.exceptionTranslator).publish();
            Flux fluxToUse = connectableFlux.doOnSubscribe(subscription -> {
                if (this.subscribers.incrementAndGet() == 1L) {
                    this.disposable = connectableFlux.connect();
                }
            }).doFinally(signalType -> {
                if (this.subscribers.decrementAndGet() == 0L) {
                    this.flux.compareAndSet(connectableFlux, null);
                    this.terminate();
                }
            });
            if (this.flux.compareAndSet(null, fluxToUse)) {
                return fluxToUse;
            }
            return this.flux.get();
        }

        void terminate() {
            this.flux.set(null);
            Disposable disposable = this.disposable;
            if (disposable != null && !disposable.isDisposed()) {
                disposable.dispose();
            }
        }

        public boolean contains(ByteBuffer target) {
            return this.targets.contains(State.getWrapper(target));
        }

        private static ByteArrayWrapper getWrapper(ByteBuffer byteBuffer) {
            return new ByteArrayWrapper(byteBuffer);
        }
    }
}

