diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java index aa855909b4..88ba083a9e 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/CoreRemotingConnection.java @@ -61,6 +61,11 @@ public interface CoreRemotingConnection extends RemotingConnection { return version >= PacketImpl.ARTEMIS_2_18_0_VERSION; } + default boolean isVersionSupportCommitV2() { + int version = getChannelVersion(); + return version >= PacketImpl.ARTEMIS_2_21_0_VERSION; + } + /** * Sets the client protocol used on the communication. This will determine if the client has * support for certain packet types diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java index 811ebef669..6013266ae2 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ActiveMQSessionContext.java @@ -85,6 +85,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; @@ -443,13 +445,17 @@ public class ActiveMQSessionContext extends SessionContext { @Override public void simpleCommit() throws ActiveMQException { - sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE); + simpleCommit(true); } @Override public void simpleCommit(boolean block) throws ActiveMQException { if (block) { - sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE); + if (!sessionChannel.getConnection().isVersionSupportCommitV2()) { + sessionChannel.sendBlocking(new SessionCommitMessage(), PacketImpl.NULL_RESPONSE); + } else { + sessionChannel.sendBlocking(new SessionCommitMessage_V2(), PacketImpl.NULL_RESPONSE); + } } else { sessionChannel.sendBatched(new PacketImpl(PacketImpl.SESS_COMMIT)); } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java index d133826d04..e0bb33b205 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/ChannelImpl.java @@ -21,6 +21,7 @@ import java.util.List; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -132,6 +133,8 @@ public final class ChannelImpl implements Channel { private final List interceptors; + private final AtomicLong blockingCorrelationID = new AtomicLong(-1); + public ChannelImpl(final CoreRemotingConnection connection, final long id, final int confWindowSize, @@ -481,6 +484,8 @@ public final class ChannelImpl implements Channel { synchronized (sendBlockingLock) { packet.setChannelID(id); + packet.setCorrelationID(blockingCorrelationID.decrementAndGet()); + final ActiveMQBuffer buffer = packet.encode(connection); lock.lock(); @@ -508,7 +513,7 @@ public final class ChannelImpl implements Channel { long start = System.currentTimeMillis(); - while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket)) && toWait > 0) { + while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION && (response.getType() != expectedPacket || response.getCorrelationID() != packet.getCorrelationID()))) && toWait > 0) { try { sendCondition.await(toWait, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java index 640e079cd1..2967c970e9 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketDecoder.java @@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage; +import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage; import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage; @@ -252,7 +253,11 @@ public abstract class PacketDecoder implements Serializable { break; } case SESS_COMMIT: { - packet = new SessionCommitMessage(); + if (!connection.isVersionSupportCommitV2()) { + packet = new SessionCommitMessage(); + } else { + packet = new SessionCommitMessage_V2(); + } break; } case SESS_ROLLBACK: { diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java index 23e20fe6cc..1d1471ca9b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/PacketImpl.java @@ -41,6 +41,9 @@ public class PacketImpl implements Packet { // 2.18.0 public static final int ARTEMIS_2_18_0_VERSION = 131; + // 2.21.0 + public static final int ARTEMIS_2_21_0_VERSION = 132; + public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue."); public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue."); public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic."); diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java index b5d1132c26..7e47e0ef5b 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/NullResponseMessage_V2.java @@ -53,6 +53,8 @@ public class NullResponseMessage_V2 extends NullResponseMessage { super.decodeRest(buffer); if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { correlationID = buffer.readLong(); + } else { + correlationID = -1; } } @@ -71,11 +73,6 @@ public class NullResponseMessage_V2 extends NullResponseMessage { return true; } - @Override - protected String getPacketString() { - return super.getPacketString() + ", correlationID=" + correlationID; - } - @Override public int hashCode() { final int prime = 31; diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCommitMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCommitMessage_V2.java new file mode 100644 index 0000000000..8bbe619904 --- /dev/null +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionCommitMessage_V2.java @@ -0,0 +1,87 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.core.impl.wireformat; + +import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.artemis.utils.DataConstants; + +public class SessionCommitMessage_V2 extends SessionCommitMessage { + + private long correlationID; + + @Override + public long getCorrelationID() { + return correlationID; + } + + @Override + public void setCorrelationID(long correlationID) { + this.correlationID = correlationID; + } + + @Override + public boolean isResponseAsync() { + return true; + } + + @Override + public void encodeRest(final ActiveMQBuffer buffer) { + super.encodeRest(buffer); + buffer.writeLong(correlationID); + } + + @Override + public void decodeRest(final ActiveMQBuffer buffer) { + super.decodeRest(buffer); + if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { + correlationID = buffer.readLong(); + } else { + correlationID = -1; + } + } + + @Override + public int expectedEncodeSize() { + return super.expectedEncodeSize() + DataConstants.SIZE_LONG; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = super.hashCode(); + result = prime * result + (int) (correlationID ^ (correlationID >>> 32)); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + if (!super.equals(obj)) { + return false; + } + if (!(obj instanceof SessionCommitMessage_V2)) { + return false; + } + SessionCommitMessage_V2 other = (SessionCommitMessage_V2) obj; + if (correlationID != other.correlationID) { + return false; + } + return true; + } +} diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java index 9fd09684cf..1a6eb1d459 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionSendContinuationMessage_V2.java @@ -64,6 +64,8 @@ public class SessionSendContinuationMessage_V2 extends SessionSendContinuationMe super.decodeRest(buffer); if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { correlationID = buffer.readLong(); + } else { + correlationID = -1; } } diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java index c9bf965835..9f466eb8ce 100644 --- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java +++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/core/protocol/core/impl/wireformat/SessionXAResponseMessage_V2.java @@ -48,6 +48,8 @@ public class SessionXAResponseMessage_V2 extends SessionXAResponseMessage { super.decodeRest(buffer); if (buffer.readableBytes() >= DataConstants.SIZE_LONG) { correlationID = buffer.readLong(); + } else { + correlationID = -1; } } diff --git a/artemis-core-client/src/main/resources/activemq-version.properties b/artemis-core-client/src/main/resources/activemq-version.properties index e8ece7632d..92a1e34f1a 100644 --- a/artemis-core-client/src/main/resources/activemq-version.properties +++ b/artemis-core-client/src/main/resources/activemq-version.properties @@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion} activemq.version.microVersion=${activemq.version.microVersion} activemq.version.incrementingVersion=${activemq.version.incrementingVersion} activemq.version.versionTag=${activemq.version.versionTag} -activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131 +activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132 diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java index 5993c7a666..cdd0812b75 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/protocol/core/ServerSessionPacketHandler.java @@ -660,7 +660,7 @@ public class ServerSessionPacketHandler implements ChannelHandler { } private boolean requireNullResponseMessage_V1(Packet packet) { - return !packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange(); + return channel.getConnection().isVersionBeforeAsyncResponseChange(); } private NullResponseMessage createNullResponseMessage_V1(Packet packet) { diff --git a/pom.xml b/pom.xml index 0b3de59a68..4ca5ea6317 100644 --- a/pom.xml +++ b/pom.xml @@ -177,7 +177,7 @@ 1 0 0 - 131,130,129,128,127,126,125,124,123,122 + 132,131,130,129,128,127,126,125,124,123,122 ${project.version} ${project.version}(${activemq.version.incrementingVersion}) diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java new file mode 100644 index 0000000000..d79ea775c3 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/JMSTransactionTest.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.artemis.tests.integration.client; + +import org.apache.activemq.artemis.api.core.Interceptor; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl; +import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; +import org.apache.activemq.artemis.tests.util.JMSTestBase; +import org.apache.activemq.artemis.tests.util.Wait; +import org.junit.Assert; +import org.junit.Test; + +import javax.jms.CompletionListener; +import javax.jms.Connection; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + +public class JMSTransactionTest extends JMSTestBase { + + @Test(timeout = 60000) + public void testAsyncProduceMessageAndCommit() throws Throwable { + final String queueName = "TEST"; + final int messages = 10; + + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(); + cf.setConfirmationWindowSize(1000000); + cf.setBlockOnDurableSend(true); + cf.setBlockOnNonDurableSend(true); + + CountDownLatch commitLatch = new CountDownLatch(1); + AtomicInteger sentMessages = new AtomicInteger(0); + + server.getRemotingService().addIncomingInterceptor((Interceptor) (packet, connection1) -> { + if (packet.getType() == PacketImpl.SESS_COMMIT) { + commitLatch.countDown(); + } + return true; + }); + + try (Connection connection = cf.createConnection(); + Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) { + + javax.jms.Queue queue = session.createQueue(queueName); + MessageProducer p = session.createProducer(queue); + + for (int i = 0; i < messages; i++) { + TextMessage message = session.createTextMessage(); + message.setText("Message:" + i); + p.send(message, new CompletionListener() { + @Override + public void onCompletion(Message message) { + try { + commitLatch.await(); + sentMessages.incrementAndGet(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @Override + public void onException(Message message, Exception exception) { + + } + }); + } + + session.commit(); + Assert.assertEquals(messages, sentMessages.get()); + + org.apache.activemq.artemis.core.server.Queue queueView = server.locateQueue(SimpleString.toSimpleString(queueName)); + Wait.assertEquals(messages, queueView::getMessageCount); + } + } +}