This commit is contained in:
Martyn Taylor 2015-02-27 10:42:18 +00:00
commit a895bf3344
12 changed files with 331 additions and 81 deletions

View File

@ -380,7 +380,7 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent
void addBytesToLargeMessage(SequentialFile appendFile, long messageID, byte[] bytes) throws Exception;
/**
* Stores the given journalID in the bindingsJournal.
* Stores the id from IDManager.
*
* @param journalID
* @param id
@ -388,6 +388,13 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent
*/
void storeID(long journalID, long id) throws Exception;
/*
Deletes the ID from IDManager.
*/
void deleteID(long journalD) throws Exception;
/**
* Read lock the StorageManager. USE WITH CARE!
* <p/>

View File

@ -16,6 +16,9 @@
*/
package org.apache.activemq.core.persistence.impl.journal;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.api.core.ActiveMQBuffer;
@ -42,6 +45,8 @@ public final class BatchingIDGenerator implements IDGenerator
private final StorageManager storageManager;
private List<Long> cleanupRecords = null;
public BatchingIDGenerator(final long start, final long checkpointSize, final StorageManager storageManager)
{
counter = new AtomicLong(start);
@ -60,8 +65,31 @@ public final class BatchingIDGenerator implements IDGenerator
storeID(recordID, recordID);
}
/**
* A method to cleanup old records after started
*/
public void cleanup()
{
if (cleanupRecords != null)
{
Iterator<Long> iterRecord = cleanupRecords.iterator();
while (iterRecord.hasNext())
{
Long record = iterRecord.next();
if (iterRecord.hasNext())
{
// we don't want to remove the last record
deleteID(record.longValue());
}
}
cleanupRecords.clear(); // help GC
cleanupRecords = null;
}
}
public void loadState(final long journalID, final ActiveMQBuffer buffer)
{
addCleanupRecord(journalID);
IDCounterEncoding encoding = new IDCounterEncoding();
encoding.decode(buffer);
@ -93,10 +121,33 @@ public final class BatchingIDGenerator implements IDGenerator
if (id >= nextID)
{
nextID += checkpointSize;
storeID(counter.incrementAndGet(), nextID);
if (!storageManager.isStarted())
{
// This could happen after the server is stopped
// while notifications are being sent and ID gerated.
// If the ID is intended to the journal you would know soon enough
// so we just ignore this for now
ActiveMQServerLogger.LOGGER.debug("The journalStorageManager is not loaded. " +
"This is probably ok as long as it's a notification being sent after shutdown");
}
else
{
storeID(counter.getAndIncrement(), nextID);
}
}
}
private void addCleanupRecord(long id)
{
if (cleanupRecords == null)
{
cleanupRecords = new LinkedList<>();
}
cleanupRecords.add(id);
}
private void storeID(final long journalID, final long id)
{
try
@ -109,6 +160,18 @@ public final class BatchingIDGenerator implements IDGenerator
}
}
private void deleteID(final long journalID)
{
try
{
storageManager.deleteID(journalID);
}
catch (Exception e)
{
ActiveMQServerLogger.LOGGER.batchingIdError(e);
}
}
public static EncodingSupport createIDEncodingSupport(final long id)
{
return new IDCounterEncoding(id);

View File

@ -1488,6 +1488,21 @@ public class JournalStorageManager implements StorageManager
}
}
@Override
public void deleteID(long journalD) throws Exception
{
readLock();
try
{
bindingsJournal.appendDeleteRecord(journalD, false);
}
finally
{
readUnLock();
}
}
public void deleteAddressSetting(SimpleString addressMatch) throws Exception
{
PersistedAddressSetting oldSetting = mapPersistedAddressSettings.remove(addressMatch);
@ -2200,6 +2215,9 @@ public class JournalStorageManager implements StorageManager
}
}
// This will instruct the IDGenerator to cleanup old records
idGenerator.cleanup();
return bindingsInfo;
}

View File

@ -631,4 +631,10 @@ public class NullStorageManager implements StorageManager
{
// no-op
}
@Override
public void deleteID(long journalD) throws Exception
{
}
}

View File

