mirror of https://github.com/apache/activemq.git
https://issues.apache.org/jira/browse/AMQ-3637 - NullPointerException while loading node from kahadb during vmcursor replay. Batch recovery such that expiry can be processed periodically which avoids a nested kahadb transaction that modifies. additional test and some additional logging when recovery is taking a long time
git-svn-id: https://svn.apache.org/repos/asf/activemq/trunk@1214888 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
08b8a354ab
commit
41cdadbe2a
|
@ -2224,7 +2224,7 @@ public class BrokerService implements Service {
|
|||
if (ioExceptionHandler != null) {
|
||||
ioExceptionHandler.handle(exception);
|
||||
} else {
|
||||
LOG.info("Ignoring IO exception, " + exception, exception);
|
||||
LOG.info("No IOExceptionHandler registered, ignoring IO exception, " + exception, exception);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
package org.apache.activemq.broker.region;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.AbstractList;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -25,6 +24,7 @@ import java.util.Comparator;
|
|||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
@ -231,6 +231,79 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
}
|
||||
}
|
||||
|
||||
class BatchMessageRecoveryListener implements MessageRecoveryListener {
|
||||
final LinkedList<Message> toExpire = new LinkedList<Message>();
|
||||
final double totalMessageCount;
|
||||
int recoveredAccumulator = 0;
|
||||
int currentBatchCount;
|
||||
|
||||
BatchMessageRecoveryListener(int totalMessageCount) {
|
||||
this.totalMessageCount = totalMessageCount;
|
||||
currentBatchCount = recoveredAccumulator;
|
||||
}
|
||||
|
||||
public boolean recoverMessage(Message message) {
|
||||
recoveredAccumulator++;
|
||||
if (LOG.isInfoEnabled() && (recoveredAccumulator % 10000) == 0) {
|
||||
LOG.info("cursor for " + getActiveMQDestination().getQualifiedName() + " has recovered "
|
||||
+ recoveredAccumulator + " messages. " +
|
||||
(int) (recoveredAccumulator * 100 / totalMessageCount) + "% complete");
|
||||
}
|
||||
// Message could have expired while it was being
|
||||
// loaded..
|
||||
if (message.isExpired() && broker.isExpired(message)) {
|
||||
toExpire.add(message);
|
||||
return true;
|
||||
}
|
||||
if (hasSpace()) {
|
||||
message.setRegionDestination(Queue.this);
|
||||
messagesLock.writeLock().lock();
|
||||
try {
|
||||
try {
|
||||
messages.addMessageLast(message);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to add message to cursor", e);
|
||||
}
|
||||
} finally {
|
||||
messagesLock.writeLock().unlock();
|
||||
}
|
||||
destinationStatistics.getMessages().increment();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
|
||||
throw new RuntimeException("Should not be called.");
|
||||
}
|
||||
|
||||
public boolean hasSpace() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isDuplicate(MessageId id) {
|
||||
return false;
|
||||
}
|
||||
|
||||
public void reset() {
|
||||
currentBatchCount = recoveredAccumulator;
|
||||
}
|
||||
|
||||
public void processExpired() {
|
||||
for (Message message: toExpire) {
|
||||
messageExpired(createConnectionContext(), createMessageReference(message));
|
||||
// drop message will decrement so counter
|
||||
// balance here
|
||||
destinationStatistics.getMessages().increment();
|
||||
}
|
||||
toExpire.clear();
|
||||
}
|
||||
|
||||
public boolean done() {
|
||||
return currentBatchCount == recoveredAccumulator;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void initialize() throws Exception {
|
||||
if (this.messages == null) {
|
||||
|
@ -263,60 +336,15 @@ public class Queue extends BaseDestination implements Task, UsageListener {
|
|||
messages.setMaxProducersToAudit(getMaxProducersToAudit());
|
||||
messages.setUseCache(isUseCache());
|
||||
messages.setMemoryUsageHighWaterMark(getCursorMemoryHighWaterMark());
|
||||
final int messageCount = store.getMessageCount();
|
||||
if (messages.isRecoveryRequired()) {
|
||||
store.recover(new MessageRecoveryListener() {
|
||||
double totalMessageCount = store.getMessageCount();
|
||||
int recoveredMessageCount = 0;
|
||||
|
||||
public boolean recoverMessage(Message message) {
|
||||
// Message could have expired while it was being
|
||||
// loaded..
|
||||
if ((++recoveredMessageCount % 50000) == 0) {
|
||||
LOG.info("cursor for " + getActiveMQDestination().getQualifiedName() + " has recovered "
|
||||
+ recoveredMessageCount + " messages. " +
|
||||
(int)(recoveredMessageCount*100/totalMessageCount) + "% complete");
|
||||
}
|
||||
if (message.isExpired()) {
|
||||
if (broker.isExpired(message)) {
|
||||
messageExpired(createConnectionContext(), createMessageReference(message));
|
||||
// drop message will decrement so counter
|
||||
// balance here
|
||||
destinationStatistics.getMessages().increment();
|
||||
}
|
||||
return true;
|
||||
}
|
||||
if (hasSpace()) {
|
||||
message.setRegionDestination(Queue.this);
|
||||
messagesLock.writeLock().lock();
|
||||
try{
|
||||
try {
|
||||
messages.addMessageLast(message);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Failed to add message to cursor", e);
|
||||
}
|
||||
}finally {
|
||||
messagesLock.writeLock().unlock();
|
||||
}
|
||||
destinationStatistics.getMessages().increment();
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean recoverMessageReference(MessageId messageReference) throws Exception {
|
||||
throw new RuntimeException("Should not be called.");
|
||||
}
|
||||
|
||||
public boolean hasSpace() {
|
||||
return true;
|
||||
}
|
||||
|
||||
public boolean isDuplicate(MessageId id) {
|
||||
return false;
|
||||
}
|
||||
});
|
||||
BatchMessageRecoveryListener listener = new BatchMessageRecoveryListener(messageCount);
|
||||
do {
|
||||
listener.reset();
|
||||
store.recoverNextMessages(getMaxPageSize(), listener);
|
||||
listener.processExpired();
|
||||
} while (!listener.done());
|
||||
} else {
|
||||
int messageCount = store.getMessageCount();
|
||||
destinationStatistics.getMessages().setCount(messageCount);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -470,6 +470,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
|
|||
process(message, recoveryPosition, lastIndoubtPosition);
|
||||
redoCounter++;
|
||||
recoveryPosition = journal.getNextLocation(recoveryPosition);
|
||||
if (LOG.isInfoEnabled() && redoCounter % 100000 == 0) {
|
||||
LOG.info("@" + recoveryPosition + ", " + redoCounter + " entries recovered ..");
|
||||
}
|
||||
}
|
||||
if (LOG.isInfoEnabled()) {
|
||||
long end = System.currentTimeMillis();
|
||||
|
|
|
@ -0,0 +1,112 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.activemq.store.kahadb;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.DeliveryMode;
|
||||
import javax.jms.Destination;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Session;
|
||||
import org.apache.activemq.ActiveMQConnection;
|
||||
import org.apache.activemq.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.broker.BrokerService;
|
||||
import org.apache.activemq.broker.region.BaseDestination;
|
||||
import org.apache.activemq.broker.region.policy.PolicyEntry;
|
||||
import org.apache.activemq.broker.region.policy.PolicyMap;
|
||||
import org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
|
||||
import org.apache.activemq.command.ActiveMQQueue;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
|
||||
import static junit.framework.Assert.assertNotNull;
|
||||
import static junit.framework.Assert.assertNull;
|
||||
|
||||
public class KahaDBStoreRecoveryExpiryTest {
|
||||
|
||||
private BrokerService broker;
|
||||
private ActiveMQConnection connection;
|
||||
private Destination destination = new ActiveMQQueue("Test");
|
||||
private Session session;
|
||||
|
||||
@Test
|
||||
public void testRestartWitExpired() throws Exception {
|
||||
publishMessages(1, 0);
|
||||
publishMessages(1, 2000);
|
||||
publishMessages(1, 0);
|
||||
restartBroker(3000);
|
||||
consumeMessages(2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRestartWitExpiredLargerThanBatchRecovery() throws Exception {
|
||||
publishMessages(BaseDestination.MAX_PAGE_SIZE + 10, 2000);
|
||||
publishMessages(10, 0);
|
||||
restartBroker(3000);
|
||||
consumeMessages(10);
|
||||
}
|
||||
|
||||
private void consumeMessages(int count) throws Exception {
|
||||
MessageConsumer consumer = session.createConsumer(destination);
|
||||
for (int i=0; i<count; i++) {
|
||||
assertNotNull("got message "+ i, consumer.receive(4000));
|
||||
}
|
||||
assertNull("none left over", consumer.receive(2000));
|
||||
}
|
||||
|
||||
private void restartBroker(int restartDelay) throws Exception {
|
||||
stopBroker();
|
||||
TimeUnit.MILLISECONDS.sleep(restartDelay);
|
||||
startBroker();
|
||||
}
|
||||
|
||||
@After
|
||||
public void stopBroker() throws Exception {
|
||||
broker.stop();
|
||||
broker.waitUntilStopped();
|
||||
}
|
||||
|
||||
private void publishMessages(int count, int expiry) throws Exception {
|
||||
MessageProducer producer = session.createProducer(destination);
|
||||
for (int i=0; i<count; i++) {
|
||||
producer.send(session.createTextMessage(), DeliveryMode.PERSISTENT, 5, expiry);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void startBroker() throws Exception {
|
||||
broker = new BrokerService();
|
||||
((KahaDBPersistenceAdapter)broker.getPersistenceAdapter()).setIndexCacheSize(0);
|
||||
PolicyMap policyMap = new PolicyMap();
|
||||
PolicyEntry defaultEntry = new PolicyEntry();
|
||||
defaultEntry.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
|
||||
policyMap.setDefaultEntry(defaultEntry);
|
||||
broker.setDestinationPolicy(policyMap);
|
||||
broker.setUseJmx(false);
|
||||
broker.start();
|
||||
|
||||
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("vm://localhost?create=false");
|
||||
connection = (ActiveMQConnection) connectionFactory.createConnection();
|
||||
connection.setWatchTopicAdvisories(false);
|
||||
connection.start();
|
||||
|
||||
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue