Fix check for durable sub with no pending messages during checkpoint
cleanup.
(cherry picked from commit 193f6be687)
This commit is contained in:
Timothy Bish 2016-01-18 17:43:18 -05:00
parent 57b65dc8ef
commit 7d3a71a4df
2 changed files with 295 additions and 2 deletions

View File

@ -853,7 +853,8 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
missingPredicates.add(new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, dataFile.getLength()), new Location(id + 1, 0)));
Sequence seq = dataFile.getCorruptedBlocks().getHead();
while (seq != null) {
BTreeVisitor.BetweenVisitor visitor = new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
BTreeVisitor.BetweenVisitor<Location, Long> visitor =
new BTreeVisitor.BetweenVisitor<Location, Long>(new Location(id, (int) seq.getFirst()), new Location(id, (int) seq.getLast() + 1));
missingPredicates.add(visitor);
knownCorruption.add(visitor);
seq = seq.getNext();
@ -1707,7 +1708,9 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
// When pending is size one that is the next message Id meaning there
// are no pending messages currently.
if (pendingAcks == null || pendingAcks.size() <= 1) {
if (pendingAcks == null || pendingAcks.isEmpty() ||
(pendingAcks.size() == 1 && pendingAcks.getTail().range() == 1)) {
if (LOG.isTraceEnabled()) {
LOG.trace("Found candidate for rewrite: {} from file {}", entry.getKey(), dataFileId);
}

View File

@ -0,0 +1,290 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.bugs;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Random;
import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TopicSubscriber;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.store.kahadb.MessageDatabase;
import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.filefilter.TrueFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.log4j.Level;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* This class is to show that a durable can lose messages after index deletion.
*/
public class AMQ6131Test {
protected BrokerService broker;
protected URI brokerConnectURI;
@Before
public void startBroker() throws Exception {
org.apache.log4j.Logger.getLogger(MessageDatabase.class).setLevel(Level.TRACE);
setUpBroker(true);
}
protected void setUpBroker(boolean clearDataDir) throws Exception {
broker = new BrokerService();
broker.setPersistent(true);
broker.setDeleteAllMessagesOnStartup(clearDataDir);
// set up a transport
TransportConnector connector = broker.addConnector(new TransportConnector());
connector.setUri(new URI("tcp://0.0.0.0:0"));
connector.setName("tcp");
broker.start();
broker.waitUntilStarted();
brokerConnectURI = broker.getConnectorByName("tcp").getConnectUri();
}
@After
public void stopBroker() throws Exception {
broker.stop();
broker.waitUntilStopped();
}
protected BrokerService getBroker() {
return this.broker;
}
public File getPersistentDir() throws IOException {
return getBroker().getPersistenceAdapter().getDirectory();
}
@Test(timeout = 300000)
public void testDurableWithOnePendingAfterRestartAndIndexRecovery() throws Exception {
final File persistentDir = getPersistentDir();
broker.getBroker().addDestination(broker.getAdminConnectionContext(), new ActiveMQTopic("durable.sub"), false);
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.brokerConnectURI);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.setClientID("myId");
connection.start();
final Session jmsSession = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durable = jmsSession.createDurableSubscriber(new ActiveMQTopic("durable.sub"), "sub");
final MessageProducer producer = jmsSession.createProducer(new ActiveMQTopic("durable.sub"));
final int original = new ArrayList<File>(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size();
// 100k messages
final byte[] data = new byte[100000];
final Random random = new Random();
random.nextBytes(data);
// run test with enough messages to create a second journal file
final AtomicInteger messageCount = new AtomicInteger();
assertTrue("Should have added a journal file", Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
final ActiveMQBytesMessage message = new ActiveMQBytesMessage();
message.setContent(new ByteSequence(data));
for (int i = 0; i < 100; i++) {
producer.send(message);
messageCount.getAndIncrement();
}
return new ArrayList<File>(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size() > original;
}
}));
// Consume all but 1 message
for (int i = 0; i < messageCount.get() - 1; i++) {
durable.receive();
}
durable.close();
// wait until a journal file has been GC'd after receiving messages
assertTrue("Subscription should go inactive", Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
}
}));
// force a GC of unneeded journal files
getBroker().getPersistenceAdapter().checkpoint(true);
// wait until a journal file has been GC'd after receiving messages
assertFalse("Should not have garbage collected", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return new ArrayList<File>(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size() == original;
}
}, 5000, 500));
// stop the broker so we can blow away the index
getBroker().stop();
getBroker().waitUntilStopped();
// delete the index so that the durables are gone from the index
// The test passes if you take out this delete section
for (File index : FileUtils.listFiles(persistentDir, new WildcardFileFilter("db.*"), TrueFileFilter.INSTANCE)) {
FileUtils.deleteQuietly(index);
}
stopBroker();
setUpBroker(false);
assertEquals(1, broker.getAdminView().getInactiveDurableTopicSubscribers().length);
assertEquals(0, broker.getAdminView().getDurableTopicSubscribers().length);
ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(this.brokerConnectURI);
ActiveMQConnection connection2 = (ActiveMQConnection) connectionFactory2.createConnection();
connection2.setClientID("myId");
connection2.start();
final Session jmsSession2 = connection2.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durable2 = jmsSession2.createDurableSubscriber(new ActiveMQTopic("durable.sub"), "sub");
assertEquals(0, broker.getAdminView().getInactiveDurableTopicSubscribers().length);
assertEquals(1, broker.getAdminView().getDurableTopicSubscribers().length);
assertNotNull(durable2.receive(5000));
}
@Test(timeout = 300000)
public void testDurableWithNoMessageAfterRestartAndIndexRecovery() throws Exception {
final File persistentDir = getPersistentDir();
broker.getBroker().addDestination(broker.getAdminConnectionContext(), new ActiveMQTopic("durable.sub"), false);
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(this.brokerConnectURI);
ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
connection.setClientID("myId");
connection.start();
final Session jmsSession = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durable = jmsSession.createDurableSubscriber(new ActiveMQTopic("durable.sub"), "sub");
final MessageProducer producer = jmsSession.createProducer(new ActiveMQTopic("durable.sub"));
final int original = new ArrayList<File>(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size();
// 100k messages
final byte[] data = new byte[100000];
final Random random = new Random();
random.nextBytes(data);
// run test with enough messages to create a second journal file
final AtomicInteger messageCount = new AtomicInteger();
assertTrue("Should have added a journal file", Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
final ActiveMQBytesMessage message = new ActiveMQBytesMessage();
message.setContent(new ByteSequence(data));
for (int i = 0; i < 100; i++) {
producer.send(message);
messageCount.getAndIncrement();
}
return new ArrayList<File>(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size() > original;
}
}));
// Consume all messages
for (int i = 0; i < messageCount.get(); i++) {
durable.receive();
}
durable.close();
assertTrue("Subscription should go inactive", Wait.waitFor(new Condition() {
@Override
public boolean isSatisified() throws Exception {
return broker.getAdminView().getInactiveDurableTopicSubscribers().length == 1;
}
}));
// force a GC of unneeded journal files
getBroker().getPersistenceAdapter().checkpoint(true);
// wait until a journal file has been GC'd after receiving messages
assertTrue("Should have garbage collected", Wait.waitFor(new Wait.Condition() {
@Override
public boolean isSatisified() throws Exception {
return new ArrayList<File>(FileUtils.listFiles(persistentDir, new WildcardFileFilter("*.log"), TrueFileFilter.INSTANCE)).size() == original;
}
}));
// stop the broker so we can blow away the index
getBroker().stop();
getBroker().waitUntilStopped();
// delete the index so that the durables are gone from the index
// The test passes if you take out this delete section
for (File index : FileUtils.listFiles(persistentDir, new WildcardFileFilter("db.*"), TrueFileFilter.INSTANCE)) {
FileUtils.deleteQuietly(index);
}
stopBroker();
setUpBroker(false);
assertEquals(1, broker.getAdminView().getInactiveDurableTopicSubscribers().length);
assertEquals(0, broker.getAdminView().getDurableTopicSubscribers().length);
ActiveMQConnectionFactory connectionFactory2 = new ActiveMQConnectionFactory(this.brokerConnectURI);
ActiveMQConnection connection2 = (ActiveMQConnection) connectionFactory2.createConnection();
connection2.setClientID("myId");
connection2.start();
final Session jmsSession2 = connection2.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
TopicSubscriber durable2 = jmsSession2.createDurableSubscriber(new ActiveMQTopic("durable.sub"), "sub");
assertEquals(0, broker.getAdminView().getInactiveDurableTopicSubscribers().length);
assertEquals(1, broker.getAdminView().getDurableTopicSubscribers().length);
assertNull(durable2.receive(500));
}
}