Fixing when the storage is stopped for the storage manager

In some cases the ID Generator will be called after the JournalStorage was stopped.
AS a result you could have cases where the ID generator is called and the journal storage is stopped.

Also I added some check to IDs and added some code to cleanup old IDS on the BatchIDManager
This commit is contained in:
Clebert Suconic 2015-02-26 15:12:36 -05:00
parent ba1e685b69
commit f896a394e9
9 changed files with 317 additions and 62 deletions

View File

@ -380,7 +380,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent
void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws Exception; void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws Exception;
/** /**
* Stores the given journalID in the bindingsJournal. * Stores the id from IDManager.
* *
* @param journalID * @param journalID
* @param id * @param id
@ -388,6 +388,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent
*/ */
void storeID(long journalID, long id) throws Exception; void storeID(long journalID, long id) throws Exception;
/*
Deletes the ID from IDManager.
*/
void deleteID(long journalD) throws Exception;
/** /**
* Read lock the StorageManager. USE WITH CARE! * Read lock the StorageManager. USE WITH CARE!
* <p/> * <p/>

View File

@ -16,6 +16,9 @@
*/ */
package org.apache.activemq.core.persistence.impl.journal; package org.apache.activemq.core.persistence.impl.journal;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.api.core.ActiveMQBuffer; import org.apache.activemq.api.core.ActiveMQBuffer;
@ -42,6 +45,8 @@ public final class BatchingIDGenerator implements IDGenerator
private final StorageManager storageManager; private final StorageManager storageManager;
private List<Long> cleanupRecords = null;
public BatchingIDGenerator(final long start, final long checkpointSize, final StorageManager storageManager) public BatchingIDGenerator(final long start, final long checkpointSize, final StorageManager storageManager)
{ {
counter = new AtomicLong(start); counter = new AtomicLong(start);
@ -60,8 +65,31 @@ public final class BatchingIDGenerator implements IDGenerator
storeID(recordID, recordID); storeID(recordID, recordID);
} }
/**
* A method to cleanup old records after started
*/
public void cleanup()
{
if (cleanupRecords != null)
{
Iterator<Long> iterRecord = cleanupRecords.iterator();
while (iterRecord.hasNext())
{
Long record = iterRecord.next();
if (iterRecord.hasNext())
{
// we don't want to remove the last record
deleteID(record.longValue());
}
}
cleanupRecords.clear(); // help GC
cleanupRecords = null;
}
}
public void loadState(final long journalID, final ActiveMQBuffer buffer) public void loadState(final long journalID, final ActiveMQBuffer buffer)
{ {
addCleanupRecord(journalID);
IDCounterEncoding encoding = new IDCounterEncoding(); IDCounterEncoding encoding = new IDCounterEncoding();
encoding.decode(buffer); encoding.decode(buffer);
@ -93,8 +121,31 @@ public final class BatchingIDGenerator implements IDGenerator
if (id >= nextID) if (id >= nextID)
{ {
nextID += checkpointSize; nextID += checkpointSize;
storeID(counter.incrementAndGet(), nextID);
if (!storageManager.isStarted())
{
// This could happen after the server is stopped
// while notifications are being sent and ID gerated.
// If the ID is intended to the journal you would know soon enough
// so we just ignore this for now
ActiveMQServerLogger.LOGGER.debug("The journalStorageManager is not loaded. " +
"This is probably ok as long as it's a notification being sent after shutdown");
} }
else
{
storeID(counter.getAndIncrement(), nextID);
}
}
}
private void addCleanupRecord(long id)
{
if (cleanupRecords == null)
{
cleanupRecords = new LinkedList<>();
}
cleanupRecords.add(id);
} }
private void storeID(final long journalID, final long id) private void storeID(final long journalID, final long id)
@ -109,6 +160,18 @@ public final class BatchingIDGenerator implements IDGenerator
} }
} }
private void deleteID(final long journalID)
{
try
{
storageManager.deleteID(journalID);
}
catch (Exception e)
{
ActiveMQServerLogger.LOGGER.batchingIdError(e);
}
}
public static EncodingSupport createIDEncodingSupport(final long id) public static EncodingSupport createIDEncodingSupport(final long id)
{ {
return new IDCounterEncoding(id); return new IDCounterEncoding(id);

View File

@ -1488,6 +1488,21 @@ public class JournalStorageManager implements StorageManager
} }
} }
@Override
public void deleteID(long journalD) throws Exception
{
readLock();
try
{
bindingsJournal.appendDeleteRecord(journalD, false);
}
finally
{
readUnLock();
}
}
public void deleteAddressSetting(SimpleString addressMatch) throws Exception public void deleteAddressSetting(SimpleString addressMatch) throws Exception
{ {
PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch); PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch);
@ -2200,6 +2215,9 @@ public class JournalStorageManager implements StorageManager
} }
} }
// This will instruct the IDGenerator to cleanup old records
idGenerator.cleanup();
return bindingsInfo; return bindingsInfo;
} }

