mirror of https://github.com/apache/activemq.git
AMQ-7132 - ActiveMQ reads lots of index pages upon startup (after a graceful or ungraceful shutdown)
This commit is contained in:
parent
b10458e2c6
commit
4b51f8b66c
|
@ -91,6 +91,13 @@ public class SizeStatisticImpl extends StatisticImpl {
|
|||
return maxSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximum size of any step
|
||||
*/
|
||||
public synchronized void setMaxSize(long size) {
|
||||
maxSize = size;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the minimum size of any step
|
||||
*/
|
||||
|
@ -98,6 +105,13 @@ public class SizeStatisticImpl extends StatisticImpl {
|
|||
return minSize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the maximum size of any step
|
||||
*/
|
||||
public synchronized void setMinSize(long size) {
|
||||
minSize = size;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the total size of all the steps added together
|
||||
*/
|
||||
|
@ -105,6 +119,10 @@ public class SizeStatisticImpl extends StatisticImpl {
|
|||
return totalSize;
|
||||
}
|
||||
|
||||
public synchronized void setCount(long count) {
|
||||
this.count = count;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the average size calculated by dividing the total size by the
|
||||
* number of counts
|
||||
|
|
|
@ -459,7 +459,9 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
|
|||
ackedAndPrepared.remove(id);
|
||||
if (rollback) {
|
||||
rolledBackAcks.add(id);
|
||||
incrementAndAddSizeToStoreStat(dest, 0);
|
||||
pageFile.tx().execute(tx -> {
|
||||
incrementAndAddSizeToStoreStat(tx, dest, 0);
|
||||
});
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
|
@ -812,16 +814,19 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter,
|
|||
recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
|
||||
@Override
|
||||
public MessageStoreStatistics execute(Transaction tx) throws IOException {
|
||||
MessageStoreStatistics statistics = new MessageStoreStatistics();
|
||||
MessageStoreStatistics statistics = getStoredMessageStoreStatistics(dest, tx);
|
||||
|
||||
// Iterate through all index entries to get the size of each message
|
||||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
|
||||
int locationSize = iterator.next().getKey().getSize();
|
||||
statistics.getMessageCount().increment();
|
||||
statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
|
||||
if (statistics == null) {
|
||||
StoredDestination sd = getStoredDestination(dest, tx);
|
||||
statistics = new MessageStoreStatistics();
|
||||
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext(); ) {
|
||||
int locationSize = iterator.next().getKey().getSize();
|
||||
statistics.getMessageCount().increment();
|
||||
statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
|
||||
}
|
||||
}
|
||||
return statistics;
|
||||
return statistics;
|
||||
}
|
||||
});
|
||||
recoveredStatistics.getMessageCount().subtract(ackedAndPrepared.size());
|
||||
|
|
|
@ -134,7 +134,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
static final int OPEN_STATE = 2;
|
||||
static final long NOT_ACKED = -1;
|
||||
|
||||
static final int VERSION = 6;
|
||||
static final int VERSION = 7;
|
||||
|
||||
static final byte COMPACTED_JOURNAL_FILE = DataFile.STANDARD_LOG_FILE + 1;
|
||||
|
||||
|
@ -188,6 +188,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
} catch (EOFException expectedOnUpgrade) {
|
||||
openwireVersion = OpenWireFormat.DEFAULT_LEGACY_VERSION;
|
||||
}
|
||||
|
||||
LOG.info("KahaDB is version " + version);
|
||||
}
|
||||
|
||||
|
@ -863,7 +864,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
sd.messageIdIndex.remove(tx, keys.messageId);
|
||||
metadata.producerSequenceIdTracker.rollback(keys.messageId);
|
||||
undoCounter++;
|
||||
decrementAndSubSizeToStoreStat(key, keys.location.getSize());
|
||||
decrementAndSubSizeToStoreStat(tx, key, sd, keys.location.getSize());
|
||||
// TODO: do we need to modify the ack positions for the pub sub case?
|
||||
}
|
||||
}
|
||||
|
@ -979,7 +980,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
sd.messageIdIndex.remove(tx, keys.messageId);
|
||||
LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
|
||||
undoCounter++;
|
||||
decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
|
||||
decrementAndSubSizeToStoreStat(tx, sdEntry.getKey(), sdEntry.getValue(), keys.location.getSize());
|
||||
// TODO: do we need to modify the ack positions for the pub sub case?
|
||||
}
|
||||
} else {
|
||||
|
@ -1491,7 +1492,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
if (previous == null) {
|
||||
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
|
||||
if (previous == null) {
|
||||
incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
|
||||
incrementAndAddSizeToStoreStat(tx, command.getDestination(), location.getSize());
|
||||
sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
|
||||
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
|
||||
addAckLocationForNewMessage(tx, command.getDestination(), sd, id);
|
||||
|
@ -1550,11 +1551,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
new MessageKeys(command.getMessageId(), location)
|
||||
);
|
||||
sd.locationIndex.put(tx, location, id);
|
||||
incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
|
||||
incrementAndAddSizeToStoreStat(tx, command.getDestination(), location.getSize());
|
||||
|
||||
if (previousKeys != null) {
|
||||
//Remove the existing from the size
|
||||
decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
|
||||
decrementAndSubSizeToStoreStat(tx, command.getDestination(), previousKeys.location.getSize());
|
||||
|
||||
//update all the subscription metrics
|
||||
if (enableSubscriptionStatistics && sd.ackPositions != null && location.getSize() != previousKeys.location.getSize()) {
|
||||
|
@ -1590,7 +1591,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
|
||||
if (keys != null) {
|
||||
sd.locationIndex.remove(tx, keys.location);
|
||||
decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
|
||||
decrementAndSubSizeToStoreStat(tx, command.getDestination(), keys.location.getSize());
|
||||
recordAckMessageReferenceLocation(ackLocation, keys.location);
|
||||
metadata.lastUpdate = ackLocation;
|
||||
} else if (LOG.isDebugEnabled()) {
|
||||
|
@ -1655,6 +1656,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
sd.messageIdIndex.unload(tx);
|
||||
tx.free(sd.messageIdIndex.getPageId());
|
||||
|
||||
tx.free(sd.messageStoreStatistics.getPageId());
|
||||
sd.messageStoreStatistics = null;
|
||||
|
||||
if (sd.subscriptions != null) {
|
||||
sd.subscriptions.clear(tx);
|
||||
sd.subscriptions.unload(tx);
|
||||
|
@ -2362,6 +2366,53 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
}
|
||||
}
|
||||
|
||||
class StoredMessageStoreStatistics {
|
||||
private PageFile pageFile;
|
||||
private Page<MessageStoreStatistics> page;
|
||||
private long pageId;
|
||||
private AtomicBoolean loaded = new AtomicBoolean();
|
||||
private MessageStoreStatisticsMarshaller messageStoreStatisticsMarshaller = new MessageStoreStatisticsMarshaller();
|
||||
|
||||
StoredMessageStoreStatistics(PageFile pageFile, long pageId) {
|
||||
this.pageId = pageId;
|
||||
this.pageFile = pageFile;
|
||||
}
|
||||
|
||||
StoredMessageStoreStatistics(PageFile pageFile, Page page) {
|
||||
this(pageFile, page.getPageId());
|
||||
}
|
||||
|
||||
public long getPageId() {
|
||||
return pageId;
|
||||
}
|
||||
|
||||
synchronized void load(Transaction tx) throws IOException {
|
||||
if (loaded.compareAndSet(false, true)) {
|
||||
page = tx.load(pageId, null);
|
||||
|
||||
if (page.getType() == Page.PAGE_FREE_TYPE) {
|
||||
page.set(null);
|
||||
tx.store(page, messageStoreStatisticsMarshaller, true);
|
||||
}
|
||||
}
|
||||
page = tx.load(pageId, messageStoreStatisticsMarshaller);
|
||||
}
|
||||
|
||||
synchronized MessageStoreStatistics get(Transaction tx) throws IOException {
|
||||
load(tx);
|
||||
return page.get();
|
||||
}
|
||||
|
||||
synchronized void put(Transaction tx, MessageStoreStatistics storeStatistics) throws IOException {
|
||||
if (page == null) {
|
||||
page = tx.load(pageId, messageStoreStatisticsMarshaller);
|
||||
}
|
||||
|
||||
page.set(storeStatistics);
|
||||
|
||||
tx.store(page, messageStoreStatisticsMarshaller, true);
|
||||
}
|
||||
}
|
||||
class StoredDestination {
|
||||
|
||||
MessageOrderIndex orderIndex = new MessageOrderIndex();
|
||||
|
@ -2378,6 +2429,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
// Transient data used to track which Messages are no longer needed.
|
||||
final HashSet<String> subscriptionCache = new LinkedHashSet<>();
|
||||
|
||||
StoredMessageStoreStatistics messageStoreStatistics;
|
||||
|
||||
public void trackPendingAdd(Long seq) {
|
||||
orderIndex.trackPendingAdd(seq);
|
||||
}
|
||||
|
@ -2392,6 +2445,38 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
}
|
||||
}
|
||||
|
||||
protected class MessageStoreStatisticsMarshaller extends VariableMarshaller<MessageStoreStatistics> {
|
||||
|
||||
@Override
|
||||
public void writePayload(final MessageStoreStatistics object, final DataOutput dataOut) throws IOException {
|
||||
dataOut.writeBoolean(null != object);
|
||||
if (object != null) {
|
||||
dataOut.writeLong(object.getMessageCount().getCount());
|
||||
dataOut.writeLong(object.getMessageSize().getTotalSize());
|
||||
dataOut.writeLong(object.getMessageSize().getMaxSize());
|
||||
dataOut.writeLong(object.getMessageSize().getMinSize());
|
||||
dataOut.writeLong(object.getMessageSize().getCount());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public MessageStoreStatistics readPayload(final DataInput dataIn) throws IOException {
|
||||
|
||||
if (!dataIn.readBoolean()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
MessageStoreStatistics messageStoreStatistics = new MessageStoreStatistics();
|
||||
messageStoreStatistics.getMessageCount().setCount(dataIn.readLong());
|
||||
messageStoreStatistics.getMessageSize().setTotalSize(dataIn.readLong());
|
||||
messageStoreStatistics.getMessageSize().setMaxSize(dataIn.readLong());
|
||||
messageStoreStatistics.getMessageSize().setMinSize(dataIn.readLong());
|
||||
messageStoreStatistics.getMessageSize().setCount(dataIn.readLong());
|
||||
|
||||
return messageStoreStatistics;
|
||||
}
|
||||
}
|
||||
|
||||
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
|
||||
|
||||
final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
|
||||
|
@ -2470,6 +2555,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
});
|
||||
}
|
||||
}
|
||||
|
||||
if (metadata.version >= 2) {
|
||||
value.orderIndex.lowPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
|
||||
value.orderIndex.highPriorityIndex = new BTreeIndex<>(pageFile, dataIn.readLong());
|
||||
|
@ -2491,6 +2577,15 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
});
|
||||
}
|
||||
|
||||
if (metadata.version >= 7) {
|
||||
value.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, dataIn.readLong());
|
||||
} else {
|
||||
pageFile.tx().execute(tx -> {
|
||||
value.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, tx.allocate());
|
||||
value.messageStoreStatistics.load(tx);
|
||||
});
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
|
@ -2510,6 +2605,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
}
|
||||
dataOut.writeLong(value.orderIndex.lowPriorityIndex.getPageId());
|
||||
dataOut.writeLong(value.orderIndex.highPriorityIndex.getPageId());
|
||||
dataOut.writeLong(value.messageStoreStatistics.getPageId());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2543,6 +2639,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
return rc;
|
||||
}
|
||||
|
||||
protected MessageStoreStatistics getStoredMessageStoreStatistics(KahaDestination destination, Transaction tx) throws IOException {
|
||||
StoredDestination sd = getStoredDestination(destination, tx);
|
||||
return sd != null && sd.messageStoreStatistics != null ? sd.messageStoreStatistics.get(tx) : null;
|
||||
}
|
||||
|
||||
protected StoredDestination getExistingStoredDestination(KahaDestination destination, Transaction tx) throws IOException {
|
||||
String key = key(destination);
|
||||
StoredDestination rc = storedDestinations.get(key);
|
||||
|
@ -2575,9 +2676,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
rc.ackPositions = new ListIndex<>(pageFile, tx.allocate());
|
||||
rc.subLocations = new ListIndex<>(pageFile, tx.allocate());
|
||||
}
|
||||
|
||||
rc.messageStoreStatistics = new StoredMessageStoreStatistics(pageFile, tx.allocate());
|
||||
|
||||
metadata.destinations.put(tx, key, rc);
|
||||
}
|
||||
|
||||
rc.messageStoreStatistics.load(tx);
|
||||
|
||||
// Configure the marshalers and load.
|
||||
rc.orderIndex.load(tx);
|
||||
|
||||
|
@ -2644,9 +2750,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
}
|
||||
}
|
||||
|
||||
// Configure the message references index
|
||||
|
||||
|
||||
// Configure the subscription cache
|
||||
for (Iterator<Entry<String, LastAck>> iterator = rc.subscriptionAcks.iterator(tx); iterator.hasNext(); ) {
|
||||
Entry<String, LastAck> entry = iterator.next();
|
||||
|
@ -2707,31 +2810,60 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
* @param kahaDestination
|
||||
* @param size
|
||||
*/
|
||||
protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
|
||||
incrementAndAddSizeToStoreStat(key(kahaDestination), size);
|
||||
protected void incrementAndAddSizeToStoreStat(Transaction tx, KahaDestination kahaDestination, long size) throws IOException {
|
||||
StoredDestination sd = getStoredDestination(kahaDestination, tx);
|
||||
incrementAndAddSizeToStoreStat(tx, key(kahaDestination), sd, size);
|
||||
}
|
||||
|
||||
protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
|
||||
protected void incrementAndAddSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException {
|
||||
MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
|
||||
if (storeStats != null) {
|
||||
storeStats.getMessageCount().increment();
|
||||
if (size > 0) {
|
||||
storeStats.getMessageSize().addSize(size);
|
||||
incrementAndAddSizeToStoreStat(size, storeStats);
|
||||
sd.messageStoreStatistics.put(tx, storeStats);
|
||||
} else if (sd != null){
|
||||
// During the recovery the storeStats is null
|
||||
MessageStoreStatistics storedStoreStats = sd.messageStoreStatistics.get(tx);
|
||||
if (storedStoreStats == null) {
|
||||
storedStoreStats = new MessageStoreStatistics();
|
||||
}
|
||||
incrementAndAddSizeToStoreStat(size, storedStoreStats);
|
||||
sd.messageStoreStatistics.put(tx, storedStoreStats);
|
||||
}
|
||||
}
|
||||
|
||||
protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
|
||||
decrementAndSubSizeToStoreStat(key(kahaDestination), size);
|
||||
private void incrementAndAddSizeToStoreStat(final long size, final MessageStoreStatistics storedStoreStats) {
|
||||
storedStoreStats.getMessageCount().increment();
|
||||
if (size > 0) {
|
||||
storedStoreStats.getMessageSize().addSize(size);
|
||||
}
|
||||
}
|
||||
|
||||
protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
|
||||
protected void decrementAndSubSizeToStoreStat(Transaction tx, KahaDestination kahaDestination, long size) throws IOException {
|
||||
StoredDestination sd = getStoredDestination(kahaDestination, tx);
|
||||
decrementAndSubSizeToStoreStat(tx, key(kahaDestination), sd,size);
|
||||
}
|
||||
|
||||
protected void decrementAndSubSizeToStoreStat(Transaction tx, String kahaDestKey, StoredDestination sd, long size) throws IOException {
|
||||
MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
|
||||
if (storeStats != null) {
|
||||
storeStats.getMessageCount().decrement();
|
||||
if (size > 0) {
|
||||
storeStats.getMessageSize().addSize(-size);
|
||||
decrementAndSubSizeToStoreStat(size, storeStats);
|
||||
sd.messageStoreStatistics.put(tx, storeStats);
|
||||
} else if (sd != null){
|
||||
// During the recovery the storeStats is null
|
||||
MessageStoreStatistics storedStoreStats = sd.messageStoreStatistics.get(tx);
|
||||
if (storedStoreStats == null) {
|
||||
storedStoreStats = new MessageStoreStatistics();
|
||||
}
|
||||
decrementAndSubSizeToStoreStat(size, storedStoreStats);
|
||||
sd.messageStoreStatistics.put(tx, storedStoreStats);
|
||||
}
|
||||
}
|
||||
|
||||
private void decrementAndSubSizeToStoreStat(final long size, final MessageStoreStatistics storedStoreStats) {
|
||||
storedStoreStats.getMessageCount().decrement();
|
||||
|
||||
if (size > 0) {
|
||||
storedStoreStats.getMessageSize().addSize(-size);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2936,7 +3068,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
sd.locationIndex.remove(tx, entry.getValue().location);
|
||||
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
|
||||
sd.orderIndex.remove(tx, entry.getKey());
|
||||
decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
|
||||
decrementAndSubSizeToStoreStat(tx, command.getDestination(), entry.getValue().location.getSize());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2990,7 +3122,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
sd.locationIndex.remove(tx, entry.getValue().location);
|
||||
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
|
||||
sd.orderIndex.remove(tx, entry.getKey());
|
||||
decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
|
||||
decrementAndSubSizeToStoreStat(tx, command.getDestination(), entry.getValue().location.getSize());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,242 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.broker;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import org.apache.activemq.broker.region.Destination;
|
||||
import org.apache.activemq.broker.region.RegionBroker;
|
||||
import org.apache.activemq.command.ActiveMQDestination;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.apache.activemq.command.ConnectionInfo;
|
||||
import org.apache.activemq.command.ConsumerInfo;
|
||||
import org.apache.activemq.command.Message;
|
||||
import org.apache.activemq.command.MessageAck;
|
||||
import org.apache.activemq.command.ProducerInfo;
|
||||
import org.apache.activemq.command.SessionInfo;
|
||||
import org.apache.activemq.store.MessageStoreStatistics;
|
||||
import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
@RunWith(value = Parameterized.class)
|
||||
public class RecoveryStatsBrokerTest extends BrokerRestartTestSupport {
|
||||
|
||||
private RestartType restartType;
|
||||
|
||||
enum RestartType {
|
||||
NORMAL,
|
||||
FULL_RECOVERY,
|
||||
UNCLEAN_SHUTDOWN
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void configureBroker(BrokerService broker) throws Exception {
|
||||
KahaDBPersistenceAdapter persistenceAdapter = new KahaDBPersistenceAdapter();
|
||||
persistenceAdapter.setJournalMaxFileLength(1024*1024);
|
||||
//persistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
|
||||
persistenceAdapter.setDirectory(broker.getBrokerDataDirectory());
|
||||
broker.setPersistenceAdapter(persistenceAdapter);
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
}
|
||||
|
||||
protected void restartBroker(RestartType restartType) throws Exception {
|
||||
if (restartType == RestartType.FULL_RECOVERY) {
|
||||
stopBroker();
|
||||
KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
|
||||
File dir = kahaDBPersistenceAdapter.getDirectory();
|
||||
if (dir != null) {
|
||||
IOHelper.deleteFile(new File(dir, "db.data"));
|
||||
}
|
||||
broker.start();
|
||||
} else if (restartType == RestartType.UNCLEAN_SHUTDOWN){
|
||||
//Simulate an unclean shutdown
|
||||
|
||||
File dir = broker.getBrokerDataDirectory();
|
||||
File backUpDir = new File(dir, "bk");
|
||||
IOHelper.mkdirs(new File(dir, "bk"));
|
||||
|
||||
for (File f: dir.listFiles()) {
|
||||
if (!f.isDirectory()) {
|
||||
IOHelper.copyFile(f, new File(backUpDir, f.getName()));
|
||||
}
|
||||
}
|
||||
|
||||
stopBroker();
|
||||
|
||||
for (File f: backUpDir.listFiles()) {
|
||||
if (!f.isDirectory()) {
|
||||
IOHelper.copyFile(f, new File(dir, f.getName()));
|
||||
}
|
||||
}
|
||||
|
||||
broker.start();
|
||||
} else {
|
||||
restartBroker();
|
||||
}
|
||||
}
|
||||
|
||||
@Parameterized.Parameters(name="{0}")
|
||||
public static Collection<Object[]> getTestParameters() {
|
||||
return Arrays.asList(new Object[][] {
|
||||
{RestartType.NORMAL},
|
||||
{RestartType.FULL_RECOVERY},
|
||||
{RestartType.UNCLEAN_SHUTDOWN},
|
||||
});
|
||||
}
|
||||
|
||||
public RecoveryStatsBrokerTest(RestartType restartType) {
|
||||
this.restartType = restartType;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
}
|
||||
|
||||
@Test(timeout = 60 * 1000)
|
||||
public void testStaticsRecovery() throws Exception {
|
||||
List<ActiveMQDestination> destinations = ImmutableList.of(new ActiveMQQueue("TEST.A"), new ActiveMQQueue("TEST.B"));
|
||||
Random random = new Random();
|
||||
Map<ActiveMQDestination, Integer> consumedMessages = new HashMap<>();
|
||||
|
||||
destinations.forEach(destination -> consumedMessages.put(destination, 0));
|
||||
|
||||
int numberOfMessages = 10000;
|
||||
|
||||
StubConnection connection = createConnection();
|
||||
ConnectionInfo connectionInfo = createConnectionInfo();
|
||||
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
|
||||
ProducerInfo producerInfo = createProducerInfo(sessionInfo);
|
||||
connection.send(connectionInfo);
|
||||
connection.send(sessionInfo);
|
||||
connection.send(producerInfo);
|
||||
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
for (ActiveMQDestination destination : destinations) {
|
||||
Message message = createMessage(producerInfo, destination);
|
||||
message.setPersistent(true);
|
||||
message.setProducerId(message.getMessageId().getProducerId());
|
||||
connection.request(message);
|
||||
}
|
||||
}
|
||||
|
||||
Map<ActiveMQDestination, MessageStoreStatistics> originalStatistics = getCurrentStatistics(destinations);
|
||||
|
||||
checkStatistics(destinations, originalStatistics);
|
||||
|
||||
restartBroker(restartType);
|
||||
|
||||
checkStatistics(destinations, originalStatistics);
|
||||
|
||||
for (ActiveMQDestination destination : destinations) {
|
||||
consume(destination, 100, false);
|
||||
}
|
||||
|
||||
checkStatistics(destinations, originalStatistics);
|
||||
|
||||
restartBroker(restartType);
|
||||
|
||||
checkStatistics(destinations, originalStatistics);
|
||||
|
||||
for (ActiveMQDestination destination : destinations) {
|
||||
int messagesToConsume = random.nextInt(numberOfMessages);
|
||||
consume(destination, messagesToConsume, true);
|
||||
consumedMessages.compute(destination, (key, value) -> value = value + messagesToConsume);
|
||||
}
|
||||
|
||||
originalStatistics = getCurrentStatistics(destinations);
|
||||
|
||||
for (ActiveMQDestination destination : destinations) {
|
||||
int consumedCount = consumedMessages.get(destination);
|
||||
assertEquals("", numberOfMessages - consumedCount, originalStatistics.get(destination).getMessageCount().getCount());
|
||||
}
|
||||
|
||||
checkStatistics(destinations, originalStatistics);
|
||||
|
||||
restartBroker(restartType);
|
||||
|
||||
checkStatistics(destinations, originalStatistics);
|
||||
}
|
||||
|
||||
private Map<ActiveMQDestination, MessageStoreStatistics> getCurrentStatistics(final List<ActiveMQDestination> destinations) {
|
||||
return destinations.stream()
|
||||
.map(destination -> getDestination(broker, destination))
|
||||
.collect(Collectors.toMap(destination -> new ActiveMQQueue(destination.getName()), destination2 -> destination2.getMessageStore().getMessageStoreStatistics()));
|
||||
}
|
||||
|
||||
private void consume(ActiveMQDestination destination, int numberOfMessages, boolean shouldAck) throws Exception {
|
||||
// Setup the consumer and receive the message.
|
||||
StubConnection connection = createConnection();
|
||||
ConnectionInfo connectionInfo = createConnectionInfo();
|
||||
SessionInfo sessionInfo = createSessionInfo(connectionInfo);
|
||||
connection.send(connectionInfo);
|
||||
connection.send(sessionInfo);
|
||||
ConsumerInfo consumerInfo = createConsumerInfo(sessionInfo, destination);
|
||||
connection.send(consumerInfo);
|
||||
|
||||
// The we should get the messages.
|
||||
for (int i = 0; i < numberOfMessages; i++) {
|
||||
Message m2 = receiveMessage(connection);
|
||||
assertNotNull(m2);
|
||||
if (shouldAck) {
|
||||
MessageAck ack = createAck(consumerInfo, m2, 1, MessageAck.STANDARD_ACK_TYPE);
|
||||
connection.request(ack);
|
||||
}
|
||||
}
|
||||
|
||||
connection.request(closeConnectionInfo(connectionInfo));
|
||||
}
|
||||
|
||||
private void checkStatistics(final List<ActiveMQDestination> destinations, final Map<ActiveMQDestination, MessageStoreStatistics> originalStatistics) {
|
||||
for (ActiveMQDestination destination : destinations) {
|
||||
MessageStoreStatistics original = originalStatistics.get(destination);
|
||||
MessageStoreStatistics actual = getDestination(broker, destination).getMessageStore().getMessageStoreStatistics();
|
||||
assertEquals("Have Same Count", original.getMessageCount().getCount(), actual.getMessageCount().getCount());
|
||||
assertEquals("Have Same TotalSize", original.getMessageSize().getTotalSize(), getDestination(broker, destination).getMessageStore().getMessageStoreStatistics().getMessageSize().getTotalSize());
|
||||
}
|
||||
}
|
||||
|
||||
protected Destination getDestination(BrokerService target, ActiveMQDestination destination) {
|
||||
RegionBroker regionBroker = (RegionBroker) target.getRegionBroker();
|
||||
if (destination.isTemporary()) {
|
||||
return destination.isQueue() ? regionBroker.getTempQueueRegion().getDestinationMap().get(destination) :
|
||||
regionBroker.getTempTopicRegion().getDestinationMap().get(destination);
|
||||
}
|
||||
return destination.isQueue() ?
|
||||
regionBroker.getQueueRegion().getDestinationMap().get(destination) :
|
||||
regionBroker.getTopicRegion().getDestinationMap().get(destination);
|
||||
}
|
||||
}
|
|
@ -57,6 +57,7 @@ public class KahaDBVersionTest extends TestCase {
|
|||
final static File VERSION_3_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion3");
|
||||
final static File VERSION_4_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion4");
|
||||
final static File VERSION_5_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion5");
|
||||
final static File VERSION_6_DB = new File(basedir + "/src/test/resources/org/apache/activemq/store/kahadb/KahaDBVersion6");
|
||||
|
||||
|
||||
BrokerService broker = null;
|
||||
|
@ -133,6 +134,10 @@ public class KahaDBVersionTest extends TestCase {
|
|||
doConvertRestartCycle(VERSION_5_DB);
|
||||
}
|
||||
|
||||
public void testVersion6Conversion() throws Exception {
|
||||
doConvertRestartCycle(VERSION_6_DB);
|
||||
}
|
||||
|
||||
public void doConvertRestartCycle(File existingStore) throws Exception {
|
||||
|
||||
File testDir = new File("target/activemq-data/kahadb/versionDB");
|
||||
|
|
|
@ -177,7 +177,7 @@ public class MKahaDBStoreLimitTest {
|
|||
|
||||
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
|
||||
StoreUsage storeUsage = new StoreUsage();
|
||||
storeUsage.setLimit(40*1024);
|
||||
storeUsage.setLimit(44*1024);
|
||||
|
||||
filtered.setUsage(storeUsage);
|
||||
filtered.setDestination(queueA);
|
||||
|
|
|
@ -37,7 +37,7 @@ public class StoreUsageTest extends EmbeddedBrokerTestSupport {
|
|||
@Override
|
||||
protected BrokerService createBroker() throws Exception {
|
||||
BrokerService broker = super.createBroker();
|
||||
broker.getSystemUsage().getStoreUsage().setLimit(34 * 1024);
|
||||
broker.getSystemUsage().getStoreUsage().setLimit(38 * 1024);
|
||||
broker.deleteAllMessages();
|
||||
return broker;
|
||||
}
|
||||
|
|
|
@ -218,7 +218,7 @@ public class DurableSubsOfflineSelectorConcurrentConsumeIndexUseTest extends org
|
|||
assertTrue("no leak of pages, always use just 11", Wait.waitFor(new Wait.Condition() {
|
||||
@Override
|
||||
public boolean isSatisified() throws Exception {
|
||||
return 11 == store.getPageFile().getPageCount() -
|
||||
return 12 == store.getPageFile().getPageCount() -
|
||||
store.getPageFile().getFreePageCount();
|
||||
}
|
||||
}, TimeUnit.SECONDS.toMillis(10)));
|
||||
|
|
Binary file not shown.
Binary file not shown.
Binary file not shown.
|
@ -87,6 +87,7 @@ public class LocalBrokerFacade extends BrokerFacadeSupport {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
private Destination unwrap(Destination dest) {
|
||||
if (dest instanceof DestinationFilter) {
|
||||
return unwrap(((DestinationFilter) dest).getNext());
|
||||
|
|
Loading…
Reference in New Issue