mirror of
https://github.com/apache/activemq.git
synced 2025-03-09 01:29:23 +00:00
Added a getMessageSize method to MessageStore to support retrieving the total message size of all stored messages for a destination. Added a new storeMessageSize statistic to DestinationStatistics.
This commit is contained in:
parent
7a68ad5d98
commit
785b16bf9e
activemq-broker/src/main/java/org/apache/activemq
broker
store
activemq-client/src/main/java/org/apache/activemq/management
activemq-jdbc-store/src/main/java/org/apache/activemq/store
activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb
activemq-leveldb-store/src/main/scala/org/apache/activemq/leveldb
activemq-unit-tests/src/test
java/org/apache/activemq
broker/region/cursors
store
resources/org/apache/activemq/store/kahadb
@ -38,6 +38,7 @@ import javax.management.openmbean.OpenDataException;
|
|||||||
import javax.management.openmbean.TabularData;
|
import javax.management.openmbean.TabularData;
|
||||||
import javax.management.openmbean.TabularDataSupport;
|
import javax.management.openmbean.TabularDataSupport;
|
||||||
import javax.management.openmbean.TabularType;
|
import javax.management.openmbean.TabularType;
|
||||||
|
|
||||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
|
import org.apache.activemq.broker.jmx.OpenTypeSupport.OpenTypeFactory;
|
||||||
import org.apache.activemq.broker.region.Destination;
|
import org.apache.activemq.broker.region.Destination;
|
||||||
@ -51,6 +52,7 @@ import org.apache.activemq.command.Message;
|
|||||||
import org.apache.activemq.filter.BooleanExpression;
|
import org.apache.activemq.filter.BooleanExpression;
|
||||||
import org.apache.activemq.filter.MessageEvaluationContext;
|
import org.apache.activemq.filter.MessageEvaluationContext;
|
||||||
import org.apache.activemq.selector.SelectorParser;
|
import org.apache.activemq.selector.SelectorParser;
|
||||||
|
import org.apache.activemq.store.MessageStore;
|
||||||
import org.apache.activemq.util.URISupport;
|
import org.apache.activemq.util.URISupport;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -119,6 +121,12 @@ public class DestinationView implements DestinationViewMBean {
|
|||||||
return destination.getDestinationStatistics().getMessages().getCount();
|
return destination.getDestinationStatistics().getMessages().getCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getStoreMessageSize() {
|
||||||
|
MessageStore messageStore = destination.getMessageStore();
|
||||||
|
return messageStore != null ? messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize() : 0;
|
||||||
|
}
|
||||||
|
|
||||||
public long getMessagesCached() {
|
public long getMessagesCached() {
|
||||||
return destination.getDestinationStatistics().getMessagesCached().getCount();
|
return destination.getDestinationStatistics().getMessagesCached().getCount();
|
||||||
}
|
}
|
||||||
|
@ -121,6 +121,14 @@ public interface DestinationViewMBean {
|
|||||||
@MBeanInfo("Number of messages in the destination which are yet to be consumed. Potentially dispatched but unacknowledged.")
|
@MBeanInfo("Number of messages in the destination which are yet to be consumed. Potentially dispatched but unacknowledged.")
|
||||||
long getQueueSize();
|
long getQueueSize();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the memory size of all messages in this destination's store
|
||||||
|
*
|
||||||
|
* @return Returns the memory size of all messages in this destination's store
|
||||||
|
*/
|
||||||
|
@MBeanInfo("The memory size of all messages in this destination's store.")
|
||||||
|
long getStoreMessageSize();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return An array of all the messages in the destination's queue.
|
* @return An array of all the messages in the destination's queue.
|
||||||
*/
|
*/
|
||||||
|
@ -375,6 +375,7 @@ public class Queue extends BaseDestination implements Task, UsageListener, Index
|
|||||||
messages.setMaxProducersToAudit(getMaxProducersToAudit());
|
messages.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||||
messages.setUseCache(isUseCache());
|
messages.setUseCache(isUseCache());
|
||||||
messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
||||||
|
store.start();
|
||||||
final int messageCount = store.getMessageCount();
|
final int messageCount = store.getMessageCount();
|
||||||
if (messageCount > 0 && messages.isRecoveryRequired()) {
|
if (messageCount > 0 && messages.isRecoveryRequired()) {
|
||||||
BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);
|
BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);
|
||||||
|
@ -105,6 +105,7 @@ public class Topic extends BaseDestination implements Task {
|
|||||||
// misleading metrics.
|
// misleading metrics.
|
||||||
// int messageCount = store.getMessageCount();
|
// int messageCount = store.getMessageCount();
|
||||||
// destinationStatistics.getMessages().setCount(messageCount);
|
// destinationStatistics.getMessages().setCount(messageCount);
|
||||||
|
store.start();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -30,6 +30,7 @@ abstract public class AbstractMessageStore implements MessageStore {
|
|||||||
protected final ActiveMQDestination destination;
|
protected final ActiveMQDestination destination;
|
||||||
protected boolean prioritizedMessages;
|
protected boolean prioritizedMessages;
|
||||||
protected IndexListener indexListener;
|
protected IndexListener indexListener;
|
||||||
|
protected final MessageStoreStatistics messageStoreStatistics = new MessageStoreStatistics();
|
||||||
|
|
||||||
public AbstractMessageStore(ActiveMQDestination destination) {
|
public AbstractMessageStore(ActiveMQDestination destination) {
|
||||||
this.destination = destination;
|
this.destination = destination;
|
||||||
@ -41,6 +42,7 @@ abstract public class AbstractMessageStore implements MessageStore {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void start() throws Exception {
|
public void start() throws Exception {
|
||||||
|
recoverMessageStoreStatistics();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -132,4 +134,23 @@ abstract public class AbstractMessageStore implements MessageStore {
|
|||||||
static {
|
static {
|
||||||
FUTURE = new InlineListenableFuture();
|
FUTURE = new InlineListenableFuture();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getMessageCount() throws IOException {
|
||||||
|
return (int) getMessageStoreStatistics().getMessageCount().getCount();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getMessageSize() throws IOException {
|
||||||
|
return getMessageStoreStatistics().getMessageSize().getTotalSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageStoreStatistics getMessageStoreStatistics() {
|
||||||
|
return messageStoreStatistics;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void recoverMessageStoreStatistics() throws IOException {
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -158,6 +158,18 @@ public interface MessageStore extends Service {
|
|||||||
*/
|
*/
|
||||||
int getMessageCount() throws IOException;
|
int getMessageCount() throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the size of the messages ready to deliver
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
long getMessageSize() throws IOException;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return The statistics bean for this message store
|
||||||
|
*/
|
||||||
|
MessageStoreStatistics getMessageStoreStatistics();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A hint to the Store to reset any batching state for the Destination
|
* A hint to the Store to reset any batching state for the Destination
|
||||||
*
|
*
|
||||||
|
@ -0,0 +1,81 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.activemq.store;
|
||||||
|
|
||||||
|
import org.apache.activemq.management.CountStatisticImpl;
|
||||||
|
import org.apache.activemq.management.SizeStatisticImpl;
|
||||||
|
import org.apache.activemq.management.StatsImpl;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The J2EE Statistics for a Message Sore
|
||||||
|
*/
|
||||||
|
public class MessageStoreStatistics extends StatsImpl {
|
||||||
|
|
||||||
|
protected CountStatisticImpl messageCount;
|
||||||
|
protected SizeStatisticImpl messageSize;
|
||||||
|
|
||||||
|
|
||||||
|
public MessageStoreStatistics() {
|
||||||
|
this(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
public MessageStoreStatistics(boolean enabled) {
|
||||||
|
|
||||||
|
messageCount = new CountStatisticImpl("messageCount", "The number of messages in the store passing through the destination");
|
||||||
|
messageSize = new SizeStatisticImpl("messageSize","Size of messages in the store passing through the destination");
|
||||||
|
|
||||||
|
addStatistic("messageCount", messageCount);
|
||||||
|
addStatistic("messageSize", messageSize);
|
||||||
|
|
||||||
|
this.setEnabled(enabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
public CountStatisticImpl getMessageCount() {
|
||||||
|
return messageCount;
|
||||||
|
}
|
||||||
|
|
||||||
|
public SizeStatisticImpl getMessageSize() {
|
||||||
|
return messageSize;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void reset() {
|
||||||
|
if (this.isDoReset()) {
|
||||||
|
super.reset();
|
||||||
|
messageCount.reset();
|
||||||
|
messageSize.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setEnabled(boolean enabled) {
|
||||||
|
super.setEnabled(enabled);
|
||||||
|
messageCount.setEnabled(enabled);
|
||||||
|
messageSize.setEnabled(enabled);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setParent(MessageStoreStatistics parent) {
|
||||||
|
if (parent != null) {
|
||||||
|
messageCount.setParent(parent.messageCount);
|
||||||
|
messageSize.setParent(parent.messageSize);
|
||||||
|
} else {
|
||||||
|
messageCount.setParent(null);
|
||||||
|
messageSize.setParent(null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -100,6 +100,11 @@ public class ProxyMessageStore implements MessageStore {
|
|||||||
return delegate.getMessageCount();
|
return delegate.getMessageCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getMessageSize() throws IOException {
|
||||||
|
return delegate.getMessageSize();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
|
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
|
||||||
delegate.recoverNextMessages(maxReturned, listener);
|
delegate.recoverNextMessages(maxReturned, listener);
|
||||||
@ -169,4 +174,10 @@ public class ProxyMessageStore implements MessageStore {
|
|||||||
public String toString() {
|
public String toString() {
|
||||||
return delegate.toString();
|
return delegate.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageStoreStatistics getMessageStoreStatistics() {
|
||||||
|
return delegate.getMessageStoreStatistics();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -17,7 +17,6 @@
|
|||||||
package org.apache.activemq.store;
|
package org.apache.activemq.store;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
import org.apache.activemq.broker.ConnectionContext;
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
import org.apache.activemq.command.ActiveMQDestination;
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
@ -145,6 +144,11 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
|
|||||||
return delegate.getMessageCount();
|
return delegate.getMessageCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getMessageSize() throws IOException {
|
||||||
|
return delegate.getMessageSize();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
|
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
|
||||||
delegate.recoverNextMessages(maxReturned, listener);
|
delegate.recoverNextMessages(maxReturned, listener);
|
||||||
@ -213,4 +217,10 @@ public class ProxyTopicMessageStore implements TopicMessageStore {
|
|||||||
public void registerIndexListener(IndexListener indexListener) {
|
public void registerIndexListener(IndexListener indexListener) {
|
||||||
delegate.registerIndexListener(indexListener);
|
delegate.registerIndexListener(indexListener);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public MessageStoreStatistics getMessageStoreStatistics() {
|
||||||
|
return delegate.getMessageStoreStatistics();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -35,8 +35,8 @@ import org.apache.activemq.store.AbstractMessageStore;
|
|||||||
/**
|
/**
|
||||||
* An implementation of {@link org.apache.activemq.store.MessageStore} which
|
* An implementation of {@link org.apache.activemq.store.MessageStore} which
|
||||||
* uses a
|
* uses a
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class MemoryMessageStore extends AbstractMessageStore {
|
public class MemoryMessageStore extends AbstractMessageStore {
|
||||||
|
|
||||||
@ -56,6 +56,8 @@ public class MemoryMessageStore extends AbstractMessageStore {
|
|||||||
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
|
public synchronized void addMessage(ConnectionContext context, Message message) throws IOException {
|
||||||
synchronized (messageTable) {
|
synchronized (messageTable) {
|
||||||
messageTable.put(message.getMessageId(), message);
|
messageTable.put(message.getMessageId(), message);
|
||||||
|
getMessageStoreStatistics().getMessageCount().increment();
|
||||||
|
getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
|
||||||
}
|
}
|
||||||
message.incrementReferenceCount();
|
message.incrementReferenceCount();
|
||||||
message.getMessageId().setFutureOrSequenceLong(sequenceId++);
|
message.getMessageId().setFutureOrSequenceLong(sequenceId++);
|
||||||
@ -93,6 +95,8 @@ public class MemoryMessageStore extends AbstractMessageStore {
|
|||||||
if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
|
if ((lastBatchId != null && lastBatchId.equals(msgId)) || messageTable.isEmpty()) {
|
||||||
lastBatchId = null;
|
lastBatchId = null;
|
||||||
}
|
}
|
||||||
|
getMessageStoreStatistics().getMessageCount().decrement();
|
||||||
|
getMessageStoreStatistics().getMessageSize().addSize(-removed.getSize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,20 +118,17 @@ public class MemoryMessageStore extends AbstractMessageStore {
|
|||||||
public void removeAllMessages(ConnectionContext context) throws IOException {
|
public void removeAllMessages(ConnectionContext context) throws IOException {
|
||||||
synchronized (messageTable) {
|
synchronized (messageTable) {
|
||||||
messageTable.clear();
|
messageTable.clear();
|
||||||
|
getMessageStoreStatistics().reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void delete() {
|
public void delete() {
|
||||||
synchronized (messageTable) {
|
synchronized (messageTable) {
|
||||||
messageTable.clear();
|
messageTable.clear();
|
||||||
|
getMessageStoreStatistics().reset();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public int getMessageCount() {
|
|
||||||
return messageTable.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
|
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
|
||||||
synchronized (messageTable) {
|
synchronized (messageTable) {
|
||||||
boolean pastLackBatch = lastBatchId == null;
|
boolean pastLackBatch = lastBatchId == null;
|
||||||
@ -161,8 +162,34 @@ public class MemoryMessageStore extends AbstractMessageStore {
|
|||||||
|
|
||||||
public void updateMessage(Message message) {
|
public void updateMessage(Message message) {
|
||||||
synchronized (messageTable) {
|
synchronized (messageTable) {
|
||||||
|
Message original = messageTable.get(message.getMessageId());
|
||||||
|
|
||||||
|
//if can't be found then increment count, else remove old size
|
||||||
|
if (original == null) {
|
||||||
|
getMessageStoreStatistics().getMessageCount().increment();
|
||||||
|
} else {
|
||||||
|
getMessageStoreStatistics().getMessageSize().addSize(-original.getSize());
|
||||||
|
}
|
||||||
messageTable.put(message.getMessageId(), message);
|
messageTable.put(message.getMessageId(), message);
|
||||||
|
getMessageStoreStatistics().getMessageSize().addSize(message.getSize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recoverMessageStoreStatistics() throws IOException {
|
||||||
|
synchronized (messageTable) {
|
||||||
|
long size = 0;
|
||||||
|
int count = 0;
|
||||||
|
for (Iterator<Message> iter = messageTable.values().iterator(); iter
|
||||||
|
.hasNext();) {
|
||||||
|
Message msg = iter.next();
|
||||||
|
size += msg.getSize();
|
||||||
|
}
|
||||||
|
|
||||||
|
getMessageStoreStatistics().reset();
|
||||||
|
getMessageStoreStatistics().getMessageCount().setCount(count);
|
||||||
|
getMessageStoreStatistics().getMessageSize().setTotalSize(size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -67,6 +67,23 @@ public class SizeStatisticImpl extends StatisticImpl {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reset the total size to the new value
|
||||||
|
*
|
||||||
|
* @param size
|
||||||
|
*/
|
||||||
|
public synchronized void setTotalSize(long size) {
|
||||||
|
count++;
|
||||||
|
totalSize = size;
|
||||||
|
if (size > maxSize) {
|
||||||
|
maxSize = size;
|
||||||
|
}
|
||||||
|
if (size < minSize || minSize == 0) {
|
||||||
|
minSize = size;
|
||||||
|
}
|
||||||
|
updateSampleTime();
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the maximum size of any step
|
* @return the maximum size of any step
|
||||||
*/
|
*/
|
||||||
|
@ -304,6 +304,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
public int getMessageCount() throws IOException {
|
public int getMessageCount() throws IOException {
|
||||||
int result = 0;
|
int result = 0;
|
||||||
TransactionContext c = persistenceAdapter.getTransactionContext();
|
TransactionContext c = persistenceAdapter.getTransactionContext();
|
||||||
@ -401,4 +402,5 @@ public class JDBCMessageStore extends AbstractMessageStore {
|
|||||||
public String toString() {
|
public String toString() {
|
||||||
return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size();
|
return destination.getPhysicalName() + ",pendingSize:" + pendingAdditions.size();
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -48,8 +48,8 @@ import org.slf4j.LoggerFactory;
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* A MessageStore that uses a Journal to store it's messages.
|
* A MessageStore that uses a Journal to store it's messages.
|
||||||
*
|
*
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public class JournalMessageStore extends AbstractMessageStore {
|
public class JournalMessageStore extends AbstractMessageStore {
|
||||||
|
|
||||||
@ -79,7 +79,7 @@ public class JournalMessageStore extends AbstractMessageStore {
|
|||||||
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
|
this.transactionTemplate = new TransactionTemplate(adapter, new ConnectionContext(new NonCachedMessageEvaluationContext()));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
public void setMemoryUsage(MemoryUsage memoryUsage) {
|
public void setMemoryUsage(MemoryUsage memoryUsage) {
|
||||||
this.memoryUsage=memoryUsage;
|
this.memoryUsage=memoryUsage;
|
||||||
longTermStore.setMemoryUsage(memoryUsage);
|
longTermStore.setMemoryUsage(memoryUsage);
|
||||||
@ -323,7 +323,7 @@ public class JournalMessageStore extends AbstractMessageStore {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
public Message getMessage(MessageId identity) throws IOException {
|
public Message getMessage(MessageId identity) throws IOException {
|
||||||
Message answer = null;
|
Message answer = null;
|
||||||
@ -348,7 +348,7 @@ public class JournalMessageStore extends AbstractMessageStore {
|
|||||||
* Replays the checkpointStore first as those messages are the oldest ones,
|
* Replays the checkpointStore first as those messages are the oldest ones,
|
||||||
* then messages are replayed from the transaction log and then the cache is
|
* then messages are replayed from the transaction log and then the cache is
|
||||||
* updated.
|
* updated.
|
||||||
*
|
*
|
||||||
* @param listener
|
* @param listener
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@ -404,6 +404,11 @@ public class JournalMessageStore extends AbstractMessageStore {
|
|||||||
return longTermStore.getMessageCount();
|
return longTermStore.getMessageCount();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getMessageSize() throws IOException {
|
||||||
|
peristenceAdapter.checkpoint(true, true);
|
||||||
|
return longTermStore.getMessageSize();
|
||||||
|
}
|
||||||
|
|
||||||
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
|
public void recoverNextMessages(int maxReturned, MessageRecoveryListener listener) throws Exception {
|
||||||
peristenceAdapter.checkpoint(true, true);
|
peristenceAdapter.checkpoint(true, true);
|
||||||
longTermStore.recoverNextMessages(maxReturned, listener);
|
longTermStore.recoverNextMessages(maxReturned, listener);
|
||||||
|
@ -61,6 +61,7 @@ import org.apache.activemq.store.IndexListener;
|
|||||||
import org.apache.activemq.store.ListenableFuture;
|
import org.apache.activemq.store.ListenableFuture;
|
||||||
import org.apache.activemq.store.MessageRecoveryListener;
|
import org.apache.activemq.store.MessageRecoveryListener;
|
||||||
import org.apache.activemq.store.MessageStore;
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.apache.activemq.store.MessageStoreStatistics;
|
||||||
import org.apache.activemq.store.PersistenceAdapter;
|
import org.apache.activemq.store.PersistenceAdapter;
|
||||||
import org.apache.activemq.store.TopicMessageStore;
|
import org.apache.activemq.store.TopicMessageStore;
|
||||||
import org.apache.activemq.store.TransactionIdTransformer;
|
import org.apache.activemq.store.TransactionIdTransformer;
|
||||||
@ -503,34 +504,6 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||||||
return loadMessage(location);
|
return loadMessage(location);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getMessageCount() throws IOException {
|
|
||||||
try {
|
|
||||||
lockAsyncJobQueue();
|
|
||||||
indexLock.writeLock().lock();
|
|
||||||
try {
|
|
||||||
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>() {
|
|
||||||
@Override
|
|
||||||
public Integer execute(Transaction tx) throws IOException {
|
|
||||||
// Iterate through all index entries to get a count
|
|
||||||
// of messages in the destination.
|
|
||||||
StoredDestination sd = getStoredDestination(dest, tx);
|
|
||||||
int rc = 0;
|
|
||||||
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
|
|
||||||
iterator.next();
|
|
||||||
rc++;
|
|
||||||
}
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
} finally {
|
|
||||||
indexLock.writeLock().unlock();
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
unlockAsyncJobQueue();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isEmpty() throws IOException {
|
public boolean isEmpty() throws IOException {
|
||||||
indexLock.writeLock().lock();
|
indexLock.writeLock().lock();
|
||||||
@ -716,6 +689,38 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||||||
public String toString(){
|
public String toString(){
|
||||||
return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
|
return "permits:" + this.localDestinationSemaphore.availablePermits() + ",sd=" + storedDestinations.get(key(dest));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void recoverMessageStoreStatistics() throws IOException {
|
||||||
|
try {
|
||||||
|
MessageStoreStatistics recoveredStatistics;
|
||||||
|
lockAsyncJobQueue();
|
||||||
|
indexLock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
recoveredStatistics = pageFile.tx().execute(new Transaction.CallableClosure<MessageStoreStatistics, IOException>() {
|
||||||
|
@Override
|
||||||
|
public MessageStoreStatistics execute(Transaction tx) throws IOException {
|
||||||
|
MessageStoreStatistics statistics = new MessageStoreStatistics();
|
||||||
|
|
||||||
|
// Iterate through all index entries to get the size of each message
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
for (Iterator<Entry<Location, Long>> iterator = sd.locationIndex.iterator(tx); iterator.hasNext();) {
|
||||||
|
int locationSize = iterator.next().getKey().getSize();
|
||||||
|
statistics.getMessageCount().increment();
|
||||||
|
statistics.getMessageSize().addSize(locationSize > 0 ? locationSize : 0);
|
||||||
|
}
|
||||||
|
return statistics;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
getMessageStoreStatistics().getMessageCount().setCount(recoveredStatistics.getMessageCount().getCount());
|
||||||
|
getMessageStoreStatistics().getMessageSize().setTotalSize(recoveredStatistics.getMessageSize().getTotalSize());
|
||||||
|
} finally {
|
||||||
|
indexLock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
unlockAsyncJobQueue();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
|
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
|
||||||
@ -993,12 +998,16 @@ public class KahaDBStore extends MessageDatabase implements PersistenceAdapter {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
|
public MessageStore createQueueMessageStore(ActiveMQQueue destination) throws IOException {
|
||||||
return this.transactionStore.proxy(new KahaDBMessageStore(destination));
|
MessageStore store = this.transactionStore.proxy(new KahaDBMessageStore(destination));
|
||||||
|
storeCache.put(key(convert(destination)), store);
|
||||||
|
return store;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
|
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
|
||||||
return this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
|
TopicMessageStore store = this.transactionStore.proxy(new KahaDBTopicMessageStore(destination));
|
||||||
|
storeCache.put(key(convert(destination)), store);
|
||||||
|
return store;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -46,6 +46,7 @@ import java.util.Set;
|
|||||||
import java.util.SortedSet;
|
import java.util.SortedSet;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
@ -53,10 +54,15 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
|
|||||||
import org.apache.activemq.ActiveMQMessageAuditNoSync;
|
import org.apache.activemq.ActiveMQMessageAuditNoSync;
|
||||||
import org.apache.activemq.broker.BrokerService;
|
import org.apache.activemq.broker.BrokerService;
|
||||||
import org.apache.activemq.broker.BrokerServiceAware;
|
import org.apache.activemq.broker.BrokerServiceAware;
|
||||||
|
import org.apache.activemq.broker.region.Destination;
|
||||||
|
import org.apache.activemq.broker.region.Queue;
|
||||||
|
import org.apache.activemq.broker.region.Topic;
|
||||||
import org.apache.activemq.command.MessageAck;
|
import org.apache.activemq.command.MessageAck;
|
||||||
import org.apache.activemq.command.TransactionId;
|
import org.apache.activemq.command.TransactionId;
|
||||||
import org.apache.activemq.openwire.OpenWireFormat;
|
import org.apache.activemq.openwire.OpenWireFormat;
|
||||||
import org.apache.activemq.protobuf.Buffer;
|
import org.apache.activemq.protobuf.Buffer;
|
||||||
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.apache.activemq.store.MessageStoreStatistics;
|
||||||
import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
|
import org.apache.activemq.store.kahadb.data.KahaAckMessageFileMapCommand;
|
||||||
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
|
import org.apache.activemq.store.kahadb.data.KahaAddMessageCommand;
|
||||||
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
|
import org.apache.activemq.store.kahadb.data.KahaCommitCommand;
|
||||||
@ -113,7 +119,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
static final int OPEN_STATE = 2;
|
static final int OPEN_STATE = 2;
|
||||||
static final long NOT_ACKED = -1;
|
static final long NOT_ACKED = -1;
|
||||||
|
|
||||||
static final int VERSION = 5;
|
static final int VERSION = 6;
|
||||||
|
|
||||||
protected class Metadata {
|
protected class Metadata {
|
||||||
protected Page<Metadata> page;
|
protected Page<Metadata> page;
|
||||||
@ -738,7 +744,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
long undoCounter=0;
|
long undoCounter=0;
|
||||||
|
|
||||||
// Go through all the destinations to see if they have messages past the lastAppendLocation
|
// Go through all the destinations to see if they have messages past the lastAppendLocation
|
||||||
for (StoredDestination sd : storedDestinations.values()) {
|
for (String key : storedDestinations.keySet()) {
|
||||||
|
StoredDestination sd = storedDestinations.get(key);
|
||||||
|
|
||||||
final ArrayList<Long> matches = new ArrayList<Long>();
|
final ArrayList<Long> matches = new ArrayList<Long>();
|
||||||
// Find all the Locations that are >= than the last Append Location.
|
// Find all the Locations that are >= than the last Append Location.
|
||||||
@ -755,6 +762,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
sd.messageIdIndex.remove(tx, keys.messageId);
|
sd.messageIdIndex.remove(tx, keys.messageId);
|
||||||
metadata.producerSequenceIdTracker.rollback(keys.messageId);
|
metadata.producerSequenceIdTracker.rollback(keys.messageId);
|
||||||
undoCounter++;
|
undoCounter++;
|
||||||
|
decrementAndSubSizeToStoreStat(key, keys.location.getSize());
|
||||||
// TODO: do we need to modify the ack positions for the pub sub case?
|
// TODO: do we need to modify the ack positions for the pub sub case?
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -858,6 +866,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
sd.messageIdIndex.remove(tx, keys.messageId);
|
sd.messageIdIndex.remove(tx, keys.messageId);
|
||||||
LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
|
LOG.info("[" + sdEntry.getKey() + "] dropped: " + keys.messageId + " at corrupt location: " + keys.location);
|
||||||
undoCounter++;
|
undoCounter++;
|
||||||
|
decrementAndSubSizeToStoreStat(sdEntry.getKey(), keys.location.getSize());
|
||||||
// TODO: do we need to modify the ack positions for the pub sub case?
|
// TODO: do we need to modify the ack positions for the pub sub case?
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -1312,6 +1321,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
if (previous == null) {
|
if (previous == null) {
|
||||||
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
|
previous = sd.messageIdIndex.put(tx, command.getMessageId(), id);
|
||||||
if (previous == null) {
|
if (previous == null) {
|
||||||
|
incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
|
||||||
sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
|
sd.orderIndex.put(tx, priority, id, new MessageKeys(command.getMessageId(), location));
|
||||||
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
|
if (sd.subscriptions != null && !sd.subscriptions.isEmpty(tx)) {
|
||||||
addAckLocationForNewMessage(tx, sd, id);
|
addAckLocationForNewMessage(tx, sd, id);
|
||||||
@ -1337,7 +1347,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
}
|
}
|
||||||
// record this id in any event, initial send or recovery
|
// record this id in any event, initial send or recovery
|
||||||
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
|
metadata.producerSequenceIdTracker.isDuplicate(command.getMessageId());
|
||||||
return id;
|
|
||||||
|
return id;
|
||||||
}
|
}
|
||||||
|
|
||||||
void trackPendingAdd(KahaDestination destination, Long seq) {
|
void trackPendingAdd(KahaDestination destination, Long seq) {
|
||||||
@ -1367,9 +1378,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
new MessageKeys(command.getMessageId(), location)
|
new MessageKeys(command.getMessageId(), location)
|
||||||
);
|
);
|
||||||
sd.locationIndex.put(tx, location, id);
|
sd.locationIndex.put(tx, location, id);
|
||||||
|
incrementAndAddSizeToStoreStat(command.getDestination(), location.getSize());
|
||||||
// on first update previous is original location, on recovery/replay it may be the updated location
|
// on first update previous is original location, on recovery/replay it may be the updated location
|
||||||
if(previousKeys != null && !previousKeys.location.equals(location)) {
|
if(previousKeys != null && !previousKeys.location.equals(location)) {
|
||||||
sd.locationIndex.remove(tx, previousKeys.location);
|
sd.locationIndex.remove(tx, previousKeys.location);
|
||||||
|
decrementAndSubSizeToStoreStat(command.getDestination(), previousKeys.location.getSize());
|
||||||
}
|
}
|
||||||
metadata.lastUpdate = location;
|
metadata.lastUpdate = location;
|
||||||
} else {
|
} else {
|
||||||
@ -1387,6 +1400,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
|
MessageKeys keys = sd.orderIndex.remove(tx, sequenceId);
|
||||||
if (keys != null) {
|
if (keys != null) {
|
||||||
sd.locationIndex.remove(tx, keys.location);
|
sd.locationIndex.remove(tx, keys.location);
|
||||||
|
decrementAndSubSizeToStoreStat(command.getDestination(), keys.location.getSize());
|
||||||
recordAckMessageReferenceLocation(ackLocation, keys.location);
|
recordAckMessageReferenceLocation(ackLocation, keys.location);
|
||||||
metadata.lastUpdate = ackLocation;
|
metadata.lastUpdate = ackLocation;
|
||||||
} else if (LOG.isDebugEnabled()) {
|
} else if (LOG.isDebugEnabled()) {
|
||||||
@ -1414,7 +1428,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
recordAckMessageReferenceLocation(ackLocation, keys.location);
|
recordAckMessageReferenceLocation(ackLocation, keys.location);
|
||||||
}
|
}
|
||||||
// The following method handles deleting un-referenced messages.
|
// The following method handles deleting un-referenced messages.
|
||||||
removeAckLocation(tx, sd, subscriptionKey, sequence);
|
removeAckLocation(command, tx, sd, subscriptionKey, sequence);
|
||||||
metadata.lastUpdate = ackLocation;
|
metadata.lastUpdate = ackLocation;
|
||||||
} else if (LOG.isDebugEnabled()) {
|
} else if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
|
LOG.debug("no message sequence exists for id: " + command.getMessageId() + " and sub: " + command.getSubscriptionKey());
|
||||||
@ -1470,6 +1484,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
String key = key(command.getDestination());
|
String key = key(command.getDestination());
|
||||||
storedDestinations.remove(key);
|
storedDestinations.remove(key);
|
||||||
metadata.destinations.remove(tx, key);
|
metadata.destinations.remove(tx, key);
|
||||||
|
clearStoreStats(command.getDestination());
|
||||||
|
storeCache.remove(key(command.getDestination()));
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
|
void updateIndex(Transaction tx, KahaSubscriptionCommand command, Location location) throws IOException {
|
||||||
@ -1494,13 +1510,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
sd.subLocations.remove(tx, subscriptionKey);
|
sd.subLocations.remove(tx, subscriptionKey);
|
||||||
sd.subscriptionAcks.remove(tx, subscriptionKey);
|
sd.subscriptionAcks.remove(tx, subscriptionKey);
|
||||||
sd.subscriptionCache.remove(subscriptionKey);
|
sd.subscriptionCache.remove(subscriptionKey);
|
||||||
removeAckLocationsForSub(tx, sd, subscriptionKey);
|
removeAckLocationsForSub(command, tx, sd, subscriptionKey);
|
||||||
|
|
||||||
if (sd.subscriptions.isEmpty(tx)) {
|
if (sd.subscriptions.isEmpty(tx)) {
|
||||||
// remove the stored destination
|
// remove the stored destination
|
||||||
KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
|
KahaRemoveDestinationCommand removeDestinationCommand = new KahaRemoveDestinationCommand();
|
||||||
removeDestinationCommand.setDestination(command.getDestination());
|
removeDestinationCommand.setDestination(command.getDestination());
|
||||||
updateIndex(tx, removeDestinationCommand, null);
|
updateIndex(tx, removeDestinationCommand, null);
|
||||||
|
clearStoreStats(command.getDestination());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1879,6 +1896,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
class StoredDestination {
|
class StoredDestination {
|
||||||
|
|
||||||
MessageOrderIndex orderIndex = new MessageOrderIndex();
|
MessageOrderIndex orderIndex = new MessageOrderIndex();
|
||||||
@ -1912,6 +1930,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
|
|
||||||
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
|
protected class StoredDestinationMarshaller extends VariableMarshaller<StoredDestination> {
|
||||||
|
|
||||||
|
final MessageKeysMarshaller messageKeysMarshaller = new MessageKeysMarshaller();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public StoredDestination readPayload(final DataInput dataIn) throws IOException {
|
public StoredDestination readPayload(final DataInput dataIn) throws IOException {
|
||||||
final StoredDestination value = new StoredDestination();
|
final StoredDestination value = new StoredDestination();
|
||||||
@ -1996,12 +2016,12 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
public void execute(Transaction tx) throws IOException {
|
public void execute(Transaction tx) throws IOException {
|
||||||
value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
|
value.orderIndex.lowPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
|
||||||
value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
value.orderIndex.lowPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||||
value.orderIndex.lowPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
|
value.orderIndex.lowPriorityIndex.setValueMarshaller(messageKeysMarshaller);
|
||||||
value.orderIndex.lowPriorityIndex.load(tx);
|
value.orderIndex.lowPriorityIndex.load(tx);
|
||||||
|
|
||||||
value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
|
value.orderIndex.highPriorityIndex = new BTreeIndex<Long, MessageKeys>(pageFile, tx.allocate());
|
||||||
value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
value.orderIndex.highPriorityIndex.setKeyMarshaller(LongMarshaller.INSTANCE);
|
||||||
value.orderIndex.highPriorityIndex.setValueMarshaller(MessageKeysMarshaller.INSTANCE);
|
value.orderIndex.highPriorityIndex.setValueMarshaller(messageKeysMarshaller);
|
||||||
value.orderIndex.highPriorityIndex.load(tx);
|
value.orderIndex.highPriorityIndex.load(tx);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
@ -2100,7 +2120,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
// Figure out the next key using the last entry in the destination.
|
// Figure out the next key using the last entry in the destination.
|
||||||
rc.orderIndex.configureLast(tx);
|
rc.orderIndex.configureLast(tx);
|
||||||
|
|
||||||
rc.locationIndex.setKeyMarshaller(org.apache.activemq.store.kahadb.disk.util.LocationMarshaller.INSTANCE);
|
rc.locationIndex.setKeyMarshaller(new LocationSizeMarshaller());
|
||||||
rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
|
rc.locationIndex.setValueMarshaller(LongMarshaller.INSTANCE);
|
||||||
rc.locationIndex.load(tx);
|
rc.locationIndex.load(tx);
|
||||||
|
|
||||||
@ -2202,6 +2222,133 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
return rc;
|
return rc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Clear the counter for the destination, if one exists.
|
||||||
|
*
|
||||||
|
* @param kahaDestination
|
||||||
|
*/
|
||||||
|
protected void clearStoreStats(KahaDestination kahaDestination) {
|
||||||
|
MessageStoreStatistics storeStats = getStoreStats(key(kahaDestination));
|
||||||
|
if (storeStats != null) {
|
||||||
|
storeStats.reset();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update MessageStoreStatistics
|
||||||
|
*
|
||||||
|
* @param kahaDestination
|
||||||
|
* @param size
|
||||||
|
*/
|
||||||
|
protected void incrementAndAddSizeToStoreStat(KahaDestination kahaDestination, long size) {
|
||||||
|
incrementAndAddSizeToStoreStat(key(kahaDestination), size);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void incrementAndAddSizeToStoreStat(String kahaDestKey, long size) {
|
||||||
|
MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
|
||||||
|
if (storeStats != null) {
|
||||||
|
storeStats.getMessageCount().increment();
|
||||||
|
if (size > 0) {
|
||||||
|
storeStats.getMessageSize().addSize(size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void decrementAndSubSizeToStoreStat(KahaDestination kahaDestination, long size) {
|
||||||
|
decrementAndSubSizeToStoreStat(key(kahaDestination), size);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void decrementAndSubSizeToStoreStat(String kahaDestKey, long size) {
|
||||||
|
MessageStoreStatistics storeStats = getStoreStats(kahaDestKey);
|
||||||
|
if (storeStats != null) {
|
||||||
|
storeStats.getMessageCount().decrement();
|
||||||
|
if (size > 0) {
|
||||||
|
storeStats.getMessageSize().addSize(-size);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This is a map to cache DestinationStatistics for a specific
|
||||||
|
* KahaDestination key
|
||||||
|
*/
|
||||||
|
protected final Map<String, MessageStore> storeCache =
|
||||||
|
new ConcurrentHashMap<String, MessageStore>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Locate the storeMessageSize counter for this KahaDestination
|
||||||
|
* @param kahaDestination
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
protected MessageStoreStatistics getStoreStats(String kahaDestKey) {
|
||||||
|
MessageStoreStatistics storeStats = null;
|
||||||
|
try {
|
||||||
|
MessageStore messageStore = storeCache.get(kahaDestKey);
|
||||||
|
if (messageStore != null) {
|
||||||
|
storeStats = messageStore.getMessageStoreStatistics();
|
||||||
|
}
|
||||||
|
} catch (Exception e1) {
|
||||||
|
LOG.error("Getting size counter of destination failed", e1);
|
||||||
|
}
|
||||||
|
|
||||||
|
return storeStats;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine whether this Destination matches the DestinationType
|
||||||
|
*
|
||||||
|
* @param destination
|
||||||
|
* @param type
|
||||||
|
* @return
|
||||||
|
*/
|
||||||
|
protected boolean matchType(Destination destination,
|
||||||
|
KahaDestination.DestinationType type) {
|
||||||
|
if (destination instanceof Topic
|
||||||
|
&& type.equals(KahaDestination.DestinationType.TOPIC)) {
|
||||||
|
return true;
|
||||||
|
} else if (destination instanceof Queue
|
||||||
|
&& type.equals(KahaDestination.DestinationType.QUEUE)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
class LocationSizeMarshaller implements Marshaller<Location> {
|
||||||
|
|
||||||
|
public LocationSizeMarshaller() {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
public Location readPayload(DataInput dataIn) throws IOException {
|
||||||
|
Location rc = new Location();
|
||||||
|
rc.setDataFileId(dataIn.readInt());
|
||||||
|
rc.setOffset(dataIn.readInt());
|
||||||
|
if (metadata.version >= 6) {
|
||||||
|
rc.setSize(dataIn.readInt());
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void writePayload(Location object, DataOutput dataOut)
|
||||||
|
throws IOException {
|
||||||
|
dataOut.writeInt(object.getDataFileId());
|
||||||
|
dataOut.writeInt(object.getOffset());
|
||||||
|
dataOut.writeInt(object.getSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getFixedSize() {
|
||||||
|
return 12;
|
||||||
|
}
|
||||||
|
|
||||||
|
public Location deepCopy(Location source) {
|
||||||
|
return new Location(source);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isDeepCopySupported() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
|
private void addAckLocation(Transaction tx, StoredDestination sd, Long messageSequence, String subscriptionKey) throws IOException {
|
||||||
SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
|
SequenceSet sequences = sd.ackPositions.get(tx, subscriptionKey);
|
||||||
if (sequences == null) {
|
if (sequences == null) {
|
||||||
@ -2269,7 +2416,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void removeAckLocationsForSub(Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
|
private void removeAckLocationsForSub(KahaSubscriptionCommand command,
|
||||||
|
Transaction tx, StoredDestination sd, String subscriptionKey) throws IOException {
|
||||||
if (!sd.ackPositions.isEmpty(tx)) {
|
if (!sd.ackPositions.isEmpty(tx)) {
|
||||||
SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
|
SequenceSet sequences = sd.ackPositions.remove(tx, subscriptionKey);
|
||||||
if (sequences == null || sequences.isEmpty()) {
|
if (sequences == null || sequences.isEmpty()) {
|
||||||
@ -2302,6 +2450,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
sd.locationIndex.remove(tx, entry.getValue().location);
|
sd.locationIndex.remove(tx, entry.getValue().location);
|
||||||
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
|
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
|
||||||
sd.orderIndex.remove(tx, entry.getKey());
|
sd.orderIndex.remove(tx, entry.getKey());
|
||||||
|
decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2314,7 +2463,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
* @param messageSequence
|
* @param messageSequence
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
private void removeAckLocation(Transaction tx, StoredDestination sd, String subscriptionKey, Long messageSequence) throws IOException {
|
private void removeAckLocation(KahaRemoveMessageCommand command,
|
||||||
|
Transaction tx, StoredDestination sd, String subscriptionKey,
|
||||||
|
Long messageSequence) throws IOException {
|
||||||
// Remove the sub from the previous location set..
|
// Remove the sub from the previous location set..
|
||||||
if (messageSequence != null) {
|
if (messageSequence != null) {
|
||||||
SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
|
SequenceSet range = sd.ackPositions.get(tx, subscriptionKey);
|
||||||
@ -2347,6 +2498,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||||||
sd.locationIndex.remove(tx, entry.getValue().location);
|
sd.locationIndex.remove(tx, entry.getValue().location);
|
||||||
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
|
sd.messageIdIndex.remove(tx, entry.getValue().messageId);
|
||||||
sd.orderIndex.remove(tx, entry.getKey());
|
sd.orderIndex.remove(tx, entry.getKey());
|
||||||
|
decrementAndSubSizeToStoreStat(command.getDestination(), entry.getValue().location.getSize());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -197,25 +197,6 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
|
|||||||
return msg;
|
return msg;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getMessageCount() throws IOException {
|
|
||||||
synchronized(indexMutex) {
|
|
||||||
return pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
|
|
||||||
@Override
|
|
||||||
public Integer execute(Transaction tx) throws IOException {
|
|
||||||
// Iterate through all index entries to get a count of messages in the destination.
|
|
||||||
StoredDestination sd = getStoredDestination(dest, tx);
|
|
||||||
int rc=0;
|
|
||||||
for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
|
|
||||||
iterator.next();
|
|
||||||
rc++;
|
|
||||||
}
|
|
||||||
return rc;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void recover(final MessageRecoveryListener listener) throws Exception {
|
public void recover(final MessageRecoveryListener listener) throws Exception {
|
||||||
synchronized(indexMutex) {
|
synchronized(indexMutex) {
|
||||||
@ -297,6 +278,27 @@ public class TempKahaDBStore extends TempMessageDatabase implements PersistenceA
|
|||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recoverMessageStoreStatistics() throws IOException {
|
||||||
|
int count = 0;
|
||||||
|
synchronized(indexMutex) {
|
||||||
|
count = pageFile.tx().execute(new Transaction.CallableClosure<Integer, IOException>(){
|
||||||
|
@Override
|
||||||
|
public Integer execute(Transaction tx) throws IOException {
|
||||||
|
// Iterate through all index entries to get a count of messages in the destination.
|
||||||
|
StoredDestination sd = getStoredDestination(dest, tx);
|
||||||
|
int rc=0;
|
||||||
|
for (Iterator<Entry<String, Long>> iterator = sd.messageIdIndex.iterator(tx); iterator.hasNext();) {
|
||||||
|
iterator.next();
|
||||||
|
rc++;
|
||||||
|
}
|
||||||
|
return rc;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
getMessageStoreStatistics().getMessageCount().setCount(count);
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
|
class KahaDBTopicMessageStore extends KahaDBMessageStore implements TopicMessageStore {
|
||||||
|
@ -22,8 +22,13 @@ import java.io.IOException;
|
|||||||
import org.apache.activemq.store.kahadb.disk.journal.Location;
|
import org.apache.activemq.store.kahadb.disk.journal.Location;
|
||||||
|
|
||||||
public class LocationMarshaller implements Marshaller<Location> {
|
public class LocationMarshaller implements Marshaller<Location> {
|
||||||
|
|
||||||
public final static LocationMarshaller INSTANCE = new LocationMarshaller();
|
public final static LocationMarshaller INSTANCE = new LocationMarshaller();
|
||||||
|
|
||||||
|
public LocationMarshaller () {
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
public Location readPayload(DataInput dataIn) throws IOException {
|
public Location readPayload(DataInput dataIn) throws IOException {
|
||||||
Location rc = new Location();
|
Location rc = new Location();
|
||||||
rc.setDataFileId(dataIn.readInt());
|
rc.setDataFileId(dataIn.readInt());
|
||||||
|
@ -834,7 +834,7 @@ class LevelDBStore extends LockableServiceSupport with BrokerServiceAware with P
|
|||||||
cursorPosition = cursorResetPosition
|
cursorPosition = cursorResetPosition
|
||||||
}
|
}
|
||||||
|
|
||||||
def getMessageCount: Int = {
|
override def getMessageCount: Int = {
|
||||||
return db.collectionSize(key).toInt
|
return db.collectionSize(key).toInt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -490,11 +490,6 @@ public class StoreQueueCursorOrderTest {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getMessageCount() throws IOException {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void resetBatching() {
|
public void resetBatching() {
|
||||||
|
|
||||||
@ -513,5 +508,10 @@ public class StoreQueueCursorOrderTest {
|
|||||||
batch.incrementAndGet();
|
batch.incrementAndGet();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void recoverMessageStoreStatistics() throws IOException {
|
||||||
|
this.getMessageStoreStatistics().reset();
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
266
activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
Normal file
266
activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeStatTest.java
Normal file
@ -0,0 +1,266 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import javax.jms.BytesMessage;
|
||||||
|
import javax.jms.Connection;
|
||||||
|
import javax.jms.DeliveryMode;
|
||||||
|
import javax.jms.JMSException;
|
||||||
|
import javax.jms.MessageConsumer;
|
||||||
|
import javax.jms.MessageProducer;
|
||||||
|
import javax.jms.Queue;
|
||||||
|
import javax.jms.QueueSession;
|
||||||
|
import javax.jms.Session;
|
||||||
|
import javax.jms.Topic;
|
||||||
|
import javax.jms.TopicSession;
|
||||||
|
|
||||||
|
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.TransportConnector;
|
||||||
|
import org.apache.activemq.broker.region.Destination;
|
||||||
|
import org.apache.activemq.command.ActiveMQDestination;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.ActiveMQTopic;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test checks that KahaDB properly sets the new storeMessageSize statistic.
|
||||||
|
*
|
||||||
|
* AMQ-5748
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public abstract class AbstractMessageStoreSizeStatTest {
|
||||||
|
protected static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(AbstractMessageStoreSizeStatTest.class);
|
||||||
|
|
||||||
|
|
||||||
|
protected BrokerService broker;
|
||||||
|
protected URI brokerConnectURI;
|
||||||
|
protected String defaultQueueName = "test.queue";
|
||||||
|
protected static int messageSize = 1000;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void startBroker() throws Exception {
|
||||||
|
setUpBroker(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void setUpBroker(boolean clearDataDir) throws Exception {
|
||||||
|
|
||||||
|
broker = new BrokerService();
|
||||||
|
this.initPersistence(broker);
|
||||||
|
//set up a transport
|
||||||
|
TransportConnector connector = broker
|
||||||
|
.addConnector(new TransportConnector());
|
||||||
|
connector.setUri(new URI("tcp://0.0.0.0:0"));
|
||||||
|
connector.setName("tcp");
|
||||||
|
|
||||||
|
broker.start();
|
||||||
|
broker.waitUntilStarted();
|
||||||
|
brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void stopBroker() throws Exception {
|
||||||
|
broker.stop();
|
||||||
|
broker.waitUntilStopped();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void initPersistence(BrokerService brokerService) throws IOException;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMessageSize() throws Exception {
|
||||||
|
Destination dest = publishTestMessages(200);
|
||||||
|
verifyStats(dest, 200, 200 * messageSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMessageSizeAfterConsumption() throws Exception {
|
||||||
|
|
||||||
|
Destination dest = publishTestMessages(200);
|
||||||
|
verifyStats(dest, 200, 200 * messageSize);
|
||||||
|
|
||||||
|
consumeTestMessages();
|
||||||
|
Thread.sleep(3000);
|
||||||
|
verifyStats(dest, 0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMessageSizeDurable() throws Exception {
|
||||||
|
|
||||||
|
Destination dest = publishTestMessagesDurable();
|
||||||
|
|
||||||
|
//verify the count and size
|
||||||
|
verifyStats(dest, 200, 200 * messageSize);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMessageSizeAfterDestinationDeletion() throws Exception {
|
||||||
|
Destination dest = publishTestMessages(200);
|
||||||
|
verifyStats(dest, 200, 200 * messageSize);
|
||||||
|
|
||||||
|
//check that the size is 0 after deletion
|
||||||
|
broker.removeDestination(dest.getActiveMQDestination());
|
||||||
|
verifyStats(dest, 0, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void verifyStats(Destination dest, int count, long minimumSize) throws Exception {
|
||||||
|
MessageStore messageStore = dest.getMessageStore();
|
||||||
|
MessageStoreStatistics storeStats = dest.getMessageStore().getMessageStoreStatistics();
|
||||||
|
assertEquals(messageStore.getMessageCount(), count);
|
||||||
|
assertEquals(messageStore.getMessageCount(),
|
||||||
|
storeStats.getMessageCount().getCount());
|
||||||
|
assertEquals(messageStore.getMessageSize(),
|
||||||
|
messageStore.getMessageStoreStatistics().getMessageSize().getTotalSize());
|
||||||
|
if (count > 0) {
|
||||||
|
assertTrue(storeStats.getMessageSize().getTotalSize() > minimumSize);
|
||||||
|
} else {
|
||||||
|
assertEquals(storeStats.getMessageSize().getTotalSize(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Generate random 1 megabyte messages
|
||||||
|
* @param session
|
||||||
|
* @return
|
||||||
|
* @throws JMSException
|
||||||
|
*/
|
||||||
|
protected BytesMessage createMessage(Session session) throws JMSException {
|
||||||
|
final BytesMessage message = session.createBytesMessage();
|
||||||
|
final byte[] data = new byte[messageSize];
|
||||||
|
final Random rng = new Random();
|
||||||
|
rng.nextBytes(data);
|
||||||
|
message.writeBytes(data);
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
protected Destination publishTestMessages(int count) throws Exception {
|
||||||
|
return publishTestMessages(count, defaultQueueName);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Destination publishTestMessages(int count, String queueName) throws Exception {
|
||||||
|
// create a new queue
|
||||||
|
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
|
||||||
|
queueName);
|
||||||
|
|
||||||
|
Destination dest = broker.getDestination(activeMqQueue);
|
||||||
|
|
||||||
|
// Start the connection
|
||||||
|
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
|
||||||
|
.createConnection();
|
||||||
|
connection.setClientID("clientId" + queueName);
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(false,
|
||||||
|
QueueSession.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue = session.createQueue(queueName);
|
||||||
|
|
||||||
|
try {
|
||||||
|
// publish a bunch of non-persistent messages to fill up the temp
|
||||||
|
// store
|
||||||
|
MessageProducer prod = session.createProducer(queue);
|
||||||
|
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
for (int i = 0; i < count; i++) {
|
||||||
|
prod.send(createMessage(session));
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
connection.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
return dest;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Destination consumeTestMessages() throws Exception {
|
||||||
|
return consumeTestMessages(defaultQueueName);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Destination consumeTestMessages(String queueName) throws Exception {
|
||||||
|
// create a new queue
|
||||||
|
final ActiveMQDestination activeMqQueue = new ActiveMQQueue(
|
||||||
|
queueName);
|
||||||
|
|
||||||
|
Destination dest = broker.getDestination(activeMqQueue);
|
||||||
|
|
||||||
|
// Start the connection
|
||||||
|
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
|
||||||
|
.createConnection();
|
||||||
|
connection.setClientID("clientId2" + queueName);
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(false,
|
||||||
|
QueueSession.AUTO_ACKNOWLEDGE);
|
||||||
|
Queue queue = session.createQueue(queueName);
|
||||||
|
|
||||||
|
try {
|
||||||
|
MessageConsumer consumer = session.createConsumer(queue);
|
||||||
|
for (int i = 0; i < 200; i++) {
|
||||||
|
consumer.receive();
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
connection.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
return dest;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Destination publishTestMessagesDurable() throws Exception {
|
||||||
|
// create a new queue
|
||||||
|
final ActiveMQDestination activeMqTopic = new ActiveMQTopic(
|
||||||
|
"test.topic");
|
||||||
|
|
||||||
|
Destination dest = broker.getDestination(activeMqTopic);
|
||||||
|
|
||||||
|
// Start the connection
|
||||||
|
Connection connection = new ActiveMQConnectionFactory(brokerConnectURI)
|
||||||
|
.createConnection();
|
||||||
|
connection.setClientID("clientId");
|
||||||
|
connection.start();
|
||||||
|
Session session = connection.createSession(false,
|
||||||
|
TopicSession.AUTO_ACKNOWLEDGE);
|
||||||
|
Topic topic = session.createTopic("test.topic");
|
||||||
|
session.createDurableSubscriber(topic, "sub1");
|
||||||
|
|
||||||
|
try {
|
||||||
|
// publish a bunch of non-persistent messages to fill up the temp
|
||||||
|
// store
|
||||||
|
MessageProducer prod = session.createProducer(topic);
|
||||||
|
prod.setDeliveryMode(DeliveryMode.PERSISTENT);
|
||||||
|
for (int i = 0; i < 200; i++) {
|
||||||
|
prod.send(createMessage(session));
|
||||||
|
}
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
connection.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
return dest;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
98
activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java
Normal file
98
activemq-unit-tests/src/test/java/org/apache/activemq/store/AbstractMessageStoreSizeTest.java
Normal file
@ -0,0 +1,98 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.ConnectionContext;
|
||||||
|
import org.apache.activemq.command.ActiveMQMessage;
|
||||||
|
import org.apache.activemq.command.ActiveMQQueue;
|
||||||
|
import org.apache.activemq.command.MessageId;
|
||||||
|
import org.apache.activemq.command.ProducerId;
|
||||||
|
import org.apache.activemq.util.ByteSequence;
|
||||||
|
import org.apache.activemq.util.IdGenerator;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
|
||||||
|
* compute the size of the messages in the store.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public abstract class AbstractMessageStoreSizeTest {
|
||||||
|
|
||||||
|
protected static final IdGenerator id = new IdGenerator();
|
||||||
|
protected ActiveMQQueue destination = new ActiveMQQueue("Test");
|
||||||
|
protected ProducerId producerId = new ProducerId("1.1.1");
|
||||||
|
protected static final int MESSAGE_COUNT = 20;
|
||||||
|
protected static String dataDirectory = "target/test-amq-5748/datadb";
|
||||||
|
protected static int testMessageSize = 1000;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void init() throws Exception {
|
||||||
|
this.initStore();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void destroy() throws Exception {
|
||||||
|
this.destroyStore();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract void initStore() throws Exception;
|
||||||
|
|
||||||
|
|
||||||
|
protected abstract void destroyStore() throws Exception;
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method tests that the message size exists after writing a bunch of messages to the store.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMessageSize() throws Exception {
|
||||||
|
writeMessages();
|
||||||
|
long messageSize = getMessageStore().getMessageSize();
|
||||||
|
assertTrue(getMessageStore().getMessageCount() == 20);
|
||||||
|
assertTrue(messageSize > 20 * testMessageSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write random byte messages to the store for testing.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
protected void writeMessages() throws Exception {
|
||||||
|
final ConnectionContext context = new ConnectionContext();
|
||||||
|
|
||||||
|
for (int i = 0; i < MESSAGE_COUNT; i++) {
|
||||||
|
ActiveMQMessage message = new ActiveMQMessage();
|
||||||
|
final byte[] data = new byte[testMessageSize];
|
||||||
|
final Random rng = new Random();
|
||||||
|
rng.nextBytes(data);
|
||||||
|
message.setContent(new ByteSequence(data));
|
||||||
|
message.setDestination(destination);
|
||||||
|
message.setMessageId(new MessageId(id.generateId() + ":1", i));
|
||||||
|
getMessageStore().addMessage(context, message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected abstract MessageStore getMessageStore();
|
||||||
|
}
|
147
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
Normal file
147
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/AbstractKahaDBMessageStoreSizeTest.java
Normal file
@ -0,0 +1,147 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.activemq.store.AbstractMessageStoreSizeTest;
|
||||||
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.apache.activemq.store.PersistenceAdapter;
|
||||||
|
import org.apache.activemq.store.kahadb.disk.util.LocationMarshaller;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.commons.io.filefilter.TrueFileFilter;
|
||||||
|
import org.apache.commons.io.filefilter.WildcardFileFilter;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
|
||||||
|
* compute the size of the messages in the store.
|
||||||
|
*
|
||||||
|
* For KahaDB specifically, the size was not being stored in in the index ({@link LocationMarshaller}). LocationMarshaller
|
||||||
|
* has been updated to include an option to include the size in the serialized value. This way the message
|
||||||
|
* size will be persisted in the index and be available between broker restarts without needing to rebuild the index.
|
||||||
|
* Note that the KahaDB version has been incremented from 5 to 6 because the index will need to be rebuild when a version
|
||||||
|
* 5 index is detected since it will be detected as corrupt.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public abstract class AbstractKahaDBMessageStoreSizeTest extends AbstractMessageStoreSizeTest {
|
||||||
|
|
||||||
|
MessageStore messageStore;
|
||||||
|
PersistenceAdapter store;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initStore() throws Exception {
|
||||||
|
createStore(true, dataDirectory);
|
||||||
|
}
|
||||||
|
|
||||||
|
abstract protected void createStore(boolean deleteAllMessages, String directory) throws Exception;
|
||||||
|
|
||||||
|
abstract protected String getVersion5Dir();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroyStore() throws Exception {
|
||||||
|
if (store != null) {
|
||||||
|
store.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method tests that the message sizes exist for all messages that exist after messages are recovered
|
||||||
|
* off of disk.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMessageSizeStoreRecovery() throws Exception {
|
||||||
|
writeMessages();
|
||||||
|
store.stop();
|
||||||
|
|
||||||
|
createStore(false, dataDirectory);
|
||||||
|
writeMessages();
|
||||||
|
long messageSize = messageStore.getMessageSize();
|
||||||
|
assertEquals(40, messageStore.getMessageCount());
|
||||||
|
assertTrue(messageSize > 40 * testMessageSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method tests that a version 5 store with an old index still works but returns 0 for messgage sizes.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMessageSizeStoreRecoveryVersion5() throws Exception {
|
||||||
|
store.stop();
|
||||||
|
|
||||||
|
//Copy over an existing version 5 store with messages
|
||||||
|
File dataDir = new File(dataDirectory);
|
||||||
|
if (dataDir.exists())
|
||||||
|
FileUtils.deleteDirectory(new File(dataDirectory));
|
||||||
|
FileUtils.copyDirectory(new File(getVersion5Dir()),
|
||||||
|
dataDir);
|
||||||
|
|
||||||
|
//reload store
|
||||||
|
createStore(false, dataDirectory);
|
||||||
|
|
||||||
|
//make sure size is 0
|
||||||
|
long messageSize = messageStore.getMessageSize();
|
||||||
|
assertTrue(messageStore.getMessageCount() == 20);
|
||||||
|
assertTrue(messageSize == 0);
|
||||||
|
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method tests that a version 5 store with existing messages will correctly be recovered and converted
|
||||||
|
* to version 6. After index deletion, the index will be rebuilt and will include message sizes.
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMessageSizeStoreRecoveryVersion5RebuildIndex() throws Exception {
|
||||||
|
store.stop();
|
||||||
|
|
||||||
|
//Copy over an existing version 5 store with messages
|
||||||
|
File dataDir = new File(dataDirectory);
|
||||||
|
if (dataDir.exists())
|
||||||
|
FileUtils.deleteDirectory(new File(dataDirectory));
|
||||||
|
FileUtils.copyDirectory(new File(getVersion5Dir()),
|
||||||
|
dataDir);
|
||||||
|
for (File index : FileUtils.listFiles(new File(dataDirectory), new WildcardFileFilter("*.data"), TrueFileFilter.INSTANCE)) {
|
||||||
|
FileUtils.deleteQuietly(index);
|
||||||
|
}
|
||||||
|
|
||||||
|
//append more messages...at this point the index should be rebuilt
|
||||||
|
createStore(false, dataDirectory);
|
||||||
|
writeMessages();
|
||||||
|
|
||||||
|
//after writing new messages to the existing store, make sure the index is rebuilt and size is correct
|
||||||
|
long messageSize = messageStore.getMessageSize();
|
||||||
|
assertTrue(messageStore.getMessageCount() == 40);
|
||||||
|
assertTrue(messageSize > 40 * testMessageSize);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected MessageStore getMessageStore() {
|
||||||
|
return messageStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
82
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
Normal file
82
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeStatTest.java
Normal file
@ -0,0 +1,82 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.region.Destination;
|
||||||
|
import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test checks that KahaDB properly sets the new storeMessageSize
|
||||||
|
* statistic.
|
||||||
|
*
|
||||||
|
* AMQ-5748
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class KahaDBMessageStoreSizeStatTest extends
|
||||||
|
AbstractMessageStoreSizeStatTest {
|
||||||
|
protected static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(KahaDBMessageStoreSizeStatTest.class);
|
||||||
|
|
||||||
|
File dataFileDir = new File("target/test-amq-5748/stat-datadb");
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setUpBroker(boolean clearDataDir) throws Exception {
|
||||||
|
if (clearDataDir && dataFileDir.exists())
|
||||||
|
FileUtils.cleanDirectory(dataFileDir);
|
||||||
|
super.setUpBroker(clearDataDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void initPersistence(BrokerService brokerService)
|
||||||
|
throws IOException {
|
||||||
|
broker.setPersistent(true);
|
||||||
|
broker.setDataDirectoryFile(dataFileDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that the the counter restores size and works after restart and more
|
||||||
|
* messages are published
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMessageSizeAfterRestartAndPublish() throws Exception {
|
||||||
|
|
||||||
|
Destination dest = publishTestMessages(200);
|
||||||
|
|
||||||
|
// verify the count and size
|
||||||
|
verifyStats(dest, 200, 200 * messageSize);
|
||||||
|
|
||||||
|
// stop, restart broker and publish more messages
|
||||||
|
stopBroker();
|
||||||
|
this.setUpBroker(false);
|
||||||
|
dest = publishTestMessages(200);
|
||||||
|
|
||||||
|
// verify the count and size
|
||||||
|
verifyStats(dest, 400, 400 * messageSize);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
46
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
Normal file
46
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/KahaDBMessageStoreSizeTest.java
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
|
||||||
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
|
||||||
|
* compute the size of the messages in the KahaDB Store.
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class KahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTest {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void createStore(boolean deleteAllMessages, String directory) throws Exception {
|
||||||
|
KahaDBStore kahaDBStore = new KahaDBStore();
|
||||||
|
store = kahaDBStore;
|
||||||
|
kahaDBStore.setJournalMaxFileLength(1024 * 512);
|
||||||
|
kahaDBStore.setDeleteAllMessages(deleteAllMessages);
|
||||||
|
kahaDBStore.setDirectory(new File(directory));
|
||||||
|
kahaDBStore.start();
|
||||||
|
messageStore = store.createQueueMessageStore(destination);
|
||||||
|
messageStore.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getVersion5Dir() {
|
||||||
|
return "src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5";
|
||||||
|
}
|
||||||
|
}
|
134
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
Normal file
134
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeStatTest.java
Normal file
@ -0,0 +1,134 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.broker.region.Destination;
|
||||||
|
import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test checks that KahaDB properly sets the new storeMessageSize
|
||||||
|
* statistic.
|
||||||
|
*
|
||||||
|
* AMQ-5748
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class MultiKahaDBMessageStoreSizeStatTest extends
|
||||||
|
AbstractMessageStoreSizeStatTest {
|
||||||
|
protected static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(MultiKahaDBMessageStoreSizeStatTest.class);
|
||||||
|
|
||||||
|
File dataFileDir = new File("target/test-amq-5748/stat-datadb");
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setUpBroker(boolean clearDataDir) throws Exception {
|
||||||
|
if (clearDataDir && dataFileDir.exists())
|
||||||
|
FileUtils.cleanDirectory(dataFileDir);
|
||||||
|
super.setUpBroker(clearDataDir);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void initPersistence(BrokerService brokerService)
|
||||||
|
throws IOException {
|
||||||
|
broker.setPersistent(true);
|
||||||
|
|
||||||
|
//setup multi-kaha adapter
|
||||||
|
MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter();
|
||||||
|
persistenceAdapter.setDirectory(dataFileDir);
|
||||||
|
|
||||||
|
KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter();
|
||||||
|
kahaStore.setJournalMaxFileLength(1024 * 512);
|
||||||
|
|
||||||
|
//set up a store per destination
|
||||||
|
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
|
||||||
|
filtered.setPersistenceAdapter(kahaStore);
|
||||||
|
filtered.setPerDestination(true);
|
||||||
|
List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
|
||||||
|
stores.add(filtered);
|
||||||
|
|
||||||
|
persistenceAdapter.setFilteredPersistenceAdapters(stores);
|
||||||
|
broker.setPersistenceAdapter(persistenceAdapter);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that the the counter restores size and works after restart and more
|
||||||
|
* messages are published
|
||||||
|
*
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMessageSizeAfterRestartAndPublish() throws Exception {
|
||||||
|
|
||||||
|
Destination dest = publishTestMessages(200);
|
||||||
|
|
||||||
|
// verify the count and size
|
||||||
|
verifyStats(dest, 200, 200 * messageSize);
|
||||||
|
|
||||||
|
// stop, restart broker and publish more messages
|
||||||
|
stopBroker();
|
||||||
|
this.setUpBroker(false);
|
||||||
|
dest = publishTestMessages(200);
|
||||||
|
|
||||||
|
// verify the count and size
|
||||||
|
verifyStats(dest, 400, 400 * messageSize);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMessageSizeAfterRestartAndPublishMultiQueue() throws Exception {
|
||||||
|
|
||||||
|
Destination dest = publishTestMessages(200);
|
||||||
|
|
||||||
|
// verify the count and size
|
||||||
|
verifyStats(dest, 200, 200 * messageSize);
|
||||||
|
assertTrue(broker.getPersistenceAdapter().size() > 200 * messageSize);
|
||||||
|
|
||||||
|
Destination dest2 = publishTestMessages(200, "test.queue2");
|
||||||
|
|
||||||
|
// verify the count and size
|
||||||
|
verifyStats(dest2, 200, 200 * messageSize);
|
||||||
|
assertTrue(broker.getPersistenceAdapter().size() > 400 * messageSize);
|
||||||
|
|
||||||
|
// stop, restart broker and publish more messages
|
||||||
|
stopBroker();
|
||||||
|
this.setUpBroker(false);
|
||||||
|
dest = publishTestMessages(200);
|
||||||
|
dest2 = publishTestMessages(200, "test.queue2");
|
||||||
|
|
||||||
|
// verify the count and size after publishing messages
|
||||||
|
verifyStats(dest, 400, 400 * messageSize);
|
||||||
|
verifyStats(dest2, 400, 400 * messageSize);
|
||||||
|
|
||||||
|
System.out.println(broker.getPersistenceAdapter().size());
|
||||||
|
assertTrue(broker.getPersistenceAdapter().size() > 800 * messageSize);
|
||||||
|
assertTrue(broker.getPersistenceAdapter().size() >=
|
||||||
|
(dest.getMessageStore().getMessageSize() + dest2.getMessageStore().getMessageSize()));
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
68
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java
Normal file
68
activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MultiKahaDBMessageStoreSizeTest.java
Normal file
@ -0,0 +1,68 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.kahadb;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
import org.apache.commons.io.FileUtils;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test is for AMQ-5748 to verify that {@link MessageStore} implements correctly
|
||||||
|
* compute the size of the messages in the store.
|
||||||
|
*
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class MultiKahaDBMessageStoreSizeTest extends AbstractKahaDBMessageStoreSizeTest {
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void createStore(boolean deleteAllMessages, String directory) throws Exception {
|
||||||
|
MultiKahaDBPersistenceAdapter multiStore = new MultiKahaDBPersistenceAdapter();
|
||||||
|
|
||||||
|
store = multiStore;
|
||||||
|
File fileDir = new File(directory);
|
||||||
|
|
||||||
|
if (deleteAllMessages && fileDir.exists()) {
|
||||||
|
FileUtils.cleanDirectory(new File(directory));
|
||||||
|
}
|
||||||
|
|
||||||
|
KahaDBPersistenceAdapter localStore = new KahaDBPersistenceAdapter();
|
||||||
|
localStore.setJournalMaxFileLength(1024 * 512);
|
||||||
|
localStore.setDirectory(new File(directory));
|
||||||
|
|
||||||
|
FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter();
|
||||||
|
filtered.setPersistenceAdapter(localStore);
|
||||||
|
filtered.setPerDestination(true);
|
||||||
|
List<FilteredKahaDBPersistenceAdapter> stores = new ArrayList<>();
|
||||||
|
stores.add(filtered);
|
||||||
|
|
||||||
|
multiStore.setFilteredPersistenceAdapters(stores);
|
||||||
|
multiStore.setDirectory(fileDir);
|
||||||
|
multiStore.start();
|
||||||
|
messageStore = store.createQueueMessageStore(destination);
|
||||||
|
messageStore.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getVersion5Dir() {
|
||||||
|
return "src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5";
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
45
activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
Normal file
45
activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeStatTest.java
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.memory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.activemq.broker.BrokerService;
|
||||||
|
import org.apache.activemq.store.AbstractMessageStoreSizeStatTest;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This test checks that KahaDB properly sets the new storeMessageSize statistic.
|
||||||
|
*
|
||||||
|
* AMQ-5748
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class MemoryMessageStoreSizeStatTest extends AbstractMessageStoreSizeStatTest {
|
||||||
|
protected static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(MemoryMessageStoreSizeStatTest.class);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void initPersistence(BrokerService brokerService) throws IOException {
|
||||||
|
broker.setPersistent(false);
|
||||||
|
broker.setPersistenceAdapter(new MemoryPersistenceAdapter());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
}
|
45
activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeTest.java
Normal file
45
activemq-unit-tests/src/test/java/org/apache/activemq/store/memory/MemoryMessageStoreSizeTest.java
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.activemq.store.memory;
|
||||||
|
|
||||||
|
import org.apache.activemq.store.AbstractMessageStoreSizeTest;
|
||||||
|
import org.apache.activemq.store.MessageStore;
|
||||||
|
|
||||||
|
public class MemoryMessageStoreSizeTest extends AbstractMessageStoreSizeTest {
|
||||||
|
|
||||||
|
MemoryMessageStore messageStore;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void initStore() throws Exception {
|
||||||
|
messageStore = new MemoryMessageStore(destination);
|
||||||
|
messageStore.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void destroyStore() throws Exception {
|
||||||
|
if (messageStore != null) {
|
||||||
|
messageStore.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected MessageStore getMessageStore() {
|
||||||
|
return messageStore;
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
BIN
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db-1.log
Normal file
BIN
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db-1.log
Normal file
Binary file not shown.
BIN
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.data
Normal file
BIN
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.data
Normal file
Binary file not shown.
BIN
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.redo
Normal file
BIN
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MessageStoreTest/version5/db.redo
Normal file
Binary file not shown.
BIN
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db-1.log
Normal file
BIN
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db-1.log
Normal file
Binary file not shown.
BIN
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.data
Normal file
BIN
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.data
Normal file
Binary file not shown.
BIN
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.redo
Normal file
BIN
activemq-unit-tests/src/test/resources/org/apache/activemq/store/kahadb/MultiKahaMessageStoreTest/version5/queue#3a#2f#2fTest/db.redo
Normal file
Binary file not shown.
Loading…
x
Reference in New Issue
Block a user