Merge branch 'AMQ-7132'

This closes #336

Thanks to Alan Protasio for the patch
This commit is contained in:
Christopher L. Shannon (cshannon) 2019-01-14 10:54:33 -05:00
commit 03ce997ff2
12 changed files with 438 additions and 35 deletions

View File

@ -91,6 +91,13 @@ public class SizeStatisticImpl extends StatisticImpl {
return maxSize;
}
/**
* @return the maximum size of any step
*/
public synchronized void setMaxSize(long size) {
maxSize = size;
}
/**
* @return the minimum size of any step
*/
@ -98,6 +105,13 @@ public class SizeStatisticImpl extends StatisticImpl {
return minSize;
}
/**
* @return the maximum size of any step
*/
public synchronized void setMinSize(long size) {
minSize = size;
}
/**
* @return the total size of all the steps added together
*/
@ -105,6 +119,10 @@ public class SizeStatisticImpl extends StatisticImpl {
return totalSize;
}
public synchronized void setCount(long count) {
this.count = count;
}
/**
* @return the average size calculated by dividing the total size by the
* number of counts

View File

@ -459,7 +459,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
ackedAndPrepared.remove(id);
if (rollback) {
rolledBackAcks.add(id);
incrementAndAddSizeToStoreStat(dest, 0);
pageFile.tx().execute(tx -> {
incrementAndAddSizeToStoreStat(tx, dest, 0);
});
}
}
} finally {
@ -812,16 +814,19 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
@Override
public MessageStoreStatistics execute(Transaction tx) throws IOException {
MessageStoreStatistics statistics = new MessageStoreStatistics();
MessageStoreStatistics statistics = getStoredMessageStoreStatistics(dest, tx);
// Iterate through all index entries to get the size of each message
StoredDestination sd = getStoredDestination(dest, tx);
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
int locationSize = iterator.next().getKey().getSize();
statistics.getMessageCount().increment();
statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
if (statistics == null) {
StoredDestination sd = getStoredDestination(dest, tx);
statistics = new MessageStoreStatistics();
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext(); ) {
int locationSize = iterator.next().getKey().getSize();
statistics.getMessageCount().increment();
statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
}
}
return statistics;
return statistics;
}
});
recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size());

View File

@ -134,7 +134,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
static final int OPEN_STATE = 2;
static final long NOT_ACKED = -1;
static final int VERSION = 6;
static final int VERSION = 7;
static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
@ -188,6 +188,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
} catch (EOFException expectedOnUpgrade) {
openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
}
LOG.info("KahaDB is version " + version);
}
@ -863,7 +864,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.messageIdIndex.remove(tx, keys.messageId);
metadata.producerSequenceIdTracker.rollback(keys.messageId);
undoCounter++;
decrementAndSubSizeToStoreStat(key, keys.location.getSize());
decrementAndSubSizeToStoreStat(tx, key, sd, keys.location.getSize());
// TODO: do we need to modify the ack positions for the pub sub case?
}
}
@ -979,7 +980,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.messageIdIndex.remove(tx, keys.messageId);
LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
undoCounter++;
decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
decrementAndSubSizeToStoreStat(tx, sdEntry.getKey(), sdEntry.getValue(), keys.location.getSize());
// TODO: do we need to modify the ack positions for the pub sub case?
}
} else {
@ -1491,7 +1492,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
if (previous == null) {
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
if (previous == null) {
incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
incrementAndAddSizeToStoreStat(tx, command.getDestination(), location.getSize());
sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
addAckLocationForNewMessage(tx, command.getDestination(), sd, id);
@ -1550,11 +1551,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
new MessageKeys(command.getMessageId(), location)
);
sd.locationIndex.put(tx, location, id);
incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
incrementAndAddSizeToStoreStat(tx, command.getDestination(), location.getSize());
if (previousKeys != null) {
//Remove the existing from the size
decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
decrementAndSubSizeToStoreStat(tx, command.getDestination(), previousKeys.location.getSize());
//update all the subscription metrics
if (enableSubscriptionStatistics && sd.ackPositions != null && location.getSize() != previousKeys.location.getSize()) {
@ -1590,7 +1591,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
if (keys != null) {
sd.locationIndex.remove(tx, keys.location);
decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
decrementAndSubSizeToStoreStat(tx, command.getDestination(), keys.location.getSize());
recordAckMessageReferenceLocation(ackLocation, keys.location);
metadata.lastUpdate = ackLocation;
} else if (LOG.isDebugEnabled()) {
@ -1655,6 +1656,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.messageIdIndex.unload(tx);
tx.free(sd.messageIdIndex.getPageId());
tx.free(sd.messageStoreStatistics.getPageId());
sd.messageStoreStatistics = null;
if (sd.subscriptions != null) {
sd.subscriptions.clear(tx);
sd.subscriptions.unload(tx);
@ -2362,6 +2366,53 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
class StoredMessageStoreStatistics {
private PageFile pageFile;
private Page<MessageStoreStatistics> page;
private long pageId;
private AtomicBoolean loaded = new AtomicBoolean();
private MessageStoreStatisticsMarshaller messageStoreStatisticsMarshaller = new MessageStoreStatisticsMarshaller();
StoredMessageStoreStatistics(PageFile pageFile, long pageId) {
this.pageId = pageId;
this.pageFile = pageFile;
}
StoredMessageStoreStatistics(PageFile pageFile, Page page) {
this(pageFile, page.getPageId());
}
public long getPageId() {
return pageId;
}
synchronized void load(Transaction tx) throws IOException {
if (loaded.compareAndSet(false, true)) {
page = tx.load(pageId, null);
if (page.getType() == Page.PAGE_FREE_TYPE) {
page.set(null);
tx.store(page, messageStoreStatisticsMarshaller, true);
}
}
page = tx.load(pageId, messageStoreStatisticsMarshaller);
}
synchronized MessageStoreStatistics get(Transaction tx) throws IOException {
load(tx);
return page.get();
}
synchronized void put(Transaction tx, MessageStoreStatistics storeStatistics) throws IOException {
if (page == null) {
page = tx.load(pageId, messageStoreStatisticsMarshaller);
}
page.set(storeStatistics);
tx.store(page, messageStoreStatisticsMarshaller, true);
}
}
class StoredDestination {
MessageOrderIndex orderIndex = new MessageOrderIndex();
@ -2378,6 +2429,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// Transient data used to track which Messages are no longer needed.
final HashSet<String> subscriptionCache = new LinkedHashSet<>();
StoredMessageStoreStatistics messageStoreStatistics;
public void trackPendingAdd(Long seq) {
orderIndex.trackPendingAdd(seq);
}
@ -2392,6 +2445,38 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
protected class MessageStoreStatisticsMarshaller extends VariableMarshaller<MessageStoreStatistics> {
@Override
public void writePayload(final MessageStoreStatistics object, final DataOutput dataOut) throws IOException {
dataOut.writeBoolean(null != object);
if (object != null) {
dataOut.writeLong(object.getMessageCount().getCount());
dataOut.writeLong(object.getMessageSize().getTotalSize());
dataOut.writeLong(object.getMessageSize().getMaxSize());
dataOut.writeLong(object.getMessageSize().getMinSize());
dataOut.writeLong(object.getMessageSize().getCount());
}
}
@Override
public MessageStoreStatistics readPayload(final DataInput dataIn) throws IOException {
if (!dataIn.readBoolean()) {
return null;
}
MessageStoreStatistics messageStoreStatistics = new MessageStoreStatistics();
messageStoreStatistics.getMessageCount().setCount(dataIn.readLong());
messageStoreStatistics.getMessageSize().setTotalSize(dataIn.readLong());
messageStoreStatistics.getMessageSize().setMaxSize(dataIn.readLong());
messageStoreStatistics.getMessageSize().setMinSize(dataIn.readLong());
messageStoreStatistics.getMessageSize().setCount(dataIn.readLong());
return messageStoreStatistics;
}
}
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
@ -2470,6 +2555,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
});
}
}
if (metadata.version >= 2) {
value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
@ -2491,6 +2577,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
});
}
if (metadata.version >= 7) {
value.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, dataIn.readLong());
} else {
pageFile.tx().execute(tx -> {
value.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, tx.allocate());
value.messageStoreStatistics.load(tx);
});
}
return value;
}
@ -2510,6 +2605,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
dataOut.writeLong(value.messageStoreStatistics.getPageId());
}
}
@ -2543,6 +2639,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
return rc;
}
protected MessageStoreStatistics getStoredMessageStoreStatistics(KahaDestination destination, Transaction tx) throws IOException {
StoredDestination sd = getStoredDestination(destination, tx);
return sd != null && sd.messageStoreStatistics != null ? sd.messageStoreStatistics.get(tx) : null;
}
protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
String key = key(destination);
StoredDestination rc = storedDestinations.get(key);
@ -2575,9 +2676,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
rc.ackPositions = new ListIndex<>(pageFile, tx.allocate());
rc.subLocations = new ListIndex<>(pageFile, tx.allocate());
}
rc.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, tx.allocate());
metadata.destinations.put(tx, key, rc);
}
rc.messageStoreStatistics.load(tx);
// Configure the marshalers and load.
rc.orderIndex.load(tx);
@ -2644,9 +2750,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
}
}
// Configure the message references index
// Configure the subscription cache
for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
Entry<String, LastAck> entry = iterator.next();
@ -2707,31 +2810,60 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
* @param kahaDestination
* @param size
*/
protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
incrementAndAddSizeToStoreStat(key(kahaDestination), size);
protected void incrementAndAddSizeToStoreStat(Transaction tx, KahaDestination kahaDestination, long size) throws IOException {
StoredDestination sd = getStoredDestination(kahaDestination, tx);
incrementAndAddSizeToStoreStat(tx, key(kahaDestination), sd, size);
}
protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
protected void incrementAndAddSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException {
MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
if (storeStats != null) {
storeStats.getMessageCount().increment();
if (size > 0) {
storeStats.getMessageSize().addSize(size);
incrementAndAddSizeToStoreStat(size, storeStats);
sd.messageStoreStatistics.put(tx, storeStats);
} else if (sd != null){
// During the recovery the storeStats is null
MessageStoreStatistics storedStoreStats = sd.messageStoreStatistics.get(tx);
if (storedStoreStats == null) {
storedStoreStats = new MessageStoreStatistics();
}
incrementAndAddSizeToStoreStat(size, storedStoreStats);
sd.messageStoreStatistics.put(tx, storedStoreStats);
}
}
protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
decrementAndSubSizeToStoreStat(key(kahaDestination), size);
private void incrementAndAddSizeToStoreStat(final long size, final MessageStoreStatistics storedStoreStats) {
storedStoreStats.getMessageCount().increment();
if (size > 0) {
storedStoreStats.getMessageSize().addSize(size);
}
}
protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
protected void decrementAndSubSizeToStoreStat(Transaction tx, KahaDestination kahaDestination, long size) throws IOException {
StoredDestination sd = getStoredDestination(kahaDestination, tx);
decrementAndSubSizeToStoreStat(tx, key(kahaDestination), sd,size);
}
protected void decrementAndSubSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException {
MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
if (storeStats != null) {
storeStats.getMessageCount().decrement();
if (size > 0) {
storeStats.getMessageSize().addSize(-size);
decrementAndSubSizeToStoreStat(size, storeStats);
sd.messageStoreStatistics.put(tx, storeStats);
} else if (sd != null){
// During the recovery the storeStats is null
MessageStoreStatistics storedStoreStats = sd.messageStoreStatistics.get(tx);
if (storedStoreStats == null) {
storedStoreStats = new MessageStoreStatistics();
}
decrementAndSubSizeToStoreStat(size, storedStoreStats);
sd.messageStoreStatistics.put(tx, storedStoreStats);
}
}
private void decrementAndSubSizeToStoreStat(final long size, final MessageStoreStatistics storedStoreStats) {
storedStoreStats.getMessageCount().decrement();
if (size > 0) {
storedStoreStats.getMessageSize().addSize(-size);
}
}
@ -2936,7 +3068,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
sd.orderIndex.remove(tx, entry.getKey());
decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
decrementAndSubSizeToStoreStat(tx, command.getDestination(), entry.getValue().location.getSize());
}
}
}
@ -2990,7 +3122,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
sd.locationIndex.remove(tx, entry.getValue().location);
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
sd.orderIndex.remove(tx, entry.getKey());
decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
decrementAndSubSizeToStoreStat(tx, command.getDestination(), entry.getValue().location.getSize());
}
}
}

