From 41b28f4b23cb42bb168a7b1dfb0fb6e5d6faa053 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 10 Feb 2015 10:53:02 -0500 Subject: [PATCH 1/4] ACTIVEMQ6-78 Adding tests to evaluate this task https://issues.apache.org/jira/browse/ACTIVEMQ6-78 This commit is just adding tests I used to debug the blocked calls issue There are some profiling parameters you can use that I added as a comment to the pom The reason this is a separate commit is that it would be easier to validate the results of optimizations while checking after and before any changes --- pom.xml | 7 +- .../sends/AbstractSendReceivePerfTest.java | 247 ++++++++++++++++++ .../performance/sends/ClientACKPerf.java | 130 +++++++++ .../sends/MeasureCommitPerfTest.java | 81 ++++++ .../tests/performance/sends/PreACKPerf.java | 93 +++++++ 5 files changed, 557 insertions(+), 1 deletion(-) create mode 100644 tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java create mode 100644 tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java create mode 100644 tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java create mode 100644 tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java diff --git a/pom.xml b/pom.xml index c3ae18af4d..417214582b 100644 --- a/pom.xml +++ b/pom.xml @@ -79,7 +79,12 @@ see https://intellij-support.jetbrains.com/entries/23395793 Also see: http://youtrack.jetbrains.com/issue/IDEA-125696 - --> + + + For profiling add this line and use jmc (Java Mission Control) to evaluate the results: + -XX:+UnlockCommercialFeatures -XX:+FlightRecorder -XX:StartFlightRecording=delay=30s,duration=120s,filename=/tmp/myrecording.jfr + + --> -Djava.util.logging.manager=org.jboss.logmanager.LogManager -Dlogging.configuration=file:${activemq.basedir}/tests/config/logging.properties diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java new file mode 100644 index 0000000000..c8b2d7fb66 --- /dev/null +++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/AbstractSendReceivePerfTest.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tests.performance.sends; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.activemq.api.core.TransportConfiguration; +import org.apache.activemq.api.jms.ActiveMQJMSClient; +import org.apache.activemq.core.settings.impl.AddressFullMessagePolicy; +import org.apache.activemq.core.settings.impl.AddressSettings; +import org.apache.activemq.tests.util.JMSTestBase; +import org.junit.Before; +import org.junit.Test; + +/** + * Client-ack time + * + * @author Clebert Suconic + */ +public abstract class AbstractSendReceivePerfTest extends JMSTestBase +{ + protected static final String Q_NAME = "test-queue-01"; + private Queue queue; + + protected AtomicBoolean running = new AtomicBoolean(true); + + + @Override + @Before + public void setUp() throws Exception + { + super.setUp(); + + jmsServer.createQueue(false, Q_NAME, null, true, Q_NAME); + queue = ActiveMQJMSClient.createQueue(Q_NAME); + + AddressSettings settings = new AddressSettings(); + settings.setAddressFullMessagePolicy(AddressFullMessagePolicy.BLOCK); + settings.setMaxSizeBytes(Long.MAX_VALUE); + server.getAddressSettingsRepository().clear(); + server.getAddressSettingsRepository().addMatch("#", settings); + + } + + + @Override + protected void registerConnectionFactory() throws Exception + { + List connectorConfigs = new ArrayList(); + connectorConfigs.add(new TransportConfiguration(NETTY_CONNECTOR_FACTORY)); + + createCF(connectorConfigs, "/cf"); + + cf = (ConnectionFactory) namingContext.lookup("/cf"); + } + + + private static final java.util.logging.Logger LOGGER = java.util.logging.Logger.getLogger(AbstractSendReceivePerfTest.class.getName()); + + + @Test + public void testSendReceive() throws Exception + { + long numberOfSamples = Long.getLong("HORNETQ_TEST_SAMPLES", 1000); + + + MessageReceiver receiver = new MessageReceiver(Q_NAME, numberOfSamples); + receiver.start(); + MessageSender sender = new MessageSender(Q_NAME); + sender.start(); + + receiver.join(); + sender.join(); + + assertFalse(receiver.failed); + assertFalse(sender.failed); + + } + + final Semaphore pendingCredit = new Semaphore(5000); + + /** + * to be called after a message is consumed + * so the flow control of the test kicks in. + */ + protected final void afterConsume(Message message) + { + if (message != null) + { + pendingCredit.release(); + } + } + + + protected final void beforeSend() + { + while (running.get()) + { + try + { + if (pendingCredit.tryAcquire(1, TimeUnit.SECONDS)) + { + return; + } + else + { + System.out.println("Couldn't get credits!"); + } + } + catch (Throwable e) + { + throw new RuntimeException(e.getMessage(), e); + } + } + } + + + + + private class MessageReceiver extends Thread + { + private final String qName; + private final long numberOfSamples; + + public boolean failed = false; + + public MessageReceiver(String qname, long numberOfSamples) throws Exception + { + super("Receiver " + qname); + this.qName = qname; + this.numberOfSamples = numberOfSamples; + } + + @Override + public void run() + { + try + { + LOGGER.info("Receiver: Connecting"); + Connection c = cf.createConnection(); + + consumeMessages(c, qName); + + c.close(); + } + catch (Exception e) + { + e.printStackTrace(); + failed = true; + } + finally + { + running.set(false); + } + } + } + + protected abstract void consumeMessages(Connection c, String qName) throws Exception; + + private class MessageSender extends Thread + { + protected String qName; + + public boolean failed = false; + + public MessageSender(String qname) throws Exception + { + super("Sender " + qname); + + this.qName = qname; + } + + @Override + public void run() + { + try + { + LOGGER.info("Sender: Connecting"); + Connection c = cf.createConnection(); + + sendMessages(c, qName); + + c.close(); + + } + catch (Exception e) + { + failed = true; + if (e instanceof InterruptedException) + { + LOGGER.info("Sender done."); + } + else + { + e.printStackTrace(); + } + } + } + } + + /* This will by default send non persistent messages */ + protected void sendMessages(Connection c, String qName) throws JMSException + { + Session s = null; + s = c.createSession(false, Session.AUTO_ACKNOWLEDGE); + LOGGER.info("Sender: Using AUTO-ACK session"); + + + Queue q = s.createQueue(qName); + MessageProducer producer = s.createProducer(null); + producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT); + + + long sent = 0; + while (running.get()) + { + beforeSend(); + producer.send(q, s.createTextMessage("Message_" + (sent++))); + } + } +} \ No newline at end of file diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java new file mode 100644 index 0000000000..6dba4f41e5 --- /dev/null +++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/ClientACKPerf.java @@ -0,0 +1,130 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tests.performance.sends; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +/** + * @author clebertsuconic + */ +@RunWith(Parameterized.class) +public class ClientACKPerf extends AbstractSendReceivePerfTest +{ + + @Parameterized.Parameters(name = "batchSize={0}") + public static Collection data() + { + List list = Arrays.asList(new Object[][]{ + {1}, + {2000}}); + + System.out.println("Size = " + list.size()); + return list; + } + + public ClientACKPerf(int batchSize) + { + super(); + this.batchSize = batchSize; + } + + + public final int batchSize; + + @Override + protected void consumeMessages(Connection c, String qName) throws Exception + { + int mode = 0; + mode = Session.CLIENT_ACKNOWLEDGE; + + System.out.println("Receiver: Using PRE-ACK mode"); + + Session s = c.createSession(false, mode); + Queue q = s.createQueue(qName); + MessageConsumer consumer = s.createConsumer(q, null, false); + + c.start(); + + Message m = null; + + long totalTimeACKTime = 0; + + + long start = System.currentTimeMillis(); + + long nmessages = 0; + long timeout = System.currentTimeMillis() + 60 * 1000; + while (timeout > System.currentTimeMillis()) + { + m = consumer.receive(5000); + afterConsume(m); + + + if (m == null) + { + throw new Exception("Failed with m = null"); + } + + if (nmessages++ % batchSize == 0) + { + long startACK = System.nanoTime(); + m.acknowledge(); + long endACK = System.nanoTime(); + totalTimeACKTime += (endACK - startACK); + } + + + if (nmessages % 10000 == 0) + { + printMsgsSec(start, nmessages, totalTimeACKTime); + } + } + + + printMsgsSec(start, nmessages, totalTimeACKTime); + } + + + + protected void printMsgsSec(final long start, final double nmessages, final double totalTimeACKTime) + { + + long end = System.currentTimeMillis(); + double elapsed = ((double) end - (double) start) / 1000f; + + double messagesPerSecond = nmessages / elapsed; + double nAcks = nmessages / batchSize; + + System.out.println("batchSize=" + batchSize + ", numberOfMessages=" + + nmessages + ", elapsedTime=" + elapsed + " msgs/sec= " + messagesPerSecond + ",totalTimeAcking=" + String.format("%10.4f", totalTimeACKTime) + + ", avgACKTime=" + String.format("%10.4f", (totalTimeACKTime / nAcks))); + + } + + +} diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java new file mode 100644 index 0000000000..0528d0eae6 --- /dev/null +++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/MeasureCommitPerfTest.java @@ -0,0 +1,81 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tests.performance.sends; + +import javax.jms.Connection; +import javax.jms.JMSException; +import javax.jms.Session; + + +/** + * @author clebertsuconic + */ + +public class MeasureCommitPerfTest extends AbstractSendReceivePerfTest +{ + @Override + protected void consumeMessages(Connection c, String qName) throws Exception + { + } + + + /* This will by default send non persistent messages */ + protected void sendMessages(Connection c, String qName) throws JMSException + { + Session s = c.createSession(true, Session.SESSION_TRANSACTED); + + + long timeout = System.currentTimeMillis() + 30 * 1000; + + long startMeasure = System.currentTimeMillis() + 5000; + long start = 0; + long committs = 0; + while (timeout > System.currentTimeMillis()) + { + + if (start == 0 && System.currentTimeMillis() > startMeasure) + { + System.out.println("heat up"); + start = System.currentTimeMillis(); + committs = 0; + } + + s.commit(); + committs++; + if (start > 0 && committs % 1000 == 0) printCommitsSecond(start, committs); + } + printCommitsSecond(start, committs); + + s.close(); + } + + + protected void printCommitsSecond(final long start, final double committs) + { + + long end = System.currentTimeMillis(); + double elapsed = ((double) end - (double) start) / 1000f; + + double commitsPerSecond = committs / elapsed; + + System.out.println("end = " + end + ", start=" + start + ", numberOfMessages=" + + committs + ", elapsed=" + elapsed + " msgs/sec= " + commitsPerSecond); + + } + + +} diff --git a/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java new file mode 100644 index 0000000000..a6d2906017 --- /dev/null +++ b/tests/performance-tests/src/test/java/org/apache/activemq/tests/performance/sends/PreACKPerf.java @@ -0,0 +1,93 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.tests.performance.sends; + +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.api.jms.ActiveMQJMSConstants; + +/** + * @author clebertsuconic + */ + +public class PreACKPerf extends AbstractSendReceivePerfTest +{ + @Override + protected void consumeMessages(Connection c, String qName) throws Exception + { + int mode = 0; + mode = ActiveMQJMSConstants.PRE_ACKNOWLEDGE; + + System.out.println("Receiver: Using PRE-ACK mode"); + + Session s = c.createSession(false, mode); + Queue q = s.createQueue(qName); + MessageConsumer consumer = s.createConsumer(q, null, false); + + c.start(); + + Message m = null; + + + long start = System.currentTimeMillis(); + + long nmessages = 0; + long timeout = System.currentTimeMillis() + 30 * 1000; + while (timeout > System.currentTimeMillis()) + { + m = consumer.receive(5000); + + nmessages++; + + if (m == null) + { + throw new Exception("Failed with m = null"); + } + + if (nmessages % 10000 == 0) + { + printMsgsSec(start, nmessages); + } + + } + + long end = System.currentTimeMillis(); + + printMsgsSec(start, nmessages); + } + + + + protected void printMsgsSec(final long start, final double nmessages) + { + + long end = System.currentTimeMillis(); + double elapsed = ((double) end - (double) start) / 1000f; + + double messagesPerSecond = nmessages / elapsed; + + System.out.println("end = " + end + ", start=" + start + ", numberOfMessages=" + + nmessages + ", elapsed=" + elapsed + " msgs/sec= " + messagesPerSecond); + + } + + +} From f7c4d56cc771e2d4ff54fd2796ad2cdfae9f5e13 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Tue, 10 Feb 2015 11:30:18 -0500 Subject: [PATCH 2/4] ACTIVEMQ6-78 Improving performance over Netty NIO blocked calls https://issues.apache.org/jira/browse/ACTIVEMQ6-78 performance work There are two aspects of this work. First avoid asynchronous packets and avoid context switch over the executors. Packet had a method to make certain packets such as commit to use a different executor. Since it's NIO everything is done at the Netty thread now. The second aspect was to make sure we use the proper buffering --- .../buffers/impl/ChannelBufferWrapper.java | 12 ++++++ .../impl/ResetLimitWrappedActiveMQBuffer.java | 4 +- .../activemq/core/protocol/core/Packet.java | 2 - .../impl/ActiveMQClientProtocolManager.java | 2 +- .../core/protocol/core/impl/PacketImpl.java | 7 +--- .../core/impl/RemotingConnectionImpl.java | 37 +------------------ .../core/impl/wireformat/RollbackMessage.java | 6 --- .../impl/wireformat/SessionCloseMessage.java | 6 --- .../impl/wireformat/SessionCommitMessage.java | 5 --- .../wireformat/SessionXACommitMessage.java | 6 --- .../wireformat/SessionXAPrepareMessage.java | 6 --- .../wireformat/SessionXARollbackMessage.java | 6 --- .../remoting/impl/netty/NettyConnection.java | 6 +-- .../remoting/impl/netty/NettyConnector.java | 3 +- .../protocol/AbstractRemotingConnection.java | 4 +- .../spi/core/protocol/RemotingConnection.java | 3 +- .../spi/core/remoting/Connection.java | 2 +- .../protocol/openwire/OpenWireConnection.java | 2 +- .../core/protocol/stomp/StompConnection.java | 2 +- .../remoting/impl/invm/InVMConnection.java | 2 +- .../impl/netty/NettyServerConnection.java | 2 +- .../jms/tests/message/MessageHeaderTest.java | 4 +- .../impl/netty/NettyConnectionTest.java | 2 +- 23 files changed, 35 insertions(+), 96 deletions(-) diff --git a/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java b/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java index a3fa5b512d..53d7306173 100644 --- a/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java +++ b/activemq-commons/src/main/java/org/apache/activemq/core/buffers/impl/ChannelBufferWrapper.java @@ -35,6 +35,18 @@ public class ChannelBufferWrapper implements ActiveMQBuffer protected ByteBuf buffer; // NO_UCD (use final) private final boolean releasable; + public static ByteBuf unwrap(ByteBuf buffer) + { + ByteBuf parent; + while ((parent = buffer.unwrap()) != null && + parent != buffer) // this last part is just in case the semantic + { // ever changes where unwrap is returning itself + buffer = parent; + } + + return buffer; + } + public ChannelBufferWrapper(final ByteBuf buffer) { this(buffer, false); diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java b/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java index 96b3e2b6a4..1cd342d806 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/buffers/impl/ResetLimitWrappedActiveMQBuffer.java @@ -46,7 +46,9 @@ public final class ResetLimitWrappedActiveMQBuffer extends ChannelBufferWrapper public ResetLimitWrappedActiveMQBuffer(final int limit, final ActiveMQBuffer buffer, final MessageInternal message) { - super(buffer.byteBuf()); + // a wrapped inside a wrapper will increase the stack size. + // we fixed this here due to some profiling testing + super(unwrap(buffer.byteBuf())); this.limit = limit; diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java index 4027c67c77..6b23bffca3 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/Packet.java @@ -85,6 +85,4 @@ public interface Packet * @return true if confirmation is required */ boolean isRequiresConfirmations(); - - boolean isAsyncExec(); } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java index b7366df301..890e8d097b 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/ActiveMQClientProtocolManager.java @@ -482,7 +482,7 @@ public class ActiveMQClientProtocolManager implements ClientProtocolManager { // no need to send handshake on inVM as inVM is not using the NettyProtocolHandling String handshake = "HORNETQ"; - ActiveMQBuffer amqbuffer = connection.createBuffer(handshake.length()); + ActiveMQBuffer amqbuffer = connection.createTransportBuffer(handshake.length()); amqbuffer.writeBytes(handshake.getBytes()); transportConnection.write(amqbuffer); } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java index e2bdc28a56..61e0eca07f 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/PacketImpl.java @@ -276,7 +276,7 @@ public class PacketImpl implements Packet public ActiveMQBuffer encode(final RemotingConnection connection) { - ActiveMQBuffer buffer = connection.createBuffer(PacketImpl.INITIAL_PACKET_SIZE); + ActiveMQBuffer buffer = connection.createTransportBuffer(PacketImpl.INITIAL_PACKET_SIZE); // The standard header fields @@ -333,11 +333,6 @@ public class PacketImpl implements Packet return true; } - public boolean isAsyncExec() - { - return false; - } - @Override public String toString() { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java index 5ef4cfd9ea..88208501f0 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/RemotingConnectionImpl.java @@ -81,8 +81,6 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement private final Object failLock = new Object(); - private volatile boolean executing; - private final SimpleString nodeID; private String clientID; @@ -381,39 +379,8 @@ public class RemotingConnectionImpl extends AbstractRemotingConnection implement ActiveMQClientLogger.LOGGER.trace("handling packet " + packet); } - if (packet.isAsyncExec() && executor != null) - { - executing = true; - - executor.execute(new Runnable() - { - public void run() - { - try - { - doBufferReceived(packet); - } - catch (Throwable t) - { - ActiveMQClientLogger.LOGGER.errorHandlingPacket(t, packet); - } - - executing = false; - } - }); - } - else - { - //To prevent out of order execution if interleaving sync and async operations on same connection - while (executing) - { - Thread.yield(); - } - - // Pings must always be handled out of band so we can send pings back to the client quickly - // otherwise they would get in the queue with everything else which might give an intolerable delay - doBufferReceived(packet); - } + dataReceived = true; + doBufferReceived(packet); super.bufferReceived(connectionID, buffer); } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java index 41c5735207..340c73adfd 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/RollbackMessage.java @@ -69,12 +69,6 @@ public class RollbackMessage extends PacketImpl considerLastMessageAsDelivered = buffer.readBoolean(); } - @Override - public boolean isAsyncExec() - { - return true; - } - @Override public int hashCode() { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java index 1c8a276952..dc61860147 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCloseMessage.java @@ -49,10 +49,4 @@ public class SessionCloseMessage extends PacketImpl // TODO return 0; } - - @Override - public boolean isAsyncExec() - { - return true; - } } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java index c7242fb1f4..1b1e081531 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionCommitMessage.java @@ -30,9 +30,4 @@ public class SessionCommitMessage extends PacketImpl super(SESS_COMMIT); } - @Override - public boolean isAsyncExec() - { - return true; - } } diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java index 65fdf33868..14668cb1fb 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXACommitMessage.java @@ -55,12 +55,6 @@ public class SessionXACommitMessage extends PacketImpl return onePhase; } - @Override - public boolean isAsyncExec() - { - return true; - } - @Override public void encodeRest(final ActiveMQBuffer buffer) { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java index b9a531a4c7..8ff5b1531c 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXAPrepareMessage.java @@ -61,12 +61,6 @@ public class SessionXAPrepareMessage extends PacketImpl xid = XidCodecSupport.decodeXid(buffer); } - @Override - public boolean isAsyncExec() - { - return true; - } - @Override public int hashCode() { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java index 8efab01774..272386dc17 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/protocol/core/impl/wireformat/SessionXARollbackMessage.java @@ -62,12 +62,6 @@ public class SessionXARollbackMessage extends PacketImpl xid = XidCodecSupport.decodeXid(buffer); } - @Override - public boolean isAsyncExec() - { - return true; - } - @Override public int hashCode() { diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java index a73aa1bfa7..c7eafe76c1 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnection.java @@ -172,9 +172,9 @@ public class NettyConnection implements Connection listener.connectionDestroyed(getID()); } - public ActiveMQBuffer createBuffer(final int size) + public ActiveMQBuffer createTransportBuffer(final int size) { - return new ChannelBufferWrapper(channel.alloc().buffer(size)); + return new ChannelBufferWrapper(PartialPooledByteBufAllocator.INSTANCE.directBuffer(size), true); } public Object getID() @@ -199,7 +199,7 @@ public class NettyConnection implements Connection { channel.writeAndFlush(batchBuffer.byteBuf()); - batchBuffer = createBuffer(BATCHING_BUFFER_SIZE); + batchBuffer = createTransportBuffer(BATCHING_BUFFER_SIZE); } } finally diff --git a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java index 3ed9e879cf..8425edd53c 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyConnector.java @@ -47,7 +47,6 @@ import java.util.concurrent.TimeUnit; import io.netty.bootstrap.Bootstrap; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; -import io.netty.buffer.UnpooledByteBufAllocator; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelFuture; @@ -477,7 +476,7 @@ public class NettyConnector extends AbstractConnector } bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.option(ChannelOption.SO_REUSEADDR, true); - bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false)); + bootstrap.option(ChannelOption.ALLOCATOR, PartialPooledByteBufAllocator.INSTANCE); channelGroup = new DefaultChannelGroup("activemq-connector", GlobalEventExecutor.INSTANCE); final SSLContext context; diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java index a48845fa9b..22b26eeb14 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/AbstractRemotingConnection.java @@ -182,9 +182,9 @@ public abstract class AbstractRemotingConnection implements RemotingConnection closeListeners.addAll(listeners); } - public ActiveMQBuffer createBuffer(final int size) + public ActiveMQBuffer createTransportBuffer(final int size) { - return transportConnection.createBuffer(size); + return transportConnection.createTransportBuffer(size); } public Connection getTransportConnection() diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java index 186e098ddf..9eb287ef27 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/protocol/RemotingConnection.java @@ -115,11 +115,12 @@ public interface RemotingConnection extends BufferHandler /** * creates a new ActiveMQBuffer of the specified size. + * For the purpose of i/o outgoing packets * * @param size the size of buffer required * @return the buffer */ - ActiveMQBuffer createBuffer(int size); + ActiveMQBuffer createTransportBuffer(int size); /** * called when the underlying connection fails. diff --git a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java index a4896e29ce..b6060d3a12 100644 --- a/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java +++ b/activemq-core-client/src/main/java/org/apache/activemq/spi/core/remoting/Connection.java @@ -36,7 +36,7 @@ public interface Connection * @param size the size of buffer to create * @return the new buffer. */ - ActiveMQBuffer createBuffer(int size); + ActiveMQBuffer createTransportBuffer(int size); RemotingConnection getProtocolConnection(); diff --git a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java index 20762e2cc9..bf906bca73 100644 --- a/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java +++ b/activemq-protocols/activemq-openwire-protocol/src/main/java/org/apache/activemq/core/protocol/openwire/OpenWireConnection.java @@ -440,7 +440,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor } @Override - public ActiveMQBuffer createBuffer(int size) + public ActiveMQBuffer createTransportBuffer(int size) { return ActiveMQBuffers.dynamicBuffer(size); } diff --git a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java index 16cf55e6c5..9a4e7b726d 100644 --- a/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java +++ b/activemq-protocols/activemq-stomp-protocol/src/main/java/org/apache/activemq/core/protocol/stomp/StompConnection.java @@ -280,7 +280,7 @@ public final class StompConnection implements RemotingConnection } @Override - public ActiveMQBuffer createBuffer(int size) + public ActiveMQBuffer createTransportBuffer(int size) { return ActiveMQBuffers.dynamicBuffer(size); } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java index eed1ff4a35..37e2acba3e 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/invm/InVMConnection.java @@ -142,7 +142,7 @@ public class InVMConnection implements Connection } } - public ActiveMQBuffer createBuffer(final int size) + public ActiveMQBuffer createTransportBuffer(final int size) { return ActiveMQBuffers.dynamicBuffer(size); } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java index d899e8583f..339b407130 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/remoting/impl/netty/NettyServerConnection.java @@ -34,7 +34,7 @@ public class NettyServerConnection extends NettyConnection } @Override - public ActiveMQBuffer createBuffer(int size) + public ActiveMQBuffer createTransportBuffer(int size) { return new ChannelBufferWrapper(channel.alloc().directBuffer(size), true); } diff --git a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java index 784a0c063e..cfbf9956c9 100644 --- a/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java +++ b/tests/jms-tests/src/test/java/org/apache/activemq/jms/tests/message/MessageHeaderTest.java @@ -1391,7 +1391,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase } /* (non-Javadoc) - * @see org.apache.activemq.api.core.client.ClientSession#createBuffer(byte[]) + * @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(byte[]) */ public ActiveMQBuffer createBuffer(final byte[] bytes) { @@ -1400,7 +1400,7 @@ public class MessageHeaderTest extends MessageHeaderTestBase } /* (non-Javadoc) - * @see org.apache.activemq.api.core.client.ClientSession#createBuffer(int) + * @see org.apache.activemq.api.core.client.ClientSession#createTransportBuffer(int) */ public ActiveMQBuffer createBuffer(final int size) { diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java index cc9d57b70d..b0a4c14802 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/remoting/impl/netty/NettyConnectionTest.java @@ -74,7 +74,7 @@ public class NettyConnectionTest extends UnitTestCase final int size = 1234; - ActiveMQBuffer buff = conn.createBuffer(size); + ActiveMQBuffer buff = conn.createTransportBuffer(size); buff.writeByte((byte) 0x00); // Netty buffer does lazy initialization. Assert.assertEquals(size, buff.capacity()); From 60179cfc980fba4756d588c3a9d8de44be3e9960 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 11 Feb 2015 08:36:17 -0500 Subject: [PATCH 3/4] Fixing test failure This test started to fail after performance improvements from ACTIVEMQ6-78 After some investigation it turned out that this test was racing with the client crashing even before the queue was created or sending a non persistent message asynchronously while the client crashed before the server received the message. I've also decreased some of the times on pings so the test could run a bit faster --- .../clientcrash/ClientCrashTest.java | 20 ++++++++++++------- .../integration/clientcrash/CrashClient.java | 3 ++- .../integration/clientcrash/CrashClient2.java | 2 +- 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java index fbab2f42ba..25177bbcdf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/ClientCrashTest.java @@ -44,9 +44,11 @@ import org.apache.activemq.tests.util.SpawnedVMSupport; */ public class ClientCrashTest extends ClientTestBase { - static final int PING_PERIOD = 2000; + // using short values so this test can run fast + static final int PING_PERIOD = 100; - static final int CONNECTION_TTL = 6000; + // using short values so this test can run fast + static final int CONNECTION_TTL = 1000; // Constants ----------------------------------------------------- @@ -76,18 +78,22 @@ public class ClientCrashTest extends ClientTestBase { assertActiveConnections(1); - // spawn a JVM that creates a Core client, which sends a message - Process p = SpawnedVMSupport.spawnVM(CrashClient.class.getName()); - ClientSession session = sf.createSession(false, true, true); session.createQueue(ClientCrashTest.QUEUE, ClientCrashTest.QUEUE, null, false); + + // spawn a JVM that creates a Core client, which sends a message + // It has to be spawned after the queue was created. + // if the client is too fast you race the send before the queue was created, missing a message + Process p = SpawnedVMSupport.spawnVM(CrashClient.class.getName()); + ClientConsumer consumer = session.createConsumer(ClientCrashTest.QUEUE); ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE); + session.start(); // receive a message from the queue - Message messageFromClient = consumer.receive(500000); + Message messageFromClient = consumer.receive(5000); Assert.assertNotNull("no message received", messageFromClient); Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString()); @@ -155,7 +161,7 @@ public class ClientCrashTest extends ClientTestBase session.start(); // receive a message from the queue - ClientMessage messageFromClient = consumer.receive(10000); + ClientMessage messageFromClient = consumer.receive(timeout); Assert.assertNotNull("no message received", messageFromClient); Assert.assertEquals(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBodyBuffer().readString()); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java index 8a0c5d6198..406ba5624c 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient.java @@ -59,8 +59,9 @@ public class CrashClient ClientSession session = sf.createSession(false, true, true); ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE); + // it has to be durable otherwise it may race dying before the client is killed ClientMessage message = session.createMessage(ActiveMQTextMessage.TYPE, - false, + true, 0, System.currentTimeMillis(), (byte)1); diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java index 15a1c07500..775b50d882 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/clientcrash/CrashClient2.java @@ -59,7 +59,7 @@ public class CrashClient2 ClientSession session = sf.createSession(true, true, 1000000); ClientProducer producer = session.createProducer(ClientCrashTest.QUEUE2); - ClientMessage message = session.createMessage(false); + ClientMessage message = session.createMessage(true); message.getBodyBuffer().writeString(ClientCrashTest.MESSAGE_TEXT_FROM_CLIENT); producer.send(message); From bace921389150c1654f26977282df82259cc4339 Mon Sep 17 00:00:00 2001 From: Clebert Suconic Date: Wed, 11 Feb 2015 21:04:18 -0500 Subject: [PATCH 4/4] back-porting fix on exception message from hornetq code This is such a small change that qualifies as a tweak, so I'm not bothering on creating a JIRA for this --- .../java/org/apache/activemq/jms/client/ActiveMQSession.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java index 7725ba81d4..a170b9ec68 100644 --- a/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java +++ b/activemq-jms-client/src/main/java/org/apache/activemq/jms/client/ActiveMQSession.java @@ -767,7 +767,7 @@ public class ActiveMQSession implements QueueSession, TopicSession if (subscriptionName == null) { if (durability != ConsumerDurability.NON_DURABLE) - throw new RuntimeException(); + throw new RuntimeException("Subscription name cannot be null for durable topic consumer"); // Non durable sub queueName = new SimpleString(UUID.randomUUID().toString()); @@ -782,7 +782,7 @@ public class ActiveMQSession implements QueueSession, TopicSession { // Durable sub if (durability != ConsumerDurability.DURABLE) - throw new RuntimeException(); + throw new RuntimeException("Subscription name must be null for non-durable topic consumer"); if (connection.getClientID() == null) { throw new IllegalStateException("Cannot create durable subscription - client ID has not been set");