package org.eclipse.microprofile.reactive.messaging.tck.acknowledgement;

import java.util.List;
import java.util.ServiceLoader;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.stream.Stream;
import javax.enterprise.context.ApplicationScoped;
import javax.inject.Inject;
import org.assertj.core.api.Assertions;
import org.awaitility.Awaitility;
import org.eclipse.microprofile.reactive.messaging.Emitter;
import org.eclipse.microprofile.reactive.messaging.Incoming;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.Outgoing;
import org.eclipse.microprofile.reactive.messaging.tck.ArchiveExtender;
import org.eclipse.microprofile.reactive.messaging.tck.TckBase;
import org.jboss.arquillian.container.test.api.Deployment;
import org.jboss.shrinkwrap.api.Archive;
import org.jboss.shrinkwrap.api.ShrinkWrap;
import org.jboss.shrinkwrap.api.asset.EmptyAsset;
import org.jboss.shrinkwrap.api.spec.JavaArchive;
import org.junit.Test;

/* loaded from: input_file:org/eclipse/microprofile/reactive/messaging/tck/acknowledgement/PayloadProcessorAckTest.class */
public class PayloadProcessorAckTest extends TckBase {

    @Inject
    private EmitterBean bean;

    @Inject
    private PayloadProcessor processor;

    @Inject
    private Sink sink;

    @ApplicationScoped
    /* loaded from: input_file:org/eclipse/microprofile/reactive/messaging/tck/acknowledgement/PayloadProcessorAckTest$PayloadProcessor.class */
    public static class PayloadProcessor {
        private boolean failureModeEnabled = false;

        public void enableFailureMode() {
            this.failureModeEnabled = true;
        }

        public void disableFailureMode() {
            this.failureModeEnabled = false;
        }

        @Outgoing("out")
        @Incoming("data")
        public String process(String str) {
            if (this.failureModeEnabled && str.equalsIgnoreCase("b")) {
                throw new IllegalArgumentException("b");
            }
            return str.toUpperCase();
        }
    }

    @ApplicationScoped
    /* loaded from: input_file:org/eclipse/microprofile/reactive/messaging/tck/acknowledgement/PayloadProcessorAckTest$Sink.class */
    public static class Sink {
        private final List<String> list = new CopyOnWriteArrayList();

        @Incoming("out")
        public void consume(String str) {
            this.list.add(str);
        }

        public List<String> list() {
            return this.list;
        }

        public void reset() {
            this.list.clear();
        }
    }

    @Deployment
    public static Archive<JavaArchive> deployment() {
        JavaArchive addAsManifestResource = ShrinkWrap.create(JavaArchive.class).addClasses(new Class[]{EmitterBean.class, Sink.class, PayloadProcessor.class, ArchiveExtender.class}).addAsManifestResource(EmptyAsset.INSTANCE, "beans.xml");
        ServiceLoader.load(ArchiveExtender.class).iterator().forEachRemaining(archiveExtender -> {
            archiveExtender.extend(addAsManifestResource);
        });
        return addAsManifestResource;
    }

    @Test
    public void testThatMessagesAreAckedAfterSuccessfulProcessingOfMessage() throws InterruptedException, TimeoutException, ExecutionException {
        this.sink.reset();
        this.processor.disableFailureMode();
        Emitter<String> emitter = this.bean.getEmitter();
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        ConcurrentHashMap.KeySetView newKeySet2 = ConcurrentHashMap.newKeySet();
        Assertions.assertThat(run(newKeySet, newKeySet2, emitter)).isEmpty();
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.sink.list().size() == 10);
        });
        Assertions.assertThat(newKeySet).hasSize(10);
        Assertions.assertThat(newKeySet2).hasSize(0);
    }

    @Test
    public void testThatMessagesAreNackedAfterFailingProcessingOfMessage() throws InterruptedException, TimeoutException, ExecutionException {
        this.sink.reset();
        Emitter<String> emitter = this.bean.getEmitter();
        ConcurrentHashMap.KeySetView newKeySet = ConcurrentHashMap.newKeySet();
        ConcurrentHashMap.KeySetView newKeySet2 = ConcurrentHashMap.newKeySet();
        this.processor.enableFailureMode();
        List<Throwable> run = run(newKeySet, newKeySet2, emitter);
        Awaitility.await().until(() -> {
            return Boolean.valueOf(this.sink.list().size() == 9);
        });
        Assertions.assertThat(newKeySet).hasSize(9);
        Assertions.assertThat(newKeySet2).hasSize(1);
        Assertions.assertThat(run).hasSize(1);
    }

    private List<Throwable> run(Set<String> set, Set<String> set2, Emitter<String> emitter) throws InterruptedException, TimeoutException, ExecutionException {
        CopyOnWriteArrayList copyOnWriteArrayList = new CopyOnWriteArrayList();
        CompletableFuture.allOf((CompletableFuture[]) Stream.of((Object[]) new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}).map(str -> {
            return CompletableFuture.runAsync(() -> {
                emitter.send(Message.of(str, () -> {
                    set.add(str);
                    return CompletableFuture.completedFuture(null);
                }, th -> {
                    copyOnWriteArrayList.add(th);
                    set2.add(str);
                    return CompletableFuture.completedFuture(null);
                }));
            }).thenApply(r3 -> {
                return str;
            });
        }).toArray(i -> {
            return new CompletableFuture[i];
        })).get(10L, TimeUnit.SECONDS);
        return copyOnWriteArrayList;
    }
}
