This commit is contained in:
Clebert Suconic 2020-03-11 11:14:30 -04:00
commit 316d703f74
3 changed files with 75 additions and 4 deletions

View File

@ -632,9 +632,14 @@ public final class OpenWireMessageConverter {
ByteSequence midSeq = new ByteSequence(midBytes);
mid = (MessageId) marshaller.unmarshal(midSeq);
} else {
//JMSMessageID should be started with "ID:"
String midd = "ID:" + UUIDGenerator.getInstance().generateStringUUID() + ":-1:-1:-1:-1";
mid = new MessageId(midd);
final SimpleString connectionId = (SimpleString) coreMessage.getObjectProperty(MessageUtil.CONNECTION_ID_PROPERTY_NAME);
if (connectionId != null) {
mid = new MessageId("ID:" + connectionId.toString() + ":-1:-1:-1", coreMessage.getMessageID());
} else {
//JMSMessageID should be started with "ID:"
String midd = "ID:" + UUIDGenerator.getInstance().generateStringUUID() + ":-1:-1:-1:-1";
mid = new MessageId(midd);
}
}
amqMsg.setMessageId(mid);

View File

@ -304,7 +304,11 @@ public class AMQConsumer {
List<MessageReference> ackList = serverConsumer.getDeliveringReferencesBasedOnProtocol(removeReferences, first, last);
acquireCredit(ack.getMessageCount());
if (ack.isIndividualAck() || ack.isStandardAck() || ack.isPoisonAck()) {
acquireCredit(ackList.size());
} else {
acquireCredit(ack.getMessageCount());
}
if (removeReferences) {

View File

@ -19,6 +19,7 @@ package org.apache.activemq.artemis.tests.integration.openwire.interop;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.MapMessage;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.ObjectMessage;
@ -29,10 +30,16 @@ import javax.jms.TextMessage;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageConsumer;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.artemis.api.core.ActiveMQDisconnectedException;
import org.apache.activemq.artemis.api.core.client.ServerLocator;
import org.apache.activemq.artemis.api.core.management.QueueControl;
import org.apache.activemq.artemis.api.core.management.ResourceNames;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.tests.integration.openwire.BasicOpenWireTest;
import org.apache.activemq.artemis.utils.Wait;
import org.apache.activemq.command.ActiveMQDestination;
import org.junit.Before;
import org.junit.Test;
@ -186,6 +193,61 @@ public class GeneralInteropTest extends BasicOpenWireTest {
}
}
@Test
public void testFailoverReceivingFromCore() throws Exception {
final int prefetchSize = 10;
final String text = "HelloWorld";
sendMultipleTextMessagesUsingCoreJms(queueName, text, 100);
//Initialize a failover connectionFactory.
String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.MaxInactivityDuration=5000)";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(urlString);
connectionFactory.setSendAcksAsync(false);
connectionFactory.setOptimizeAcknowledge(false);
connectionFactory.getPrefetchPolicy().setAll(prefetchSize);
Connection connection = connectionFactory.createConnection();
try {
connection.setClientID("test.consumer.queue." + queueName);
connection.start();
Message message = null;
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
Queue queue = session.createQueue(queueName);
QueueControl queueControl = (QueueControl)server.getManagementService().
getResource(ResourceNames.QUEUE + queueName);
MessageConsumer consumer = session.createConsumer(queue);
message = consumer.receive(5000);
assertNotNull(message);
assertTrue(message instanceof TextMessage);
assertEquals(text + 0, ((TextMessage)message).getText());
message.acknowledge();
Wait.assertEquals(1L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 3000, 100);
//Force a disconnection.
for (ServerSession serverSession : server.getSessions()) {
if (session.toString().contains(serverSession.getName())) {
serverSession.getRemotingConnection().fail(new ActiveMQDisconnectedException());
}
}
message = consumer.receive(5000);
assertNotNull(message);
assertTrue(message instanceof TextMessage);
assertEquals(text + 1, ((TextMessage)message).getText());
message.acknowledge();
Wait.assertEquals(2L, () -> queueControl.getMessagesAcknowledged(), 3000, 100);
Wait.assertEquals(prefetchSize, () -> queueControl.getDeliveringCount(), 30000, 100);
} finally {
connection.close();
}
}
private void sendMultipleTextMessagesUsingCoreJms(String queueName, String text, int num) throws Exception {
Connection jmsConn = null;
try {