ARTEMIS-3729 Fix JMS CORE client commit after async sends
This commit is contained in:
parent
5bba1fbc30
commit
d0c550bcd7
|
@ -61,6 +61,11 @@ public interface CoreRemotingConnection extends RemotingConnection {
|
||||||
return version >= PacketImpl.ARTEMIS_2_18_0_VERSION;
|
return version >= PacketImpl.ARTEMIS_2_18_0_VERSION;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
default boolean isVersionSupportCommitV2() {
|
||||||
|
int version = getChannelVersion();
|
||||||
|
return version >= PacketImpl.ARTEMIS_2_21_0_VERSION;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sets the client protocol used on the communication. This will determine if the client has
|
* Sets the client protocol used on the communication. This will determine if the client has
|
||||||
* support for certain packet types
|
* support for certain packet types
|
||||||
|
|
|
@ -85,6 +85,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V3;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
|
||||||
|
@ -443,13 +445,17 @@ public class ActiveMQSessionContext extends SessionContext {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void simpleCommit() throws ActiveMQException {
|
public void simpleCommit() throws ActiveMQException {
|
||||||
sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
|
simpleCommit(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void simpleCommit(boolean block) throws ActiveMQException {
|
public void simpleCommit(boolean block) throws ActiveMQException {
|
||||||
if (block) {
|
if (block) {
|
||||||
sessionChannel.sendBlocking(new PacketImpl(PacketImpl.SESS_COMMIT), PacketImpl.NULL_RESPONSE);
|
if (!sessionChannel.getConnection().isVersionSupportCommitV2()) {
|
||||||
|
sessionChannel.sendBlocking(new SessionCommitMessage(), PacketImpl.NULL_RESPONSE);
|
||||||
|
} else {
|
||||||
|
sessionChannel.sendBlocking(new SessionCommitMessage_V2(), PacketImpl.NULL_RESPONSE);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
sessionChannel.sendBatched(new PacketImpl(PacketImpl.SESS_COMMIT));
|
sessionChannel.sendBatched(new PacketImpl(PacketImpl.SESS_COMMIT));
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.List;
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
@ -132,6 +133,8 @@ public final class ChannelImpl implements Channel {
|
||||||
|
|
||||||
private final List<Interceptor> interceptors;
|
private final List<Interceptor> interceptors;
|
||||||
|
|
||||||
|
private final AtomicLong blockingCorrelationID = new AtomicLong(-1);
|
||||||
|
|
||||||
public ChannelImpl(final CoreRemotingConnection connection,
|
public ChannelImpl(final CoreRemotingConnection connection,
|
||||||
final long id,
|
final long id,
|
||||||
final int confWindowSize,
|
final int confWindowSize,
|
||||||
|
@ -481,6 +484,8 @@ public final class ChannelImpl implements Channel {
|
||||||
synchronized (sendBlockingLock) {
|
synchronized (sendBlockingLock) {
|
||||||
packet.setChannelID(id);
|
packet.setChannelID(id);
|
||||||
|
|
||||||
|
packet.setCorrelationID(blockingCorrelationID.decrementAndGet());
|
||||||
|
|
||||||
final ActiveMQBuffer buffer = packet.encode(connection);
|
final ActiveMQBuffer buffer = packet.encode(connection);
|
||||||
|
|
||||||
lock.lock();
|
lock.lock();
|
||||||
|
@ -508,7 +513,7 @@ public final class ChannelImpl implements Channel {
|
||||||
|
|
||||||
long start = System.currentTimeMillis();
|
long start = System.currentTimeMillis();
|
||||||
|
|
||||||
while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION && response.getType() != expectedPacket)) && toWait > 0) {
|
while (!closed && (response == null || (response.getType() != PacketImpl.EXCEPTION && (response.getType() != expectedPacket || response.getCorrelationID() != packet.getCorrelationID()))) && toWait > 0) {
|
||||||
try {
|
try {
|
||||||
sendCondition.await(toWait, TimeUnit.MILLISECONDS);
|
sendCondition.await(toWait, TimeUnit.MILLISECONDS);
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBin
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionBindingQueryResponseMessage_V4;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCloseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCommitMessage_V2;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerCloseMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionConsumerFlowCreditMessage;
|
||||||
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
|
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionCreateConsumerMessage;
|
||||||
|
@ -252,7 +253,11 @@ public abstract class PacketDecoder implements Serializable {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case SESS_COMMIT: {
|
case SESS_COMMIT: {
|
||||||
|
if (!connection.isVersionSupportCommitV2()) {
|
||||||
packet = new SessionCommitMessage();
|
packet = new SessionCommitMessage();
|
||||||
|
} else {
|
||||||
|
packet = new SessionCommitMessage_V2();
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case SESS_ROLLBACK: {
|
case SESS_ROLLBACK: {
|
||||||
|
|
|
@ -41,6 +41,9 @@ public class PacketImpl implements Packet {
|
||||||
// 2.18.0
|
// 2.18.0
|
||||||
public static final int ARTEMIS_2_18_0_VERSION = 131;
|
public static final int ARTEMIS_2_18_0_VERSION = 131;
|
||||||
|
|
||||||
|
// 2.21.0
|
||||||
|
public static final int ARTEMIS_2_21_0_VERSION = 132;
|
||||||
|
|
||||||
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
|
public static final SimpleString OLD_QUEUE_PREFIX = new SimpleString("jms.queue.");
|
||||||
public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue.");
|
public static final SimpleString OLD_TEMP_QUEUE_PREFIX = new SimpleString("jms.tempqueue.");
|
||||||
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
|
public static final SimpleString OLD_TOPIC_PREFIX = new SimpleString("jms.topic.");
|
||||||
|
|
|
@ -53,6 +53,8 @@ public class NullResponseMessage_V2 extends NullResponseMessage {
|
||||||
super.decodeRest(buffer);
|
super.decodeRest(buffer);
|
||||||
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
|
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
|
||||||
correlationID = buffer.readLong();
|
correlationID = buffer.readLong();
|
||||||
|
} else {
|
||||||
|
correlationID = -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -71,11 +73,6 @@ public class NullResponseMessage_V2 extends NullResponseMessage {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected String getPacketString() {
|
|
||||||
return super.getPacketString() + ", correlationID=" + correlationID;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
final int prime = 31;
|
final int prime = 31;
|
||||||
|
|
|
@ -0,0 +1,87 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.artemis.core.protocol.core.impl.wireformat;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||||
|
import org.apache.activemq.artemis.utils.DataConstants;
|
||||||
|
|
||||||
|
public class SessionCommitMessage_V2 extends SessionCommitMessage {
|
||||||
|
|
||||||
|
private long correlationID;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getCorrelationID() {
|
||||||
|
return correlationID;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setCorrelationID(long correlationID) {
|
||||||
|
this.correlationID = correlationID;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isResponseAsync() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void encodeRest(final ActiveMQBuffer buffer) {
|
||||||
|
super.encodeRest(buffer);
|
||||||
|
buffer.writeLong(correlationID);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void decodeRest(final ActiveMQBuffer buffer) {
|
||||||
|
super.decodeRest(buffer);
|
||||||
|
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
|
||||||
|
correlationID = buffer.readLong();
|
||||||
|
} else {
|
||||||
|
correlationID = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int expectedEncodeSize() {
|
||||||
|
return super.expectedEncodeSize() + DataConstants.SIZE_LONG;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
final int prime = 31;
|
||||||
|
int result = super.hashCode();
|
||||||
|
result = prime * result + (int) (correlationID ^ (correlationID >>> 32));
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object obj) {
|
||||||
|
if (this == obj) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
if (!super.equals(obj)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (!(obj instanceof SessionCommitMessage_V2)) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
SessionCommitMessage_V2 other = (SessionCommitMessage_V2) obj;
|
||||||
|
if (correlationID != other.correlationID) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
|
@ -64,6 +64,8 @@ public class SessionSendContinuationMessage_V2 extends SessionSendContinuationMe
|
||||||
super.decodeRest(buffer);
|
super.decodeRest(buffer);
|
||||||
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
|
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
|
||||||
correlationID = buffer.readLong();
|
correlationID = buffer.readLong();
|
||||||
|
} else {
|
||||||
|
correlationID = -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,8 @@ public class SessionXAResponseMessage_V2 extends SessionXAResponseMessage {
|
||||||
super.decodeRest(buffer);
|
super.decodeRest(buffer);
|
||||||
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
|
if (buffer.readableBytes() >= DataConstants.SIZE_LONG) {
|
||||||
correlationID = buffer.readLong();
|
correlationID = buffer.readLong();
|
||||||
|
} else {
|
||||||
|
correlationID = -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,4 +20,4 @@ activemq.version.minorVersion=${activemq.version.minorVersion}
|
||||||
activemq.version.microVersion=${activemq.version.microVersion}
|
activemq.version.microVersion=${activemq.version.microVersion}
|
||||||
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
|
activemq.version.incrementingVersion=${activemq.version.incrementingVersion}
|
||||||
activemq.version.versionTag=${activemq.version.versionTag}
|
activemq.version.versionTag=${activemq.version.versionTag}
|
||||||
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131
|
activemq.version.compatibleVersionList=121,122,123,124,125,126,127,128,129,130,131,132
|
||||||
|
|
|
@ -660,7 +660,7 @@ public class ServerSessionPacketHandler implements ChannelHandler {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean requireNullResponseMessage_V1(Packet packet) {
|
private boolean requireNullResponseMessage_V1(Packet packet) {
|
||||||
return !packet.isResponseAsync() || channel.getConnection().isVersionBeforeAsyncResponseChange();
|
return channel.getConnection().isVersionBeforeAsyncResponseChange();
|
||||||
}
|
}
|
||||||
|
|
||||||
private NullResponseMessage createNullResponseMessage_V1(Packet packet) {
|
private NullResponseMessage createNullResponseMessage_V1(Packet packet) {
|
||||||
|
|
2
pom.xml
2
pom.xml
|
@ -177,7 +177,7 @@
|
||||||
<activemq.version.majorVersion>1</activemq.version.majorVersion>
|
<activemq.version.majorVersion>1</activemq.version.majorVersion>
|
||||||
<activemq.version.minorVersion>0</activemq.version.minorVersion>
|
<activemq.version.minorVersion>0</activemq.version.minorVersion>
|
||||||
<activemq.version.microVersion>0</activemq.version.microVersion>
|
<activemq.version.microVersion>0</activemq.version.microVersion>
|
||||||
<activemq.version.incrementingVersion>131,130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
|
<activemq.version.incrementingVersion>132,131,130,129,128,127,126,125,124,123,122</activemq.version.incrementingVersion>
|
||||||
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
|
<activemq.version.versionTag>${project.version}</activemq.version.versionTag>
|
||||||
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
|
<ActiveMQ-Version>${project.version}(${activemq.version.incrementingVersion})</ActiveMQ-Version>
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,94 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.artemis.tests.integration.client;
|
||||||
|
|
||||||
|
import org.apache.activemq.artemis.api.core.Interceptor;
|
||||||
|
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||||
|
import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
|
||||||
|
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.artemis.tests.util.JMSTestBase;
|
||||||
|
import org.apache.activemq.artemis.tests.util.Wait;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import javax.jms.CompletionListener;
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.Message;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.TextMessage;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
public class JMSTransactionTest extends JMSTestBase {
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testAsyncProduceMessageAndCommit() throws Throwable {
|
||||||
|
final String queueName = "TEST";
|
||||||
|
final int messages = 10;
|
||||||
|
|
||||||
|
ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory();
|
||||||
|
cf.setConfirmationWindowSize(1000000);
|
||||||
|
cf.setBlockOnDurableSend(true);
|
||||||
|
cf.setBlockOnNonDurableSend(true);
|
||||||
|
|
||||||
|
CountDownLatch commitLatch = new CountDownLatch(1);
|
||||||
|
AtomicInteger sentMessages = new AtomicInteger(0);
|
||||||
|
|
||||||
|
server.getRemotingService().addIncomingInterceptor((Interceptor) (packet, connection1) -> {
|
||||||
|
if (packet.getType() == PacketImpl.SESS_COMMIT) {
|
||||||
|
commitLatch.countDown();
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
});
|
||||||
|
|
||||||
|
try (Connection connection = cf.createConnection();
|
||||||
|
Session session = connection.createSession(true, Session.SESSION_TRANSACTED)) {
|
||||||
|
|
||||||
|
javax.jms.Queue queue = session.createQueue(queueName);
|
||||||
|
MessageProducer p = session.createProducer(queue);
|
||||||
|
|
||||||
|
for (int i = 0; i < messages; i++) {
|
||||||
|
TextMessage message = session.createTextMessage();
|
||||||
|
message.setText("Message:" + i);
|
||||||
|
p.send(message, new CompletionListener() {
|
||||||
|
@Override
|
||||||
|
public void onCompletion(Message message) {
|
||||||
|
try {
|
||||||
|
commitLatch.await();
|
||||||
|
sentMessages.incrementAndGet();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onException(Message message, Exception exception) {
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
session.commit();
|
||||||
|
Assert.assertEquals(messages, sentMessages.get());
|
||||||
|
|
||||||
|
org.apache.activemq.artemis.core.server.Queue queueView = server.locateQueue(SimpleString.toSimpleString(queueName));
|
||||||
|
Wait.assertEquals(messages, queueView::getMessageCount);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue