/*
 * Decompiled with CFR 0.152.
 */
package org.springframework.data.redis.connection.lettuce;

import io.lettuce.core.Limit;
import io.lettuce.core.Range;
import io.lettuce.core.XAddArgs;
import io.lettuce.core.XClaimArgs;
import io.lettuce.core.XGroupCreateArgs;
import io.lettuce.core.XReadArgs;
import io.lettuce.core.cluster.api.reactive.RedisClusterReactiveCommands;
import io.lettuce.core.models.stream.PendingMessage;
import java.nio.ByteBuffer;
import java.util.Collection;
import java.util.List;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import org.springframework.data.redis.connection.ReactiveRedisConnection;
import org.springframework.data.redis.connection.ReactiveStreamCommands;
import org.springframework.data.redis.connection.lettuce.LettuceConverters;
import org.springframework.data.redis.connection.lettuce.LettuceReactiveRedisConnection;
import org.springframework.data.redis.connection.lettuce.RangeConverter;
import org.springframework.data.redis.connection.lettuce.StreamConverters;
import org.springframework.data.redis.connection.stream.ByteBufferRecord;
import org.springframework.data.redis.connection.stream.Consumer;
import org.springframework.data.redis.connection.stream.PendingMessages;
import org.springframework.data.redis.connection.stream.PendingMessagesSummary;
import org.springframework.data.redis.connection.stream.RecordId;
import org.springframework.data.redis.connection.stream.StreamInfo;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.connection.stream.StreamReadOptions;
import org.springframework.data.redis.connection.stream.StreamRecords;
import org.springframework.data.redis.util.ByteUtils;
import org.springframework.util.Assert;
import reactor.core.publisher.Flux;

