mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-5513 - interaction with - https://issues.apache.org/jira/browse/AMQ-5068 - need to ensure broker cached messages state reflects delivery attempt - RedeliveryRestartWithExceptionTest regression
This commit is contained in:
parent
d5470254af
commit
6cebd2c79e
|
@ -66,6 +66,7 @@ import org.apache.activemq.state.ConnectionState;
|
|||
import org.apache.activemq.store.PListStore;
|
||||
import org.apache.activemq.thread.Scheduler;
|
||||
import org.apache.activemq.thread.TaskRunnerFactory;
|
||||
import org.apache.activemq.transport.TransmitCallback;
|
||||
import org.apache.activemq.usage.SystemUsage;
|
||||
import org.apache.activemq.util.BrokerSupport;
|
||||
import org.apache.activemq.util.IdGenerator;
|
||||
|
@ -613,8 +614,8 @@ public class RegionBroker extends EmptyBroker {
|
|||
}
|
||||
|
||||
@Override
|
||||
public void preProcessDispatch(MessageDispatch messageDispatch) {
|
||||
Message message = messageDispatch.getMessage();
|
||||
public void preProcessDispatch(final MessageDispatch messageDispatch) {
|
||||
final Message message = messageDispatch.getMessage();
|
||||
if (message != null) {
|
||||
long endTime = System.currentTimeMillis();
|
||||
message.setBrokerOutTime(endTime);
|
||||
|
@ -627,6 +628,25 @@ public class RegionBroker extends EmptyBroker {
|
|||
message.incrementRedeliveryCounter();
|
||||
try {
|
||||
((BaseDestination) message.getRegionDestination()).getMessageStore().updateMessage(message);
|
||||
messageDispatch.setTransmitCallback(new TransmitCallback() {
|
||||
// dispatch is considered a delivery, so update sub state post dispatch otherwise
|
||||
// on a disconnect/reconnect cached messages will not reflect initial delivery attempt
|
||||
final TransmitCallback delegate = messageDispatch.getTransmitCallback();
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
message.incrementRedeliveryCounter();
|
||||
if (delegate != null) {
|
||||
delegate.onSuccess();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure() {
|
||||
if (delegate != null) {
|
||||
delegate.onFailure();
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (IOException error) {
|
||||
RuntimeException runtimeException = new RuntimeException("Failed to persist JMSRedeliveryFlag on " + message.getMessageId() + " in " + message.getDestination(), error);
|
||||
LOG.warn(runtimeException.getLocalizedMessage(), runtimeException);
|
||||
|
|
|
@ -200,8 +200,8 @@ public class RedeliveryRestartWithExceptionTest extends TestSupport {
|
|||
msg = (TextMessage) consumer.receive(4000);
|
||||
LOG.info("redelivered? got: " + msg);
|
||||
assertNotNull("got the message again", msg);
|
||||
assertEquals("re delivery flag", true, msg.getJMSRedelivered());
|
||||
assertTrue("redelivery count survives reconnect", msg.getLongProperty("JMSXDeliveryCount") > 1);
|
||||
assertEquals("re delivery flag on:" + i, true, msg.getJMSRedelivered());
|
||||
assertTrue("redelivery count survives reconnect for:" + i, msg.getLongProperty("JMSXDeliveryCount") > 1);
|
||||
msg.acknowledge();
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue