diff --git a/activemq-server/src/main/java/org/apache/activemq/core/persistence/StorageManager.java b/activemq-server/src/main/java/org/apache/activemq/core/persistence/StorageManager.java index c074ce26bf..e07fe064e0 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/persistence/StorageManager.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/persistence/StorageManager.java @@ -380,7 +380,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws Exception; /** - * Stores the given journalID in the bindingsJournal. + * Stores the id from IDManager. * * @param journalID * @param id @@ -388,6 +388,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent */ void storeID(long journalID, long id) throws Exception; + + /* + Deletes the ID from IDManager. + */ + void deleteID(long journalD) throws Exception; + + /** * Read lock the StorageManager. USE WITH CARE! *

diff --git a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/BatchingIDGenerator.java b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/BatchingIDGenerator.java index c2a9040f28..9c8bc214ac 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/BatchingIDGenerator.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/BatchingIDGenerator.java @@ -16,6 +16,9 @@ */ package org.apache.activemq.core.persistence.impl.journal; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.api.core.ActiveMQBuffer; @@ -42,6 +45,8 @@ public final class BatchingIDGenerator implements IDGenerator private final StorageManager storageManager; + private List cleanupRecords = null; + public BatchingIDGenerator(final long start, final long checkpointSize, final StorageManager storageManager) { counter = new AtomicLong(start); @@ -60,8 +65,31 @@ public final class BatchingIDGenerator implements IDGenerator storeID(recordID, recordID); } + /** + * A method to cleanup old records after started + */ + public void cleanup() + { + if (cleanupRecords != null) + { + Iterator iterRecord = cleanupRecords.iterator(); + while (iterRecord.hasNext()) + { + Long record = iterRecord.next(); + if (iterRecord.hasNext()) + { + // we don't want to remove the last record + deleteID(record.longValue()); + } + } + cleanupRecords.clear(); // help GC + cleanupRecords = null; + } + } + public void loadState(final long journalID, final ActiveMQBuffer buffer) { + addCleanupRecord(journalID); IDCounterEncoding encoding = new IDCounterEncoding(); encoding.decode(buffer); @@ -93,10 +121,33 @@ public final class BatchingIDGenerator implements IDGenerator if (id >= nextID) { nextID += checkpointSize; - storeID(counter.incrementAndGet(), nextID); + + if (!storageManager.isStarted()) + { + // This could happen after the server is stopped + // while notifications are being sent and ID gerated. + // If the ID is intended to the journal you would know soon enough + // so we just ignore this for now + ActiveMQServerLogger.LOGGER.debug("The journalStorageManager is not loaded. " + + "This is probably ok as long as it's a notification being sent after shutdown"); + } + else + { + storeID(counter.getAndIncrement(), nextID); + } } } + private void addCleanupRecord(long id) + { + if (cleanupRecords == null) + { + cleanupRecords = new LinkedList<>(); + } + + cleanupRecords.add(id); + } + private void storeID(final long journalID, final long id) { try @@ -109,6 +160,18 @@ public final class BatchingIDGenerator implements IDGenerator } } + private void deleteID(final long journalID) + { + try + { + storageManager.deleteID(journalID); + } + catch (Exception e) + { + ActiveMQServerLogger.LOGGER.batchingIdError(e); + } + } + public static EncodingSupport createIDEncodingSupport(final long id) { return new IDCounterEncoding(id); diff --git a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java index 82b135c70b..78570fe561 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/journal/JournalStorageManager.java @@ -1488,6 +1488,21 @@ public class JournalStorageManager implements StorageManager } } + @Override + public void deleteID(long journalD) throws Exception + { + readLock(); + try + { + bindingsJournal.appendDeleteRecord(journalD, false); + } + finally + { + readUnLock(); + } + } + + public void deleteAddressSetting(SimpleString addressMatch) throws Exception { PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch); @@ -2200,6 +2215,9 @@ public class JournalStorageManager implements StorageManager } } + // This will instruct the IDGenerator to cleanup old records + idGenerator.cleanup(); + return bindingsInfo; } diff --git a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageManager.java b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageManager.java index 4fc0c02a3e..b9b04c9e57 100644 --- a/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageManager.java +++ b/activemq-server/src/main/java/org/apache/activemq/core/persistence/impl/nullpm/NullStorageManager.java @@ -631,4 +631,10 @@ public class NullStorageManager implements StorageManager { // no-op } + + @Override + public void deleteID(long journalD) throws Exception + { + + } } diff --git a/activemq-server/src/test/java/org/apache/activemq/tests/logging/AssertionLoggerHandler.java b/activemq-server/src/test/java/org/apache/activemq/tests/logging/AssertionLoggerHandler.java index c819f6446f..8188495d1c 100644 --- a/activemq-server/src/test/java/org/apache/activemq/tests/logging/AssertionLoggerHandler.java +++ b/activemq-server/src/test/java/org/apache/activemq/tests/logging/AssertionLoggerHandler.java @@ -59,20 +59,23 @@ public class AssertionLoggerHandler extends ExtHandler } } - public static void assertMessageWasLogged(String assertionMessage, String expectedMessage) - { - if (!messages.containsKey(expectedMessage)) - { - throw new AssertionError(assertionMessage); - } - } - public static void assertMessageWasLogged(String message) + /** + * is there any record matching Level? + * @param level + * @return + */ + public static boolean hasLevel(Level level) { - if (!messages.containsKey(message)) + for (ExtLogRecord record : messages.values()) { - throw new AssertionError(Arrays.toString(messages.keySet().toArray())); + if (record.getLevel().equals(level)) + { + return true; + } } + + return false; } /** diff --git a/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java b/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java index dd6b9453c0..f6ffbc70cf 100644 --- a/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java +++ b/activemq-server/src/test/java/org/apache/activemq/tests/util/ServiceTestBase.java @@ -1000,25 +1000,56 @@ public abstract class ServiceTestBase extends UnitTestCase */ protected HashMap countJournalLivingRecords(Configuration config) throws Exception { - final HashMap recordsType = new HashMap(); - SequentialFileFactory messagesFF = new NIOSequentialFileFactory(getJournalDir(), null); + return internalCountJournalLivingRecords(config, true); + } - JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(), - config.getJournalMinFiles(), - 0, - 0, - messagesFF, - "activemq-data", - "amq", - 1); - messagesJournal.start(); + /** + * This method will load a journal and count the living records + * + * @param config + * @param messageJournal if true -> MessageJournal, false -> BindingsJournal + * @return + * @throws Exception + */ + protected HashMap internalCountJournalLivingRecords(Configuration config, boolean messageJournal) throws Exception + { + final HashMap recordsType = new HashMap(); + SequentialFileFactory ff; + + JournalImpl journal; + + if (messageJournal) + { + ff = new NIOSequentialFileFactory(getJournalDir(), null); + journal = new JournalImpl(config.getJournalFileSize(), + config.getJournalMinFiles(), + 0, + 0, + ff, + "activemq-data", + "amq", + 1); + } + else + { + ff = new NIOSequentialFileFactory(getBindingsDir(), null); + journal = new JournalImpl(1024 * 1024, + 2, + config.getJournalCompactMinFiles(), + config.getJournalCompactPercentage(), + ff, + "activemq-bindings", + "bindings", + 1); + } + journal.start(); final List committedRecords = new LinkedList(); final List preparedTransactions = new LinkedList(); - messagesJournal.load(committedRecords, preparedTransactions, null, false); + journal.load(committedRecords, preparedTransactions, null, false); for (RecordInfo info : committedRecords) { @@ -1033,7 +1064,7 @@ public abstract class ServiceTestBase extends UnitTestCase } - messagesJournal.stop(); + journal.stop(); return recordsType; } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java index 071e78288c..fb564407bf 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/client/HangConsumerTest.java @@ -477,45 +477,59 @@ public class HangConsumerTest extends ServiceTestBase @Test public void testDuplicateDestinationsOnTopic() throws Exception { - for (int i = 0; i < 5; i++) + try { - if (server.locateQueue(SimpleString.toSimpleString("jms.topic.tt")) == null) + for (int i = 0; i < 5; i++) { - server.createQueue(SimpleString.toSimpleString("jms.topic.tt"), SimpleString.toSimpleString("jms.topic.tt"), SimpleString.toSimpleString(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false); - } - - server.stop(); - - SequentialFileFactory messagesFF = new NIOSequentialFileFactory(getBindingsDir(), null); - - JournalImpl messagesJournal = new JournalImpl(1024 * 1024, - 2, - 0, - 0, - messagesFF, - "activemq-bindings", - "bindings", - 1); - - messagesJournal.start(); - - LinkedList infos = new LinkedList(); - - messagesJournal.load(infos, null, null); - - int bindings = 0; - for (RecordInfo info: infos) - { - if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) + if (server.locateQueue(SimpleString.toSimpleString("jms.topic.tt")) == null) { - bindings++; + server.createQueue(SimpleString.toSimpleString("jms.topic.tt"), SimpleString.toSimpleString("jms.topic.tt"), SimpleString.toSimpleString(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false); } - } - assertEquals(1, bindings); - System.out.println("Bindings: " + bindings); - messagesJournal.stop(); - if (i < 4) server.start(); + server.stop(); + + SequentialFileFactory messagesFF = new NIOSequentialFileFactory(getBindingsDir(), null); + + JournalImpl messagesJournal = new JournalImpl(1024 * 1024, + 2, + 0, + 0, + messagesFF, + "activemq-bindings", + "bindings", + 1); + + messagesJournal.start(); + + LinkedList infos = new LinkedList(); + + messagesJournal.load(infos, null, null); + + int bindings = 0; + for (RecordInfo info : infos) + { + System.out.println("info: " + info); + if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD) + { + bindings++; + } + } + assertEquals(1, bindings); + + System.out.println("Bindings: " + bindings); + messagesJournal.stop(); + if (i < 4) server.start(); + } + } + finally + { + try + { + server.stop(); + } + catch (Throwable ignored) + { + } } } diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/SimpleStartStopTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/SimpleStartStopTest.java new file mode 100644 index 0000000000..265cb166f2 --- /dev/null +++ b/tests/integration-tests/src/test/java/org/apache/activemq/tests/integration/server/SimpleStartStopTest.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.activemq.tests.integration.server; + +import java.util.HashMap; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.activemq.core.persistence.impl.journal.JournalRecordIds; +import org.apache.activemq.core.server.ActiveMQServer; +import org.apache.activemq.tests.logging.AssertionLoggerHandler; +import org.apache.activemq.tests.util.ServiceTestBase; +import org.jboss.logmanager.Level; +import org.junit.Test; + +/** + * @author clebertsuconic + */ + +public class SimpleStartStopTest extends ServiceTestBase +{ + + /** + * Start / stopping the server shouldn't generate any errors. + * Also it shouldn't bloat the journal with lots of IDs (it should do some cleanup when possible) + *

+ * This is also validating that the same server could be restarted after stopped + * + * @throws Exception + */ + @Test + public void testStartStopAndCleanupIDs() throws Exception + { + AssertionLoggerHandler.clear(); + AssertionLoggerHandler.startCapture(); + try + { + ActiveMQServer server = null; + + for (int i = 0; i < 50; i++) + { + server = createServer(true, false); + server.start(); + server.stop(false); + } + + // There shouldn't be any error from starting / stopping the server + assertFalse("There shouldn't be any error for just starting / stopping the server", + AssertionLoggerHandler.hasLevel(Level.ERROR)); + assertFalse(AssertionLoggerHandler.findText("AMQ224008")); + + + HashMap records = this.internalCountJournalLivingRecords(server.getConfiguration(), false); + + + AtomicInteger recordCount = records.get((int) JournalRecordIds.ID_COUNTER_RECORD); + + assertNotNull(recordCount); + + // The server should remove old IDs from the journal + assertTrue("The server should cleanup after IDs on the bindings record. It left " + recordCount + + " ids on the journal", recordCount.intValue() < 5); + + System.out.println("RecordCount::" + recordCount); + + + server.start(); + + + records = this.internalCountJournalLivingRecords(server.getConfiguration(), false); + + + recordCount = records.get((int) JournalRecordIds.ID_COUNTER_RECORD); + + assertNotNull(recordCount); + + System.out.println("Record count with server started: " + recordCount); + + assertTrue("If this is zero it means we are removing too many records", recordCount.intValue() != 0); + + server.stop(); + + } + finally + { + AssertionLoggerHandler.stopCapture(); + } + } +} diff --git a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java index adcc28f5ce..c7fd23ccfd 100644 --- a/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java +++ b/tests/unit-tests/src/test/java/org/apache/activemq/tests/unit/core/persistence/impl/BatchIDGeneratorUnitTest.java @@ -46,7 +46,7 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase public void testSequence() throws Exception { NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getTestDir()); - Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "test-data", "tst", 1); + Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "activemq-bindings", "bindings", 1); journal.start(); @@ -135,7 +135,7 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase Assert.assertEquals(0, tx.size()); - Assert.assertTrue(records.size() > 0); + Assert.assertTrue("Contains " + records.size(), records.size() > 0); for (RecordInfo record : records) { @@ -149,7 +149,7 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase private StorageManager getJournalStorageManager(final Journal bindingsJournal) { - return new NullStorageManager() + NullStorageManager storageManager = new NullStorageManager() { @Override public synchronized void storeID(long journalID, long id) throws Exception @@ -158,5 +158,15 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase BatchingIDGenerator.createIDEncodingSupport(id), true); } }; + + try + { + storageManager.start(); + } + catch (Throwable ignored) + { + } + + return storageManager; } }