View File

@ -631,4 +631,10 @@ public class NullStorageManager implements StorageManager
{ {
// no-op // no-op
} }
@Override
public void deleteID(long journalD) throws Exception
{
}
} }

View File

@ -59,20 +59,23 @@ public class AssertionLoggerHandler extends ExtHandler
} }
} }
public static void assertMessageWasLogged(String assertionMessage, String expectedMessage)
/**
* is there any record matching Level?
* @param level
* @return
*/
public static boolean hasLevel(Level level)
{ {
if (!messages.containsKey(expectedMessage)) for (ExtLogRecord record : messages.values())
{ {
throw new AssertionError(assertionMessage); if (record.getLevel().equals(level))
{
return true;
} }
} }
public static void assertMessageWasLogged(String message) return false;
{
if (!messages.containsKey(message))
{
throw new AssertionError(Arrays.toString(messages.keySet().toArray()));
}
} }
/** /**

View File

@ -1000,25 +1000,56 @@ public abstract class ServiceTestBase extends UnitTestCase
*/ */
protected HashMap<Integer, AtomicInteger> countJournalLivingRecords(Configuration config) throws Exception protected HashMap<Integer, AtomicInteger> countJournalLivingRecords(Configuration config) throws Exception
{ {
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<Integer, AtomicInteger>(); return internalCountJournalLivingRecords(config, true);
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(getJournalDir(), null); }
JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(), /**
* This method will load a journal and count the living records
*
* @param config
* @param messageJournal if true -> MessageJournal, false -> BindingsJournal
* @return
* @throws Exception
*/
protected HashMap<Integer, AtomicInteger> internalCountJournalLivingRecords(Configuration config, boolean messageJournal) throws Exception
{
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<Integer, AtomicInteger>();
SequentialFileFactory ff;
JournalImpl journal;
if (messageJournal)
{
ff = new NIOSequentialFileFactory(getJournalDir(), null);
journal = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(), config.getJournalMinFiles(),
0, 0,
0, 0,
messagesFF, ff,
"activemq-data", "activemq-data",
"amq", "amq",
1); 1);
messagesJournal.start(); }
else
{
ff = new NIOSequentialFileFactory(getBindingsDir(), null);
journal = new JournalImpl(1024 * 1024,
2,
config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(),
ff,
"activemq-bindings",
"bindings",
1);
}
journal.start();
final List<RecordInfo> committedRecords = new LinkedList<RecordInfo>(); final List<RecordInfo> committedRecords = new LinkedList<RecordInfo>();
final List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>(); final List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
messagesJournal.load(committedRecords, preparedTransactions, null, false); journal.load(committedRecords, preparedTransactions, null, false);
for (RecordInfo info : committedRecords) for (RecordInfo info : committedRecords)
{ {
@ -1033,7 +1064,7 @@ public abstract class ServiceTestBase extends UnitTestCase
} }
messagesJournal.stop(); journal.stop();
return recordsType; return recordsType;
} }

View File

