ARTEMIS-2159 Fixing OpenWire Blocker Producer

Previous change on Flow control in OpenWire broke Blocked cases
This is a better fix.
This commit is contained in:
Clebert Suconic 2018-11-01 15:33:03 -04:00
parent 2f52e3ce2a
commit c62146802e
4 changed files with 67 additions and 56 deletions

View File

@ -443,64 +443,64 @@ public class AMQSession implements SessionCallback {
final AtomicInteger count,
final org.apache.activemq.artemis.api.core.Message coreMsg,
final SimpleString address) throws ResourceAllocationException {
if (!store.checkMemory(null)) {
if (!store.checkMemory(false, () -> {
Exception exceptionToSend = null;
try {
getCoreSession().send(coreMsg, false, dest.isTemporary());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
exceptionToSend = e;
}
connection.enableTtl();
if (count == null || count.decrementAndGet() == 0) {
if (exceptionToSend != null) {
this.connection.getContext().setDontSendReponse(false);
connection.sendException(exceptionToSend);
} else {
server.getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void done() {
if (sendProducerAck) {
try {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
connection.dispatchAsync(ack);
} catch (Exception e) {
connection.getContext().setDontSendReponse(false);
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
connection.sendException(e);
}
} else {
connection.getContext().setDontSendReponse(false);
try {
Response response = new Response();
response.setCorrelationId(messageSend.getCommandId());
connection.dispatchAsync(response);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
connection.sendException(e);
}
}
}
@Override
public void onError(int errorCode, String errorMessage) {
try {
final IOException e = new IOException(errorMessage);
ActiveMQServerLogger.LOGGER.warn(errorMessage);
connection.serviceException(e);
} catch (Exception ex) {
ActiveMQServerLogger.LOGGER.debug(ex);
}
}
});
}
}
})) {
this.connection.getContext().setDontSendReponse(false);
connection.enableTtl();
throw new ResourceAllocationException("Queue is full " + address);
}
Exception exceptionToSend = null;
try {
getCoreSession().send(coreMsg, false, dest.isTemporary());
} catch (Exception e) {
logger.warn(e.getMessage(), e);
exceptionToSend = e;
}
connection.enableTtl();
if (count == null || count.decrementAndGet() == 0) {
if (exceptionToSend != null) {
this.connection.getContext().setDontSendReponse(false);
connection.sendException(exceptionToSend);
} else {
server.getStorageManager().afterCompleteOperations(new IOCallback() {
@Override
public void done() {
if (sendProducerAck) {
try {
ProducerAck ack = new ProducerAck(producerInfo.getProducerId(), messageSend.getSize());
connection.dispatchAsync(ack);
} catch (Exception e) {
connection.getContext().setDontSendReponse(false);
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
connection.sendException(e);
}
} else {
connection.getContext().setDontSendReponse(false);
try {
Response response = new Response();
response.setCorrelationId(messageSend.getCommandId());
connection.dispatchAsync(response);
} catch (Exception e) {
ActiveMQServerLogger.LOGGER.warn(e.getMessage(), e);
connection.sendException(e);
}
}
}
@Override
public void onError(int errorCode, String errorMessage) {
try {
final IOException e = new IOException(errorMessage);
ActiveMQServerLogger.LOGGER.warn(errorMessage);
connection.serviceException(e);
} catch (Exception ex) {
ActiveMQServerLogger.LOGGER.debug(ex);
}
}
});
}
}
}
private void enableAutoReadAndTtl() {

View File

@ -127,6 +127,8 @@ public interface PagingStore extends ActiveMQComponent, RefCountMessageListener
boolean checkMemory(Runnable runnable);
boolean checkMemory(boolean runOnFailure, Runnable runnable);
boolean isFull();
boolean isRejectingMessages();

View File

@ -652,13 +652,17 @@ public class PagingStoreImpl implements PagingStore {
}
}
@Override
public boolean checkMemory(final Runnable runWhenAvailable) {
return checkMemory(true, runWhenAvailable);
}
@Override
public boolean checkMemory(boolean runOnFailure, final Runnable runWhenAvailable) {
if (addressFullMessagePolicy == AddressFullMessagePolicy.FAIL && (maxSize != -1 || usingGlobalMaxSize || pagingManager.isDiskFull())) {
if (isFull()) {
if (runWhenAvailable != null) {
if (runOnFailure && runWhenAvailable != null) {
onMemoryFreedRunnables.add(AtomicRunnable.checkAtomic(runWhenAvailable));
}
return false;

View File

@ -410,6 +410,11 @@ public class PersistMultiThreadTest extends ActiveMQTestBase {
}
@Override
public boolean checkMemory(boolean runOnFailure, Runnable runnable) {
return false;
}
@Override
public boolean checkMemory(Runnable runnable) {
return false;