package org.gytheio.messaging.benchmark;

import java.io.PrintStream;
import java.text.DecimalFormat;
import java.util.Date;
import java.util.concurrent.Executors;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.amqp.AMQPComponent;
import org.apache.camel.component.jackson.JacksonDataFormat;
import org.apache.camel.component.jms.JmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.util.URISupport;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.qpid.jms.ConnectionURL;
import org.gytheio.messaging.MessageConsumer;
import org.gytheio.messaging.MessageProducer;
import org.gytheio.messaging.amqp.AmqpDirectEndpoint;
import org.gytheio.messaging.amqp.AmqpNodeBootstrapUtils;
import org.gytheio.messaging.camel.CamelMessageProducer;
import org.gytheio.messaging.jackson.ObjectMapperFactory;

/* loaded from: input_file:org/gytheio/messaging/benchmark/BenchmarkRunner.class */
public class BenchmarkRunner {
    private static final Log logger = LogFactory.getLog(BenchmarkRunner.class);
    protected static final String DEFAULT_ENDPOINT = "queue:gytheio.test.benchmark";
    protected static final String LOG_SEPERATOR = "--------------------------------------------------\n";
    private static final long CHECK_CONSUMER_COMPLETE_PERIOD_MS = 100;
    protected int logAfterNumMessages = 1000;
    protected String brokerUrl;
    protected String brokerUsername;
    protected String brokerPassword;
    protected String endpointSend;
    protected String endpointReceive;
    protected int numMessages;
    protected boolean runProducer;
    protected boolean runConsumer;

    public BenchmarkRunner(String str, String str2, String str3, String str4, String str5, int i, boolean z, boolean z2) {
        this.brokerUrl = str;
        this.brokerUsername = str2;
        this.brokerPassword = str3;
        this.endpointSend = str4;
        this.endpointReceive = str5;
        this.numMessages = i;
        this.runProducer = z;
        this.runConsumer = z2;
    }

    protected String getDefaultReceiveEndpoint(String str) {
        return str;
    }

    protected Object getBenchmarkMessage(int i) {
        return BenchmarkMessage.createInstance();
    }

    protected BenchmarkConsumer getBenchmarkConsumer() {
        BenchmarkConsumer benchmarkConsumer = new BenchmarkConsumer();
        benchmarkConsumer.setLogAfterNumMessages(this.logAfterNumMessages);
        return benchmarkConsumer;
    }

    public void runBenchmark() throws Exception {
        MessageProducer initializeCamelEndpoint;
        if (this.endpointSend == null) {
            this.endpointSend = DEFAULT_ENDPOINT;
        }
        if (this.endpointReceive == null) {
            this.endpointReceive = getDefaultReceiveEndpoint(this.endpointSend);
        }
        BenchmarkConsumer benchmarkConsumer = this.runConsumer ? getBenchmarkConsumer() : null;
        if (this.brokerUrl.startsWith("tcp") || this.brokerUrl.startsWith(ConnectionURL.OPTIONS_FAILOVER) || this.brokerUrl.startsWith("ssl")) {
            logger.debug("Initializing Camel Endpoint: " + this.brokerUrl + (this.brokerUsername != null ? " (" + this.brokerUsername + URISupport.RAW_TOKEN_END : ""));
            initializeCamelEndpoint = initializeCamelEndpoint(this.brokerUrl, this.brokerUsername, this.brokerPassword, this.endpointSend, this.endpointReceive, benchmarkConsumer);
        } else {
            if (!this.brokerUrl.startsWith(ConnectionURL.AMQ_PROTOCOL)) {
                throw new IllegalArgumentException("Unsupported transport in " + this.brokerUrl);
            }
            logger.debug("Initializing AmqpDirect Endpoint: " + this.brokerUrl + (this.brokerUsername != null ? " (" + this.brokerUsername + URISupport.RAW_TOKEN_END : ""));
            initializeCamelEndpoint = initializeAmqpDirectEndpoint(this.brokerUrl, this.brokerUsername, this.brokerPassword, this.endpointSend, this.endpointReceive, benchmarkConsumer);
        }
        logStart(this.numMessages, this.brokerUrl, this.endpointSend, this.endpointReceive, this.runProducer, this.runConsumer);
        long time = new Date().getTime();
        long j = 0;
        if (this.runProducer) {
            for (int i = 0; i < this.numMessages; i++) {
                initializeCamelEndpoint.send(getBenchmarkMessage(i));
                if (i > 0 && i % this.logAfterNumMessages == 0) {
                    logger.debug("Sent " + (i + 1) + " messages...");
                } else if (logger.isTraceEnabled()) {
                    logger.trace("Sent " + (i + 1) + " messages...");
                }
            }
            j = new Date().getTime() - time;
        }
        long j2 = 0;
        if (this.runConsumer) {
            int i2 = 0;
            int messageCount = benchmarkConsumer.getMessageCount();
            while (messageCount < this.numMessages && i2 < 100) {
                try {
                    if (logger.isTraceEnabled()) {
                        logger.trace("Consumer still working, sleeping 100ms");
                    }
                    Thread.sleep(CHECK_CONSUMER_COMPLETE_PERIOD_MS);
                } catch (InterruptedException e) {
                }
                if (messageCount < benchmarkConsumer.getMessageCount()) {
                    messageCount = benchmarkConsumer.getMessageCount();
                    i2 = 0;
                } else {
                    i2++;
                    if (logger.isTraceEnabled()) {
                        logger.trace("Anymore messages - none received since last sleep (x" + i2 + URISupport.RAW_TOKEN_END);
                    }
                }
            }
            j2 = new Date().getTime() - time;
        }
        if (initializeCamelEndpoint instanceof AmqpDirectEndpoint) {
            logger.debug("Waiting for 5 secs ...");
            Thread.sleep(5 * 1000);
        }
        logStatistics(this.runProducer ? initializeCamelEndpoint : null, this.runConsumer ? benchmarkConsumer : null, getBenchmarkMessage(0), this.numMessages, this.runConsumer ? benchmarkConsumer.getMessageCount() : 0, j, j2);
        System.exit(0);
    }