@ -476,6 +476,8 @@ public class HangConsumerTest extends ServiceTestBase
*/ */
@Test @Test
public void testDuplicateDestinationsOnTopic() throws Exception public void testDuplicateDestinationsOnTopic() throws Exception
{
try
{ {
for (int i = 0; i < 5; i++) for (int i = 0; i < 5; i++)
{ {
@ -504,8 +506,9 @@ public class HangConsumerTest extends ServiceTestBase
messagesJournal.load(infos, null, null); messagesJournal.load(infos, null, null);
int bindings = 0; int bindings = 0;
for (RecordInfo info: infos) for (RecordInfo info : infos)
{ {
System.out.println("info: " + info);
if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD)
{ {
bindings++; bindings++;
@ -518,6 +521,17 @@ public class HangConsumerTest extends ServiceTestBase
if (i < 4) server.start(); if (i < 4) server.start();
} }
} }
finally
{
try
{
server.stop();
}
catch (Throwable ignored)
{
}
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.server;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.tests.logging.AssertionLoggerHandler;
import org.apache.activemq.tests.util.ServiceTestBase;
import org.jboss.logmanager.Level;
import org.junit.Test;
/**
* @author clebertsuconic
*/
public class SimpleStartStopTest extends ServiceTestBase
{
/**
* Start / stopping the server shouldn't generate any errors.
* Also it shouldn't bloat the journal with lots of IDs (it should do some cleanup when possible)
* <p/>
* This is also validating that the same server could be restarted after stopped
*
* @throws Exception
*/
@Test
public void testStartStopAndCleanupIDs() throws Exception
{
AssertionLoggerHandler.clear();
AssertionLoggerHandler.startCapture();
try
{
ActiveMQServer server = null;
for (int i = 0; i < 50; i++)
{
server = createServer(true, false);
server.start();
server.stop(false);
}
// There shouldn't be any error from starting / stopping the server
assertFalse("There shouldn't be any error for just starting / stopping the server",
AssertionLoggerHandler.hasLevel(Level.ERROR));
assertFalse(AssertionLoggerHandler.findText("AMQ224008"));
HashMap<Integer, AtomicInteger> records = this.internalCountJournalLivingRecords(server.getConfiguration(), false);
AtomicInteger recordCount = records.get((int) JournalRecordIds.ID_COUNTER_RECORD);
assertNotNull(recordCount);
// The server should remove old IDs from the journal
assertTrue("The server should cleanup after IDs on the bindings record. It left " + recordCount +
" ids on the journal", recordCount.intValue() < 5);
System.out.println("RecordCount::" + recordCount);
server.start();
records = this.internalCountJournalLivingRecords(server.getConfiguration(), false);
recordCount = records.get((int) JournalRecordIds.ID_COUNTER_RECORD);
assertNotNull(recordCount);
System.out.println("Record count with server started: " + recordCount);
assertTrue("If this is zero it means we are removing too many records", recordCount.intValue() != 0);
server.stop();
}
finally
{
AssertionLoggerHandler.stopCapture();
}
}
}

View File

@ -46,7 +46,7 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase
public void testSequence() throws Exception public void testSequence() throws Exception
{ {
NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getTestDir()); NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getTestDir());
Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "test-data", "tst", 1); Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "activemq-bindings", "bindings", 1);
journal.start(); journal.start();
@ -135,7 +135,7 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase
Assert.assertEquals(0, tx.size()); Assert.assertEquals(0, tx.size());
Assert.assertTrue(records.size() > 0); Assert.assertTrue("Contains " + records.size(), records.size() > 0);
for (RecordInfo record : records) for (RecordInfo record : records)
{ {
@ -149,7 +149,7 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase
private StorageManager getJournalStorageManager(final Journal bindingsJournal) private StorageManager getJournalStorageManager(final Journal bindingsJournal)
{ {
return new NullStorageManager() NullStorageManager storageManager = new NullStorageManager()
{ {
@Override @Override
public synchronized void storeID(long journalID, long id) throws Exception public synchronized void storeID(long journalID, long id) throws Exception
@ -158,5 +158,15 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase
BatchingIDGenerator.createIDEncodingSupport(id), true); BatchingIDGenerator.createIDEncodingSupport(id), true);
} }
}; };
try
{
storageManager.start();
}
catch (Throwable ignored)
{
}
return storageManager;
} }
} }