diff --git a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java index 0c497f6438..35b296cc88 100644 --- a/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java +++ b/activemq-core/src/main/java/org/apache/activemq/store/kahadb/MessageDatabase.java @@ -63,14 +63,7 @@ import org.apache.kahadb.journal.Location; import org.apache.kahadb.page.Page; import org.apache.kahadb.page.PageFile; import org.apache.kahadb.page.Transaction; -import org.apache.kahadb.util.ByteSequence; -import org.apache.kahadb.util.DataByteArrayInputStream; -import org.apache.kahadb.util.DataByteArrayOutputStream; -import org.apache.kahadb.util.LockFile; -import org.apache.kahadb.util.LongMarshaller; -import org.apache.kahadb.util.Marshaller; -import org.apache.kahadb.util.StringMarshaller; -import org.apache.kahadb.util.VariableMarshaller; +import org.apache.kahadb.util.*; public class MessageDatabase { @@ -155,6 +148,8 @@ public class MessageDatabase { protected AtomicBoolean started = new AtomicBoolean(); protected AtomicBoolean opened = new AtomicBoolean(); private LockFile lockFile; + private boolean ignoreMissingJournalfiles = false; + private int indexCacheSize = 100; public MessageDatabase() { } @@ -218,24 +213,6 @@ public class MessageDatabase { * @throws IOException */ public void open() throws IOException { - File lockFileName = new File(directory, "lock"); - lockFile = new LockFile(lockFileName, true); - if (failIfDatabaseIsLocked) { - lockFile.lock(); - } else { - while (true) { - try { - lockFile.lock(); - break; - } catch (IOException e) { - LOG.info("Database "+lockFileName+" is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked. Reason: " + e); - try { - Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); - } catch (InterruptedException e1) { - } - } - } - } if( opened.compareAndSet(false, true) ) { getJournal().start(); @@ -271,24 +248,45 @@ public class MessageDatabase { recover(); } } - + + private void lock() throws IOException { + if( lockFile == null ) { + File lockFileName = new File(directory, "lock"); + lockFile = new LockFile(lockFileName, true); + if (failIfDatabaseIsLocked) { + lockFile.lock(); + } else { + while (true) { + try { + lockFile.lock(); + break; + } catch (IOException e) { + LOG.info("Database "+lockFileName+" is locked... waiting " + (DATABASE_LOCKED_WAIT_DELAY / 1000) + " seconds for the database to be unlocked. Reason: " + e); + try { + Thread.sleep(DATABASE_LOCKED_WAIT_DELAY); + } catch (InterruptedException e1) { + } + } + } + } + } + } + public void load() throws IOException { synchronized (indexMutex) { + lock(); + if (deleteAllMessages) { + getJournal().start(); + getJournal().delete(); + getJournal().close(); + journal = null; + getPageFile().delete(); + LOG.info("Persistence store purged."); + deleteAllMessages = false; + } + open(); - - if (deleteAllMessages) { - journal.delete(); - - pageFile.unload(); - pageFile.delete(); - metadata = new Metadata(); - - LOG.info("Persistence store purged."); - deleteAllMessages = false; - - loadPageFile(); - } store(new KahaTraceCommand().setMessage("LOADED " + new Date())); } @@ -348,7 +346,6 @@ public class MessageDatabase { * * @throws IOException * @throws IOException - * @throws InvalidLocationException * @throws IllegalStateException */ private void recover() throws IllegalStateException, IOException { @@ -406,6 +403,75 @@ public class MessageDatabase { // TODO: do we need to modify the ack positions for the pub sub case? } } + + + // Lets be extra paranoid here and verify that all the datafiles being referenced + // by the indexes still exists. + + final SequenceSet ss = new SequenceSet(); + for (StoredDestination sd : storedDestinations.values()) { + // Use a visitor to cut down the number of pages that we load + sd.locationIndex.visit(tx, new BTreeVisitor() { + int last=-1; + + public boolean isInterestedInKeysBetween(Location first, Location second) { + if( first==null ) { + return !ss.contains(0, second.getDataFileId()); + } else if( second==null ) { + return true; + } else { + return !ss.contains(first.getDataFileId(), second.getDataFileId()); + } + } + + public void visit(List keys, List values) { + for (Location l : keys) { + int fileId = l.getDataFileId(); + if( last != fileId ) { + ss.add(fileId); + last = fileId; + } + } + } + + }); + } + HashSet missingJournalFiles = new HashSet(); + while( !ss.isEmpty() ) { + missingJournalFiles.add( (int)ss.removeFirst() ); + } + missingJournalFiles.removeAll( journal.getFileMap().keySet() ); + + if( !missingJournalFiles.isEmpty() ) { + if( ignoreMissingJournalfiles ) { + + for (StoredDestination sd : storedDestinations.values()) { + + final ArrayList matches = new ArrayList(); + for (Integer missing : missingJournalFiles) { + sd.locationIndex.visit(tx, new BTreeVisitor.BetweenVisitor(new Location(missing,0), new Location(missing+1,0)) { + @Override + protected void matched(Location key, Long value) { + matches.add(value); + } + }); + } + + + for (Long sequenceId : matches) { + MessageKeys keys = sd.orderIndex.remove(tx, sequenceId); + sd.locationIndex.remove(tx, keys.location); + sd.messageIdIndex.remove(tx, keys.messageId); + undoCounter++; + // TODO: do we need to modify the ack positions for the pub sub case? + } + } + + } else { + throw new IOException("Detected missing journal files: "+missingJournalFiles); + } + } + long end = System.currentTimeMillis(); if( undoCounter > 0 ) { // The rolledback operations are basically in flight journal writes. To avoid getting these the end user @@ -1263,6 +1329,7 @@ public class MessageDatabase { PageFile index = new PageFile(directory, "db"); index.setEnableWriteThread(isEnableIndexWriteAsync()); index.setWriteBatchSize(getIndexWriteBatchSize()); + index.setPageCacheSize(indexCacheSize); return index; } @@ -1358,4 +1425,20 @@ public class MessageDatabase { public void setFailIfDatabaseIsLocked(boolean failIfDatabaseIsLocked) { this.failIfDatabaseIsLocked = failIfDatabaseIsLocked; } + + public boolean isIgnoreMissingJournalfiles() { + return ignoreMissingJournalfiles; + } + + public void setIgnoreMissingJournalfiles(boolean ignoreMissingJournalfiles) { + this.ignoreMissingJournalfiles = ignoreMissingJournalfiles; + } + + public int getIndexCacheSize() { + return indexCacheSize; + } + + public void setIndexCacheSize(int indexCacheSize) { + this.indexCacheSize = indexCacheSize; + } } diff --git a/activemq-core/src/main/resources/activemq.xsd b/activemq-core/src/main/resources/activemq.xsd index 91bc6fbb2f..1390b9a674 100644 --- a/activemq-core/src/main/resources/activemq.xsd +++ b/activemq-core/src/main/resources/activemq.xsd @@ -2883,6 +2883,8 @@ false so that messages actually reside long term in the JDBC database. + + diff --git a/activemq-core/src/main/resources/activemq.xsd.html b/activemq-core/src/main/resources/activemq.xsd.html index 598df803a6..82fad2ec97 100644 --- a/activemq-core/src/main/resources/activemq.xsd.html +++ b/activemq-core/src/main/resources/activemq.xsd.html @@ -976,6 +976,8 @@ false so that messages actually reside long term in the JDBC database. indexWriteBatchSizexs:integer enableIndexWriteAsyncxs:boolean enableJournalDiskSyncsxs:boolean + ignoreMissingJournalfilesxs:boolean + indexCacheSizexs:integer diff --git a/activemq-core/src/main/resources/activemq.xsd.wiki b/activemq-core/src/main/resources/activemq.xsd.wiki index 6f5990261a..621e8e31db 100644 --- a/activemq-core/src/main/resources/activemq.xsd.wiki +++ b/activemq-core/src/main/resources/activemq.xsd.wiki @@ -1247,6 +1247,8 @@ h4. Properties | indexWriteBatchSize | _int_ | {html}{html} | | enableIndexWriteAsync | _boolean_ | {html}{html} | | enableJournalDiskSyncs | _boolean_ | {html}{html} | + | ignoreMissingJournalfiles | _boolean_ | {html}{html} | + | indexCacheSize | _int_ | {html}{html} | | size | _java.util.concurrent.atomic.AtomicLong_ | {html}{html} | | usageManager | _[org.apache.activemq.usage.SystemUsage|#org.apache.activemq.usage.SystemUsage-types]_ | {html}{html} | diff --git a/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java new file mode 100644 index 0000000000..3cc0233c10 --- /dev/null +++ b/activemq-core/src/test/java/org/apache/activemq/store/kahadb/KahaDBTest.java @@ -0,0 +1,147 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.store.kahadb; + +import junit.framework.TestCase; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.command.ActiveMQQueue; + +import javax.jms.*; +import java.io.File; +import java.io.IOException; + +/** + * @author chirino + */ +public class KahaDBTest extends TestCase { + + protected BrokerService createBroker(KahaDBStore kaha) throws Exception { + + BrokerService broker = new BrokerService(); + broker.setUseJmx(false); + broker.setPersistenceAdapter(kaha); + broker.start(); + return broker; + + } + + private KahaDBStore createStore(boolean delete) throws IOException { + KahaDBStore kaha = new KahaDBStore(); + kaha.setDirectory(new File("target/activemq-data/kahadb")); + if( delete ) { + kaha.deleteAllMessages(); + } + return kaha; + } + + public void testIgnoreMissingJournalfilesOptionSetFalse() throws Exception { + KahaDBStore kaha = createStore(true); + kaha.setJournalMaxFileLength(1024*100); + assertFalse(kaha.isIgnoreMissingJournalfiles()); + BrokerService broker = createBroker(kaha); + sendMessages(1000); + broker.stop(); + + // Delete some journal files.. + assertExistsAndDelete(new File(kaha.getDirectory(), "db-4.log")); + assertExistsAndDelete(new File(kaha.getDirectory(), "db-8.log")); + + kaha = createStore(false); + kaha.setJournalMaxFileLength(1024*100); + assertFalse(kaha.isIgnoreMissingJournalfiles()); + try { + broker = createBroker(kaha); + fail("expected IOException"); + } catch (IOException e) { + assertTrue( e.getMessage().startsWith("Detected missing journal files") ); + } + + } + + + public void testIgnoreMissingJournalfilesOptionSetTrue() throws Exception { + KahaDBStore kaha = createStore(true); + kaha.setJournalMaxFileLength(1024*100); + assertFalse(kaha.isIgnoreMissingJournalfiles()); + BrokerService broker = createBroker(kaha); + sendMessages(1000); + broker.stop(); + + // Delete some journal files.. + assertExistsAndDelete(new File(kaha.getDirectory(), "db-4.log")); + assertExistsAndDelete(new File(kaha.getDirectory(), "db-8.log")); + + kaha = createStore(false); + kaha.setIgnoreMissingJournalfiles(true); + kaha.setJournalMaxFileLength(1024*100); + broker = createBroker(kaha); + + // We know we won't get all the messages but we should get most of them. + int count = receiveMessages(); + assertTrue( count > 800 ); + assertTrue( count < 1000 ); + + broker.stop(); + } + + private void assertExistsAndDelete(File file) { + assertTrue(file.exists()); + file.delete(); + assertFalse(file.exists()); + } + + private void sendMessages(int count) throws JMSException { + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = cf.createConnection(); + try { + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageProducer producer = session.createProducer(new ActiveMQQueue("TEST")); + for (int i = 0; i < count; i++) { + producer.send(session.createTextMessage(createContent(i))); + } + } finally { + connection.close(); + } + } + + private int receiveMessages() throws JMSException { + int rc=0; + ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory("vm://localhost"); + Connection connection = cf.createConnection(); + try { + connection.start(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + MessageConsumer messageConsumer = session.createConsumer(new ActiveMQQueue("TEST")); + while ( messageConsumer.receive(1000) !=null ) { + rc++; + } + return rc; + } finally { + connection.close(); + } + } + + private String createContent(int i) { + StringBuilder sb = new StringBuilder(i+":"); + while( sb.length() < 1024 ) { + sb.append("*"); + } + return sb.toString(); + } + +} \ No newline at end of file diff --git a/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java b/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java index 35bc230a1b..a322898e5a 100644 --- a/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java +++ b/kahadb/src/main/java/org/apache/kahadb/index/BTreeVisitor.java @@ -67,6 +67,32 @@ public interface BTreeVisitor { abstract protected void matched(Key key, Value value); } + abstract class BetweenVisitor, Value> implements BTreeVisitor{ + private final Key first; + private final Key last; + + public BetweenVisitor(Key first, Key last) { + this.first = first; + this.last = last; + } + + public boolean isInterestedInKeysBetween(Key first, Key second) { + return (second==null || second.compareTo(this.first)>=0) + && (first==null || first.compareTo(last)<0); + } + + public void visit(List keys, List values) { + for( int i=0; i < keys.size(); i++) { + Key key = keys.get(i); + if( key.compareTo(first)>=0 && key.compareTo(last)<0 ) { + matched(key, values.get(i)); + } + } + } + + abstract protected void matched(Key key, Value value); + } + abstract class GTEVisitor, Value> implements BTreeVisitor{ final private Key value; diff --git a/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java b/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java index e39bd2b6dc..15dc73f823 100644 --- a/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java +++ b/kahadb/src/main/java/org/apache/kahadb/util/SequenceSet.java @@ -31,7 +31,7 @@ import java.util.NoSuchElementException; * @author chirino */ public class SequenceSet extends LinkedNodeList { - + public static class Marshaller implements org.apache.kahadb.util.Marshaller { public static final Marshaller INSTANCE = new Marshaller(); @@ -254,5 +254,19 @@ public class SequenceSet extends LinkedNodeList { } return rc; } - + + public boolean contains(int first, int last) { + if (isEmpty()) { + return false; + } + Sequence sequence = getHead(); + while (sequence != null) { + if (sequence.first <= first ) { + return last <= sequence.last ; + } + sequence = sequence.getNext(); + } + return false; + } + } \ No newline at end of file
ElementTypeDescription