class LettuceReactiveStreamCommands
implements ReactiveStreamCommands {
    private final LettuceReactiveRedisConnection connection;

    LettuceReactiveStreamCommands(LettuceReactiveRedisConnection connection) {
        Assert.notNull((Object)connection, "Connection must not be null");
        this.connection = connection;
    }

    @Override
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.AcknowledgeCommand, Long>> xAck(Publisher<ReactiveStreamCommands.AcknowledgeCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            Assert.notNull((Object)command.getGroup(), "Group must not be null");
            Assert.notNull(command.getRecordIds(), "recordIds must not be null");
            return cmd.xack(command.getKey(), ByteUtils.getByteBuffer(command.getGroup()), LettuceReactiveStreamCommands.entryIdsToString(command.getRecordIds())).map(value -> new ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.AcknowledgeCommand, Long>((ReactiveStreamCommands.AcknowledgeCommand)command, (Long)value));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.AddStreamRecord, RecordId>> xAdd(Publisher<ReactiveStreamCommands.AddStreamRecord> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            Assert.notNull(command.getBody(), "Body must not be null");
            XAddArgs args = new XAddArgs();
            if (!command.getRecord().getId().shouldBeAutoGenerated()) {
                args.id(command.getRecord().getId().getValue());
            }
            if (command.hasMaxlen()) {
                args.maxlen(command.getMaxlen());
            }
            if (command.hasMinId()) {
                args.minId(command.getMinId().getValue());
            }
            args.nomkstream(command.isNoMkStream());
            args.approximateTrimming(command.isApproximateTrimming());
            return cmd.xadd(command.getKey(), args, command.getBody()).map(value -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.AddStreamRecord, RecordId>((ReactiveStreamCommands.AddStreamRecord)command, RecordId.of(value)));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XClaimCommand, Flux<RecordId>>> xClaimJustId(Publisher<ReactiveStreamCommands.XClaimCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).map(command -> {
            String[] ids = command.getOptions().getIdsAsStringArray();
            io.lettuce.core.Consumer<ByteBuffer> from = io.lettuce.core.Consumer.from(ByteUtils.getByteBuffer(command.getGroupName()), ByteUtils.getByteBuffer(command.getNewOwner()));
            XClaimArgs args = StreamConverters.toXClaimArgs(command.getOptions()).justid();
            Flux<RecordId> result = cmd.xclaim(command.getKey(), from, args, ids).map(it -> RecordId.of(it.getId()));
            return new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XClaimCommand, Flux<RecordId>>((ReactiveStreamCommands.XClaimCommand)command, result);
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XClaimCommand, Flux<ByteBufferRecord>>> xClaim(Publisher<ReactiveStreamCommands.XClaimCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).map(command -> {
            String[] ids = command.getOptions().getIdsAsStringArray();
            io.lettuce.core.Consumer<ByteBuffer> from = io.lettuce.core.Consumer.from(ByteUtils.getByteBuffer(command.getGroupName()), ByteUtils.getByteBuffer(command.getNewOwner()));
            XClaimArgs args = StreamConverters.toXClaimArgs(command.getOptions());
            Flux<ByteBufferRecord> result = cmd.xclaim(command.getKey(), from, args, ids).map(it -> StreamRecords.newRecord().in((ByteBuffer)it.getStream()).withId(it.getId()).ofBuffer(it.getBody()));
            return new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XClaimCommand, Flux<ByteBufferRecord>>((ReactiveStreamCommands.XClaimCommand)command, result);
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.DeleteCommand, Long>> xDel(Publisher<ReactiveStreamCommands.DeleteCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            Assert.notNull(command.getRecordIds(), "recordIds must not be null");
            return cmd.xdel(command.getKey(), LettuceReactiveStreamCommands.entryIdsToString(command.getRecordIds())).map(value -> new ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.DeleteCommand, Long>((ReactiveStreamCommands.DeleteCommand)command, (Long)value));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>> xGroup(Publisher<ReactiveStreamCommands.GroupCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            Assert.notNull((Object)command.getGroupName(), "GroupName must not be null");
            if (command.getAction().equals((Object)ReactiveStreamCommands.GroupCommand.GroupCommandAction.CREATE)) {
                Assert.notNull((Object)command.getReadOffset(), "ReadOffset must not be null");
                XReadArgs.StreamOffset<ByteBuffer> offset = XReadArgs.StreamOffset.from(command.getKey(), command.getReadOffset().getOffset());
                return cmd.xgroupCreate(offset, ByteUtils.getByteBuffer(command.getGroupName()), XGroupCreateArgs.Builder.mkstream(command.isMkStream())).map(it -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, Object>((ReactiveStreamCommands.GroupCommand)command, it));
            }
            if (command.getAction().equals((Object)ReactiveStreamCommands.GroupCommand.GroupCommandAction.DELETE_CONSUMER)) {
                return cmd.xgroupDelconsumer(command.getKey(), io.lettuce.core.Consumer.from(ByteUtils.getByteBuffer(command.getGroupName()), ByteUtils.getByteBuffer(command.getConsumerName()))).map(it -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>((ReactiveStreamCommands.GroupCommand)command, "OK"));
            }
            if (command.getAction().equals((Object)ReactiveStreamCommands.GroupCommand.GroupCommandAction.DESTROY)) {
                return cmd.xgroupDestroy(command.getKey(), ByteUtils.getByteBuffer(command.getGroupName())).map(it -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.GroupCommand, String>((ReactiveStreamCommands.GroupCommand)command, Boolean.TRUE.equals(it) ? "OK" : "Error"));
            }
            throw new IllegalArgumentException("Unknown group command " + command.getAction());
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>> xLen(Publisher<ReactiveRedisConnection.KeyCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            return cmd.xlen(command.getKey()).map(value -> new ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>((ReactiveRedisConnection.KeyCommand)command, (Long)value));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.PendingRecordsCommand, PendingMessagesSummary>> xPendingSummary(Publisher<ReactiveStreamCommands.PendingRecordsCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            return cmd.xpending(command.getKey(), ByteUtils.getByteBuffer(command.getGroupName())).map(it -> StreamConverters.toPendingMessagesInfo(command.getGroupName(), it)).map(value -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.PendingRecordsCommand, PendingMessagesSummary>((ReactiveStreamCommands.PendingRecordsCommand)command, (PendingMessagesSummary)value));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.PendingRecordsCommand, PendingMessages>> xPending(Publisher<ReactiveStreamCommands.PendingRecordsCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            ByteBuffer groupName = ByteUtils.getByteBuffer(command.getGroupName());
            Range<String> range = RangeConverter.toRangeWithDefault(command.getRange(), "-", "+");
            Limit limit = command.isLimited() ? Limit.from(command.getCount()) : Limit.unlimited();
            Flux<PendingMessage> publisher = command.hasConsumer() ? cmd.xpending(command.getKey(), io.lettuce.core.Consumer.from(groupName, ByteUtils.getByteBuffer(command.getConsumerName())), range, limit) : cmd.xpending(command.getKey(), groupName, range, limit);
            return publisher.collectList().map(it -> StreamConverters.toPendingMessages(command.getGroupName(), command.getRange(), it)).map(value -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.PendingRecordsCommand, PendingMessages>((ReactiveStreamCommands.PendingRecordsCommand)command, (PendingMessages)value));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> xRange(Publisher<ReactiveStreamCommands.RangeCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).map(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            Assert.notNull(command.getRange(), "Range must not be null");
            Assert.notNull((Object)command.getLimit(), "Limit must not be null");
            Range<String> lettuceRange = RangeConverter.toRange(command.getRange(), Function.identity());
            Limit lettuceLimit = LettuceConverters.toLimit(command.getLimit());
            return new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>((ReactiveStreamCommands.RangeCommand)command, cmd.xrange(command.getKey(), lettuceRange, lettuceLimit).map(it -> StreamRecords.newRecord().in((ByteBuffer)it.getStream()).withId(it.getId()).ofBuffer(it.getBody())));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.ReadCommand, Flux<ByteBufferRecord>>> read(Publisher<ReactiveStreamCommands.ReadCommand> commands) {
        return Flux.from(commands).map(command -> {
            Assert.notNull(command.getStreamOffsets(), "StreamOffsets must not be null");
            Assert.notNull((Object)command.getReadOptions(), "ReadOptions must not be null");
            StreamReadOptions readOptions = command.getReadOptions();
            if (readOptions.isBlocking()) {
                return new ReactiveRedisConnection.CommandResponse((ReactiveStreamCommands.ReadCommand)command, this.connection.executeDedicated(cmd -> LettuceReactiveStreamCommands.doRead(command, readOptions, cmd)));
            }
            return new ReactiveRedisConnection.CommandResponse((ReactiveStreamCommands.ReadCommand)command, this.connection.execute(cmd -> LettuceReactiveStreamCommands.doRead(command, readOptions, cmd)));
        });
    }

    private static Flux<ByteBufferRecord> doRead(ReactiveStreamCommands.ReadCommand command, StreamReadOptions readOptions, RedisClusterReactiveCommands<ByteBuffer, ByteBuffer> cmd) {
        XReadArgs.StreamOffset<T>[] streamOffsets = LettuceReactiveStreamCommands.toStreamOffsets(command.getStreamOffsets());
        XReadArgs args = StreamConverters.toReadArgs(readOptions);
        if (command.getConsumer() == null) {
            return cmd.xread(args, streamOffsets).map(it -> StreamRecords.newRecord().in((ByteBuffer)it.getStream()).withId(it.getId()).ofBuffer(it.getBody()));
        }
        io.lettuce.core.Consumer<ByteBuffer> lettuceConsumer = LettuceReactiveStreamCommands.toConsumer(command.getConsumer());
        return cmd.xreadgroup(lettuceConsumer, args, streamOffsets).map(it -> StreamRecords.newRecord().in((ByteBuffer)it.getStream()).withId(it.getId()).ofBuffer(it.getBody()));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, StreamInfo.XInfoStream>> xInfo(Publisher<ReactiveStreamCommands.XInfoCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).flatMap(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            return cmd.xinfoStream(command.getKey()).collectList().map(StreamInfo.XInfoStream::fromList).map(it -> new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, StreamInfo.XInfoStream>((ReactiveStreamCommands.XInfoCommand)command, (StreamInfo.XInfoStream)it));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, Flux<StreamInfo.XInfoGroup>>> xInfoGroups(Publisher<ReactiveStreamCommands.XInfoCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).map(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            return new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, Flux<StreamInfo.XInfoGroup>>((ReactiveStreamCommands.XInfoCommand)command, cmd.xinfoGroups(command.getKey()).map(it -> StreamInfo.XInfoGroup.fromList((List)it)));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, Flux<StreamInfo.XInfoConsumer>>> xInfoConsumers(Publisher<ReactiveStreamCommands.XInfoCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).map(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            ByteBuffer groupName = ByteUtils.getByteBuffer(command.getGroupName());
            return new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.XInfoCommand, Flux<StreamInfo.XInfoConsumer>>((ReactiveStreamCommands.XInfoCommand)command, cmd.xinfoConsumers(command.getKey(), groupName).map(it -> new StreamInfo.XInfoConsumer(command.getGroupName(), (List)it)));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>> xRevRange(Publisher<ReactiveStreamCommands.RangeCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).map(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            Assert.notNull(command.getRange(), "Range must not be null");
            Assert.notNull((Object)command.getLimit(), "Limit must not be null");
            Range<String> lettuceRange = RangeConverter.toRange(command.getRange(), Function.identity());
            Limit lettuceLimit = LettuceConverters.toLimit(command.getLimit());
            return new ReactiveRedisConnection.CommandResponse<ReactiveStreamCommands.RangeCommand, Flux<ByteBufferRecord>>((ReactiveStreamCommands.RangeCommand)command, cmd.xrevrange(command.getKey(), lettuceRange, lettuceLimit).map(it -> StreamRecords.newRecord().in((ByteBuffer)it.getStream()).withId(it.getId()).ofBuffer(it.getBody())));
        }));
    }

    @Override
    public Flux<ReactiveRedisConnection.NumericResponse<ReactiveRedisConnection.KeyCommand, Long>> xTrim(Publisher<ReactiveStreamCommands.TrimCommand> commands) {
        return this.connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
            Assert.notNull((Object)command.getKey(), "Key must not be null");
            Assert.notNull((Object)command.getCount(), "Count must not be null");
            return cmd.xtrim(command.getKey(), command.isApproximateTrimming(), command.getCount()).map(value -> new ReactiveRedisConnection.NumericResponse<ReactiveStreamCommands.TrimCommand, Long>((ReactiveStreamCommands.TrimCommand)command, (Long)value));
        }));
    }

    private static <T> XReadArgs.StreamOffset<T>[] toStreamOffsets(Collection<StreamOffset<T>> streams) {
        return (XReadArgs.StreamOffset[])streams.stream().map(it -> XReadArgs.StreamOffset.from(it.getKey(), it.getOffset().getOffset())).toArray(XReadArgs.StreamOffset[]::new);
    }

    private static io.lettuce.core.Consumer<ByteBuffer> toConsumer(Consumer consumer) {
        return io.lettuce.core.Consumer.from(ByteUtils.getByteBuffer(consumer.getGroup()), ByteUtils.getByteBuffer(consumer.getName()));
    }

    private static String[] entryIdsToString(List<RecordId> recordIds) {
        if (recordIds.size() == 1) {
            return new String[]{recordIds.get(0).getValue()};
        }
        return (String[])recordIds.stream().map(RecordId::getValue).toArray(String[]::new);
    }
}

