From 0a29533ed7ef49798a3fb4fa8eed7fb78d0a868a Mon Sep 17 00:00:00 2001 From: gtully Date: Fri, 25 Nov 2016 16:44:28 +0000 Subject: [PATCH] [AMQ-6520] respect kahadb indexDirectory for perDestination mKahaDB - fix and test --- .../kahadb/KahaDBPersistenceAdapter.java | 2 +- .../kahadb/MultiKahaDBPersistenceAdapter.java | 36 +++- .../kahadb/MKahaDBIndexLocationTest.java | 163 ++++++++++++++++++ 3 files changed, 198 insertions(+), 3 deletions(-) create mode 100644 activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBIndexLocationTest.java diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java index 5eef7500b6..fd77e49166 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/KahaDBPersistenceAdapter.java @@ -779,7 +779,7 @@ public class KahaDBPersistenceAdapter extends LockableServiceSupport implements @Override public String toString() { String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET"; - return "KahaDBPersistenceAdapter[" + path + "]"; + return "KahaDBPersistenceAdapter[" + path + (getIndexDirectory() != null ? ",Index:" + getIndexDirectory().getAbsolutePath() : "") + "]"; } @Override diff --git a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java index 68f0ed66cc..223afb9b43 100644 --- a/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java +++ b/activemq-kahadb-store/src/main/java/org/apache/activemq/store/kahadb/MultiKahaDBPersistenceAdapter.java @@ -64,6 +64,8 @@ import org.apache.activemq.util.ServiceStopper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.activemq.store.kahadb.MessageDatabase.DEFAULT_DIRECTORY; + /** * An implementation of {@link org.apache.activemq.store.PersistenceAdapter} that supports * distribution of destinations across multiple kahaDB persistence adapters @@ -150,7 +152,7 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem destinationMap.setEntries(entries); } - private String nameFromDestinationFilter(ActiveMQDestination destination) { + public static String nameFromDestinationFilter(ActiveMQDestination destination) { if (destination.getQualifiedName().length() > IOHelper.getMaxFileNameLength()) { LOG.warn("Destination name is longer than 'MaximumFileNameLength' system property, " + "potential problem with recovery can result from name truncation."); @@ -242,6 +244,20 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem } transactionStore.deleteAllMessages(); IOHelper.deleteChildren(getDirectory()); + for (Object o : destinationMap.get(new AnyDestination(new ActiveMQDestination[]{new ActiveMQQueue(">"), new ActiveMQTopic(">")}))) { + if (o instanceof FilteredKahaDBPersistenceAdapter) { + FilteredKahaDBPersistenceAdapter filteredKahaDBPersistenceAdapter = (FilteredKahaDBPersistenceAdapter) o; + if (filteredKahaDBPersistenceAdapter.getPersistenceAdapter().getDirectory() != DEFAULT_DIRECTORY) { + IOHelper.deleteChildren(filteredKahaDBPersistenceAdapter.getPersistenceAdapter().getDirectory()); + } + if (filteredKahaDBPersistenceAdapter.getPersistenceAdapter() instanceof KahaDBPersistenceAdapter) { + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) filteredKahaDBPersistenceAdapter.getPersistenceAdapter(); + if (kahaDBPersistenceAdapter.getIndexDirectory() != null) { + IOHelper.deleteChildren(kahaDBPersistenceAdapter.getIndexDirectory()); + } + } + } + } } @Override @@ -394,12 +410,28 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem PersistenceAdapter adapter = kahaDBFromTemplate(template); configureAdapter(adapter); configureDirectory(adapter, destinationName); + configureIndexDirectory(adapter, template, destinationName); return adapter; } + private void configureIndexDirectory(PersistenceAdapter adapter, PersistenceAdapter template, String destinationName) { + if (template instanceof KahaDBPersistenceAdapter) { + KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) template; + if (kahaDBPersistenceAdapter.getIndexDirectory() != null) { + if (adapter instanceof KahaDBPersistenceAdapter) { + File directory = kahaDBPersistenceAdapter.getIndexDirectory(); + if (destinationName != null) { + directory = new File(directory, destinationName); + } + ((KahaDBPersistenceAdapter)adapter).setIndexDirectory(directory); + } + } + } + } + private void configureDirectory(PersistenceAdapter adapter, String fileName) { File directory = null; - File defaultDir = MessageDatabase.DEFAULT_DIRECTORY; + File defaultDir = DEFAULT_DIRECTORY; try { defaultDir = adapter.getClass().newInstance().getDirectory(); } catch (Exception e) { diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBIndexLocationTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBIndexLocationTest.java new file mode 100644 index 0000000000..a7ecc0c61a --- /dev/null +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/store/kahadb/MKahaDBIndexLocationTest.java @@ -0,0 +1,163 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.command.ActiveMQQueue; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.jms.Connection; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import java.io.File; +import java.io.FilenameFilter; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.activemq.store.kahadb.MultiKahaDBPersistenceAdapter.nameFromDestinationFilter; +import static org.junit.Assert.*; + +public class MKahaDBIndexLocationTest { + + private static final Logger LOG = LoggerFactory.getLogger(MKahaDBIndexLocationTest.class); + + private BrokerService broker; + + private final File testDataDir = new File("target/activemq-data/ConfigIndexDir"); + private final File kahaDataDir = new File(testDataDir, "log"); + private final File kahaIndexDir = new File(testDataDir, "index"); + private final ActiveMQQueue queue = new ActiveMQQueue("Qq"); + + @Before + public void startBroker() throws Exception { + createBroker(); + broker.setDeleteAllMessagesOnStartup(true); + broker.start(); + broker.waitUntilStarted(); + } + + @After + public void stopBroker() throws Exception { + if (broker != null) { + broker.stop(); + broker.waitUntilStopped(); + } + } + + private void createBroker() throws Exception { + broker = new BrokerService(); + + // setup multi-kaha adapter + MultiKahaDBPersistenceAdapter persistenceAdapter = new MultiKahaDBPersistenceAdapter(); + persistenceAdapter.setDirectory(kahaDataDir); + + KahaDBPersistenceAdapter kahaStore = new KahaDBPersistenceAdapter(); + kahaStore.setJournalMaxFileLength(1024 * 512); + kahaStore.setIndexDirectory(kahaIndexDir); + + // set up a store per destination + FilteredKahaDBPersistenceAdapter filtered = new FilteredKahaDBPersistenceAdapter(); + filtered.setPersistenceAdapter(kahaStore); + filtered.setPerDestination(true); + List stores = new ArrayList<>(); + stores.add(filtered); + + persistenceAdapter.setFilteredPersistenceAdapters(stores); + broker.setPersistenceAdapter(persistenceAdapter); + + broker.setUseJmx(false); + broker.setAdvisorySupport(false); + broker.setSchedulerSupport(false); + broker.setPersistenceAdapter(persistenceAdapter); + } + + @Test + public void testIndexDirExists() throws Exception { + + produceMessages(); + + LOG.info("Index dir is configured as: {}", kahaIndexDir); + assertTrue(kahaDataDir.exists()); + assertTrue(kahaIndexDir.exists()); + + + String destName = nameFromDestinationFilter(queue); + String[] index = new File(kahaIndexDir, destName).list(new FilenameFilter() { + + @Override + public boolean accept(File dir, String name) { + LOG.info("Testing index filename: {}", name); + return name.endsWith("data") || name.endsWith("redo"); + } + }); + + String[] journal = new File(kahaDataDir, destName).list(new FilenameFilter() { + + @Override + public boolean accept(File dir, String name) { + LOG.info("Testing log filename: {}", name); + return name.endsWith("log") || name.equals("lock"); + } + }); + + + // Should be db.data and db.redo and nothing else. + assertNotNull(index); + assertEquals(2, index.length); + + // Should contain the initial log for the journal + assertNotNull(journal); + assertEquals(1, journal.length); + + stopBroker(); + createBroker(); + broker.start(); + broker.waitUntilStarted(); + + consume(); + } + + private void consume() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false"); + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer consumer = session.createConsumer(queue); + for (int i = 0; i < 5; ++i) { + assertNotNull("message[" + i + "]", consumer.receive(4000)); + } + connection.close(); + } + + private void produceMessages() throws Exception { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost?create=false"); + Connection connection = cf.createConnection(); + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(queue); + for (int i = 0; i < 5; ++i) { + producer.send(session.createTextMessage("test:" + i)); + } + connection.close(); + } +}