mirror of https://github.com/apache/activemq.git
Add accessors with the correctly spelled name and deprecate the old ones.
This commit is contained in:
parent
7fe23bce62
commit
b9fd189d56
|
@ -17,9 +17,10 @@
|
||||||
package org.apache.activemq.broker.region;
|
package org.apache.activemq.broker.region;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import javax.jms.ResourceAllocationException;
|
import javax.jms.ResourceAllocationException;
|
||||||
|
|
||||||
import org.apache.activemq.advisory.AdvisorySupport;
|
import org.apache.activemq.advisory.AdvisorySupport;
|
||||||
import org.apache.activemq.broker.Broker;
|
import org.apache.activemq.broker.Broker;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
@ -33,7 +34,6 @@ import org.apache.activemq.command.Message;
|
||||||
import org.apache.activemq.command.MessageAck;
|
import org.apache.activemq.command.MessageAck;
|
||||||
import org.apache.activemq.command.MessageDispatchNotification;
|
import org.apache.activemq.command.MessageDispatchNotification;
|
||||||
import org.apache.activemq.command.ProducerInfo;
|
import org.apache.activemq.command.ProducerInfo;
|
||||||
import org.apache.activemq.command.TransactionId;
|
|
||||||
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
|
import org.apache.activemq.filter.NonCachedMessageEvaluationContext;
|
||||||
import org.apache.activemq.security.SecurityContext;
|
import org.apache.activemq.security.SecurityContext;
|
||||||
import org.apache.activemq.state.ProducerState;
|
import org.apache.activemq.state.ProducerState;
|
||||||
|
@ -94,7 +94,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
protected int storeUsageHighWaterMark = 100;
|
protected int storeUsageHighWaterMark = 100;
|
||||||
private SlowConsumerStrategy slowConsumerStrategy;
|
private SlowConsumerStrategy slowConsumerStrategy;
|
||||||
private boolean prioritizedMessages;
|
private boolean prioritizedMessages;
|
||||||
private long inactiveTimoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
|
private long inactiveTimeoutBeforeGC = DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
|
||||||
private boolean gcIfInactive;
|
private boolean gcIfInactive;
|
||||||
private boolean gcWithNetworkConsumers;
|
private boolean gcWithNetworkConsumers;
|
||||||
private long lastActiveTime=0l;
|
private long lastActiveTime=0l;
|
||||||
|
@ -146,6 +146,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
/**
|
/**
|
||||||
* @return the producerFlowControl
|
* @return the producerFlowControl
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public boolean isProducerFlowControl() {
|
public boolean isProducerFlowControl() {
|
||||||
return producerFlowControl;
|
return producerFlowControl;
|
||||||
}
|
}
|
||||||
|
@ -153,14 +154,17 @@ public abstract class BaseDestination implements Destination {
|
||||||
/**
|
/**
|
||||||
* @param producerFlowControl the producerFlowControl to set
|
* @param producerFlowControl the producerFlowControl to set
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void setProducerFlowControl(boolean producerFlowControl) {
|
public void setProducerFlowControl(boolean producerFlowControl) {
|
||||||
this.producerFlowControl = producerFlowControl;
|
this.producerFlowControl = producerFlowControl;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isAlwaysRetroactive() {
|
public boolean isAlwaysRetroactive() {
|
||||||
return alwaysRetroactive;
|
return alwaysRetroactive;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setAlwaysRetroactive(boolean alwaysRetroactive) {
|
public void setAlwaysRetroactive(boolean alwaysRetroactive) {
|
||||||
this.alwaysRetroactive = alwaysRetroactive;
|
this.alwaysRetroactive = alwaysRetroactive;
|
||||||
}
|
}
|
||||||
|
@ -173,6 +177,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
* @param blockedProducerWarningInterval the interval at which warning about
|
* @param blockedProducerWarningInterval the interval at which warning about
|
||||||
* blocked producers will be triggered.
|
* blocked producers will be triggered.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
|
public void setBlockedProducerWarningInterval(long blockedProducerWarningInterval) {
|
||||||
this.blockedProducerWarningInterval = blockedProducerWarningInterval;
|
this.blockedProducerWarningInterval = blockedProducerWarningInterval;
|
||||||
}
|
}
|
||||||
|
@ -182,6 +187,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
* @return the interval at which warning about blocked producers will be
|
* @return the interval at which warning about blocked producers will be
|
||||||
* triggered.
|
* triggered.
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public long getBlockedProducerWarningInterval() {
|
public long getBlockedProducerWarningInterval() {
|
||||||
return blockedProducerWarningInterval;
|
return blockedProducerWarningInterval;
|
||||||
}
|
}
|
||||||
|
@ -189,6 +195,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
/**
|
/**
|
||||||
* @return the maxProducersToAudit
|
* @return the maxProducersToAudit
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public int getMaxProducersToAudit() {
|
public int getMaxProducersToAudit() {
|
||||||
return maxProducersToAudit;
|
return maxProducersToAudit;
|
||||||
}
|
}
|
||||||
|
@ -196,6 +203,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
/**
|
/**
|
||||||
* @param maxProducersToAudit the maxProducersToAudit to set
|
* @param maxProducersToAudit the maxProducersToAudit to set
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void setMaxProducersToAudit(int maxProducersToAudit) {
|
public void setMaxProducersToAudit(int maxProducersToAudit) {
|
||||||
this.maxProducersToAudit = maxProducersToAudit;
|
this.maxProducersToAudit = maxProducersToAudit;
|
||||||
}
|
}
|
||||||
|
@ -203,6 +211,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
/**
|
/**
|
||||||
* @return the maxAuditDepth
|
* @return the maxAuditDepth
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public int getMaxAuditDepth() {
|
public int getMaxAuditDepth() {
|
||||||
return maxAuditDepth;
|
return maxAuditDepth;
|
||||||
}
|
}
|
||||||
|
@ -210,6 +219,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
/**
|
/**
|
||||||
* @param maxAuditDepth the maxAuditDepth to set
|
* @param maxAuditDepth the maxAuditDepth to set
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void setMaxAuditDepth(int maxAuditDepth) {
|
public void setMaxAuditDepth(int maxAuditDepth) {
|
||||||
this.maxAuditDepth = maxAuditDepth;
|
this.maxAuditDepth = maxAuditDepth;
|
||||||
}
|
}
|
||||||
|
@ -217,6 +227,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
/**
|
/**
|
||||||
* @return the enableAudit
|
* @return the enableAudit
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public boolean isEnableAudit() {
|
public boolean isEnableAudit() {
|
||||||
return enableAudit;
|
return enableAudit;
|
||||||
}
|
}
|
||||||
|
@ -224,53 +235,65 @@ public abstract class BaseDestination implements Destination {
|
||||||
/**
|
/**
|
||||||
* @param enableAudit the enableAudit to set
|
* @param enableAudit the enableAudit to set
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void setEnableAudit(boolean enableAudit) {
|
public void setEnableAudit(boolean enableAudit) {
|
||||||
this.enableAudit = enableAudit;
|
this.enableAudit = enableAudit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
|
public void addProducer(ConnectionContext context, ProducerInfo info) throws Exception {
|
||||||
destinationStatistics.getProducers().increment();
|
destinationStatistics.getProducers().increment();
|
||||||
this.lastActiveTime=0l;
|
this.lastActiveTime=0l;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
|
public void removeProducer(ConnectionContext context, ProducerInfo info) throws Exception {
|
||||||
destinationStatistics.getProducers().decrement();
|
destinationStatistics.getProducers().decrement();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
|
public void addSubscription(ConnectionContext context, Subscription sub) throws Exception{
|
||||||
destinationStatistics.getConsumers().increment();
|
destinationStatistics.getConsumers().increment();
|
||||||
this.lastActiveTime=0l;
|
this.lastActiveTime=0l;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
|
public void removeSubscription(ConnectionContext context, Subscription sub, long lastDeliveredSequenceId) throws Exception{
|
||||||
destinationStatistics.getConsumers().decrement();
|
destinationStatistics.getConsumers().decrement();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
public final MemoryUsage getMemoryUsage() {
|
public final MemoryUsage getMemoryUsage() {
|
||||||
return memoryUsage;
|
return memoryUsage;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMemoryUsage(MemoryUsage memoryUsage) {
|
@Override
|
||||||
this.memoryUsage = memoryUsage;
|
public void setMemoryUsage(MemoryUsage memoryUsage) {
|
||||||
|
this.memoryUsage = memoryUsage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public DestinationStatistics getDestinationStatistics() {
|
public DestinationStatistics getDestinationStatistics() {
|
||||||
return destinationStatistics;
|
return destinationStatistics;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public ActiveMQDestination getActiveMQDestination() {
|
public ActiveMQDestination getActiveMQDestination() {
|
||||||
return destination;
|
return destination;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public final String getName() {
|
public final String getName() {
|
||||||
return getActiveMQDestination().getPhysicalName();
|
return getActiveMQDestination().getPhysicalName();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public final MessageStore getMessageStore() {
|
public final MessageStore getMessageStore() {
|
||||||
return store;
|
return store;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isActive() {
|
public boolean isActive() {
|
||||||
boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
|
boolean isActive = destinationStatistics.getConsumers().getCount() != 0 ||
|
||||||
destinationStatistics.getProducers().getCount() != 0;
|
destinationStatistics.getProducers().getCount() != 0;
|
||||||
|
@ -280,18 +303,22 @@ public abstract class BaseDestination implements Destination {
|
||||||
return isActive;
|
return isActive;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getMaxPageSize() {
|
public int getMaxPageSize() {
|
||||||
return maxPageSize;
|
return maxPageSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setMaxPageSize(int maxPageSize) {
|
public void setMaxPageSize(int maxPageSize) {
|
||||||
this.maxPageSize = maxPageSize;
|
this.maxPageSize = maxPageSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getMaxBrowsePageSize() {
|
public int getMaxBrowsePageSize() {
|
||||||
return this.maxBrowsePageSize > 0 ? this.maxBrowsePageSize : getMaxPageSize();
|
return this.maxBrowsePageSize > 0 ? this.maxBrowsePageSize : getMaxPageSize();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setMaxBrowsePageSize(int maxPageSize) {
|
public void setMaxBrowsePageSize(int maxPageSize) {
|
||||||
this.maxBrowsePageSize = maxPageSize;
|
this.maxBrowsePageSize = maxPageSize;
|
||||||
}
|
}
|
||||||
|
@ -312,26 +339,32 @@ public abstract class BaseDestination implements Destination {
|
||||||
return expireMessagesPeriod;
|
return expireMessagesPeriod;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isUseCache() {
|
public boolean isUseCache() {
|
||||||
return useCache;
|
return useCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setUseCache(boolean useCache) {
|
public void setUseCache(boolean useCache) {
|
||||||
this.useCache = useCache;
|
this.useCache = useCache;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getMinimumMessageSize() {
|
public int getMinimumMessageSize() {
|
||||||
return minimumMessageSize;
|
return minimumMessageSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setMinimumMessageSize(int minimumMessageSize) {
|
public void setMinimumMessageSize(int minimumMessageSize) {
|
||||||
this.minimumMessageSize = minimumMessageSize;
|
this.minimumMessageSize = minimumMessageSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isLazyDispatch() {
|
public boolean isLazyDispatch() {
|
||||||
return lazyDispatch;
|
return lazyDispatch;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setLazyDispatch(boolean lazyDispatch) {
|
public void setLazyDispatch(boolean lazyDispatch) {
|
||||||
this.lazyDispatch = lazyDispatch;
|
this.lazyDispatch = lazyDispatch;
|
||||||
}
|
}
|
||||||
|
@ -436,6 +469,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
/**
|
/**
|
||||||
* @return the dead letter strategy
|
* @return the dead letter strategy
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public DeadLetterStrategy getDeadLetterStrategy() {
|
public DeadLetterStrategy getDeadLetterStrategy() {
|
||||||
return deadLetterStrategy;
|
return deadLetterStrategy;
|
||||||
}
|
}
|
||||||
|
@ -449,10 +483,12 @@ public abstract class BaseDestination implements Destination {
|
||||||
this.deadLetterStrategy = deadLetterStrategy;
|
this.deadLetterStrategy = deadLetterStrategy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getCursorMemoryHighWaterMark() {
|
public int getCursorMemoryHighWaterMark() {
|
||||||
return this.cursorMemoryHighWaterMark;
|
return this.cursorMemoryHighWaterMark;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
|
public void setCursorMemoryHighWaterMark(int cursorMemoryHighWaterMark) {
|
||||||
this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
|
this.cursorMemoryHighWaterMark = cursorMemoryHighWaterMark;
|
||||||
}
|
}
|
||||||
|
@ -463,6 +499,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
* @param context
|
* @param context
|
||||||
* @param messageReference
|
* @param messageReference
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
|
public void messageConsumed(ConnectionContext context, MessageReference messageReference) {
|
||||||
if (advisoryForConsumed) {
|
if (advisoryForConsumed) {
|
||||||
broker.messageConsumed(context, messageReference);
|
broker.messageConsumed(context, messageReference);
|
||||||
|
@ -475,6 +512,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
* @param context
|
* @param context
|
||||||
* @param messageReference
|
* @param messageReference
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
|
public void messageDelivered(ConnectionContext context, MessageReference messageReference) {
|
||||||
if (advisoryForDelivery) {
|
if (advisoryForDelivery) {
|
||||||
broker.messageDelivered(context, messageReference);
|
broker.messageDelivered(context, messageReference);
|
||||||
|
@ -488,6 +526,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
* @param context
|
* @param context
|
||||||
* @param messageReference
|
* @param messageReference
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
|
public void messageDiscarded(ConnectionContext context, Subscription sub, MessageReference messageReference) {
|
||||||
if (advisoryForDiscardingMessages) {
|
if (advisoryForDiscardingMessages) {
|
||||||
broker.messageDiscarded(context, sub, messageReference);
|
broker.messageDiscarded(context, sub, messageReference);
|
||||||
|
@ -500,6 +539,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
* @param context
|
* @param context
|
||||||
* @param subs
|
* @param subs
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void slowConsumer(ConnectionContext context, Subscription subs) {
|
public void slowConsumer(ConnectionContext context, Subscription subs) {
|
||||||
if (advisoryForSlowConsumers) {
|
if (advisoryForSlowConsumers) {
|
||||||
broker.slowConsumer(context, this, subs);
|
broker.slowConsumer(context, this, subs);
|
||||||
|
@ -515,6 +555,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
* @param context
|
* @param context
|
||||||
* @param producerInfo
|
* @param producerInfo
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
|
public void fastProducer(ConnectionContext context, ProducerInfo producerInfo) {
|
||||||
if (advisoryForFastProducers) {
|
if (advisoryForFastProducers) {
|
||||||
broker.fastProducer(context, producerInfo, getActiveMQDestination());
|
broker.fastProducer(context, producerInfo, getActiveMQDestination());
|
||||||
|
@ -527,12 +568,14 @@ public abstract class BaseDestination implements Destination {
|
||||||
* @param context
|
* @param context
|
||||||
* @param usage
|
* @param usage
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void isFull(ConnectionContext context, Usage<?> usage) {
|
public void isFull(ConnectionContext context, Usage<?> usage) {
|
||||||
if (advisoryWhenFull) {
|
if (advisoryWhenFull) {
|
||||||
broker.isFull(context, this, usage);
|
broker.isFull(context, this, usage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void dispose(ConnectionContext context) throws IOException {
|
public void dispose(ConnectionContext context) throws IOException {
|
||||||
if (this.store != null) {
|
if (this.store != null) {
|
||||||
this.store.removeAllMessages(context);
|
this.store.removeAllMessages(context);
|
||||||
|
@ -543,6 +586,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
this.disposed = true;
|
this.disposed = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isDisposed() {
|
public boolean isDisposed() {
|
||||||
return this.disposed;
|
return this.disposed;
|
||||||
}
|
}
|
||||||
|
@ -598,6 +642,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
|
public void processDispatchNotification(MessageDispatchNotification messageDispatchNotification) throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -653,11 +698,13 @@ public abstract class BaseDestination implements Destination {
|
||||||
this.slowConsumerStrategy = slowConsumerStrategy;
|
this.slowConsumerStrategy = slowConsumerStrategy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public SlowConsumerStrategy getSlowConsumerStrategy() {
|
public SlowConsumerStrategy getSlowConsumerStrategy() {
|
||||||
return this.slowConsumerStrategy;
|
return this.slowConsumerStrategy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isPrioritizedMessages() {
|
public boolean isPrioritizedMessages() {
|
||||||
return this.prioritizedMessages;
|
return this.prioritizedMessages;
|
||||||
}
|
}
|
||||||
|
@ -670,17 +717,18 @@ public abstract class BaseDestination implements Destination {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the inactiveTimoutBeforeGC
|
* @return the inactiveTimeoutBeforeGC
|
||||||
*/
|
*/
|
||||||
public long getInactiveTimoutBeforeGC() {
|
@Override
|
||||||
return this.inactiveTimoutBeforeGC;
|
public long getInactiveTimeoutBeforeGC() {
|
||||||
|
return this.inactiveTimeoutBeforeGC;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param inactiveTimoutBeforeGC the inactiveTimoutBeforeGC to set
|
* @param inactiveTimeoutBeforeGC the inactiveTimeoutBeforeGC to set
|
||||||
*/
|
*/
|
||||||
public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
|
public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
|
||||||
this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
|
this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -709,17 +757,19 @@ public abstract class BaseDestination implements Destination {
|
||||||
return gcWithNetworkConsumers;
|
return gcWithNetworkConsumers;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void markForGC(long timeStamp) {
|
public void markForGC(long timeStamp) {
|
||||||
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
|
if (isGcIfInactive() && this.lastActiveTime == 0 && isActive() == false
|
||||||
&& destinationStatistics.messages.getCount() == 0 && getInactiveTimoutBeforeGC() > 0l) {
|
&& destinationStatistics.messages.getCount() == 0 && getInactiveTimeoutBeforeGC() > 0l) {
|
||||||
this.lastActiveTime = timeStamp;
|
this.lastActiveTime = timeStamp;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean canGC() {
|
public boolean canGC() {
|
||||||
boolean result = false;
|
boolean result = false;
|
||||||
if (isGcIfInactive()&& this.lastActiveTime != 0l) {
|
if (isGcIfInactive()&& this.lastActiveTime != 0l) {
|
||||||
if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimoutBeforeGC()) {
|
if ((System.currentTimeMillis() - this.lastActiveTime) >= getInactiveTimeoutBeforeGC()) {
|
||||||
result = true;
|
result = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -734,10 +784,12 @@ public abstract class BaseDestination implements Destination {
|
||||||
return this.reduceMemoryFootprint;
|
return this.reduceMemoryFootprint;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isDoOptimzeMessageStorage() {
|
public boolean isDoOptimzeMessageStorage() {
|
||||||
return doOptimzeMessageStorage;
|
return doOptimzeMessageStorage;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
|
public void setDoOptimzeMessageStorage(boolean doOptimzeMessageStorage) {
|
||||||
this.doOptimzeMessageStorage = doOptimzeMessageStorage;
|
this.doOptimzeMessageStorage = doOptimzeMessageStorage;
|
||||||
}
|
}
|
||||||
|
@ -751,6 +803,7 @@ public abstract class BaseDestination implements Destination {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
public abstract List<Subscription> getConsumers();
|
public abstract List<Subscription> getConsumers();
|
||||||
|
|
||||||
protected boolean hasRegularConsumers(List<Subscription> consumers) {
|
protected boolean hasRegularConsumers(List<Subscription> consumers) {
|
||||||
|
@ -790,10 +843,12 @@ public abstract class BaseDestination implements Destination {
|
||||||
return ack;
|
return ack;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public boolean isDLQ() {
|
public boolean isDLQ() {
|
||||||
return getDeadLetterStrategy().isDLQ(this.getActiveMQDestination());
|
return getDeadLetterStrategy().isDLQ(this.getActiveMQDestination());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void duplicateFromStore(Message message, Subscription durableSub) {
|
public void duplicateFromStore(Message message, Subscription durableSub) {
|
||||||
ConnectionContext connectionContext = createConnectionContext();
|
ConnectionContext connectionContext = createConnectionContext();
|
||||||
getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId());
|
getLog().warn("duplicate message from store {}, redirecting for dlq processing", message.getMessageId());
|
||||||
|
|
|
@ -18,6 +18,7 @@ package org.apache.activemq.broker.region;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.activemq.Service;
|
import org.apache.activemq.Service;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
|
@ -54,7 +55,7 @@ public interface Destination extends Service, Task, Message.MessageDestination {
|
||||||
|
|
||||||
void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException;
|
void acknowledge(ConnectionContext context, Subscription sub, final MessageAck ack, final MessageReference node) throws IOException;
|
||||||
|
|
||||||
long getInactiveTimoutBeforeGC();
|
long getInactiveTimeoutBeforeGC();
|
||||||
|
|
||||||
void markForGC(long timeStamp);
|
void markForGC(long timeStamp);
|
||||||
|
|
||||||
|
@ -64,6 +65,7 @@ public interface Destination extends Service, Task, Message.MessageDestination {
|
||||||
|
|
||||||
ActiveMQDestination getActiveMQDestination();
|
ActiveMQDestination getActiveMQDestination();
|
||||||
|
|
||||||
|
@Override
|
||||||
MemoryUsage getMemoryUsage();
|
MemoryUsage getMemoryUsage();
|
||||||
|
|
||||||
void setMemoryUsage(MemoryUsage memoryUsage);
|
void setMemoryUsage(MemoryUsage memoryUsage);
|
||||||
|
@ -133,6 +135,7 @@ public interface Destination extends Service, Task, Message.MessageDestination {
|
||||||
|
|
||||||
public void setUseCache(boolean useCache);
|
public void setUseCache(boolean useCache);
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getMinimumMessageSize();
|
public int getMinimumMessageSize();
|
||||||
|
|
||||||
public void setMinimumMessageSize(int minimumMessageSize);
|
public void setMinimumMessageSize(int minimumMessageSize);
|
||||||
|
|
|
@ -88,8 +88,8 @@ public class DestinationFilter implements Destination {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getInactiveTimoutBeforeGC() {
|
public long getInactiveTimeoutBeforeGC() {
|
||||||
return next.getInactiveTimoutBeforeGC();
|
return next.getInactiveTimeoutBeforeGC();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -33,7 +33,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
import javax.jms.InvalidClientIDException;
|
import javax.jms.InvalidClientIDException;
|
||||||
import javax.jms.JMSException;
|
import javax.jms.JMSException;
|
||||||
|
|
||||||
import org.apache.activemq.broker.*;
|
import org.apache.activemq.broker.Broker;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.Connection;
|
||||||
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
import org.apache.activemq.broker.ConsumerBrokerExchange;
|
||||||
|
import org.apache.activemq.broker.EmptyBroker;
|
||||||
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
|
import org.apache.activemq.broker.TransportConnection;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
@ -864,7 +872,7 @@ public class RegionBroker extends EmptyBroker {
|
||||||
if (dest instanceof BaseDestination) {
|
if (dest instanceof BaseDestination) {
|
||||||
log = ((BaseDestination) dest).getLog();
|
log = ((BaseDestination) dest).getLog();
|
||||||
}
|
}
|
||||||
log.info("{} Inactive for longer than {} ms - removing ...", dest.getName(), dest.getInactiveTimoutBeforeGC());
|
log.info("{} Inactive for longer than {} ms - removing ...", dest.getName(), dest.getInactiveTimeoutBeforeGC());
|
||||||
try {
|
try {
|
||||||
getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
|
getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
@ -885,6 +893,7 @@ public class RegionBroker extends EmptyBroker {
|
||||||
this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
|
this.allowTempAutoCreationOnSend = allowTempAutoCreationOnSend;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public void reapplyInterceptor() {
|
public void reapplyInterceptor() {
|
||||||
queueRegion.reapplyInterceptor();
|
queueRegion.reapplyInterceptor();
|
||||||
topicRegion.reapplyInterceptor();
|
topicRegion.reapplyInterceptor();
|
||||||
|
|
|
@ -95,7 +95,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
private boolean allConsumersExclusiveByDefault;
|
private boolean allConsumersExclusiveByDefault;
|
||||||
private boolean gcInactiveDestinations;
|
private boolean gcInactiveDestinations;
|
||||||
private boolean gcWithNetworkConsumers;
|
private boolean gcWithNetworkConsumers;
|
||||||
private long inactiveTimoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
|
private long inactiveTimeoutBeforeGC = BaseDestination.DEFAULT_INACTIVE_TIMEOUT_BEFORE_GC;
|
||||||
private boolean reduceMemoryFootprint;
|
private boolean reduceMemoryFootprint;
|
||||||
private NetworkBridgeFilterFactory networkBridgeFilterFactory;
|
private NetworkBridgeFilterFactory networkBridgeFilterFactory;
|
||||||
private boolean doOptimzeMessageStorage = true;
|
private boolean doOptimzeMessageStorage = true;
|
||||||
|
@ -187,7 +187,7 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
|
|
||||||
destination.setGcIfInactive(isGcInactiveDestinations());
|
destination.setGcIfInactive(isGcInactiveDestinations());
|
||||||
destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
|
destination.setGcWithNetworkConsumers(isGcWithNetworkConsumers());
|
||||||
destination.setInactiveTimoutBeforeGC(getInactiveTimoutBeforeGC());
|
destination.setInactiveTimeoutBeforeGC(getInactiveTimeoutBeforeGC());
|
||||||
destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
|
destination.setReduceMemoryFootprint(isReduceMemoryFootprint());
|
||||||
destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
|
destination.setDoOptimzeMessageStorage(isDoOptimzeMessageStorage());
|
||||||
destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
|
destination.setOptimizeMessageStoreInFlightLimit(getOptimizeMessageStoreInFlightLimit());
|
||||||
|
@ -875,12 +875,44 @@ public class PolicyEntry extends DestinationMapEntry {
|
||||||
this.gcInactiveDestinations = gcInactiveDestinations;
|
this.gcInactiveDestinations = gcInactiveDestinations;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the amount of time spent inactive before GC of the destination kicks in.
|
||||||
|
*
|
||||||
|
* @deprecated use getInactiveTimeoutBeforeGC instead.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public long getInactiveTimoutBeforeGC() {
|
public long getInactiveTimoutBeforeGC() {
|
||||||
return this.inactiveTimoutBeforeGC;
|
return getInactiveTimeoutBeforeGC();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the amount of time a destination is inactive before it is marked for GC
|
||||||
|
*
|
||||||
|
* @param inactiveTimoutBeforeGC
|
||||||
|
* time in milliseconds to configure as the inactive timeout.
|
||||||
|
*
|
||||||
|
* @deprecated use getInactiveTimeoutBeforeGC instead.
|
||||||
|
*/
|
||||||
|
@Deprecated
|
||||||
public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
|
public void setInactiveTimoutBeforeGC(long inactiveTimoutBeforeGC) {
|
||||||
this.inactiveTimoutBeforeGC = inactiveTimoutBeforeGC;
|
setInactiveTimeoutBeforeGC(inactiveTimoutBeforeGC);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the amount of time spent inactive before GC of the destination kicks in.
|
||||||
|
*/
|
||||||
|
public long getInactiveTimeoutBeforeGC() {
|
||||||
|
return this.inactiveTimeoutBeforeGC;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets the amount of time a destination is inactive before it is marked for GC
|
||||||
|
*
|
||||||
|
* @param inactiveTimoutBeforeGC
|
||||||
|
* time in milliseconds to configure as the inactive timeout.
|
||||||
|
*/
|
||||||
|
public void setInactiveTimeoutBeforeGC(long inactiveTimeoutBeforeGC) {
|
||||||
|
this.inactiveTimeoutBeforeGC = inactiveTimeoutBeforeGC;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
|
public void setGcWithNetworkConsumers(boolean gcWithNetworkConsumers) {
|
||||||
|
|
Loading…
Reference in New Issue