mirror of https://github.com/apache/activemq.git
Additional work, adds an option to ignore network subscriptions by default. When the ignoreIdleConsumers setting is false and the abort connection option is true this can cause the bridges to be torn down and rebuilt for no reason.
This commit is contained in:
parent
16c1627ca0
commit
77bcffc9b9
|
@ -18,7 +18,7 @@ package org.apache.activemq.broker.jmx;
|
|||
|
||||
import org.apache.activemq.broker.region.policy.AbortSlowAckConsumerStrategy;
|
||||
|
||||
public class AbortSlowAckConsumerStrategyView extends AbortSlowConsumerStrategyView {
|
||||
public class AbortSlowAckConsumerStrategyView extends AbortSlowConsumerStrategyView implements AbortSlowAckConsumerStrategyViewMBean {
|
||||
|
||||
private final AbortSlowAckConsumerStrategy strategy;
|
||||
|
||||
|
@ -27,19 +27,33 @@ public class AbortSlowAckConsumerStrategyView extends AbortSlowConsumerStrategyV
|
|||
this.strategy = slowConsumerStrategy;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getMaxTimeSinceLastAck() {
|
||||
return strategy.getMaxTimeSinceLastAck();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setMaxTimeSinceLastAck(long maxTimeSinceLastAck) {
|
||||
this.strategy.setMaxTimeSinceLastAck(maxTimeSinceLastAck);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isIgnoreIdleConsumers() {
|
||||
return strategy.isIgnoreIdleConsumers();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIgnoreIdleConsumers(boolean ignoreIdleConsumers) {
|
||||
this.strategy.setIgnoreIdleConsumers(ignoreIdleConsumers);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isIgnoreNetworkConsumers() {
|
||||
return this.strategy.isIgnoreNetworkSubscriptions();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setIgnoreNetworkConsumers(boolean ignoreNetworkConsumers) {
|
||||
this.strategy.setIgnoreNetworkConsumers(ignoreNetworkConsumers);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -30,4 +30,10 @@ public interface AbortSlowAckConsumerStrategyViewMBean extends AbortSlowConsumer
|
|||
@MBeanInfo("sets whether consumers that are idle (no dispatched messages) should be included when checking for slow acks.")
|
||||
void setIgnoreIdleConsumers(boolean ignoreIdleConsumers);
|
||||
|
||||
@MBeanInfo("returns the current value of the ignore network connector consumers setting.")
|
||||
boolean isIgnoreNetworkConsumers();
|
||||
|
||||
@MBeanInfo("sets whether consumers that are from network connector should be included when checking for slow acks.")
|
||||
void setIgnoreNetworkConsumers(boolean ignoreIdleConsumers);
|
||||
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
|
|||
private final List<Destination> destinations = new LinkedList<Destination>();
|
||||
private long maxTimeSinceLastAck = 30*1000;
|
||||
private boolean ignoreIdleConsumers = true;
|
||||
private boolean ignoreNetworkConsumers = true;
|
||||
|
||||
public AbortSlowAckConsumerStrategy() {
|
||||
this.name = "AbortSlowAckConsumerStrategy@" + hashCode();
|
||||
|
@ -103,10 +104,18 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
|
|||
private void updateSlowConsumersList(List<Subscription> subscribers) {
|
||||
for (Subscription subscriber : subscribers) {
|
||||
|
||||
if (subscriber.getConsumerInfo().isNetworkSubscription()) {
|
||||
if (slowConsumers.remove(subscriber) != null) {
|
||||
LOG.info("sub: {} is no longer slow", subscriber.getConsumerInfo().getConsumerId());
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (isIgnoreIdleConsumers() && subscriber.getDispatchedQueueSize() == 0) {
|
||||
// Not considered Idle so ensure its cleared from the list
|
||||
if (slowConsumers.remove(subscriber) != null) {
|
||||
LOG.info("sub: {} is no longer slow", subscriber.getConsumerInfo().getConsumerId());
|
||||
continue;
|
||||
} else {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Not ignoring idle Consumer {}", subscriber.getConsumerInfo().getConsumerId());
|
||||
|
@ -214,4 +223,34 @@ public class AbortSlowAckConsumerStrategy extends AbortSlowConsumerStrategy {
|
|||
public void setIgnoreIdleConsumers(boolean ignoreIdleConsumers) {
|
||||
this.ignoreIdleConsumers = ignoreIdleConsumers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns whether the strategy is configured to ignore subscriptions that are from a network
|
||||
* connection.
|
||||
*
|
||||
* @return true if the strategy will ignore network connection subscriptions when looking
|
||||
* for slow consumers.
|
||||
*/
|
||||
public boolean isIgnoreNetworkSubscriptions() {
|
||||
return ignoreNetworkConsumers;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets whether the strategy is configured to ignore consumers that are part of a network
|
||||
* connection to another broker.
|
||||
*
|
||||
* When configured to not ignore idle consumers this strategy acts not only on consumers
|
||||
* that are actually slow but also on any consumer that has not received any messages for
|
||||
* the maxTimeSinceLastAck. This allows for a way to evict idle consumers while also
|
||||
* aborting slow consumers however for a network subscription this can create a lot of
|
||||
* unnecessary churn and if the abort connection option is also enabled this can result
|
||||
* in the entire network connection being torn down and rebuilt for no reason.
|
||||
*
|
||||
* @param ignoreNetworkConsumers
|
||||
* Should this strategy ignore subscriptions made by a network connector.
|
||||
*/
|
||||
public void setIgnoreNetworkConsumers(boolean ignoreNetworkConsumers) {
|
||||
this.ignoreNetworkConsumers = ignoreNetworkConsumers;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue