This commit is contained in:
Clebert Suconic 2018-10-31 12:46:12 -04:00
commit 3e58cf87ab
3 changed files with 81 additions and 63 deletions

View File

@ -443,64 +443,64 @@ public class AMQSession implements SessionCallback {
final AtomicInteger count,
final org.apache.activemq.artemis.api.core.Message coreMsg,
final SimpleString address) throws ResourceAllocationException {
if (!store.checkMemory(() -> {
Exception exceptionToSend = null;
try {
getCoreSession().send(coreMsg, false, dest.isTemporary());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
exceptionToSend = e;
}
connection.enableTtl();
if (count == null || count.decrementAndGet() == 0) {
if (exceptionToSend != null) {
this.connection.getContext().setDontSendReponse(false);
connection.sendException(exceptionToSend);
} else {
server.getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void done() {
if (sendProducerAck) {
try {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
connection.dispatchAsync(ack);
} catch (Exception e) {
connection.getContext().setDontSendReponse(false);
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
connection.sendException(e);
}
} else {
connection.getContext().setDontSendReponse(false);
try {
Response response = new Response();
response.setCorrelationId(messageSend.getCommandId());
connection.dispatchAsync(response);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
connection.sendException(e);
}
}
}
@Override
public void onError(int errorCode, String errorMessage) {
try {
final IOException e = new IOException(errorMessage);
ActiveMQServerLogger.LOGGER.warn(errorMessage);
connection.serviceException(e);
} catch (Exception ex) {
ActiveMQServerLogger.LOGGER.debug(ex);
}
}
});
}
}
})) {
if (!store.checkMemory(null)) {
this.connection.getContext().setDontSendReponse(false);
connection.enableTtl();
throw new ResourceAllocationException("Queue is full " + address);
}
Exception exceptionToSend = null;
try {
getCoreSession().send(coreMsg, false, dest.isTemporary());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
exceptionToSend = e;
}
connection.enableTtl();
if (count == null || count.decrementAndGet() == 0) {
if (exceptionToSend != null) {
this.connection.getContext().setDontSendReponse(false);
connection.sendException(exceptionToSend);
} else {
server.getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void done() {
if (sendProducerAck) {
try {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
connection.dispatchAsync(ack);
} catch (Exception e) {
connection.getContext().setDontSendReponse(false);
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
connection.sendException(e);
}
} else {
connection.getContext().setDontSendReponse(false);
try {
Response response = new Response();
response.setCorrelationId(messageSend.getCommandId());
connection.dispatchAsync(response);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
connection.sendException(e);
}
}
}
@Override
public void onError(int errorCode, String errorMessage) {
try {
final IOException e = new IOException(errorMessage);
ActiveMQServerLogger.LOGGER.warn(errorMessage);
connection.serviceException(e);
} catch (Exception ex) {
ActiveMQServerLogger.LOGGER.debug(ex);
}
}
});
}
}
}
private void enableAutoReadAndTtl() {

View File

@ -693,7 +693,9 @@ public class PagingStoreImpl implements PagingStore {
}
}
runWhenAvailable.run();
if (runWhenAvailable != null) {
runWhenAvailable.run();
}
return true;
}

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.tests.integration.openwire;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
@ -58,34 +59,49 @@ public class OpenWireFlowControlFailTest extends OpenWireTestBase {
textBody.append(" ");
}
ConnectionFactory factory = new ActiveMQConnectionFactory(urlString);
int numberOfMessage = 0;
try (Connection connection = factory.createConnection()) {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session.createQueue(addressInfo.getName().toString());
MessageProducer producer = session.createProducer(queue);
int numberOfMessage = 0;
boolean failed = false;
try {
for (int i = 0; i < 1000; i++) {
producer.send(session.createTextMessage(textBody.toString()));
TextMessage message = session.createTextMessage(textBody.toString());
message.setIntProperty("i", i);
producer.send(message);
numberOfMessage++;
}
} catch (Exception e) {
e.printStackTrace(System.out);
failed = true;
try {
producer.send(session.createTextMessage(textBody.toString()));
Assert.fail("Exception expected");
} catch (JMSException expected) {
expected.printStackTrace();
}
}
System.out.println("Message failed with " + numberOfMessage);
Assert.assertTrue(failed);
MessageConsumer consumer = session.createConsumer(queue);
connection.start();
}
factory = new ActiveMQConnectionFactory(urlString);
try (Connection connection2 = factory.createConnection()) {
Session session2 = connection2.createSession(false, Session.AUTO_ACKNOWLEDGE);
javax.jms.Queue queue = session2.createQueue(addressInfo.getName().toString());
MessageConsumer consumer = session2.createConsumer(queue);
connection2.start();
for (int i = 0; i < numberOfMessage; i++) {
TextMessage message = (TextMessage) consumer.receive(5000);
Assert.assertNotNull(message);
Assert.assertEquals(textBody.toString(), message.getText());
}
Assert.assertNull(consumer.receiveNoWait());
TextMessage msg = (TextMessage)consumer.receive(500);
Assert.assertNull(msg);
}
}
}