mirror of https://github.com/apache/activemq.git
Fix and test for: https://issues.apache.org/jira/browse/AMQ-4513
Makes the locking in RegionBroker a bit more fine grained. We hold a lock only for a short time and allow destination adds that aren't on the same destination to continue on concurrently. git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1479963 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
866440dfb2
commit
d7aaca5034
|
@ -43,7 +43,23 @@ import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
import org.apache.activemq.broker.TransportConnector;
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
import org.apache.activemq.command.*;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.BrokerId;
|
||||||
|
import org.apache.activemq.command.BrokerInfo;
|
||||||
|
import org.apache.activemq.command.ConnectionId;
|
||||||
|
import org.apache.activemq.command.ConnectionInfo;
|
||||||
|
import org.apache.activemq.command.ConsumerControl;
|
||||||
|
import org.apache.activemq.command.ConsumerInfo;
|
||||||
|
import org.apache.activemq.command.DestinationInfo;
|
||||||
|
import org.apache.activemq.command.Message;
|
||||||
|
import org.apache.activemq.command.MessageAck;
|
||||||
|
import org.apache.activemq.command.MessageDispatch;
|
||||||
|
import org.apache.activemq.command.MessageDispatchNotification;
|
||||||
|
import org.apache.activemq.command.MessagePull;
|
||||||
|
import org.apache.activemq.command.ProducerInfo;
|
||||||
|
import org.apache.activemq.command.RemoveSubscriptionInfo;
|
||||||
|
import org.apache.activemq.command.Response;
|
||||||
|
import org.apache.activemq.command.TransactionId;
|
||||||
import org.apache.activemq.state.ConnectionState;
|
import org.apache.activemq.state.ConnectionState;
|
||||||
import org.apache.activemq.store.PListStore;
|
import org.apache.activemq.store.PListStore;
|
||||||
import org.apache.activemq.thread.Scheduler;
|
import org.apache.activemq.thread.Scheduler;
|
||||||
|
@ -59,8 +75,6 @@ import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Routes Broker operations to the correct messaging regions for processing.
|
* Routes Broker operations to the correct messaging regions for processing.
|
||||||
*
|
|
||||||
*
|
|
||||||
*/
|
*/
|
||||||
public class RegionBroker extends EmptyBroker {
|
public class RegionBroker extends EmptyBroker {
|
||||||
public static final String ORIGINAL_EXPIRATION = "originalExpiration";
|
public static final String ORIGINAL_EXPIRATION = "originalExpiration";
|
||||||
|
@ -80,6 +94,7 @@ public class RegionBroker extends EmptyBroker {
|
||||||
private boolean keepDurableSubsActive;
|
private boolean keepDurableSubsActive;
|
||||||
|
|
||||||
private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
|
private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
|
||||||
|
private final Map<ActiveMQDestination, ActiveMQDestination> destinationGate = new HashMap<ActiveMQDestination, ActiveMQDestination>();
|
||||||
private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
|
private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
|
||||||
private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
|
private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
|
||||||
|
|
||||||
|
@ -102,9 +117,9 @@ public class RegionBroker extends EmptyBroker {
|
||||||
};
|
};
|
||||||
|
|
||||||
public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
|
public RegionBroker(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, SystemUsage memoryManager, DestinationFactory destinationFactory,
|
||||||
DestinationInterceptor destinationInterceptor,Scheduler scheduler,ThreadPoolExecutor executor) throws IOException {
|
DestinationInterceptor destinationInterceptor, Scheduler scheduler, ThreadPoolExecutor executor) throws IOException {
|
||||||
this.brokerService = brokerService;
|
this.brokerService = brokerService;
|
||||||
this.executor=executor;
|
this.executor = executor;
|
||||||
this.scheduler = scheduler;
|
this.scheduler = scheduler;
|
||||||
if (destinationFactory == null) {
|
if (destinationFactory == null) {
|
||||||
throw new IllegalArgumentException("null destinationFactory");
|
throw new IllegalArgumentException("null destinationFactory");
|
||||||
|
@ -126,7 +141,7 @@ public class RegionBroker extends EmptyBroker {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set <Destination> getDestinations(ActiveMQDestination destination) {
|
public Set<Destination> getDestinations(ActiveMQDestination destination) {
|
||||||
try {
|
try {
|
||||||
return getRegion(destination).getDestinations(destination);
|
return getRegion(destination).getDestinations(destination);
|
||||||
} catch (JMSException jmse) {
|
} catch (JMSException jmse) {
|
||||||
|
@ -216,7 +231,7 @@ public class RegionBroker extends EmptyBroker {
|
||||||
ConnectionContext oldContext = clientIdSet.get(clientId);
|
ConnectionContext oldContext = clientIdSet.get(clientId);
|
||||||
if (oldContext != null) {
|
if (oldContext != null) {
|
||||||
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
|
throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from "
|
||||||
+ oldContext.getConnection().getRemoteAddress());
|
+ oldContext.getConnection().getRemoteAddress());
|
||||||
} else {
|
} else {
|
||||||
clientIdSet.put(clientId, context);
|
clientIdSet.put(clientId, context);
|
||||||
}
|
}
|
||||||
|
@ -267,21 +282,42 @@ public class RegionBroker extends EmptyBroker {
|
||||||
return answer;
|
return answer;
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized (destinations) {
|
synchronized (destinationGate) {
|
||||||
answer = destinations.get(destination);
|
answer = destinations.get(destination);
|
||||||
if (answer != null) {
|
if (answer != null) {
|
||||||
return answer;
|
return answer;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (destinationGate.get(destination) != null) {
|
||||||
|
// Guard against spurious wakeup.
|
||||||
|
while (destinationGate.containsKey(destination)) {
|
||||||
|
destinationGate.wait();
|
||||||
|
}
|
||||||
|
answer = destinations.get(destination);
|
||||||
|
if (answer != null) {
|
||||||
|
return answer;
|
||||||
|
} else {
|
||||||
|
// In case of intermediate remove or add failure
|
||||||
|
destinationGate.put(destination, destination);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
boolean create = true;
|
try {
|
||||||
if (destination.isTemporary())
|
boolean create = true;
|
||||||
create = createIfTemp;
|
if (destination.isTemporary()) {
|
||||||
answer = getRegion(destination).addDestination(context, destination, create);
|
create = createIfTemp;
|
||||||
|
}
|
||||||
|
answer = getRegion(destination).addDestination(context, destination, create);
|
||||||
|
destinations.put(destination, answer);
|
||||||
|
} finally {
|
||||||
|
synchronized (destinationGate) {
|
||||||
|
destinationGate.remove(destination);
|
||||||
|
destinationGate.notifyAll();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
destinations.put(destination, answer);
|
|
||||||
return answer;
|
return answer;
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -294,14 +330,13 @@ public class RegionBroker extends EmptyBroker {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
|
public void addDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
|
||||||
addDestination(context, info.getDestination(),true);
|
addDestination(context, info.getDestination(), true);
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
|
public void removeDestinationInfo(ConnectionContext context, DestinationInfo info) throws Exception {
|
||||||
removeDestination(context, info.getDestination(), info.getTimeout());
|
removeDestination(context, info.getDestination(), info.getTimeout());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -384,9 +419,10 @@ public class RegionBroker extends EmptyBroker {
|
||||||
ActiveMQDestination destination = message.getDestination();
|
ActiveMQDestination destination = message.getDestination();
|
||||||
message.setBrokerInTime(System.currentTimeMillis());
|
message.setBrokerInTime(System.currentTimeMillis());
|
||||||
if (producerExchange.isMutable() || producerExchange.getRegion() == null
|
if (producerExchange.isMutable() || producerExchange.getRegion() == null
|
||||||
|| (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
|
|| (producerExchange.getRegionDestination() != null && producerExchange.getRegionDestination().isDisposed())) {
|
||||||
// ensure the destination is registered with the RegionBroker
|
// ensure the destination is registered with the RegionBroker
|
||||||
producerExchange.getConnectionContext().getBroker().addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
|
producerExchange.getConnectionContext().getBroker()
|
||||||
|
.addDestination(producerExchange.getConnectionContext(), destination, isAllowTempAutoCreationOnSend());
|
||||||
producerExchange.setRegion(getRegion(destination));
|
producerExchange.setRegion(getRegion(destination));
|
||||||
producerExchange.setRegionDestination(null);
|
producerExchange.setRegionDestination(null);
|
||||||
}
|
}
|
||||||
|
@ -412,16 +448,16 @@ public class RegionBroker extends EmptyBroker {
|
||||||
|
|
||||||
protected Region getRegion(ActiveMQDestination destination) throws JMSException {
|
protected Region getRegion(ActiveMQDestination destination) throws JMSException {
|
||||||
switch (destination.getDestinationType()) {
|
switch (destination.getDestinationType()) {
|
||||||
case ActiveMQDestination.QUEUE_TYPE:
|
case ActiveMQDestination.QUEUE_TYPE:
|
||||||
return queueRegion;
|
return queueRegion;
|
||||||
case ActiveMQDestination.TOPIC_TYPE:
|
case ActiveMQDestination.TOPIC_TYPE:
|
||||||
return topicRegion;
|
return topicRegion;
|
||||||
case ActiveMQDestination.TEMP_QUEUE_TYPE:
|
case ActiveMQDestination.TEMP_QUEUE_TYPE:
|
||||||
return tempQueueRegion;
|
return tempQueueRegion;
|
||||||
case ActiveMQDestination.TEMP_TOPIC_TYPE:
|
case ActiveMQDestination.TEMP_TOPIC_TYPE:
|
||||||
return tempTopicRegion;
|
return tempTopicRegion;
|
||||||
default:
|
default:
|
||||||
throw createUnknownDestinationTypeException(destination);
|
throw createUnknownDestinationTypeException(destination);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -523,7 +559,7 @@ public class RegionBroker extends EmptyBroker {
|
||||||
if (info != null) {
|
if (info != null) {
|
||||||
BrokerInfo existing = brokerInfos.get(info.getBrokerId());
|
BrokerInfo existing = brokerInfos.get(info.getBrokerId());
|
||||||
if (existing != null && existing.decrementRefCount() == 0) {
|
if (existing != null && existing.decrementRefCount() == 0) {
|
||||||
brokerInfos.remove(info.getBrokerId());
|
brokerInfos.remove(info.getBrokerId());
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
|
LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
|
||||||
|
@ -551,7 +587,7 @@ public class RegionBroker extends EmptyBroker {
|
||||||
message.setBrokerOutTime(endTime);
|
message.setBrokerOutTime(endTime);
|
||||||
if (getBrokerService().isEnableStatistics()) {
|
if (getBrokerService().isEnableStatistics()) {
|
||||||
long totalTime = endTime - message.getBrokerInTime();
|
long totalTime = endTime - message.getBrokerInTime();
|
||||||
((Destination)message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
|
((Destination) message.getRegionDestination()).getDestinationStatistics().getProcessTime().addTime(totalTime);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -589,7 +625,7 @@ public class RegionBroker extends EmptyBroker {
|
||||||
|
|
||||||
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
|
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
|
||||||
this.keepDurableSubsActive = keepDurableSubsActive;
|
this.keepDurableSubsActive = keepDurableSubsActive;
|
||||||
((TopicRegion)topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
|
((TopicRegion) topicRegion).setKeepDurableSubsActive(keepDurableSubsActive);
|
||||||
}
|
}
|
||||||
|
|
||||||
public DestinationInterceptor getDestinationInterceptor() {
|
public DestinationInterceptor getDestinationInterceptor() {
|
||||||
|
@ -647,16 +683,15 @@ public class RegionBroker extends EmptyBroker {
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean stampAsExpired(Message message) throws IOException {
|
private boolean stampAsExpired(Message message) throws IOException {
|
||||||
boolean stamped=false;
|
boolean stamped = false;
|
||||||
if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
|
if (message.getProperty(ORIGINAL_EXPIRATION) == null) {
|
||||||
long expiration=message.getExpiration();
|
long expiration = message.getExpiration();
|
||||||
message.setProperty(ORIGINAL_EXPIRATION,new Long(expiration));
|
message.setProperty(ORIGINAL_EXPIRATION, new Long(expiration));
|
||||||
stamped = true;
|
stamped = true;
|
||||||
}
|
}
|
||||||
return stamped;
|
return stamped;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
|
public void messageExpired(ConnectionContext context, MessageReference node, Subscription subscription) {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
|
@ -666,47 +701,42 @@ public class RegionBroker extends EmptyBroker {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void sendToDeadLetterQueue(ConnectionContext context,
|
public void sendToDeadLetterQueue(ConnectionContext context, MessageReference node, Subscription subscription) {
|
||||||
MessageReference node, Subscription subscription){
|
try {
|
||||||
try{
|
if (node != null) {
|
||||||
if(node!=null){
|
Message message = node.getMessage();
|
||||||
Message message=node.getMessage();
|
if (message != null && node.getRegionDestination() != null) {
|
||||||
if(message!=null && node.getRegionDestination()!=null){
|
DeadLetterStrategy deadLetterStrategy = ((Destination) node.getRegionDestination()).getDeadLetterStrategy();
|
||||||
DeadLetterStrategy deadLetterStrategy=((Destination)node
|
if (deadLetterStrategy != null) {
|
||||||
.getRegionDestination()).getDeadLetterStrategy();
|
if (deadLetterStrategy.isSendToDeadLetterQueue(message)) {
|
||||||
if(deadLetterStrategy!=null){
|
|
||||||
if(deadLetterStrategy.isSendToDeadLetterQueue(message)){
|
|
||||||
// message may be inflight to other subscriptions so do not modify
|
// message may be inflight to other subscriptions so do not modify
|
||||||
message = message.copy();
|
message = message.copy();
|
||||||
stampAsExpired(message);
|
stampAsExpired(message);
|
||||||
message.setExpiration(0);
|
message.setExpiration(0);
|
||||||
if(!message.isPersistent()){
|
if (!message.isPersistent()) {
|
||||||
message.setPersistent(true);
|
message.setPersistent(true);
|
||||||
message.setProperty("originalDeliveryMode",
|
message.setProperty("originalDeliveryMode", "NON_PERSISTENT");
|
||||||
"NON_PERSISTENT");
|
|
||||||
}
|
}
|
||||||
// The original destination and transaction id do
|
// The original destination and transaction id do
|
||||||
// not get filled when the message is first sent,
|
// not get filled when the message is first sent,
|
||||||
// it is only populated if the message is routed to
|
// it is only populated if the message is routed to
|
||||||
// another destination like the DLQ
|
// another destination like the DLQ
|
||||||
ActiveMQDestination deadLetterDestination=deadLetterStrategy
|
ActiveMQDestination deadLetterDestination = deadLetterStrategy.getDeadLetterQueueFor(message, subscription);
|
||||||
.getDeadLetterQueueFor(message, subscription);
|
if (context.getBroker() == null) {
|
||||||
if (context.getBroker()==null) {
|
|
||||||
context.setBroker(getRoot());
|
context.setBroker(getRoot());
|
||||||
}
|
}
|
||||||
BrokerSupport.resendNoCopy(context,message,
|
BrokerSupport.resendNoCopy(context, message, deadLetterDestination);
|
||||||
deadLetterDestination);
|
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Dead Letter message with no DLQ strategy in place, message id: "
|
LOG.debug("Dead Letter message with no DLQ strategy in place, message id: " + message.getMessageId() + ", destination: "
|
||||||
+ message.getMessageId() + ", destination: " + message.getDestination());
|
+ message.getDestination());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}catch(Exception e){
|
} catch (Exception e) {
|
||||||
LOG.warn("Caught an exception sending to DLQ: "+node,e);
|
LOG.warn("Caught an exception sending to DLQ: " + node, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -725,12 +755,11 @@ public class RegionBroker extends EmptyBroker {
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public long getBrokerSequenceId() {
|
public long getBrokerSequenceId() {
|
||||||
synchronized(sequenceGenerator) {
|
synchronized (sequenceGenerator) {
|
||||||
return sequenceGenerator.getNextSequenceId();
|
return sequenceGenerator.getNextSequenceId();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Scheduler getScheduler() {
|
public Scheduler getScheduler() {
|
||||||
return this.scheduler;
|
return this.scheduler;
|
||||||
|
@ -747,7 +776,7 @@ public class RegionBroker extends EmptyBroker {
|
||||||
try {
|
try {
|
||||||
getRegion(destination).processConsumerControl(consumerExchange, control);
|
getRegion(destination).processConsumerControl(consumerExchange, control);
|
||||||
} catch (JMSException jmse) {
|
} catch (JMSException jmse) {
|
||||||
LOG.warn("unmatched destination: " + destination + ", in consumerControl: " + control);
|
LOG.warn("unmatched destination: " + destination + ", in consumerControl: " + control);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -801,8 +830,7 @@ public class RegionBroker extends EmptyBroker {
|
||||||
if (dest instanceof BaseDestination) {
|
if (dest instanceof BaseDestination) {
|
||||||
log = ((BaseDestination) dest).getLog();
|
log = ((BaseDestination) dest).getLog();
|
||||||
}
|
}
|
||||||
log.info(dest.getName() + " Inactive for longer than " +
|
log.info(dest.getName() + " Inactive for longer than " + dest.getInactiveTimoutBeforeGC() + " ms - removing ...");
|
||||||
dest.getInactiveTimoutBeforeGC() + " ms - removing ...");
|
|
||||||
try {
|
try {
|
||||||
getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
|
getRoot().removeDestination(context, dest.getActiveMQDestination(), isAllowTempAutoCreationOnSend() ? 1 : 0);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
|
|
|
@ -0,0 +1,142 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.bugs;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.Destination;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Session;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnection;
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.region.policy.DeadLetterStrategy;
|
||||||
|
import org.apache.activemq.broker.region.policy.IndividualDeadLetterStrategy;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class AMQ4513Test {
|
||||||
|
|
||||||
|
private BrokerService brokerService;
|
||||||
|
private String connectionUri;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
brokerService = new BrokerService();
|
||||||
|
|
||||||
|
connectionUri = brokerService.addConnector("tcp://localhost:0").getPublishableConnectString();
|
||||||
|
|
||||||
|
// Configure Dead Letter Strategy
|
||||||
|
DeadLetterStrategy strategy = new IndividualDeadLetterStrategy();
|
||||||
|
((IndividualDeadLetterStrategy)strategy).setUseQueueForQueueMessages(true);
|
||||||
|
((IndividualDeadLetterStrategy)strategy).setQueuePrefix("DLQ.");
|
||||||
|
strategy.setProcessNonPersistent(false);
|
||||||
|
strategy.setProcessExpired(false);
|
||||||
|
|
||||||
|
// Add policy and individual DLQ strategy
|
||||||
|
PolicyEntry policy = new PolicyEntry();
|
||||||
|
policy.setTimeBeforeDispatchStarts(3000);
|
||||||
|
policy.setDeadLetterStrategy(strategy);
|
||||||
|
|
||||||
|
PolicyMap pMap = new PolicyMap();
|
||||||
|
pMap.setDefaultEntry(policy);
|
||||||
|
|
||||||
|
brokerService.setDestinationPolicy(pMap);
|
||||||
|
|
||||||
|
brokerService.setPersistent(false);
|
||||||
|
brokerService.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void stop() throws Exception {
|
||||||
|
brokerService.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=360000)
|
||||||
|
public void test() throws Exception {
|
||||||
|
|
||||||
|
final ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(connectionUri);
|
||||||
|
|
||||||
|
ExecutorService service = Executors.newFixedThreadPool(25);
|
||||||
|
|
||||||
|
final Random ripple = new Random(System.currentTimeMillis());
|
||||||
|
|
||||||
|
for (int i = 0; i < 1000; ++i) {
|
||||||
|
service.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createTemporaryQueue();
|
||||||
|
session.createProducer(destination);
|
||||||
|
connection.close();
|
||||||
|
TimeUnit.MILLISECONDS.sleep(ripple.nextInt(20));
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
service.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createTemporaryQueue();
|
||||||
|
MessageProducer producer = session.createProducer(destination);
|
||||||
|
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
|
||||||
|
producer.setTimeToLive(400);
|
||||||
|
producer.send(session.createTextMessage());
|
||||||
|
producer.send(session.createTextMessage());
|
||||||
|
TimeUnit.MILLISECONDS.sleep(500);
|
||||||
|
connection.close();
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
service.execute(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
ActiveMQConnection connection = (ActiveMQConnection) cf.createConnection();
|
||||||
|
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||||
|
Destination destination = session.createTemporaryQueue();
|
||||||
|
session.createProducer(destination);
|
||||||
|
connection.close();
|
||||||
|
TimeUnit.MILLISECONDS.sleep(ripple.nextInt(20));
|
||||||
|
} catch (Exception e) {
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
service.shutdown();
|
||||||
|
assertTrue(service.awaitTermination(5, TimeUnit.MINUTES));
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue