This commit is contained in:
Clebert Suconic 2019-01-30 15:38:37 -05:00
commit 40704c8101
5 changed files with 32 additions and 11 deletions

View File

@ -282,6 +282,12 @@ public final class BindingsImpl implements Bindings {
private void route(final Message message,
final RoutingContext context,
final boolean groupRouting) throws Exception {
boolean reusableContext = context.isReusable(message, version.get());
if (!reusableContext) {
context.clear();
}
/* This is a special treatment for scaled-down messages involving SnF queues.
* See org.apache.activemq.artemis.core.server.impl.ScaleDownHandler.scaleDownMessages() for the logic that sends messages with this property
*/
@ -310,6 +316,7 @@ public final class BindingsImpl implements Bindings {
binding.getBindable().route(message, context);
routed = true;
}
context.setReusable(false);
}
if (!routed) {
// Remove the ids now, in order to avoid double check
@ -319,10 +326,10 @@ public final class BindingsImpl implements Bindings {
SimpleString groupId = message.getGroupID();
if (ids != null) {
context.clear();
context.clear().setReusable(false);
routeFromCluster(message, context, ids);
} else if (groupingHandler != null && groupRouting && groupId != null) {
context.clear();
context.clear().setReusable(false);
routeUsingStrictOrdering(message, context, groupingHandler, groupId, 0);
} else if (CompositeAddress.isFullyQualified(message.getAddress())) {
Binding theBinding = bindingsNameMap.get(CompositeAddress.extractQueueName(message.getAddressSimpleString()));
@ -331,9 +338,8 @@ public final class BindingsImpl implements Bindings {
}
} else {
// in a optimization, we are reusing the previous context if everything is right for it
// so the simpleRouting will only happen if neededk
if (!context.isReusable(message, version.get())) {
context.clear();
// so the simpleRouting will only happen if needed
if (!reusableContext) {
simpleRouting(message, context);
}
}

View File

@ -889,6 +889,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
if (bindingMove != null) {
context.clear();
context.setReusable(false);
bindingMove.route(message, context);
if (addressInfo != null) {
addressInfo.incrementRoutedMessageCount();
@ -899,6 +900,7 @@ public class PostOfficeImpl implements PostOffice, NotificationListener, Binding
addressInfo.incrementRoutedMessageCount();
}
} else {
context.setReusable(false);
if (addressInfo != null) {
addressInfo.incrementUnRoutedMessageCount();
}

View File

@ -40,7 +40,7 @@ public interface RoutingContext {
SimpleString getPreviousAddress();
void setReusable(boolean reusable);
RoutingContext setReusable(boolean reusable);
RoutingContext setReusable(boolean reusable, int version);
@ -60,7 +60,7 @@ public interface RoutingContext {
int getQueueCount();
void clear();
RoutingContext clear();
void addQueueWithAck(SimpleString address, Queue queue);

View File

@ -90,6 +90,8 @@ public class DivertImpl implements Divert {
logger.trace("Diverting message " + message + " into " + this);
}
context.setReusable(false);
Message copy = null;
// Shouldn't copy if it's not routed anywhere else
@ -127,7 +129,7 @@ public class DivertImpl implements Divert {
copy = message;
}
postOffice.route(copy, context.getTransaction(), false);
postOffice.route(copy, new RoutingContextImpl(context.getTransaction()).setReusable(false), false);
}
@Override

View File

@ -79,8 +79,14 @@ public final class RoutingContextImpl implements RoutingContext {
}
@Override
public void setReusable(boolean reusable) {
public RoutingContext setReusable(boolean reusable) {
if (this.reusable != null && !this.reusable.booleanValue()) {
// cannot set to Reusable once it was set to false
return this;
}
this.reusable = reusable;
return this;
}
@Override
public RoutingContext setReusable(boolean reusable, int previousBindings) {
@ -96,7 +102,7 @@ public final class RoutingContextImpl implements RoutingContext {
}
@Override
public void clear() {
public RoutingContext clear() {
map.clear();
queueCount = 0;
@ -104,6 +110,8 @@ public final class RoutingContextImpl implements RoutingContext {
this.version = 0;
this.reusable = null;
return this;
}
@Override
@ -147,7 +155,10 @@ public final class RoutingContextImpl implements RoutingContext {
@Override
public boolean isReusable(Message message, int version) {
return isReusable() && queueCount > 0 && address.equals(previousAddress) && previousRoutingType == routingType && getPreviousBindingsVersion() == version;
if (getPreviousBindingsVersion() != version) {
this.reusable = false;
}
return isReusable() && queueCount > 0 && address.equals(previousAddress) && previousRoutingType == routingType;
}
@Override