The JDBCMessageStore now checks hasSpace() when running a message
recovery listener to prevent going past the max configured page size
This commit is contained in:
Christopher L. Shannon (cshannon) 2016-08-26 13:46:42 -04:00
parent 2b1cda1964
commit b9fad53fc6
2 changed files with 189 additions and 5 deletions

View File

@ -39,7 +39,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
*
*/
public class JDBCMessageStore extends AbstractMessageStore {
@ -98,6 +98,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
}
@Override
public void addMessage(final ConnectionContext context, final Message message) throws IOException {
MessageId messageId = message.getMessageId();
if (audit != null && audit.isDuplicate(message)) {
@ -133,6 +134,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
pendingAdditions.add(sequence);
c.onCompletion(new Runnable() {
@Override
public void run() {
// jdbc close or jms commit - while futureOrSequenceLong==null ordered
// work will remain pending on the Queue
@ -207,6 +209,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
}
@Override
public Message getMessage(MessageId messageId) throws IOException {
// Get a connection and pull the message out of the DB
TransactionContext c = persistenceAdapter.getTransactionContext();
@ -245,6 +248,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
}
@Override
public void removeMessage(ConnectionContext context, MessageAck ack) throws IOException {
long seq = ack.getLastMessageId().getFutureOrSequenceLong() != null ?
@ -263,6 +267,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
}
@Override
public void recover(final MessageRecoveryListener listener) throws Exception {
// Get all the Message ids out of the database.
@ -270,14 +275,30 @@ public class JDBCMessageStore extends AbstractMessageStore {
try {
c = persistenceAdapter.getTransactionContext();
adapter.doRecover(c, destination, new JDBCMessageRecoveryListener() {
@Override
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
return listener.recoverMessage(msg);
if (listener.hasSpace()) {
Message msg = (Message) wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
return listener.recoverMessage(msg);
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Message recovery limit reached for MessageRecoveryListener");
}
return false;
}
}
@Override
public boolean recoverMessageReference(String reference) throws Exception {
return listener.recoverMessageReference(new MessageId(reference));
if (listener.hasSpace()) {
return listener.recoverMessageReference(new MessageId(reference));
} else {
if (LOG.isTraceEnabled()) {
LOG.trace("Message recovery limit reached for MessageRecoveryListener");
}
return false;
}
}
});
} catch (SQLException e) {
@ -291,6 +312,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
/**
* @see org.apache.activemq.store.MessageStore#removeAllMessages(ConnectionContext)
*/
@Override
public void removeAllMessages(ConnectionContext context) throws IOException {
// Get a connection and remove the message from the DB
TransactionContext c = persistenceAdapter.getTransactionContext(context);
@ -328,6 +350,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
* @see org.apache.activemq.store.MessageStore#recoverNextMessages(int,
* org.apache.activemq.store.MessageRecoveryListener)
*/
@Override
public void recoverNextMessages(int maxReturned, final MessageRecoveryListener listener) throws Exception {
TransactionContext c = persistenceAdapter.getTransactionContext();
try {
@ -337,6 +360,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
adapter.doRecoverNextMessages(c, destination, perPriorityLastRecovered, minPendingSequeunceId(),
maxReturned, isPrioritizedMessages(), new JDBCMessageRecoveryListener() {
@Override
public boolean recoverMessage(long sequenceId, byte[] data) throws Exception {
Message msg = (Message)wireFormat.unmarshal(new ByteSequence(data));
msg.getMessageId().setBrokerSequenceId(sequenceId);
@ -346,6 +370,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
return true;
}
@Override
public boolean recoverMessageReference(String reference) throws Exception {
if (listener.hasSpace()) {
listener.recoverMessageReference(new MessageId(reference));
@ -370,6 +395,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
/**
* @see org.apache.activemq.store.MessageStore#resetBatching()
*/
@Override
public void resetBatching() {
if (LOG.isTraceEnabled()) {
LOG.trace(this + " resetBatching. last recovered: " + Arrays.toString(perPriorityLastRecovered));
@ -401,6 +427,7 @@ public class JDBCMessageStore extends AbstractMessageStore {
}
@Override
public void setPrioritizedMessages(boolean prioritizedMessages) {
super.setPrioritizedMessages(prioritizedMessages);
}

View File

@ -0,0 +1,157 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.jdbc;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import javax.jms.Connection;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageId;
import org.apache.activemq.store.MessageRecoveryListener;
import org.apache.activemq.store.ProxyTopicMessageStore;
import org.apache.activemq.store.TopicMessageStore;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
public class JDBCPersistenceAdapterExpiredMessageTest {
@Rule
public TemporaryFolder dataFileDir = new TemporaryFolder(new File("target"));
protected BrokerService brokerService;
private AtomicBoolean hasSpaceCalled = new AtomicBoolean();
private int expireSize = 5;
@Before
public void setUp() throws Exception {
hasSpaceCalled.set(false);
brokerService = new BrokerService();
//Wrap the adapter and listener to set a flag to make sure we are calling hasSpace()
//during expiration
JDBCPersistenceAdapter jdbc = new JDBCPersistenceAdapter() {
@Override
public TopicMessageStore createTopicMessageStore(ActiveMQTopic destination) throws IOException {
ProxyTopicMessageStore proxy = new ProxyTopicMessageStore(super.createTopicMessageStore(destination)) {
@Override
public void recover(final MessageRecoveryListener listener) throws Exception {
MessageRecoveryListener delegate = new MessageRecoveryListener() {
@Override
public boolean recoverMessageReference(MessageId ref) throws Exception {
return listener.recoverMessageReference(ref);
}
@Override
public boolean recoverMessage(Message message) throws Exception {
return listener.recoverMessage(message);
}
@Override
public boolean isDuplicate(MessageId ref) {
return listener.isDuplicate(ref);
}
@Override
public boolean hasSpace() {
hasSpaceCalled.set(true);
return listener.hasSpace();
}
};
super.recover(delegate);
}
};
return proxy;
}
};
brokerService.setSchedulerSupport(false);
brokerService.setDataDirectoryFile(dataFileDir.getRoot());
brokerService.setPersistenceAdapter(jdbc);
brokerService.setDeleteAllMessagesOnStartup(true);
PolicyMap policyMap = new PolicyMap();
PolicyEntry defaultEntry = new PolicyEntry();
defaultEntry.setExpireMessagesPeriod(5000);
defaultEntry.setMaxExpirePageSize(expireSize);
policyMap.setDefaultEntry(defaultEntry);
brokerService.setDestinationPolicy(policyMap);
brokerService.start();
}
@After
public void stop() throws Exception {
if (brokerService != null) {
brokerService.stop();
brokerService.waitUntilStopped();
}
}
@Test
public void testMaxExpirePageSize() throws Exception {
final ActiveMQTopic topic = new ActiveMQTopic("test.topic");
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
factory.setClientID("clientId");;
Connection conn = factory.createConnection();
conn.start();
Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
TopicSubscriber sub = sess.createDurableSubscriber(topic, "sub1");
sub.close();
MessageProducer producer = sess.createProducer(topic);
producer.setTimeToLive(1000);
for (int i = 0; i < 50; i++) {
producer.send(sess.createTextMessage("test message: " + i));
}
//There should be exactly 5 messages expired because the limit was hit and it stopped
//The expire messages period is 5 seconds which should give enough time for this assertion
//to pass before expiring more messages
assertTrue(Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
long expired = brokerService.getDestination(topic).getDestinationStatistics().getExpired().getCount();
return expired == expireSize && hasSpaceCalled.get();
}
}, 15000, 1000));
}
}