/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.api.StatefulConnection;
import io.lettuce.core.api.StatefulRedisConnection;
import io.lettuce.core.api.reactive.BaseRedisReactiveCommands;
import io.lettuce.core.cluster.api.StatefulRedisClusterConnection;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import io.lettuce.core.codec.RedisCodec;
import io.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.dao.DataAccessException;
import org.springframework.data.redis.connection.ReactiveGeoCommands;
import org.springframework.data.redis.connection.ReactiveHashCommands;
import org.springframework.data.redis.connection.ReactiveHyperLogLogCommands;
import org.springframework.data.redis.connection.ReactiveKeyCommands;
import org.springframework.data.redis.connection.ReactiveListCommands;
import org.springframework.data.redis.connection.ReactiveNumberCommands;
import org.springframework.data.redis.connection.ReactivePubSubCommands;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveScriptingCommands;
import org.springframework.data.redis.connection.ReactiveServerCommands;
import org.springframework.data.redis.connection.ReactiveSetCommands;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.ReactiveStringCommands;
import org.springframework.data.redis.connection.ReactiveZSetCommands;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionProvider;
import org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveGeoCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveHashCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveHyperLogLogCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveKeyCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveListCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveNumberCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactivePubSubCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveScriptingCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveServerCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveSetCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveStreamCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveStringCommands;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveZSetCommands;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