    protected MessageProducer initializeCamelEndpoint(String str, String str2, String str3, final String str4, final String str5, final MessageConsumer messageConsumer) throws Exception {
        DefaultCamelContext defaultCamelContext = new DefaultCamelContext();
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(str2, str3, str);
        JmsComponent jmsComponent = AMQPComponent.jmsComponent();
        jmsComponent.setConnectionFactory(activeMQConnectionFactory);
        defaultCamelContext.addComponent(ConnectionURL.AMQ_PROTOCOL, jmsComponent);
        final JacksonDataFormat jacksonDataFormat = new JacksonDataFormat(ObjectMapperFactory.createInstance(), (Class<?>) Object.class);
        if (messageConsumer != null) {
            defaultCamelContext.addRoutes(new RouteBuilder() { // from class: org.gytheio.messaging.benchmark.BenchmarkRunner.1
                @Override // org.apache.camel.builder.RouteBuilder
                public void configure() {
                    from("amqp:" + str5).unmarshal(jacksonDataFormat).bean(messageConsumer, "onReceive");
                }
            });
        }
        defaultCamelContext.addRoutes(new RouteBuilder() { // from class: org.gytheio.messaging.benchmark.BenchmarkRunner.2
            @Override // org.apache.camel.builder.RouteBuilder
            public void configure() {
                from("direct:benchmark.test").marshal(jacksonDataFormat).to("amqp:" + str4);
            }
        });
        CamelMessageProducer camelMessageProducer = new CamelMessageProducer();
        camelMessageProducer.setProducer(defaultCamelContext.createProducerTemplate());
        camelMessageProducer.setEndpoint("direct:benchmark.test");
        defaultCamelContext.start();
        return camelMessageProducer;
    }

    protected MessageProducer initializeAmqpDirectEndpoint(String str, String str2, String str3, String str4, String str5, MessageConsumer messageConsumer) {
        AmqpDirectEndpoint createEndpoint = AmqpNodeBootstrapUtils.createEndpoint(messageConsumer, str, str2, str3, str4, str5);
        if (messageConsumer != null) {
            Executors.newCachedThreadPool().execute(createEndpoint.getListener());
            while (!createEndpoint.isInitialized()) {
                try {
                    Thread.sleep(CHECK_CONSUMER_COMPLETE_PERIOD_MS);
                } catch (InterruptedException e) {
                }
            }
        }
        return createEndpoint;
    }

    protected void logStart(int i, String str, String str2, String str3, boolean z, boolean z2) {
        System.out.println("\n\n--------------------------------------------------\nBENCHMARK START\n--------------------------------------------------\n" + ((z && z2) ? "Simultaneously sending and receiving...\n\n" : "") + ((!z || z2) ? "" : "Sending...\n\n") + ((!z2 || z) ? "" : "Receiving...\n\n") + "Number of Messages: " + i + "\nBroker URL:         " + str + IOUtils.LINE_SEPARATOR_UNIX + (z ? "Send Endpoint:      " + str2 + IOUtils.LINE_SEPARATOR_UNIX : "") + (z2 ? "Receive Endpoint:   " + str3 + IOUtils.LINE_SEPARATOR_UNIX : "") + LOG_SEPERATOR);
    }

    protected void logStatistics(MessageProducer messageProducer, BenchmarkConsumer benchmarkConsumer, Object obj, int i, int i2, long j, long j2) {
        String str;
        double d = i / ((benchmarkConsumer != null ? j2 : j) / 1000.0d);
        PrintStream printStream = System.out;
        StringBuilder append = new StringBuilder().append("\n--------------------------------------------------\nBENCHMARK RESULTS\n--------------------------------------------------\n").append(messageProducer != null ? "MessageProducer: " + messageProducer.getClass().getSimpleName() + IOUtils.LINE_SEPARATOR_UNIX : "").append(benchmarkConsumer != null ? "MessageConsumer: " + benchmarkConsumer.getClass().getSimpleName() + IOUtils.LINE_SEPARATOR_UNIX : "").append("Message Type:    ").append(obj.getClass().getSimpleName()).append(IOUtils.LINE_SEPARATOR_UNIX).append(messageProducer != null ? "Sent:            " + i + " messages in " + formatMillis(j) + IOUtils.LINE_SEPARATOR_UNIX : "");
        if (benchmarkConsumer != null) {
            str = "Received:        " + i2 + " messages in " + formatMillis(j2) + (i2 != i ? " (*)" : "") + IOUtils.LINE_SEPARATOR_UNIX;
        } else {
            str = "";
        }
        printStream.println(append.append(str).append("Throughput:      ").append(Math.round(d)).append(" messages/second\n").append(LOG_SEPERATOR).append("\nNote that results include time taken for factors\nlike marshalling/unmarshalling of messages, network\nlatency, etc., and is not a direct measure of the\nbroker's performance.\n").toString());
    }

    protected String formatMillis(long j) {
        String str;
        if (j > 1000) {
            str = new DecimalFormat("#.#").format(j / 1000.0d) + " seconds";
        } else {
            str = j + "ms";
        }
        return str;
    }
}
