package org.eclipse.microprofile.reactive.messaging.tck.signatures.processors;

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import javax.enterprise.context.ApplicationScoped;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.messaging.tck.TckBase;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;

@ApplicationScoped
/* loaded from: input_file:org/eclipse/microprofile/reactive/messaging/tck/signatures/processors/DirectProcessorBean.class */
public class DirectProcessorBean {
    private Map<String, List<String>> collector = new ConcurrentHashMap();
    private static final List<String> EXPECTED = Arrays.asList("1", "2", "3", "4", "5", "6", "7", "8", "9", "10");
    private static Map<String, AtomicInteger> counters = new ConcurrentHashMap();

    private static void increment(String str) {
        counters.computeIfAbsent(str, str2 -> {
            return new AtomicInteger(0);
        }).incrementAndGet();
    }

    @Outgoing("publisher-synchronous-message")
    public PublisherBuilder<Integer> streamForProcessorOfMessages() {
        return ReactiveStreams.of(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @Outgoing("publisher-synchronous-payload")
    public PublisherBuilder<Integer> streamForProcessorOfPayloads() {
        return ReactiveStreams.of(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @Outgoing("publisher-asynchronous-message")
    public PublisherBuilder<Integer> streamForProcessorBuilderOfMessages() {
        return ReactiveStreams.of(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @Outgoing("publisher-asynchronous-payload")
    public PublisherBuilder<Integer> streamForProcessorBuilderOfPayloads() {
        return ReactiveStreams.of(new Integer[]{0, 1, 2, 3, 4, 5, 6, 7, 8, 9});
    }

    @Incoming("synchronous-message")
    public void getMessgesFromProcessorOfMessages(String str) {
        add("processor-message", str);
    }

    @Incoming("synchronous-payload")
    public void getMessgesFromProcessorOfPayloads(String str) {
        add("processor-payload", str);
    }

    @Incoming("asynchronous-message")
    public void getMessgesFromProcessorBuilderOfMessages(String str) {
        add("processor-builder-message", str);
    }

    @Incoming("asynchronous-payload")
    public void getMessgesFromProcessorBuilderOfPayloads(String str) {
        add("processor-builder-payload", str);
    }

    @Outgoing("synchronous-message")
    @Incoming("publisher-synchronous-message")
    public Message<String> messageSynchronous(Message<Integer> message) {
        increment("synchronous-message");
        return Message.of(Integer.toString(((Integer) message.getPayload()).intValue() + 1));
    }

    @Outgoing("synchronous-payload")
    @Incoming("publisher-synchronous-payload")
    public String payloadSynchronous(int i) {
        increment("synchronous-payload");
        return Integer.toString(i + 1);
    }

    @Outgoing("asynchronous-message")
    @Incoming("publisher-asynchronous-message")
    public CompletionStage<Message<String>> messageAsynchronous(Message<Integer> message) {
        increment("asynchronous-message");
        return CompletableFuture.supplyAsync(() -> {
            return Message.of(Integer.toString(((Integer) message.getPayload()).intValue() + 1));
        }, TckBase.EXECUTOR);
    }

    @Outgoing("asynchronous-payload")
    @Incoming("publisher-asynchronous-payload")
    public CompletionStage<String> payloadAsynchronous(int i) {
        increment("asynchronous-payload");
        return CompletableFuture.supplyAsync(() -> {
            return Integer.toString(i + 1);
        }, TckBase.EXECUTOR);
    }

    private void add(String str, String str2) {
        this.collector.computeIfAbsent(str, str3 -> {
            return new CopyOnWriteArrayList();
        }).add(str2);
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    public void verify() {
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.collector.size() == 4);
        });
        Assertions.assertThat(this.collector).hasSize(4).allSatisfy((str, list) -> {
            Assertions.assertThat(list).containsExactlyElementsOf(EXPECTED);
        });
        Assertions.assertThat(counters.get("synchronous-message")).hasValue(EXPECTED.size());
        Assertions.assertThat(counters.get("synchronous-payload")).hasValue(EXPECTED.size());
        Assertions.assertThat(counters.get("asynchronous-message")).hasValue(EXPECTED.size());
        Assertions.assertThat(counters.get("asynchronous-payload")).hasValue(EXPECTED.size());
    }
}
