diff --git a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java index e2bc033e21..9be0d11111 100644 --- a/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java +++ b/activemq-client/src/main/java/org/apache/activemq/management/SizeStatisticImpl.java @@ -91,6 +91,13 @@ public class SizeStatisticImpl extends StatisticImpl { return maxSize; } + /** + * @return the maximum size of any step + */ + public synchronized void setMaxSize(long size) { + maxSize = size; + } + /** * @return the minimum size of any step */ @@ -98,6 +105,13 @@ public class SizeStatisticImpl extends StatisticImpl { return minSize; } + /** + * @return the maximum size of any step + */ + public synchronized void setMinSize(long size) { + minSize = size; + } + /** * @return the total size of all the steps added together */ @@ -105,6 +119,10 @@ public class SizeStatisticImpl extends StatisticImpl { return totalSize; } + public synchronized void setCount(long count) { + this.count = count; + } + /** * @return the average size calculated by dividing the total size by the * number of counts diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java index b5c466d19d..37f3b9096d 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBStore.java @@ -459,7 +459,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, ackedAndPrepared.remove(id); if (rollback) { rolledBackAcks.add(id); - incrementAndAddSizeToStoreStat(dest, 0); + pageFile.tx().execute(tx -> { + incrementAndAddSizeToStoreStat(tx, dest, 0); + }); } } } finally { @@ -812,16 +814,19 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter, recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure() { @Override public MessageStoreStatistics execute(Transaction tx) throws IOException { - MessageStoreStatistics statistics = new MessageStoreStatistics(); + MessageStoreStatistics statistics = getStoredMessageStoreStatistics(dest, tx); // Iterate through all index entries to get the size of each message - StoredDestination sd = getStoredDestination(dest, tx); - for (Iterator> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) { - int locationSize = iterator.next().getKey().getSize(); - statistics.getMessageCount().increment(); - statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); + if (statistics == null) { + StoredDestination sd = getStoredDestination(dest, tx); + statistics = new MessageStoreStatistics(); + for (Iterator> iterator = sd.locationIndex.iterator(tx); iterator.hasNext(); ) { + int locationSize = iterator.next().getKey().getSize(); + statistics.getMessageCount().increment(); + statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0); + } } - return statistics; + return statistics; } }); recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size()); diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 89f694bc62..78d2bfa2c8 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -134,7 +134,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe static final int OPEN_STATE = 2; static final long NOT_ACKED = -1; - static final int VERSION = 6; + static final int VERSION = 7; static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1; @@ -188,6 +188,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } catch (EOFException expectedOnUpgrade) { openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION; } + LOG.info("KahaDB is version " + version); } @@ -863,7 +864,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.messageIdIndex.remove(tx, keys.messageId); metadata.producerSequenceIdTracker.rollback(keys.messageId); undoCounter++; - decrementAndSubSizeToStoreStat(key, keys.location.getSize()); + decrementAndSubSizeToStoreStat(tx, key, sd, keys.location.getSize()); // TODO: do we need to modify the ack positions for the pub sub case? } } @@ -979,7 +980,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.messageIdIndex.remove(tx, keys.messageId); LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location); undoCounter++; - decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize()); + decrementAndSubSizeToStoreStat(tx, sdEntry.getKey(), sdEntry.getValue(), keys.location.getSize()); // TODO: do we need to modify the ack positions for the pub sub case? } } else { @@ -1491,7 +1492,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (previous == null) { previous = sd.messageIdIndex.put(tx, command.getMessageId(), id); if (previous == null) { - incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); + incrementAndAddSizeToStoreStat(tx, command.getDestination(), location.getSize()); sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location)); if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) { addAckLocationForNewMessage(tx, command.getDestination(), sd, id); @@ -1550,11 +1551,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe new MessageKeys(command.getMessageId(), location) ); sd.locationIndex.put(tx, location, id); - incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize()); + incrementAndAddSizeToStoreStat(tx, command.getDestination(), location.getSize()); if (previousKeys != null) { //Remove the existing from the size - decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize()); + decrementAndSubSizeToStoreStat(tx, command.getDestination(), previousKeys.location.getSize()); //update all the subscription metrics if (enableSubscriptionStatistics && sd.ackPositions != null && location.getSize() != previousKeys.location.getSize()) { @@ -1590,7 +1591,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); if (keys != null) { sd.locationIndex.remove(tx, keys.location); - decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize()); + decrementAndSubSizeToStoreStat(tx, command.getDestination(), keys.location.getSize()); recordAckMessageReferenceLocation(ackLocation, keys.location); metadata.lastUpdate = ackLocation; } else if (LOG.isDebugEnabled()) { @@ -1655,6 +1656,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.messageIdIndex.unload(tx); tx.free(sd.messageIdIndex.getPageId()); + tx.free(sd.messageStoreStatistics.getPageId()); + sd.messageStoreStatistics = null; + if (sd.subscriptions != null) { sd.subscriptions.clear(tx); sd.subscriptions.unload(tx); @@ -2362,6 +2366,53 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + class StoredMessageStoreStatistics { + private PageFile pageFile; + private Page page; + private long pageId; + private AtomicBoolean loaded = new AtomicBoolean(); + private MessageStoreStatisticsMarshaller messageStoreStatisticsMarshaller = new MessageStoreStatisticsMarshaller(); + + StoredMessageStoreStatistics(PageFile pageFile, long pageId) { + this.pageId = pageId; + this.pageFile = pageFile; + } + + StoredMessageStoreStatistics(PageFile pageFile, Page page) { + this(pageFile, page.getPageId()); + } + + public long getPageId() { + return pageId; + } + + synchronized void load(Transaction tx) throws IOException { + if (loaded.compareAndSet(false, true)) { + page = tx.load(pageId, null); + + if (page.getType() == Page.PAGE_FREE_TYPE) { + page.set(null); + tx.store(page, messageStoreStatisticsMarshaller, true); + } + } + page = tx.load(pageId, messageStoreStatisticsMarshaller); + } + + synchronized MessageStoreStatistics get(Transaction tx) throws IOException { + load(tx); + return page.get(); + } + + synchronized void put(Transaction tx, MessageStoreStatistics storeStatistics) throws IOException { + if (page == null) { + page = tx.load(pageId, messageStoreStatisticsMarshaller); + } + + page.set(storeStatistics); + + tx.store(page, messageStoreStatisticsMarshaller, true); + } + } class StoredDestination { MessageOrderIndex orderIndex = new MessageOrderIndex(); @@ -2378,6 +2429,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe // Transient data used to track which Messages are no longer needed. final HashSet subscriptionCache = new LinkedHashSet<>(); + StoredMessageStoreStatistics messageStoreStatistics; + public void trackPendingAdd(Long seq) { orderIndex.trackPendingAdd(seq); } @@ -2392,6 +2445,38 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } + protected class MessageStoreStatisticsMarshaller extends VariableMarshaller { + + @Override + public void writePayload(final MessageStoreStatistics object, final DataOutput dataOut) throws IOException { + dataOut.writeBoolean(null != object); + if (object != null) { + dataOut.writeLong(object.getMessageCount().getCount()); + dataOut.writeLong(object.getMessageSize().getTotalSize()); + dataOut.writeLong(object.getMessageSize().getMaxSize()); + dataOut.writeLong(object.getMessageSize().getMinSize()); + dataOut.writeLong(object.getMessageSize().getCount()); + } + } + + @Override + public MessageStoreStatistics readPayload(final DataInput dataIn) throws IOException { + + if (!dataIn.readBoolean()) { + return null; + } + + MessageStoreStatistics messageStoreStatistics = new MessageStoreStatistics(); + messageStoreStatistics.getMessageCount().setCount(dataIn.readLong()); + messageStoreStatistics.getMessageSize().setTotalSize(dataIn.readLong()); + messageStoreStatistics.getMessageSize().setMaxSize(dataIn.readLong()); + messageStoreStatistics.getMessageSize().setMinSize(dataIn.readLong()); + messageStoreStatistics.getMessageSize().setCount(dataIn.readLong()); + + return messageStoreStatistics; + } + } + protected class StoredDestinationMarshaller extends VariableMarshaller { final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller(); @@ -2470,6 +2555,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe }); } } + if (metadata.version >= 2) { value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong()); @@ -2491,6 +2577,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe }); } + if (metadata.version >= 7) { + value.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, dataIn.readLong()); + } else { + pageFile.tx().execute(tx -> { + value.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, tx.allocate()); + value.messageStoreStatistics.load(tx); + }); + } + return value; } @@ -2510,6 +2605,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId()); dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId()); + dataOut.writeLong(value.messageStoreStatistics.getPageId()); } } @@ -2543,6 +2639,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe return rc; } + protected MessageStoreStatistics getStoredMessageStoreStatistics(KahaDestination destination, Transaction tx) throws IOException { + StoredDestination sd = getStoredDestination(destination, tx); + return sd != null && sd.messageStoreStatistics != null ? sd.messageStoreStatistics.get(tx) : null; + } + protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException { String key = key(destination); StoredDestination rc = storedDestinations.get(key); @@ -2575,9 +2676,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe rc.ackPositions = new ListIndex<>(pageFile, tx.allocate()); rc.subLocations = new ListIndex<>(pageFile, tx.allocate()); } + + rc.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, tx.allocate()); + metadata.destinations.put(tx, key, rc); } + rc.messageStoreStatistics.load(tx); + // Configure the marshalers and load. rc.orderIndex.load(tx); @@ -2644,9 +2750,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe } } - // Configure the message references index - - // Configure the subscription cache for (Iterator> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) { Entry entry = iterator.next(); @@ -2707,31 +2810,60 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe * @param kahaDestination * @param size */ - protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) { - incrementAndAddSizeToStoreStat(key(kahaDestination), size); + protected void incrementAndAddSizeToStoreStat(Transaction tx, KahaDestination kahaDestination, long size) throws IOException { + StoredDestination sd = getStoredDestination(kahaDestination, tx); + incrementAndAddSizeToStoreStat(tx, key(kahaDestination), sd, size); } - protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) { + protected void incrementAndAddSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException { MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); if (storeStats != null) { - storeStats.getMessageCount().increment(); - if (size > 0) { - storeStats.getMessageSize().addSize(size); + incrementAndAddSizeToStoreStat(size, storeStats); + sd.messageStoreStatistics.put(tx, storeStats); + } else if (sd != null){ + // During the recovery the storeStats is null + MessageStoreStatistics storedStoreStats = sd.messageStoreStatistics.get(tx); + if (storedStoreStats == null) { + storedStoreStats = new MessageStoreStatistics(); } + incrementAndAddSizeToStoreStat(size, storedStoreStats); + sd.messageStoreStatistics.put(tx, storedStoreStats); } } - protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) { - decrementAndSubSizeToStoreStat(key(kahaDestination), size); + private void incrementAndAddSizeToStoreStat(final long size, final MessageStoreStatistics storedStoreStats) { + storedStoreStats.getMessageCount().increment(); + if (size > 0) { + storedStoreStats.getMessageSize().addSize(size); + } } - protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) { + protected void decrementAndSubSizeToStoreStat(Transaction tx, KahaDestination kahaDestination, long size) throws IOException { + StoredDestination sd = getStoredDestination(kahaDestination, tx); + decrementAndSubSizeToStoreStat(tx, key(kahaDestination), sd,size); + } + + protected void decrementAndSubSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException { MessageStoreStatistics storeStats = getStoreStats(kahaDestKey); if (storeStats != null) { - storeStats.getMessageCount().decrement(); - if (size > 0) { - storeStats.getMessageSize().addSize(-size); + decrementAndSubSizeToStoreStat(size, storeStats); + sd.messageStoreStatistics.put(tx, storeStats); + } else if (sd != null){ + // During the recovery the storeStats is null + MessageStoreStatistics storedStoreStats = sd.messageStoreStatistics.get(tx); + if (storedStoreStats == null) { + storedStoreStats = new MessageStoreStatistics(); } + decrementAndSubSizeToStoreStat(size, storedStoreStats); + sd.messageStoreStatistics.put(tx, storedStoreStats); + } + } + + private void decrementAndSubSizeToStoreStat(final long size, final MessageStoreStatistics storedStoreStats) { + storedStoreStats.getMessageCount().decrement(); + + if (size > 0) { + storedStoreStats.getMessageSize().addSize(-size); } } @@ -2936,7 +3068,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.locationIndex.remove(tx, entry.getValue().location); sd.messageIdIndex.remove(tx, entry.getValue().messageId); sd.orderIndex.remove(tx, entry.getKey()); - decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize()); + decrementAndSubSizeToStoreStat(tx, command.getDestination(), entry.getValue().location.getSize()); } } } @@ -2990,7 +3122,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe sd.locationIndex.remove(tx, entry.getValue().location); sd.messageIdIndex.remove(tx, entry.getValue().messageId); sd.orderIndex.remove(tx, entry.getKey()); - decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize()); + decrementAndSubSizeToStoreStat(tx, command.getDestination(), entry.getValue().location.getSize()); } } } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryStatsBrokerTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryStatsBrokerTest.java new file mode 100644 index 0000000000..3692574948 --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/RecoveryStatsBrokerTest.java @@ -0,0 +1,242 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.broker; + +import com.google.common.collect.ImmutableList; +import org.apache.activemq.broker.region.Destination; +import org.apache.activemq.broker.region.RegionBroker; +import org.apache.activemq.command.ActiveMQDestination; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.command.ConsumerInfo; +import org.apache.activemq.command.Message; +import org.apache.activemq.command.MessageAck; +import org.apache.activemq.command.ProducerInfo; +import org.apache.activemq.command.SessionInfo; +import org.apache.activemq.store.MessageStoreStatistics; +import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter; +import org.apache.activemq.util.IOHelper; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.io.File; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.stream.Collectors; + +@RunWith(value = Parameterized.class) +public class RecoveryStatsBrokerTest extends BrokerRestartTestSupport { + + private RestartType restartType; + + enum RestartType { + NORMAL, + FULL_RECOVERY, + UNCLEAN_SHUTDOWN + } + + @Override + protected void configureBroker(BrokerService broker) throws Exception { + KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter(); + persistenceAdapter.setJournalMaxFileLength(1024*1024); + //persistenceAdapter.setConcurrentStoreAndDispatchQueues(false); + persistenceAdapter.setDirectory(broker.getBrokerDataDirectory()); + broker.setPersistenceAdapter(persistenceAdapter); + broker.setDestinationPolicy(policyMap); + } + + protected void restartBroker(RestartType restartType) throws Exception { + if (restartType == RestartType.FULL_RECOVERY) { + stopBroker(); + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter(); + File dir = kahaDBPersistenceAdapter.getDirectory(); + if (dir != null) { + IOHelper.deleteFile(new File(dir, "db.data")); + } + broker.start(); + } else if (restartType == RestartType.UNCLEAN_SHUTDOWN){ + //Simulate an unclean shutdown + + File dir = broker.getBrokerDataDirectory(); + File backUpDir = new File(dir, "bk"); + IOHelper.mkdirs(new File(dir, "bk")); + + for (File f: dir.listFiles()) { + if (!f.isDirectory()) { + IOHelper.copyFile(f, new File(backUpDir, f.getName())); + } + } + + stopBroker(); + + for (File f: backUpDir.listFiles()) { + if (!f.isDirectory()) { + IOHelper.copyFile(f, new File(dir, f.getName())); + } + } + + broker.start(); + } else { + restartBroker(); + } + } + + @Parameterized.Parameters(name="{0}") + public static Collection getTestParameters() { + return Arrays.asList(new Object[][] { + {RestartType.NORMAL}, + {RestartType.FULL_RECOVERY}, + {RestartType.UNCLEAN_SHUTDOWN}, + }); + } + + public RecoveryStatsBrokerTest(RestartType restartType) { + this.restartType = restartType; + } + + @Before + public void setUp() throws Exception { + super.setUp(); + } + + @After + public void tearDown() throws Exception { + super.tearDown(); + } + + @Test(timeout = 60 * 1000) + public void testStaticsRecovery() throws Exception { + List destinations = ImmutableList.of(new ActiveMQQueue("TEST.A"), new ActiveMQQueue("TEST.B")); + Random random = new Random(); + Map consumedMessages = new HashMap<>(); + + destinations.forEach(destination -> consumedMessages.put(destination, 0)); + + int numberOfMessages = 10000; + + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + ProducerInfo producerInfo = createProducerInfo(sessionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + connection.send(producerInfo); + + for (int i = 0; i < numberOfMessages; i++) { + for (ActiveMQDestination destination : destinations) { + Message message = createMessage(producerInfo, destination); + message.setPersistent(true); + message.setProducerId(message.getMessageId().getProducerId()); + connection.request(message); + } + } + + Map originalStatistics = getCurrentStatistics(destinations); + + checkStatistics(destinations, originalStatistics); + + restartBroker(restartType); + + checkStatistics(destinations, originalStatistics); + + for (ActiveMQDestination destination : destinations) { + consume(destination, 100, false); + } + + checkStatistics(destinations, originalStatistics); + + restartBroker(restartType); + + checkStatistics(destinations, originalStatistics); + + for (ActiveMQDestination destination : destinations) { + int messagesToConsume = random.nextInt(numberOfMessages); + consume(destination, messagesToConsume, true); + consumedMessages.compute(destination, (key, value) -> value = value + messagesToConsume); + } + + originalStatistics = getCurrentStatistics(destinations); + + for (ActiveMQDestination destination : destinations) { + int consumedCount = consumedMessages.get(destination); + assertEquals("", numberOfMessages - consumedCount, originalStatistics.get(destination).getMessageCount().getCount()); + } + + checkStatistics(destinations, originalStatistics); + + restartBroker(restartType); + + checkStatistics(destinations, originalStatistics); + } + + private Map getCurrentStatistics(final List destinations) { + return destinations.stream() + .map(destination -> getDestination(broker, destination)) + .collect(Collectors.toMap(destination -> new ActiveMQQueue(destination.getName()), destination2 -> destination2.getMessageStore().getMessageStoreStatistics())); + } + + private void consume(ActiveMQDestination destination, int numberOfMessages, boolean shouldAck) throws Exception { + // Setup the consumer and receive the message. + StubConnection connection = createConnection(); + ConnectionInfo connectionInfo = createConnectionInfo(); + SessionInfo sessionInfo = createSessionInfo(connectionInfo); + connection.send(connectionInfo); + connection.send(sessionInfo); + ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination); + connection.send(consumerInfo); + + // The we should get the messages. + for (int i = 0; i < numberOfMessages; i++) { + Message m2 = receiveMessage(connection); + assertNotNull(m2); + if (shouldAck) { + MessageAck ack = createAck(consumerInfo, m2, 1, MessageAck.STANDARD_ACK_TYPE); + connection.request(ack); + } + } + + connection.request(closeConnectionInfo(connectionInfo)); + } + + private void checkStatistics(final List destinations, final Map originalStatistics) { + for (ActiveMQDestination destination : destinations) { + MessageStoreStatistics original = originalStatistics.get(destination); + MessageStoreStatistics actual = getDestination(broker, destination).getMessageStore().getMessageStoreStatistics(); + assertEquals("Have Same Count", original.getMessageCount().getCount(), actual.getMessageCount().getCount()); + assertEquals("Have Same TotalSize", original.getMessageSize().getTotalSize(), getDestination(broker, destination).getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize()); + } + } + + protected Destination getDestination(BrokerService target, ActiveMQDestination destination) { + RegionBroker regionBroker = (RegionBroker) target.getRegionBroker(); + if (destination.isTemporary()) { + return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap().get(destination) : + regionBroker.getTempTopicRegion().getDestinationMap().get(destination); + } + return destination.isQueue() ? + regionBroker.getQueueRegion().getDestinationMap().get(destination) : + regionBroker.getTopicRegion().getDestinationMap().get(destination); + } +} diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java index 0b643b962f..85e785d02e 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBVersionTest.java @@ -57,6 +57,7 @@ public class KahaDBVersionTest extends TestCase { final static File VERSION_3_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3"); final static File VERSION_4_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4"); final static File VERSION_5_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion5"); + final static File VERSION_6_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6"); BrokerService broker = null; @@ -133,6 +134,10 @@ public class KahaDBVersionTest extends TestCase { doConvertRestartCycle(VERSION_5_DB); } + public void testVersion6Conversion() throws Exception { + doConvertRestartCycle(VERSION_6_DB); + } + public void doConvertRestartCycle(File existingStore) throws Exception { File testDir = new File("target/activemq-data/kahadb/versionDB"); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java index 4a8eea987b..9ebffcabf2 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBStoreLimitTest.java @@ -177,7 +177,7 @@ public class MKahaDBStoreLimitTest { FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); StoreUsage storeUsage = new StoreUsage(); - storeUsage.setLimit(40*1024); + storeUsage.setLimit(44*1024); filtered.setUsage(storeUsage); filtered.setDestination(queueA); diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java index defba3a785..0c048b2c40 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usage/StoreUsageTest.java @@ -37,7 +37,7 @@ public class StoreUsageTest extends EmbeddedBrokerTestSupport { @Override protected BrokerService createBroker() throws Exception { BrokerService broker = super.createBroker(); - broker.getSystemUsage().getStoreUsage().setLimit(34 * 1024); + broker.getSystemUsage().getStoreUsage().setLimit(38 * 1024); broker.deleteAllMessages(); return broker; } diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java index 0afc8da7a9..8cf24e7b3a 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest.java @@ -218,7 +218,7 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org assertTrue("no leak of pages, always use just 11", Wait.waitFor(new Wait.Condition() { @Override public boolean isSatisified() throws Exception { - return 11 == store.getPageFile().getPageCount() - + return 12 == store.getPageFile().getPageCount() - store.getPageFile().getFreePageCount(); } }, TimeUnit.SECONDS.toMillis(10))); diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db-1.log b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db-1.log new file mode 100644 index 0000000000..34facecab6 Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db-1.log differ diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.data b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.data new file mode 100644 index 0000000000..6c71774738 Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.data differ diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.redo b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.redo new file mode 100644 index 0000000000..5cb7b87aca Binary files /dev/null and b/activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6/db.redo differ diff --git a/activemq-web/src/main/java/org/apache/activemq/web/LocalBrokerFacade.java b/activemq-web/src/main/java/org/apache/activemq/web/LocalBrokerFacade.java index 50ef17c83f..28af6a28bd 100644 --- a/activemq-web/src/main/java/org/apache/activemq/web/LocalBrokerFacade.java +++ b/activemq-web/src/main/java/org/apache/activemq/web/LocalBrokerFacade.java @@ -87,6 +87,7 @@ public class LocalBrokerFacade extends BrokerFacadeSupport { } } + private Destination unwrap(Destination dest) { if (dest instanceof DestinationFilter) { return unwrap(((DestinationFilter) dest).getNext());