From 1a67318fe97b1339ea1fbbd74cc7232c46e8f27c Mon Sep 17 00:00:00 2001 From: gtully Date: Tue, 17 Jan 2017 14:05:04 +0000 Subject: [PATCH] [AMQ-6567] ensure gc file removal/move is completed after index updates to avoid dangling index referenced on partial failure. Fix and test (cherry picked from commit 20522394cc747e64bd9f87e2e0b64d886c4dec62) --- .../store/kahadb/MessageDatabase.java | 17 +- .../store/kahadb/JournalArchiveTest.java | 221 ++++++++++++++++++ 2 files changed, 230 insertions(+), 8 deletions(-) create mode 100644 activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 4c6ec36ac2..2db07f1d61 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -1686,12 +1686,14 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe try { this.indexLock.writeLock().lock(); try { - pageFile.tx().execute(new Transaction.Closure() { + Set filesToGc = pageFile.tx().execute(new Transaction.CallableClosure, IOException>() { @Override - public void execute(Transaction tx) throws IOException { - checkpointUpdate(tx, cleanup); + public Set execute(Transaction tx) throws IOException { + return checkpointUpdate(tx, cleanup); } }); + // after the index update such that partial removal does not leave dangling references in the index. + journal.removeDataFiles(filesToGc); } finally { this.indexLock.writeLock().unlock(); } @@ -1705,7 +1707,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe * @param tx * @throws IOException */ - void checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { + Set checkpointUpdate(Transaction tx, boolean cleanup) throws IOException { MDC.put("activemq.persistenceDir", getDirectory().getName()); LOG.debug("Checkpoint started."); @@ -1720,10 +1722,11 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe tx.store(metadata.page, metadataMarshaller, true); pageFile.flush(); + final TreeSet gcCandidateSet = new TreeSet<>(); if (cleanup) { final TreeSet completeFileSet = new TreeSet<>(journal.getFileMap().keySet()); - final TreeSet gcCandidateSet = new TreeSet<>(completeFileSet); + gcCandidateSet.addAll(completeFileSet); if (LOG.isTraceEnabled()) { LOG.trace("Last update: " + lastUpdate + ", full gc candidates set: " + gcCandidateSet); @@ -1895,7 +1898,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe if (!gcCandidateSet.isEmpty()) { LOG.debug("Cleanup removing the data files: {}", gcCandidateSet); - journal.removeDataFiles(gcCandidateSet); for (Integer candidate : gcCandidateSet) { for (Set ackFiles : metadata.ackMessageFileMap.values()) { ackMessageFileMapMod |= ackFiles.remove(candidate); @@ -1941,6 +1943,7 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe MDC.remove("activemq.persistenceDir"); LOG.debug("Checkpoint done."); + return gcCandidateSet; } private final class AckCompactionRunner implements Runnable { @@ -2701,8 +2704,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe /** * Locate the storeMessageSize counter for this KahaDestination - * @param kahaDestination - * @return */ protected MessageStoreStatistics getStoreStats(String kahaDestKey) { MessageStoreStatistics storeStats = null; diff --git a/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java new file mode 100644 index 0000000000..35750a6ee7 --- /dev/null +++ b/activemq-kahadb-store/src/test/java/org/apache/activemq/store/kahadb/JournalArchiveTest.java @@ -0,0 +1,221 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.region.policy.PolicyEntry; +import org.apache.activemq.broker.region.policy.PolicyMap; +import org.apache.activemq.command.ActiveMQQueue; +import org.apache.activemq.store.kahadb.disk.journal.DataFile; +import org.junit.After; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.File; +import java.io.IOException; +import java.security.Permission; +import java.util.Collection; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.apache.activemq.store.kahadb.JournalCorruptionEofIndexRecoveryTest.drain; +import static org.apache.activemq.store.kahadb.disk.journal.Journal.DEFAULT_ARCHIVE_DIRECTORY; +import static org.junit.Assert.*; + +public class JournalArchiveTest { + + private static final Logger LOG = LoggerFactory.getLogger(JournalArchiveTest.class); + + private final String KAHADB_DIRECTORY = "target/activemq-data/"; + private final String payload = new String(new byte[1024]); + + private BrokerService broker = null; + private final Destination destination = new ActiveMQQueue("Test"); + private KahaDBPersistenceAdapter adapter; + + protected void startBroker() throws Exception { + doStartBroker(true); + } + + protected void restartBroker() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + + doStartBroker(false); + } + + private void doStartBroker(boolean delete) throws Exception { + doCreateBroker(delete); + LOG.info("Starting broker.."); + broker.start(); + } + + private void doCreateBroker(boolean delete) throws Exception { + + broker = new BrokerService(); + broker.setDeleteAllMessagesOnStartup(delete); + broker.setPersistent(true); + broker.setUseJmx(true); + broker.setDataDirectory(KAHADB_DIRECTORY); + + PolicyMap policyMap = new PolicyMap(); + PolicyEntry policyEntry = new PolicyEntry(); + policyEntry.setUseCache(false); + policyMap.setDefaultEntry(policyEntry); + broker.setDestinationPolicy(policyMap); + + configurePersistence(broker); + } + + protected void configurePersistence(BrokerService brokerService) throws Exception { + adapter = (KahaDBPersistenceAdapter) brokerService.getPersistenceAdapter(); + + // ensure there are a bunch of data files but multiple entries in each + adapter.setJournalMaxFileLength(1024 * 20); + + // speed up the test case, checkpoint an cleanup early and often + adapter.setCheckpointInterval(2000); + adapter.setCleanupInterval(2000); + + adapter.setCheckForCorruptJournalFiles(true); + + adapter.setArchiveDataLogs(true); + } + + @After + public void tearDown() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + + @Test + public void testRecoveryOnArchiveFailure() throws Exception { + final AtomicInteger atomicInteger = new AtomicInteger(); + + System.setSecurityManager(new SecurityManager() { + public void checkPermission(Permission perm) {} + public void checkPermission(Permission perm, Object context) {} + + public void checkWrite(String file) { + if (file.contains(DEFAULT_ARCHIVE_DIRECTORY) && atomicInteger.incrementAndGet() > 4) { + throw new SecurityException("No Perms to write to archive times:" + atomicInteger.get()); + } + } + }); + startBroker(); + + int sent = produceMessagesToConsumeMultipleDataFiles(50); + + int numFilesAfterSend = getNumberOfJournalFiles(); + LOG.info("Num journal files: " + numFilesAfterSend); + + assertTrue("more than x files: " + numFilesAfterSend, numFilesAfterSend > 4); + + final CountDownLatch gotShutdown = new CountDownLatch(1); + broker.addShutdownHook(new Runnable() { + @Override + public void run() { + gotShutdown.countDown(); + } + }); + + int received = tryConsume(destination, sent); + assertEquals("all message received", sent, received); + assertTrue("broker got shutdown on page in error", gotShutdown.await(10, TimeUnit.SECONDS)); + + // no restrictions + System.setSecurityManager(null); + + int numFilesAfterRestart = 0; + try { + // ensure we can restart after failure to archive + doStartBroker(false); + numFilesAfterRestart = getNumberOfJournalFiles(); + LOG.info("Num journal files before gc: " + numFilesAfterRestart); + + // force gc + ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().checkpoint(true); + + } catch (Exception error) { + LOG.error("Failed to restart!", error); + fail("Failed to restart after failure to archive"); + } + int numFilesAfterGC = getNumberOfJournalFiles(); + LOG.info("Num journal files after restart nd gc: " + numFilesAfterGC); + assertTrue("Gc has happened", numFilesAfterGC < numFilesAfterRestart); + assertTrue("Gc has worked", numFilesAfterGC < 4); + + File archiveDirectory = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getDirectoryArchive(); + assertEquals("verify files in archive dir", numFilesAfterSend, archiveDirectory.listFiles().length); + } + + + private int getNumberOfJournalFiles() throws IOException { + Collection files = ((KahaDBPersistenceAdapter) broker.getPersistenceAdapter()).getStore().getJournal().getFileMap().values(); + int reality = 0; + for (DataFile file : files) { + if (file != null) { + reality++; + } + } + return reality; + } + + private int produceMessages(Destination destination, int numToSend) throws Exception { + int sent = 0; + Connection connection = new ActiveMQConnectionFactory(broker.getVmConnectorURI()).createConnection(); + connection.start(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(destination); + for (int i = 0; i < numToSend; i++) { + producer.send(createMessage(session, i)); + sent++; + } + } finally { + connection.close(); + } + + return sent; + } + + private int tryConsume(Destination destination, int numToGet) throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(broker.getVmConnectorURI()); + return drain(cf, destination, numToGet); + } + + private int produceMessagesToConsumeMultipleDataFiles(int numToSend) throws Exception { + return produceMessages(destination, numToSend); + } + + private Message createMessage(Session session, int i) throws Exception { + return session.createTextMessage(payload + "::" + i); + } +}