mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3473 - Messages (possibly) stuck and pending messages count showing high number of pending message which do not get sent to a consumer. resolve the error in queue stats, message count and enqueue count when index suppressed a duplicate message add attempt
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1306228 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
aee81279cb
commit
fb0b63e379
|
@ -32,6 +32,8 @@ import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
import org.apache.activemq.broker.region.Destination;
|
||||||
|
import org.apache.activemq.broker.region.RegionBroker;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.command.ActiveMQQueue;
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
import org.apache.activemq.command.ActiveMQTempQueue;
|
import org.apache.activemq.command.ActiveMQTempQueue;
|
||||||
|
@ -283,6 +285,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void rollbackStatsOnDuplicate(KahaDestination commandDestination) {
|
||||||
|
RegionBroker regionBroker = (RegionBroker) brokerService.getRegionBroker();
|
||||||
|
Set<Destination> destinationSet = regionBroker.getDestinations(convert(commandDestination));
|
||||||
|
for (Destination destination : destinationSet) {
|
||||||
|
destination.getDestinationStatistics().getMessages().decrement();
|
||||||
|
destination.getDestinationStatistics().getEnqueues().decrement();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
|
private Location findMessageLocation(final String key, final KahaDestination destination) throws IOException {
|
||||||
return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
|
return pageFile.tx().execute(new Transaction.CallableClosure<Location, IOException>() {
|
||||||
public Location execute(Transaction tx) throws IOException {
|
public Location execute(Transaction tx) throws IOException {
|
||||||
|
@ -1037,7 +1049,14 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
||||||
}
|
}
|
||||||
int type = Integer.parseInt(dest.substring(0, p));
|
int type = Integer.parseInt(dest.substring(0, p));
|
||||||
String name = dest.substring(p + 1);
|
String name = dest.substring(p + 1);
|
||||||
|
return convert(type, name);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ActiveMQDestination convert(KahaDestination commandDestination) {
|
||||||
|
return convert(commandDestination.getType().getNumber(), commandDestination.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
private ActiveMQDestination convert(int type, String name) {
|
||||||
switch (KahaDestination.DestinationType.valueOf(type)) {
|
switch (KahaDestination.DestinationType.valueOf(type)) {
|
||||||
case QUEUE:
|
case QUEUE:
|
||||||
return new ActiveMQQueue(name);
|
return new ActiveMQQueue(name);
|
||||||
|
|
|
@ -1207,6 +1207,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
|
LOG.warn("Duplicate message add attempt rejected. Destination: " + command.getDestination().getName() + ", Message id: " + command.getMessageId());
|
||||||
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
|
sd.messageIdIndex.put(tx, command.getMessageId(), previous);
|
||||||
sd.locationIndex.remove(tx, location);
|
sd.locationIndex.remove(tx, location);
|
||||||
|
rollbackStatsOnDuplicate(command.getDestination());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// restore the previous value.. Looks like this was a redo of a
|
// restore the previous value.. Looks like this was a redo of a
|
||||||
|
@ -1222,6 +1223,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
||||||
metadata.lastUpdate = location;
|
metadata.lastUpdate = location;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
abstract void rollbackStatsOnDuplicate(KahaDestination commandDestination);
|
||||||
|
|
||||||
void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
|
void updateIndex(Transaction tx, KahaRemoveMessageCommand command, Location ackLocation) throws IOException {
|
||||||
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
|
StoredDestination sd = getStoredDestination(command.getDestination(), tx);
|
||||||
if (!command.hasSubscriptionKey()) {
|
if (!command.hasSubscriptionKey()) {
|
||||||
|
|
|
@ -39,6 +39,7 @@ import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.ProducerBrokerExchange;
|
import org.apache.activemq.broker.ProducerBrokerExchange;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
|
import org.apache.activemq.network.ConditionalNetworkBridgeFilterFactory;
|
||||||
import org.apache.activemq.network.NetworkConnector;
|
import org.apache.activemq.network.NetworkConnector;
|
||||||
import org.apache.activemq.util.Wait;
|
import org.apache.activemq.util.Wait;
|
||||||
|
@ -440,6 +441,84 @@ public class TwoBrokerQueueClientsReconnectTest extends JmsMultipleBrokersTestSu
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testDuplicateSendWithNoAuditEnqueueCountStat() throws Exception {
|
||||||
|
broker1 = "BrokerA";
|
||||||
|
broker2 = "BrokerB";
|
||||||
|
|
||||||
|
NetworkConnector networkConnector = bridgeBrokers(broker1, broker2);
|
||||||
|
|
||||||
|
final AtomicBoolean first = new AtomicBoolean();
|
||||||
|
final CountDownLatch gotMessageLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
BrokerService brokerService = brokers.get(broker2).broker;
|
||||||
|
brokerService.setPersistent(true);
|
||||||
|
brokerService.setDeleteAllMessagesOnStartup(true);
|
||||||
|
brokerService.setPlugins(new BrokerPlugin[]{
|
||||||
|
new BrokerPluginSupport() {
|
||||||
|
@Override
|
||||||
|
public void send(final ProducerBrokerExchange producerExchange,
|
||||||
|
org.apache.activemq.command.Message messageSend)
|
||||||
|
throws Exception {
|
||||||
|
super.send(producerExchange, messageSend);
|
||||||
|
if (first.compareAndSet(false, true)) {
|
||||||
|
producerExchange.getConnectionContext().setDontSendReponse(true);
|
||||||
|
Executors.newSingleThreadExecutor().execute(new Runnable() {
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
LOG.info("Waiting for recepit");
|
||||||
|
assertTrue("message received on time", gotMessageLatch.await(60, TimeUnit.SECONDS));
|
||||||
|
LOG.info("Stopping connection post send and receive and multiple producers");
|
||||||
|
producerExchange.getConnectionContext().getConnection().stop();
|
||||||
|
} catch (Exception e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// Create queue
|
||||||
|
ActiveMQDestination dest = createDestination("TEST.FOO", false);
|
||||||
|
|
||||||
|
// statically include our destination
|
||||||
|
networkConnector.addStaticallyIncludedDestination(dest);
|
||||||
|
|
||||||
|
// Run brokers
|
||||||
|
startAllBrokers();
|
||||||
|
|
||||||
|
waitForBridgeFormation();
|
||||||
|
|
||||||
|
sendMessages("BrokerA", dest, 1);
|
||||||
|
|
||||||
|
// wait for broker2 to get the initial forward
|
||||||
|
Wait.waitFor(new Wait.Condition(){
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return brokers.get(broker2).broker.getAdminView().getTotalMessageCount() == 1;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// message still pending on broker1
|
||||||
|
assertEquals("messages message still there", 1, brokers.get(broker1).broker.getAdminView().getTotalMessageCount());
|
||||||
|
|
||||||
|
// allow the bridge to be shutdown and restarted
|
||||||
|
gotMessageLatch.countDown();
|
||||||
|
|
||||||
|
|
||||||
|
// verify message is forwarded after restart
|
||||||
|
assertTrue("no messages enqueued on origin", Wait.waitFor(new Wait.Condition() {
|
||||||
|
@Override
|
||||||
|
public boolean isSatisified() throws Exception {
|
||||||
|
return 0 == brokers.get(broker1).broker.getAdminView().getTotalMessageCount();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
|
||||||
|
assertEquals("one messages pending", 1, brokers.get(broker2).broker.getAdminView().getTotalMessageCount());
|
||||||
|
assertEquals("one messages enqueued", 1, brokers.get(broker2).broker.getDestination(dest).getDestinationStatistics().getEnqueues().getCount());
|
||||||
|
}
|
||||||
|
|
||||||
protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception {
|
protected int receiveExactMessages(MessageConsumer consumer, int msgCount) throws Exception {
|
||||||
Message msg;
|
Message msg;
|
||||||
int i;
|
int i;
|
||||||
|
|
Loading…
Reference in New Issue