@ -59,20 +59,23 @@ public class AssertionLoggerHandler extends ExtHandler
}
}
public static void assertMessageWasLogged(String assertionMessage, String expectedMessage)
{
if (!messages.containsKey(expectedMessage))
{
throw new AssertionError(assertionMessage);
}
}
public static void assertMessageWasLogged(String message)
/**
* is there any record matching Level?
* @param level
* @return
*/
public static boolean hasLevel(Level level)
{
if (!messages.containsKey(message))
for (ExtLogRecord record : messages.values())
{
throw new AssertionError(Arrays.toString(messages.keySet().toArray()));
if (record.getLevel().equals(level))
{
return true;
}
}
return false;
}
/**

View File

@ -1000,25 +1000,56 @@ public abstract class ServiceTestBase extends UnitTestCase
*/
protected HashMap<Integer, AtomicInteger> countJournalLivingRecords(Configuration config) throws Exception
{
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<Integer, AtomicInteger>();
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(getJournalDir(), null);
return internalCountJournalLivingRecords(config, true);
}
JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(),
0,
0,
messagesFF,
"activemq-data",
"amq",
1);
messagesJournal.start();
/**
* This method will load a journal and count the living records
*
* @param config
* @param messageJournal if true -> MessageJournal, false -> BindingsJournal
* @return
* @throws Exception
*/
protected HashMap<Integer, AtomicInteger> internalCountJournalLivingRecords(Configuration config, boolean messageJournal) throws Exception
{
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<Integer, AtomicInteger>();
SequentialFileFactory ff;
JournalImpl journal;
if (messageJournal)
{
ff = new NIOSequentialFileFactory(getJournalDir(), null);
journal = new JournalImpl(config.getJournalFileSize(),
config.getJournalMinFiles(),
0,
0,
ff,
"activemq-data",
"amq",
1);
}
else
{
ff = new NIOSequentialFileFactory(getBindingsDir(), null);
journal = new JournalImpl(1024 * 1024,
2,
config.getJournalCompactMinFiles(),
config.getJournalCompactPercentage(),
ff,
"activemq-bindings",
"bindings",
1);
}
journal.start();
final List<RecordInfo> committedRecords = new LinkedList<RecordInfo>();
final List<PreparedTransactionInfo> preparedTransactions = new LinkedList<PreparedTransactionInfo>();
messagesJournal.load(committedRecords, preparedTransactions, null, false);
journal.load(committedRecords, preparedTransactions, null, false);
for (RecordInfo info : committedRecords)
{
@ -1033,7 +1064,7 @@ public abstract class ServiceTestBase extends UnitTestCase
}
messagesJournal.stop();
journal.stop();
return recordsType;
}

View File

@ -162,11 +162,6 @@ It is possible to limit which protocols are supported by using the
<connector name="netty">tcp://localhost:61617?protocols=CORE,AMQP</connector>
> **Note**
>
> The `protocol` parameter is now deprecated
## Configuring Netty TCP
Netty TCP is a simple unencrypted TCP sockets based transport. Netty TCP
@ -196,18 +191,6 @@ Netty for simple TCP:
> The `host` and `port` parameters are only used in the core API, in
> XML configuration these are set in the URI host and port.
- `useNio`. If this is `true` then Java non blocking NIO will be
used. If set to `false` then old blocking Java IO will be used.
If you require the server to handle many concurrent connections, we
highly recommend that you use non blocking Java NIO. Java NIO does
not maintain a thread per connection so can scale to many more
concurrent connections than with old blocking IO. If you don't
require the server to handle many concurrent connections, you might
get slightly better performance by using old (blocking) IO. The
default value for this property is `false` on the server side and
`false` on the client side.
- `host`. This specifies the host name or IP address to connect to
(when configuring a connector) or to listen on (when configuring an
acceptor). The default value for this property is `localhost`. When

View File

@ -6,6 +6,10 @@ The function of the bridge is to consume messages from a source queue or
topic, and send them to a target queue or topic, typically on a
different server.
> *Notice:*
> The JMS Bridge is not intended as a replacement for transformation and more expert systems such as Camel.
> The JMS Bridge may be useful for fast transfers as this chapter covers, but keep in mind that more complex scenarios requiring transformations will require you to use a more advanced transformation system that will play on use cases that will go beyond ActiveMQ.
The source and target servers do not have to be in the same cluster
which makes bridging suitable for reliably sending messages from one
cluster to another, for instance across a WAN, and where the connection

View File

@ -25,8 +25,16 @@ acknowledged up to that point.
Browsers will read through the page-cursor system.
Consumers with selectors will also navigate through the page-files and
it will ignore messages that don't match the criteria.
Consumers with selectors will also navigate through the page-files and it will ignore messages that don't match the criteria.
> *Warning:*
> When you have a queue, and consumers filtering the queue with a very restrictive selector you may get into a situation where you won't be able to read more data from paging until you consume messages from the queue.
>
> Example: in one consumer you make a selector as 'color="red"'
> but you only have one color red 1 millions messages after blue, you won't be able to consume red until you consume blue ones.
>
> This is different to browsing as we will "browse" the entire queue looking for messages and while we "depage" messages while feeding the queue.
## Configuration

View File

@ -477,45 +477,59 @@ public class HangConsumerTest extends ServiceTestBase
@Test
public void testDuplicateDestinationsOnTopic() throws Exception
{
for (int i = 0; i < 5; i++)
try
{
if (server.locateQueue(SimpleString.toSimpleString("jms.topic.tt")) == null)
for (int i = 0; i < 5; i++)
{
server.createQueue(SimpleString.toSimpleString("jms.topic.tt"), SimpleString.toSimpleString("jms.topic.tt"), SimpleString.toSimpleString(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false);
}
server.stop();
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(getBindingsDir(), null);
JournalImpl messagesJournal = new JournalImpl(1024 * 1024,
2,
0,
0,
messagesFF,
"activemq-bindings",
"bindings",
1);
messagesJournal.start();
LinkedList<RecordInfo> infos = new LinkedList<RecordInfo>();
messagesJournal.load(infos, null, null);
int bindings = 0;
for (RecordInfo info: infos)
{
if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD)
if (server.locateQueue(SimpleString.toSimpleString("jms.topic.tt")) == null)
{
bindings++;
server.createQueue(SimpleString.toSimpleString("jms.topic.tt"), SimpleString.toSimpleString("jms.topic.tt"), SimpleString.toSimpleString(ActiveMQServerImpl.GENERIC_IGNORED_FILTER), true, false);
}
}
assertEquals(1, bindings);
System.out.println("Bindings: " + bindings);
messagesJournal.stop();
if (i < 4) server.start();
server.stop();
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(getBindingsDir(), null);
JournalImpl messagesJournal = new JournalImpl(1024 * 1024,
2,
0,
0,
messagesFF,
"activemq-bindings",
"bindings",
1);
messagesJournal.start();
LinkedList<RecordInfo> infos = new LinkedList<RecordInfo>();
messagesJournal.load(infos, null, null);
int bindings = 0;
for (RecordInfo info : infos)
{
System.out.println("info: " + info);
if (info.getUserRecordType() == JournalRecordIds.QUEUE_BINDING_RECORD)
{
bindings++;
}
}
assertEquals(1, bindings);
System.out.println("Bindings: " + bindings);
messagesJournal.stop();
if (i < 4) server.start();
}
}
finally
{
try
{
server.stop();
}
catch (Throwable ignored)
{
}
}
}

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.tests.integration.server;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.tests.logging.AssertionLoggerHandler;
import org.apache.activemq.tests.util.ServiceTestBase;
import org.jboss.logmanager.Level;
import org.junit.Test;
/**
* @author clebertsuconic
*/
public class SimpleStartStopTest extends ServiceTestBase
{
/**
* Start / stopping the server shouldn't generate any errors.
* Also it shouldn't bloat the journal with lots of IDs (it should do some cleanup when possible)
* <p/>
* This is also validating that the same server could be restarted after stopped
*
* @throws Exception
*/
@Test
public void testStartStopAndCleanupIDs() throws Exception
{
AssertionLoggerHandler.clear();
AssertionLoggerHandler.startCapture();
try
{
ActiveMQServer server = null;
for (int i = 0; i < 50; i++)
{
server = createServer(true, false);
server.start();
server.stop(false);
}
// There shouldn't be any error from starting / stopping the server
assertFalse("There shouldn't be any error for just starting / stopping the server",
AssertionLoggerHandler.hasLevel(Level.ERROR));
assertFalse(AssertionLoggerHandler.findText("AMQ224008"));
HashMap<Integer, AtomicInteger> records = this.internalCountJournalLivingRecords(server.getConfiguration(), false);
AtomicInteger recordCount = records.get((int) JournalRecordIds.ID_COUNTER_RECORD);
assertNotNull(recordCount);
// The server should remove old IDs from the journal
assertTrue("The server should cleanup after IDs on the bindings record. It left " + recordCount +
" ids on the journal", recordCount.intValue() < 5);
System.out.println("RecordCount::" + recordCount);
server.start();
records = this.internalCountJournalLivingRecords(server.getConfiguration(), false);
recordCount = records.get((int) JournalRecordIds.ID_COUNTER_RECORD);
assertNotNull(recordCount);
System.out.println("Record count with server started: " + recordCount);
assertTrue("If this is zero it means we are removing too many records", recordCount.intValue() != 0);
server.stop();
}
finally
{
AssertionLoggerHandler.stopCapture();
}
}
}

View File

@ -46,7 +46,7 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase
public void testSequence() throws Exception
{
NIOSequentialFileFactory factory = new NIOSequentialFileFactory(getTestDir());
Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "test-data", "tst", 1);
Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "activemq-bindings", "bindings", 1);
journal.start();
@ -135,7 +135,7 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase
Assert.assertEquals(0, tx.size());
Assert.assertTrue(records.size() > 0);
Assert.assertTrue("Contains " + records.size(), records.size() > 0);
for (RecordInfo record : records)
{
@ -149,7 +149,7 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase
private StorageManager getJournalStorageManager(final Journal bindingsJournal)
{
return new NullStorageManager()
NullStorageManager storageManager = new NullStorageManager()
{
@Override
public synchronized void storeID(long journalID, long id) throws Exception
@ -158,5 +158,15 @@ public class BatchIDGeneratorUnitTest extends UnitTestCase
BatchingIDGenerator.createIDEncodingSupport(id), true);
}
};
try
{
storageManager.start();
}
catch (Throwable ignored)
{
}
return storageManager;
}
}