View File

@ -0,0 +1,242 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.broker;
import com.google.common.collect.ImmutableList;
import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.store.MessageStoreStatistics;
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
import org.apache.activemq.util.IOHelper;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.util.Arrays;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.stream.Collectors;
@RunWith(value = Parameterized.class)
public class RecoveryStatsBrokerTest extends BrokerRestartTestSupport {
private RestartType restartType;
enum RestartType {
NORMAL,
FULL_RECOVERY,
UNCLEAN_SHUTDOWN
}
@Override
protected void configureBroker(BrokerService broker) throws Exception {
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
persistenceAdapter.setJournalMaxFileLength(1024*1024);
//persistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
persistenceAdapter.setDirectory(broker.getBrokerDataDirectory());
broker.setPersistenceAdapter(persistenceAdapter);
broker.setDestinationPolicy(policyMap);
}
protected void restartBroker(RestartType restartType) throws Exception {
if (restartType == RestartType.FULL_RECOVERY) {
stopBroker();
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
File dir = kahaDBPersistenceAdapter.getDirectory();
if (dir != null) {
IOHelper.deleteFile(new File(dir, "db.data"));
}
broker.start();
} else if (restartType == RestartType.UNCLEAN_SHUTDOWN){
//Simulate an unclean shutdown
File dir = broker.getBrokerDataDirectory();
File backUpDir = new File(dir, "bk");
IOHelper.mkdirs(new File(dir, "bk"));
for (File f: dir.listFiles()) {
if (!f.isDirectory()) {
IOHelper.copyFile(f, new File(backUpDir, f.getName()));
}
}
stopBroker();
for (File f: backUpDir.listFiles()) {
if (!f.isDirectory()) {
IOHelper.copyFile(f, new File(dir, f.getName()));
}
}
broker.start();
} else {
restartBroker();
}
}
@Parameterized.Parameters(name="{0}")
public static Collection<Object[]> getTestParameters() {
return Arrays.asList(new Object[][] {
{RestartType.NORMAL},
{RestartType.FULL_RECOVERY},
{RestartType.UNCLEAN_SHUTDOWN},
});
}
public RecoveryStatsBrokerTest(RestartType restartType) {
this.restartType = restartType;
}
@Before
public void setUp() throws Exception {
super.setUp();
}
@After
public void tearDown() throws Exception {
super.tearDown();
}
@Test(timeout = 60 * 1000)
public void testStaticsRecovery() throws Exception {
List<ActiveMQDestination> destinations = ImmutableList.of(new ActiveMQQueue("TEST.A"), new ActiveMQQueue("TEST.B"));
Random random = new Random();
Map<ActiveMQDestination, Integer> consumedMessages = new HashMap<>();
destinations.forEach(destination -> consumedMessages.put(destination, 0));
int numberOfMessages = 10000;
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
connection.send(producerInfo);
for (int i = 0; i < numberOfMessages; i++) {
for (ActiveMQDestination destination : destinations) {
Message message = createMessage(producerInfo, destination);
message.setPersistent(true);
message.setProducerId(message.getMessageId().getProducerId());
connection.request(message);
}
}
Map<ActiveMQDestination, MessageStoreStatistics> originalStatistics = getCurrentStatistics(destinations);
checkStatistics(destinations, originalStatistics);
restartBroker(restartType);
checkStatistics(destinations, originalStatistics);
for (ActiveMQDestination destination : destinations) {
consume(destination, 100, false);
}
checkStatistics(destinations, originalStatistics);
restartBroker(restartType);
checkStatistics(destinations, originalStatistics);
for (ActiveMQDestination destination : destinations) {
int messagesToConsume = random.nextInt(numberOfMessages);
consume(destination, messagesToConsume, true);
consumedMessages.compute(destination, (key, value) -> value = value + messagesToConsume);
}
originalStatistics = getCurrentStatistics(destinations);
for (ActiveMQDestination destination : destinations) {
int consumedCount = consumedMessages.get(destination);
assertEquals("", numberOfMessages - consumedCount, originalStatistics.get(destination).getMessageCount().getCount());
}
checkStatistics(destinations, originalStatistics);
restartBroker(restartType);
checkStatistics(destinations, originalStatistics);
}
private Map<ActiveMQDestination, MessageStoreStatistics> getCurrentStatistics(final List<ActiveMQDestination> destinations) {
return destinations.stream()
.map(destination -> getDestination(broker, destination))
.collect(Collectors.toMap(destination -> new ActiveMQQueue(destination.getName()), destination2 -> destination2.getMessageStore().getMessageStoreStatistics()));
}
private void consume(ActiveMQDestination destination, int numberOfMessages, boolean shouldAck) throws Exception {
// Setup the consumer and receive the message.
StubConnection connection = createConnection();
ConnectionInfo connectionInfo = createConnectionInfo();
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
connection.send(connectionInfo);
connection.send(sessionInfo);
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
connection.send(consumerInfo);
// The we should get the messages.
for (int i = 0; i < numberOfMessages; i++) {
Message m2 = receiveMessage(connection);
assertNotNull(m2);
if (shouldAck) {
MessageAck ack = createAck(consumerInfo, m2, 1, MessageAck.STANDARD_ACK_TYPE);
connection.request(ack);
}
}
connection.request(closeConnectionInfo(connectionInfo));
}
private void checkStatistics(final List<ActiveMQDestination> destinations, final Map<ActiveMQDestination, MessageStoreStatistics> originalStatistics) {
for (ActiveMQDestination destination : destinations) {
MessageStoreStatistics original = originalStatistics.get(destination);
MessageStoreStatistics actual = getDestination(broker, destination).getMessageStore().getMessageStoreStatistics();
assertEquals("Have Same Count", original.getMessageCount().getCount(), actual.getMessageCount().getCount());
assertEquals("Have Same TotalSize", original.getMessageSize().getTotalSize(), getDestination(broker, destination).getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
}
}
protected Destination getDestination(BrokerService target, ActiveMQDestination destination) {
RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
if (destination.isTemporary()) {
return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap().get(destination) :
regionBroker.getTempTopicRegion().getDestinationMap().get(destination);
}
return destination.isQueue() ?
regionBroker.getQueueRegion().getDestinationMap().get(destination) :
regionBroker.getTopicRegion().getDestinationMap().get(destination);
}
}

View File

@ -57,6 +57,7 @@ public class KahaDBVersionTest extends TestCase {
final static File VERSION_3_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3");
final static File VERSION_4_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
final static File VERSION_5_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion5");
final static File VERSION_6_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6");
BrokerService broker = null;
@ -133,6 +134,10 @@ public class KahaDBVersionTest extends TestCase {
doConvertRestartCycle(VERSION_5_DB);
}
public void testVersion6Conversion() throws Exception {
doConvertRestartCycle(VERSION_6_DB);
}
public void doConvertRestartCycle(File existingStore) throws Exception {
File testDir = new File("target/activemq-data/kahadb/versionDB");

View File

@ -177,7 +177,7 @@ public class MKahaDBStoreLimitTest {
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
StoreUsage storeUsage = new StoreUsage();
storeUsage.setLimit(40*1024);
storeUsage.setLimit(44*1024);
filtered.setUsage(storeUsage);
filtered.setDestination(queueA);

View File

@ -37,7 +37,7 @@ public class StoreUsageTest extends EmbeddedBrokerTestSupport {
@Override
protected BrokerService createBroker() throws Exception {
BrokerService broker = super.createBroker();
broker.getSystemUsage().getStoreUsage().setLimit(34 * 1024);
broker.getSystemUsage().getStoreUsage().setLimit(38 * 1024);
broker.deleteAllMessages();
return broker;
}

View File

@ -218,7 +218,7 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org
assertTrue("no leak of pages, always use just 11", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return 11 == store.getPageFile().getPageCount() -
return 12 == store.getPageFile().getPageCount() -
store.getPageFile().getFreePageCount();
}
}, TimeUnit.SECONDS.toMillis(10)));

View File

@ -87,6 +87,7 @@ public class LocalBrokerFacade extends BrokerFacadeSupport {
}
}
private Destination unwrap(Destination dest) {
if (dest instanceof DestinationFilter) {
return unwrap(((DestinationFilter) dest).getNext());