class LettuceReactiveRedisConnection
implements ReactiveRedisConnection {
    static final RedisCodec<ByteBuffer, ByteBuffer> CODEC = ByteBufferCodec.INSTANCE;
    private final Object mutex = new Object();
    private final AsyncConnect<StatefulConnection<ByteBuffer, ByteBuffer>> dedicatedConnection;
    private final AsyncConnect<StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer>> pubSubConnection;
    private volatile LettuceReactivePubSubCommands pubSub;
    @Nullable
    private Mono<StatefulConnection<ByteBuffer, ByteBuffer>> sharedConnection;

    LettuceReactiveRedisConnection(LettuceConnectionProvider connectionProvider) {
        Assert.notNull((Object)connectionProvider, "LettuceConnectionProvider must not be null");
        this.dedicatedConnection = new AsyncConnect<StatefulConnection>(connectionProvider, StatefulConnection.class);
        this.pubSubConnection = new AsyncConnect<StatefulRedisPubSubConnection>(connectionProvider, StatefulRedisPubSubConnection.class);
    }

    LettuceReactiveRedisConnection(StatefulConnection<ByteBuffer, ByteBuffer> sharedConnection, LettuceConnectionProvider connectionProvider) {
        Assert.notNull(sharedConnection, "Shared StatefulConnection must not be null");
        Assert.notNull((Object)connectionProvider, "LettuceConnectionProvider must not be null");
        this.dedicatedConnection = new AsyncConnect<StatefulConnection>(connectionProvider, StatefulConnection.class);
        this.pubSubConnection = new AsyncConnect<StatefulRedisPubSubConnection>(connectionProvider, StatefulRedisPubSubConnection.class);
        this.sharedConnection = Mono.just(sharedConnection);
    }

    @Override
    public ReactiveKeyCommands keyCommands() {
        return new LettuceReactiveKeyCommands(this);
    }

    @Override
    public ReactiveStringCommands stringCommands() {
        return new LettuceReactiveStringCommands(this);
    }

    @Override
    public ReactiveNumberCommands numberCommands() {
        return new LettuceReactiveNumberCommands(this);
    }

    @Override
    public ReactiveListCommands listCommands() {
        return new LettuceReactiveListCommands(this);
    }

    @Override
    public ReactiveSetCommands setCommands() {
        return new LettuceReactiveSetCommands(this);
    }

    @Override
    public ReactiveZSetCommands zSetCommands() {
        return new LettuceReactiveZSetCommands(this);
    }

    @Override
    public ReactiveHashCommands hashCommands() {
        return new LettuceReactiveHashCommands(this);
    }

    @Override
    public ReactiveGeoCommands geoCommands() {
        return new LettuceReactiveGeoCommands(this);
    }

    @Override
    public ReactiveHyperLogLogCommands hyperLogLogCommands() {
        return new LettuceReactiveHyperLogLogCommands(this);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public ReactivePubSubCommands pubSubCommands() {
        Object object = this.mutex;
        synchronized (object) {
            if (this.pubSub == null) {
                this.pubSub = new LettuceReactivePubSubCommands(this);
            }
            return this.pubSub;
        }
    }

    @Override
    public ReactiveScriptingCommands scriptingCommands() {
        return new LettuceReactiveScriptingCommands(this);
    }

    @Override
    public ReactiveServerCommands serverCommands() {
        return new LettuceReactiveServerCommands(this);
    }

    @Override
    public ReactiveStreamCommands streamCommands() {
        return new LettuceReactiveStreamCommands(this);
    }

    @Override
    public Mono<String> ping() {
        return this.execute(BaseRedisReactiveCommands::ping).next();
    }

    public <T> Flux<T> execute(LettuceReactiveCallback<T> callback) {
        return this.getCommands().flatMapMany(callback::doWithCommands).onErrorMap(this.translateException());
    }

    public <T> Flux<T> executeDedicated(LettuceReactiveCallback<T> callback) {
        return this.getDedicatedCommands().flatMapMany(callback::doWithCommands).onErrorMap(this.translateException());
    }

    @Override
    public Mono<Void> closeLater() {
        return Flux.mergeDelayError(2, this.dedicatedConnection.close(), this.pubSubConnection.close()).then();
    }

    protected Mono<? extends StatefulConnection<ByteBuffer, ByteBuffer>> getConnection() {
        if (this.sharedConnection != null) {
            return this.sharedConnection;
        }
        return this.getDedicatedConnection();
    }

    protected Mono<StatefulConnection<ByteBuffer, ByteBuffer>> getDedicatedConnection() {
        return this.dedicatedConnection.getConnection().onErrorMap(this.translateException());
    }

    protected Mono<StatefulRedisPubSubConnection<ByteBuffer, ByteBuffer>> getPubSubConnection() {
        return this.pubSubConnection.getConnection().onErrorMap(this.translateException());
    }

    protected Mono<? extends RedisClusterReactiveCommands<ByteBuffer, ByteBuffer>> getCommands() {
        if (this.sharedConnection != null) {
            return this.sharedConnection.map(LettuceReactiveRedisConnection::getRedisClusterReactiveCommands);
        }
        return this.getDedicatedCommands();
    }

    protected Mono<? extends RedisClusterReactiveCommands<ByteBuffer, ByteBuffer>> getDedicatedCommands() {
        return this.dedicatedConnection.getConnection().map(LettuceReactiveRedisConnection::getRedisClusterReactiveCommands);
    }

    private static RedisClusterReactiveCommands<ByteBuffer, ByteBuffer> getRedisClusterReactiveCommands(StatefulConnection<ByteBuffer, ByteBuffer> connection) {
        if (connection instanceof StatefulRedisConnection) {
            return ((StatefulRedisConnection)connection).reactive();
        }
        if (connection instanceof StatefulRedisClusterConnection) {
            return ((StatefulRedisClusterConnection)connection).reactive();
        }
        throw new IllegalStateException("o.O unknown connection type " + connection);
    }

    <T> Function<Throwable, Throwable> translateException() {
        return throwable -> {
            if (throwable instanceof RuntimeException) {
                DataAccessException convertedException = LettuceExceptionConverter.INSTANCE.convert((RuntimeException)throwable);
                return convertedException != null ? convertedException : throwable;
            }
            return throwable;
        };
    }

    static class AsyncConnect<T extends StatefulConnection<?, ?>> {
        static AtomicReferenceFieldUpdater<AsyncConnect, State> STATE = AtomicReferenceFieldUpdater.newUpdater(AsyncConnect.class, State.class, "state");
        private final Mono<T> connectionPublisher;
        private final LettuceConnectionProvider connectionProvider;
        private volatile State state = State.INITIAL;
        @Nullable
        private volatile StatefulConnection<ByteBuffer, ByteBuffer> connection;

        AsyncConnect(LettuceConnectionProvider connectionProvider, Class<T> connectionType) {
            Assert.notNull((Object)connectionProvider, "LettuceConnectionProvider must not be null");
            Assert.notNull(connectionType, "Connection type must not be null");
            this.connectionProvider = connectionProvider;
            Mono<StatefulConnection> defer = Mono.fromCompletionStage(() -> connectionProvider.getConnectionAsync(connectionType));
            this.connectionPublisher = defer.doOnNext(it -> {
                if (AsyncConnect.isClosing(STATE.get(this))) {
                    it.closeAsync();
                } else {
                    this.connection = it;
                }
            }).cache().handle((connection, sink) -> {
                if (AsyncConnect.isClosing(STATE.get(this))) {
                    sink.error(new IllegalStateException("Unable to connect; Connection is closed"));
                } else {
                    sink.next(connection);
                }
            });
        }

        Mono<T> getConnection() {
            State state = STATE.get(this);
            if (AsyncConnect.isClosing(state)) {
                return Mono.error(new IllegalStateException("Unable to connect; Connection is closed"));
            }
            STATE.compareAndSet(this, State.INITIAL, State.CONNECTION_REQUESTED);
            return this.connectionPublisher;
        }

        Mono<Void> close() {
            return Mono.defer(() -> {
                if (STATE.compareAndSet(this, State.INITIAL, State.CLOSING) || STATE.compareAndSet(this, State.CONNECTION_REQUESTED, State.CLOSING)) {
                    StatefulConnection<ByteBuffer, ByteBuffer> connection = this.connection;
                    this.connection = null;
                    STATE.set(this, State.CLOSED);
                    if (connection != null) {
                        return Mono.fromCompletionStage(this.connectionProvider.releaseAsync(connection));
                    }
                }
                return Mono.empty();
            });
        }

        private static boolean isClosing(State state) {
            return state == State.CLOSING || state == State.CLOSED;
        }

        static enum State {
            INITIAL,
            CONNECTION_REQUESTED,
            CLOSING,
            CLOSED;

        }
    }

    static interface LettuceReactiveCallback<T> {
        public Publisher<T> doWithCommands(RedisClusterReactiveCommands<ByteBuffer, ByteBuffer> var1);
    }

    static enum ByteBufferCodec implements RedisCodec<ByteBuffer, ByteBuffer>
    {
        INSTANCE;


        @Override
        public ByteBuffer decodeKey(ByteBuffer bytes) {
            ByteBuffer buffer = ByteBuffer.allocate(bytes.remaining());
            buffer.put(bytes);
            buffer.flip();
            return buffer;
        }

        @Override
        public ByteBuffer decodeValue(ByteBuffer bytes) {
            return this.decodeKey(bytes);
        }

        @Override
        public ByteBuffer encodeKey(ByteBuffer key) {
            return key.duplicate();
        }

        @Override
        public ByteBuffer encodeValue(ByteBuffer value) {
            return value.duplicate();
        }
    }
}

