package org.eclipse.microprofile.reactive.messaging.tck.metrics;

import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.enterprise.context.ApplicationScoped;
import org.eclipse.microprofile.config.Config;
import org.eclipse.microprofile.reactive.messaging.Message;
import org.eclipse.microprofile.reactive.messaging.spi.Connector;
import org.eclipse.microprofile.reactive.messaging.spi.IncomingConnectorFactory;
import org.eclipse.microprofile.reactive.messaging.spi.OutgoingConnectorFactory;
import org.eclipse.microprofile.reactive.streams.operators.ProcessorBuilder;
import org.eclipse.microprofile.reactive.streams.operators.PublisherBuilder;
import org.eclipse.microprofile.reactive.streams.operators.ReactiveStreams;
import org.eclipse.microprofile.reactive.streams.operators.SubscriberBuilder;
import org.junit.Assert;

@ApplicationScoped
@Connector(TestConnector.ID)
/* loaded from: input_file:org/eclipse/microprofile/reactive/messaging/tck/metrics/TestConnector.class */
public class TestConnector implements IncomingConnectorFactory, OutgoingConnectorFactory {
    public static final String ID = "test-connector";
    private Map<String, FlowableEmitter<Message<String>>> incomingEmitters = new HashMap();
    private Map<String, LinkedBlockingQueue<Message<String>>> outgoingQueues = new HashMap();

    public SubscriberBuilder<? extends Message<String>, Void> getSubscriberBuilder(Config config) {
        String str = (String) config.getValue("channel-name", String.class);
        LinkedBlockingQueue<Message<String>> linkedBlockingQueue = new LinkedBlockingQueue<>();
        this.outgoingQueues.put(str, linkedBlockingQueue);
        ProcessorBuilder builder = ReactiveStreams.builder();
        linkedBlockingQueue.getClass();
        return builder.forEach((v1) -> {
            r1.add(v1);
        });
    }

    public PublisherBuilder<? extends Message<String>> getPublisherBuilder(Config config) {
        String str = (String) config.getValue("channel-name", String.class);
        return ReactiveStreams.fromPublisher(Flowable.create(flowableEmitter -> {
            this.incomingEmitters.put(str, flowableEmitter);
        }, BackpressureStrategy.BUFFER));
    }

    public void send(String str, Message<String> message) {
        FlowableEmitter<Message<String>> flowableEmitter = this.incomingEmitters.get(str);
        if (flowableEmitter == null) {
            throw new RuntimeException("No such incoming channel registered: " + str);
        }
        flowableEmitter.onNext(message);
    }

    public Message<String> get(String str) {
        LinkedBlockingQueue<Message<String>> linkedBlockingQueue = this.outgoingQueues.get(str);
        if (linkedBlockingQueue == null) {
            throw new RuntimeException("No such outgoing channel registered: " + str);
        }
        Message<String> message = null;
        try {
            message = linkedBlockingQueue.poll(5L, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            Assert.fail("Interrupted while waiting for messages");
        }
        if (message == null) {
            Assert.fail("Timed out waiting for messages");
        }
        return message;
    }
}
