ARTEMIS-2159 OpenWire would allow one extra send
Thanks to Otavio Piske collaborating a test change here.
This commit is contained in:
parent
46588c8bd4
commit
02a6d5bb49
|
@ -443,64 +443,64 @@ public class AMQSession implements SessionCallback {
|
|||
final AtomicInteger count,
|
||||
final org.apache.activemq.artemis.api.core.Message coreMsg,
|
||||
final SimpleString address) throws ResourceAllocationException {
|
||||
if (!store.checkMemory(() -> {
|
||||
Exception exceptionToSend = null;
|
||||
|
||||
try {
|
||||
getCoreSession().send(coreMsg, false, dest.isTemporary());
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
exceptionToSend = e;
|
||||
}
|
||||
connection.enableTtl();
|
||||
if (count == null || count.decrementAndGet() == 0) {
|
||||
if (exceptionToSend != null) {
|
||||
this.connection.getContext().setDontSendReponse(false);
|
||||
connection.sendException(exceptionToSend);
|
||||
} else {
|
||||
server.getStorageManager().afterCompleteOperations(new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
if (sendProducerAck) {
|
||||
try {
|
||||
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
|
||||
connection.dispatchAsync(ack);
|
||||
} catch (Exception e) {
|
||||
connection.getContext().setDontSendReponse(false);
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
connection.sendException(e);
|
||||
}
|
||||
} else {
|
||||
connection.getContext().setDontSendReponse(false);
|
||||
try {
|
||||
Response response = new Response();
|
||||
response.setCorrelationId(messageSend.getCommandId());
|
||||
connection.dispatchAsync(response);
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
connection.sendException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
try {
|
||||
final IOException e = new IOException(errorMessage);
|
||||
ActiveMQServerLogger.LOGGER.warn(errorMessage);
|
||||
connection.serviceException(e);
|
||||
} catch (Exception ex) {
|
||||
ActiveMQServerLogger.LOGGER.debug(ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
})) {
|
||||
if (!store.checkMemory(null)) {
|
||||
this.connection.getContext().setDontSendReponse(false);
|
||||
connection.enableTtl();
|
||||
throw new ResourceAllocationException("Queue is full " + address);
|
||||
}
|
||||
|
||||
Exception exceptionToSend = null;
|
||||
|
||||
try {
|
||||
getCoreSession().send(coreMsg, false, dest.isTemporary());
|
||||
} catch (Exception e) {
|
||||
logger.warn(e.getMessage(), e);
|
||||
exceptionToSend = e;
|
||||
}
|
||||
connection.enableTtl();
|
||||
if (count == null || count.decrementAndGet() == 0) {
|
||||
if (exceptionToSend != null) {
|
||||
this.connection.getContext().setDontSendReponse(false);
|
||||
connection.sendException(exceptionToSend);
|
||||
} else {
|
||||
server.getStorageManager().afterCompleteOperations(new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
if (sendProducerAck) {
|
||||
try {
|
||||
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
|
||||
connection.dispatchAsync(ack);
|
||||
} catch (Exception e) {
|
||||
connection.getContext().setDontSendReponse(false);
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
connection.sendException(e);
|
||||
}
|
||||
} else {
|
||||
connection.getContext().setDontSendReponse(false);
|
||||
try {
|
||||
Response response = new Response();
|
||||
response.setCorrelationId(messageSend.getCommandId());
|
||||
connection.dispatchAsync(response);
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
|
||||
connection.sendException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
try {
|
||||
final IOException e = new IOException(errorMessage);
|
||||
ActiveMQServerLogger.LOGGER.warn(errorMessage);
|
||||
connection.serviceException(e);
|
||||
} catch (Exception ex) {
|
||||
ActiveMQServerLogger.LOGGER.debug(ex);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void enableAutoReadAndTtl() {
|
||||
|
|
|
@ -693,7 +693,9 @@ public class PagingStoreImpl implements PagingStore {
|
|||
}
|
||||
}
|
||||
|
||||
runWhenAvailable.run();
|
||||
if (runWhenAvailable != null) {
|
||||
runWhenAvailable.run();
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
|
|||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.JMSException;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
|
@ -58,34 +59,49 @@ public class OpenWireFlowControlFailTest extends OpenWireTestBase {
|
|||
textBody.append(" ");
|
||||
}
|
||||
ConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
|
||||
int numberOfMessage = 0;
|
||||
try (Connection connection = factory.createConnection()) {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Queue queue = session.createQueue(addressInfo.getName().toString());
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
int numberOfMessage = 0;
|
||||
boolean failed = false;
|
||||
try {
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
producer.send(session.createTextMessage(textBody.toString()));
|
||||
TextMessage message = session.createTextMessage(textBody.toString());
|
||||
message.setIntProperty("i", i);
|
||||
|
||||
producer.send(message);
|
||||
numberOfMessage++;
|
||||
}
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace(System.out);
|
||||
failed = true;
|
||||
try {
|
||||
producer.send(session.createTextMessage(textBody.toString()));
|
||||
Assert.fail("Exception expected");
|
||||
} catch (JMSException expected) {
|
||||
expected.printStackTrace();
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
System.out.println("Message failed with " + numberOfMessage);
|
||||
|
||||
Assert.assertTrue(failed);
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
connection.start();
|
||||
}
|
||||
|
||||
factory = new ActiveMQConnectionFactory(urlString);
|
||||
try (Connection connection2 = factory.createConnection()) {
|
||||
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
javax.jms.Queue queue = session2.createQueue(addressInfo.getName().toString());
|
||||
|
||||
MessageConsumer consumer = session2.createConsumer(queue);
|
||||
connection2.start();
|
||||
for (int i = 0; i < numberOfMessage; i++) {
|
||||
TextMessage message = (TextMessage) consumer.receive(5000);
|
||||
Assert.assertNotNull(message);
|
||||
Assert.assertEquals(textBody.toString(), message.getText());
|
||||
}
|
||||
|
||||
Assert.assertNull(consumer.receiveNoWait());
|
||||
TextMessage msg = (TextMessage)consumer.receive(500);
|
||||
Assert.assertNull(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue