This closes #262

This commit is contained in:
Clebert Suconic 2015-12-10 17:32:25 -05:00
commit 8fa5bb2a07
77 changed files with 1394 additions and 403 deletions

View File

@ -53,7 +53,7 @@ public final class CompactJournal extends LockAbstract {
final IOCriticalErrorListener listener) throws Exception {
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(directory, listener, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
journal.start();

View File

@ -107,7 +107,7 @@ public class DecodeJournal extends LockAbstract {
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
if (journal.orderFiles().size() != 0) {
throw new IllegalStateException("Import needs to create a brand new journal");

View File

@ -101,7 +101,7 @@ public class EncodeJournal extends LockAbstract {
final PrintStream out) throws Exception {
NIOSequentialFileFactory nio = new NIOSequentialFileFactory(new File(directory), null, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
JournalImpl journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, nio, journalPrefix, journalSuffix, 1);
List<JournalFile> files = journal.orderFiles();

View File

@ -310,7 +310,7 @@ public final class XmlDataExporter extends LockAbstract {
private void getJmsBindings() throws Exception {
SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1);
Journal jmsJournal = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
Journal jmsJournal = new JournalImpl(1024 * 1024, 2, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
jmsJournal.start();

View File

@ -43,7 +43,9 @@ under the License.
<large-messages-directory>${data.dir}/large-messages</large-messages-directory>
<journal-min-files>10</journal-min-files>
<journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files>
${journal-buffer.settings}
${connector-config.settings}
<acceptors>

View File

@ -99,6 +99,7 @@ public final class ActiveMQDefaultConfiguration {
// These defaults are applied depending on whether the journal type
// is NIO or AIO.
private static int DEFAULT_JOURNAL_MAX_IO_AIO = 500;
private static int DEFAULT_JOURNAL_POOL_FILES = -1;
private static int DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO;
private static int DEFAULT_JOURNAL_BUFFER_SIZE_AIO = ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO;
private static int DEFAULT_JOURNAL_MAX_IO_NIO = 1;
@ -679,6 +680,14 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_JOURNAL_MIN_FILES;
}
/**
* How many journal files can be resued
* @return
*/
public static int getDefaultJournalPoolFiles() {
return DEFAULT_JOURNAL_POOL_FILES;
}
/**
* The percentage of live data on which we consider compacting the journal
*/

View File

@ -148,8 +148,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private String liveNodeID;
private Set<ConnectionLifeCycleListener> lifeCycleListeners;
// We need to cache this value here since some listeners may be registered after connectionReadyForWrites was called.
private boolean connectionReadyForWrites;
@ -222,8 +220,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
confirmationWindowWarning = new ConfirmationWindowWarning(serverLocator.getConfirmationWindowSize() < 0);
lifeCycleListeners = new HashSet<>();
connectionReadyForWrites = true;
}
@ -238,14 +234,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
return newFailoverLock;
}
@Override
public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
synchronized (connectionReadyLock) {
lifeCycleListener.connectionReadyForWrites(connection.getTransportConnection().getID(), connectionReadyForWrites);
lifeCycleListeners.add(lifeCycleListener);
}
}
@Override
public void connect(final int initialConnectAttempts,
final boolean failoverOnInitialConnection) throws ActiveMQException {
@ -395,14 +383,6 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
@Override
public void connectionReadyForWrites(final Object connectionID, final boolean ready) {
synchronized (connectionReadyLock) {
if (connectionReadyForWrites != ready) {
connectionReadyForWrites = ready;
for (ConnectionLifeCycleListener lifeCycleListener : lifeCycleListeners) {
lifeCycleListener.connectionReadyForWrites(connectionID, ready);
}
}
}
}
@Override

View File

@ -22,7 +22,6 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.TransportConfiguration;
import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
public interface ClientSessionFactoryInternal extends ClientSessionFactory {
@ -58,6 +57,4 @@ public interface ClientSessionFactoryInternal extends ClientSessionFactory {
ConfirmationWindowWarning getConfirmationWindowWarning();
Lock lockFailover();
void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener);
}

View File

@ -44,8 +44,8 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.ConfirmationWindowWarning;
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
@ -408,6 +408,13 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
return createConsumer(SimpleString.toSimpleString(queueName), null, browseOnly);
}
@Override
public boolean isWritable(ReadyListener callback) {
return sessionContext.isWritable(callback);
}
/**
* Note, we DO NOT currently support direct consumers (i.e. consumers where delivery occurs on
* the remoting thread).
@ -695,11 +702,6 @@ public final class ClientSessionImpl implements ClientSessionInternal, FailureLi
return sessionFactory.getLiveNodeId();
}
@Override
public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
sessionFactory.addLifeCycleListener(lifeCycleListener);
}
// ClientSessionInternal implementation
// ------------------------------------------------------------

View File

@ -23,8 +23,8 @@ import org.apache.activemq.artemis.api.core.client.ClientConsumer;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public interface ClientSessionInternal extends ClientSession {
@ -126,5 +126,5 @@ public interface ClientSessionInternal extends ClientSession {
String getNodeId();
void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener);
boolean isWritable(ReadyListener callback);
}

View File

@ -33,8 +33,8 @@ import org.apache.activemq.artemis.api.core.client.SendAcknowledgementHandler;
import org.apache.activemq.artemis.api.core.client.SessionFailureListener;
import org.apache.activemq.artemis.core.client.ActiveMQClientLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ConsumerContext;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
/**
@ -101,8 +101,8 @@ public class DelegatingSession implements ClientSessionInternal {
}
@Override
public void addLifeCycleListener(ConnectionLifeCycleListener lifeCycleListener) {
session.addLifeCycleListener(lifeCycleListener);
public boolean isWritable(ReadyListener callback) {
return session.isWritable(callback);
}
@Override

View File

@ -97,6 +97,7 @@ import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAS
import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.SessionXAStartMessage;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.spi.core.remoting.SessionContext;
import org.apache.activemq.artemis.utils.TokenBucketLimiterImpl;
import org.apache.activemq.artemis.utils.VersionLoader;
@ -238,6 +239,11 @@ public class ActiveMQSessionContext extends SessionContext {
return response.toQueueQuery();
}
@Override
public boolean isWritable(ReadyListener callback) {
return remotingConnection.isWritable(callback);
}
@Override
public ClientConsumerInternal createConsumer(SimpleString queueName,
SimpleString filterString,

View File

@ -18,7 +18,7 @@ package org.apache.activemq.artemis.core.remoting.impl.netty;
import java.net.SocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.Semaphore;
import io.netty.buffer.ByteBuf;
@ -39,7 +39,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.artemis.utils.IPV6Util;
public class NettyConnection implements Connection {
@ -65,10 +64,14 @@ public class NettyConnection implements Connection {
private final Semaphore writeLock = new Semaphore(1);
private final Set<ReadyListener> readyListeners = new ConcurrentHashSet<>();
private RemotingConnection protocolConnection;
private boolean ready = true;
/** if {@link #isWritable(ReadyListener)} returns false, we add a callback
* here for when the connection (or Netty Channel) becomes available again. */
private final ConcurrentLinkedDeque<ReadyListener> readyListeners = new ConcurrentLinkedDeque<>();
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@ -96,6 +99,37 @@ public class NettyConnection implements Connection {
}
// Connection implementation ----------------------------
public boolean isWritable(ReadyListener callback) {
synchronized (readyListeners) {
readyListeners.push(callback);
return ready;
}
}
public void fireReady(final boolean ready) {
synchronized (readyListeners) {
this.ready = ready;
if (ready) {
for (;;) {
ReadyListener readyListener = readyListeners.poll();
if (readyListener == null) {
return;
}
try {
readyListener.readyForWriting();
}
catch (Throwable logOnly) {
ActiveMQClientLogger.LOGGER.warn(logOnly.getMessage(), logOnly);
}
}
}
}
}
@Override
public void forceClose() {
if (channel != null) {
@ -323,28 +357,12 @@ public class NettyConnection implements Connection {
return directDeliver;
}
@Override
public void addReadyListener(final ReadyListener listener) {
readyListeners.add(listener);
}
@Override
public void removeReadyListener(final ReadyListener listener) {
readyListeners.remove(listener);
}
//never allow this
@Override
public ActiveMQPrincipal getDefaultActiveMQPrincipal() {
return null;
}
void fireReady(final boolean ready) {
for (ReadyListener listener : readyListeners) {
listener.readyForWriting(ready);
}
}
@Override
public TransportConfiguration getConnectorConfig() {
if (configuration != null) {

View File

@ -936,6 +936,10 @@ public class NettyConnector extends AbstractConnector {
@Override
public void connectionReadyForWrites(Object connectionID, boolean ready) {
NettyConnection connection = (NettyConnection)connections.get(connectionID);
if (connection != null) {
connection.fireReady(ready);
}
listener.connectionReadyForWrites(connectionID, ready);
}

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.client.ActiveMQClientMessageBundle;
import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public abstract class AbstractRemotingConnection implements RemotingConnection {
@ -50,6 +51,11 @@ public abstract class AbstractRemotingConnection implements RemotingConnection {
return new ArrayList<>(failureListeners);
}
@Override
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
}
protected void callFailureListeners(final ActiveMQException me, String scaleDownTargetNodeID) {
final List<FailureListener> listenersClone = new ArrayList<>(failureListeners);

View File

@ -24,6 +24,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.remoting.BufferHandler;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
/**
* A RemotingConnection is a connection between a client and a server.
@ -181,4 +182,6 @@ public interface RemotingConnection extends BufferHandler {
*/
void flush();
boolean isWritable(ReadyListener callback);
}

View File

@ -39,6 +39,10 @@ public interface Connection {
void setProtocolConnection(RemotingConnection connection);
boolean isWritable(ReadyListener listener);
void fireReady(boolean ready);
/**
* returns the unique id of this wire.
*
@ -104,10 +108,6 @@ public interface Connection {
*/
void checkFlushBatchBuffer();
void addReadyListener(ReadyListener listener);
void removeReadyListener(ReadyListener listener);
/**
* Generates a {@link TransportConfiguration} to be used to connect to the same target this is
* connected to.

View File

@ -18,6 +18,6 @@ package org.apache.activemq.artemis.spi.core.remoting;
public interface ReadyListener {
void readyForWriting(boolean ready);
void readyForWriting();
}

View File

@ -269,4 +269,6 @@ public abstract class SessionContext {
public abstract void cleanup();
public abstract void linkFlowControl(SimpleString address, ClientProducerCreditsImpl clientProducerCredits);
public abstract boolean isWritable(ReadyListener callback);
}

View File

@ -86,7 +86,7 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
SequentialFileFactory bindingsJMS = new NIOSequentialFileFactory(config.getBindingsLocation(), 1);
Journal localJMS = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
Journal localJMS = new JournalImpl(1024 * 1024, 2, config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsJMS, "activemq-jms", "jms", 1);
if (replicator != null) {
jmsJournal = new ReplicatedJournal((byte) 2, localJMS, replicator);

View File

@ -79,9 +79,10 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override
public int getAlignment() {
checkOpened();
return aioFile.getBlockSize();
// TODO: get the alignment from the file system, but we have to cache this, we can't call it every time
/* checkOpened();
return aioFile.getBlockSize(); */
return 512;
}
@Override

View File

@ -73,6 +73,8 @@ public class JournalFilesRepository {
private final int minFiles;
private final int poolSize;
private final int fileSize;
private final String filePrefix;
@ -104,7 +106,8 @@ public class JournalFilesRepository {
final int userVersion,
final int maxAIO,
final int fileSize,
final int minFiles) {
final int minFiles,
final int poolSize) {
if (filePrefix == null) {
throw new IllegalArgumentException("filePrefix cannot be null");
}
@ -120,6 +123,7 @@ public class JournalFilesRepository {
this.fileExtension = fileExtension;
this.minFiles = minFiles;
this.fileSize = fileSize;
this.poolSize = poolSize;
this.userVersion = userVersion;
this.journal = journal;
}
@ -358,7 +362,7 @@ public class JournalFilesRepository {
ActiveMQJournalLogger.LOGGER.deletingFile(file);
file.getFile().delete();
}
else if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < minFiles)) {
else if (!checkDelete || (freeFilesCount.get() + dataFiles.size() + 1 + openedFiles.size() < poolSize) || (poolSize < 0)) {
// Re-initialise it
if (JournalFilesRepository.trace) {
@ -378,7 +382,7 @@ public class JournalFilesRepository {
if (trace) {
ActiveMQJournalLogger.LOGGER.trace("DataFiles.size() = " + dataFiles.size());
ActiveMQJournalLogger.LOGGER.trace("openedFiles.size() = " + openedFiles.size());
ActiveMQJournalLogger.LOGGER.trace("minfiles = " + minFiles);
ActiveMQJournalLogger.LOGGER.trace("minfiles = " + minFiles + ", poolSize = " + poolSize);
ActiveMQJournalLogger.LOGGER.trace("Free Files = " + freeFilesCount.get());
ActiveMQJournalLogger.LOGGER.trace("File " + file +
" being deleted as freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() (" +

View File

@ -40,21 +40,20 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
import org.apache.activemq.artemis.api.core.Pair;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.IOCompletion;
import org.apache.activemq.artemis.core.journal.JournalLoadInformation;
import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.journal.TestableJournal;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
import org.apache.activemq.artemis.core.journal.impl.dataformat.ByteArrayEncoding;
@ -193,7 +192,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
// Lock used during the append of records
// This lock doesn't represent a global lock.
// After a record is appended, the usedFile can't be changed until the positives and negatives are updated
private final ReentrantLock lockAppend = new ReentrantLock();
private final Object lockAppend = new Object();
/**
* We don't lock the journal during the whole compacting operation. During compacting we only
@ -209,23 +208,27 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
private volatile JournalState state = JournalState.STOPPED;
private volatile int compactCount = 0;
private final Reclaimer reclaimer = new Reclaimer();
// Constructors --------------------------------------------------
public JournalImpl(final int fileSize,
final int minFiles,
final int poolSize,
final int compactMinFiles,
final int compactPercentage,
final SequentialFileFactory fileFactory,
final String filePrefix,
final String fileExtension,
final int maxAIO) {
this(fileSize, minFiles, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
this(fileSize, minFiles, poolSize, compactMinFiles, compactPercentage, fileFactory, filePrefix, fileExtension, maxAIO, 0);
}
public JournalImpl(final int fileSize,
final int minFiles,
final int poolSize,
final int compactMinFiles,
final int compactPercentage,
final SequentialFileFactory fileFactory,
@ -234,6 +237,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
final int maxAIO,
final int userVersion) {
super(fileFactory.isSupportsCallbacks(), fileSize);
if (fileSize % fileFactory.getAlignment() != 0) {
throw new IllegalArgumentException("Invalid journal-file-size " + fileSize + ", It should be multiple of " +
fileFactory.getAlignment());
@ -257,7 +261,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
this.fileFactory = fileFactory;
filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles);
filesRepository = new JournalFilesRepository(fileFactory, this, filePrefix, fileExtension, userVersion, maxAIO, fileSize, minFiles, poolSize);
this.userVersion = userVersion;
}
@ -715,8 +719,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
callback.storeLineUp();
}
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(addRecord, false, sync, null, callback);
if (JournalImpl.TRACE_RECORDS) {
@ -729,9 +732,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
records.put(id, new JournalRecord(usedFile, addRecord.getEncodeSize()));
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -763,8 +763,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
callback.storeLineUp();
}
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(updateRecord, false, sync, null, callback);
if (JournalImpl.TRACE_RECORDS) {
@ -784,9 +783,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
jrnRecord.addUpdateFile(usedFile, updateRecord.getEncodeSize());
}
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -821,8 +817,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
callback.storeLineUp();
}
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(deleteRecord, false, sync, null, callback);
if (JournalImpl.TRACE_RECORDS) {
@ -839,9 +834,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
}
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -862,8 +854,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(addRecord, false, false, tx, null);
if (JournalImpl.TRACE_RECORDS) {
@ -878,9 +869,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addPositive(usedFile, id, addRecord.getEncodeSize());
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -911,8 +899,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(updateRecordTX, false, false, tx, null);
if (JournalImpl.TRACE_RECORDS) {
@ -927,9 +914,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addPositive(usedFile, id, updateRecordTX.getEncodeSize());
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -949,8 +933,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
JournalTransaction tx = getTransactionInfo(txID);
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(deleteRecordTX, false, false, tx, null);
if (JournalImpl.TRACE_RECORDS) {
@ -963,9 +946,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.addNegative(usedFile, id);
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -1003,8 +983,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
callback.storeLineUp();
}
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(prepareRecord, true, sync, tx, callback);
if (JournalImpl.TRACE_RECORDS) {
@ -1013,9 +992,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.prepare(usedFile);
}
finally {
lockAppend.unlock();
}
}
finally {
@ -1053,8 +1029,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
callback.storeLineUp();
}
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(commitRecord, true, sync, tx, callback);
if (JournalImpl.TRACE_RECORDS) {
@ -1063,9 +1038,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
tx.commit(usedFile);
}
finally {
lockAppend.unlock();
}
}
finally {
@ -1094,15 +1066,11 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
callback.storeLineUp();
}
lockAppend.lock();
try {
synchronized (lockAppend) {
JournalFile usedFile = appendRecord(rollbackRecord, false, sync, tx, callback);
tx.rollback(usedFile);
}
finally {
lockAppend.unlock();
}
}
finally {
@ -1289,11 +1257,17 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
* Note: only synchronized methods on journal are methods responsible for the life-cycle such as
* stop, start records will still come as this is being executed
*/
public synchronized void compact() throws Exception {
if (compactor != null) {
throw new IllegalStateException("There is pending compacting operation");
}
if (ActiveMQJournalLogger.LOGGER.isDebugEnabled()) {
ActiveMQJournalLogger.LOGGER.debug("JournalImpl::compact compacting journal " + (++compactCount));
}
compactorLock.writeLock().lock();
try {
ArrayList<JournalFile> dataFilesToProcess = new ArrayList<>(filesRepository.getDataFilesCount());
@ -2067,14 +2041,10 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
public void forceMoveNextFile() throws Exception {
journalLock.readLock().lock();
try {
lockAppend.lock();
try {
synchronized (lockAppend) {
moveNextFile(false);
debugWait();
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.readLock().unlock();
@ -2131,9 +2101,7 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
journalLock.writeLock().lock();
try {
lockAppend.lock();
try {
synchronized (lockAppend) {
setJournalState(JournalState.STOPPED);
@ -2172,9 +2140,6 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
currentFile = null;
}
finally {
lockAppend.unlock();
}
}
finally {
journalLock.writeLock().unlock();
@ -2666,33 +2631,31 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
@Override
public void run() {
lockAppend.lock();
try {
synchronized (lockAppend) {
try {
final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
final ByteArrayEncoding byteEncoder = new ByteArrayEncoding(new byte[128 * 1024]);
JournalInternalRecord blastRecord = new JournalInternalRecord() {
JournalInternalRecord blastRecord = new JournalInternalRecord() {
@Override
public int getEncodeSize() {
return byteEncoder.getEncodeSize();
@Override
public int getEncodeSize() {
return byteEncoder.getEncodeSize();
}
@Override
public void encode(final ActiveMQBuffer buffer) {
byteEncoder.encode(buffer);
}
};
for (int i = 0; i < pages; i++) {
appendRecord(blastRecord, false, false, null, null);
}
@Override
public void encode(final ActiveMQBuffer buffer) {
byteEncoder.encode(buffer);
}
};
for (int i = 0; i < pages; i++) {
appendRecord(blastRecord, false, false, null, null);
}
}
catch (Exception e) {
ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
}
finally {
lockAppend.unlock();
catch (Exception e) {
ActiveMQJournalLogger.LOGGER.failedToPerfBlast(e);
}
}
}
}
@ -2863,4 +2826,12 @@ public class JournalImpl extends JournalBase implements TestableJournal, Journal
throw new RuntimeException(e);
}
}
/**
* For tests only
*/
public int getCompactCount() {
return compactCount;
}
}

BIN
artemis-native/bin/libartemis-native-64.so Executable file → Normal file

Binary file not shown.

View File

@ -63,6 +63,9 @@ struct io_control {
int dumbWriteHandler = 0;
char dumbPath[PATH_MAX];
#define ONE_MEGA 1048576l
void * oneMegaBuffer = 0;
jclass submitClass = NULL;
jmethodID errorMethod = NULL;
@ -121,6 +124,12 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
if ((*vm)->GetEnv(vm, (void**) &env, JNI_VERSION_1_6) != JNI_OK) {
return JNI_ERR;
} else {
if (posix_memalign(&oneMegaBuffer, 512, ONE_MEGA) != 0)
{
fprintf(stderr, "Could not allocate the 1 Mega Buffer for initializing files\n");
return JNI_ERR;
}
memset(oneMegaBuffer, 0, ONE_MEGA);
sprintf (dumbPath, "%s/artemisJLHandler_XXXXXX", P_tmpdir);
dumbWriteHandler = mkstemp (dumbPath);
@ -219,6 +228,8 @@ void JNI_OnUnload(JavaVM* vm, void* reserved) {
} else {
closeDumbHandlers();
free(oneMegaBuffer);
// delete global references so the GC can collect them
if (runtimeExceptionClass != NULL) {
(*env)->DeleteGlobalRef(env, runtimeExceptionClass);
@ -757,17 +768,34 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fa
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_fill
(JNIEnv * env, jclass clazz, jint fd, jlong size)
{
void * preAllocBuffer = 0;
if (posix_memalign(&preAllocBuffer, 512, size) != 0)
int i;
int blocks = size / ONE_MEGA;
int rest = size % ONE_MEGA;
#ifdef DEBUG
fprintf (stderr, "blocks = %d, rest=%d\n", blocks, rest);
#endif
lseek (fd, 0, SEEK_SET);
for (i = 0; i < blocks; i++)
{
throwOutOfMemoryError(env);
return;
if (write(fd, oneMegaBuffer, ONE_MEGA) < 0)
{
throwIOException(env, "Cannot initialize file");
return;
}
}
if (rest != 0l)
{
if (write(fd, oneMegaBuffer, rest) < 0)
{
throwIOException(env, "Cannot initialize file");
return;
}
}
memset(preAllocBuffer, 0, size);
lseek (fd, 0, SEEK_SET);
write(fd, preAllocBuffer, size);
lseek (fd, 0, SEEK_SET);
free (preAllocBuffer);
}
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_memsetBuffer

View File

@ -49,7 +49,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
* <br>
* Or else the native module won't be loaded because of version mismatches
*/
private static final int EXPECTED_NATIVE_VERSION = 3;
private static final int EXPECTED_NATIVE_VERSION = 5;
private static boolean loaded = false;

View File

@ -48,9 +48,6 @@ public final class LibaioFile<Callback extends SubmitInfo> {
return LibaioContext.lock(fd);
}
/**
* {@inheritDoc}
*/
public void close() throws IOException {
open = false;
LibaioContext.close(fd);

View File

@ -93,12 +93,21 @@ public class LibaioTest {
}
@Test
public void testInitAndFallocate() throws Exception {
LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
fileDescriptor.fallocate(1024 * 1024);
public void testInitAndFallocate10M() throws Exception {
testInit(10 * 1024 * 1024);
}
ByteBuffer buffer = fileDescriptor.newBuffer(1024 * 1024);
fileDescriptor.read(0, 1024 * 1024, buffer, new TestInfo());
@Test
public void testInitAndFallocate10M100K() throws Exception {
testInit(10 * 1024 * 1024 + 100 * 1024);
}
private void testInit(int size) throws IOException {
LibaioFile fileDescriptor = control.openFile(temporaryFolder.newFile("test.bin"), true);
fileDescriptor.fallocate(size);
ByteBuffer buffer = fileDescriptor.newBuffer(size);
fileDescriptor.read(0, size, buffer, new TestInfo());
TestInfo[] callbacks = new TestInfo[1];
control.poll(callbacks, 1, 1);
@ -108,17 +117,27 @@ public class LibaioTest {
buffer.position(0);
LibaioFile fileDescriptor2 = control.openFile(temporaryFolder.newFile("test2.bin"), true);
fileDescriptor2.fill(1024 * 1024);
fileDescriptor2.read(0, 1024 * 1024, buffer, new TestInfo());
fileDescriptor2.fill(size);
fileDescriptor2.read(0, size, buffer, new TestInfo());
control.poll(callbacks, 1, 1);
for (int i = 0; i < 1024 * 1024; i++) {
for (int i = 0; i < size; i++) {
Assert.assertEquals(0, buffer.get());
}
LibaioContext.freeBuffer(buffer);
}
@Test
public void testInitAndFallocate10K() throws Exception {
testInit(10 * 1024);
}
@Test
public void testInitAndFallocate20K() throws Exception {
testInit(20 * 1024);
}
@Test
public void testSubmitWriteOnTwoFiles() throws Exception {

View File

@ -114,7 +114,7 @@ public class ActiveMQProtonConnectionCallback implements AMQPConnectionCallback
@Override
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
return new ProtonSessionIntegrationCallback(this, manager, connection);
return new ProtonSessionIntegrationCallback(this, manager, connection, this.connection);
}
}

View File

@ -20,6 +20,8 @@ import java.util.concurrent.Executor;
import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.transport.AmqpError;
import org.apache.qpid.proton.amqp.transport.ErrorCondition;
@ -36,7 +38,6 @@ import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.ServerSession;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ByteUtil;
import org.apache.activemq.artemis.utils.IDGenerator;
import org.apache.activemq.artemis.utils.SimpleIDGenerator;
@ -58,16 +59,26 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
private final AMQPConnectionContext connection;
private final Connection transportConnection;
private ServerSession serverSession;
private AMQPSessionContext protonSession;
public ProtonSessionIntegrationCallback(ActiveMQProtonConnectionCallback protonSPI,
ProtonProtocolManager manager,
AMQPConnectionContext connection) {
AMQPConnectionContext connection,
Connection transportConnection) {
this.protonSPI = protonSPI;
this.manager = manager;
this.connection = connection;
this.transportConnection = transportConnection;
}
@Override
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
}
@Override
@ -305,16 +316,6 @@ public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, Se
public void closed() {
}
@Override
public void addReadyListener(ReadyListener listener) {
}
@Override
public void removeReadyListener(ReadyListener listener) {
}
@Override
public void disconnect(ServerConsumer consumer, String queueName) {
synchronized (connection.getLock()) {

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.remoting.CloseListener;
import org.apache.activemq.artemis.core.remoting.FailureListener;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public class MQTTConnection implements RemotingConnection {
@ -52,6 +53,10 @@ public class MQTTConnection implements RemotingConnection {
this.destroyed = false;
}
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
}
@Override
public Object getID() {
return transportConnection.getID();

View File

@ -59,7 +59,7 @@ public class MQTTSession {
mqttConnectionManager = new MQTTConnectionManager(this);
mqttPublishManager = new MQTTPublishManager(this);
sessionCallback = new MQTTSessionCallback(this);
sessionCallback = new MQTTSessionCallback(this, connection);
subscriptionManager = new MQTTSubscriptionManager(this);
retainMessageManager = new MQTTRetainMessageManager(this);

View File

@ -25,12 +25,19 @@ import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
public class MQTTSessionCallback implements SessionCallback {
private MQTTSession session;
private final MQTTSession session;
private final MQTTConnection connection;
private MQTTLogger log = MQTTLogger.LOGGER;
public MQTTSessionCallback(MQTTSession session) throws Exception {
public MQTTSessionCallback(MQTTSession session, MQTTConnection connection) throws Exception {
this.session = session;
this.connection = connection;
}
@Override
public boolean isWritable(ReadyListener callback) {
return connection.isWritable(callback);
}
@Override
@ -54,16 +61,6 @@ public class MQTTSessionCallback implements SessionCallback {
return 1;
}
@Override
public void addReadyListener(ReadyListener listener) {
session.getConnection().getTransportConnection().addReadyListener(listener);
}
@Override
public void removeReadyListener(ReadyListener listener) {
session.getConnection().getTransportConnection().removeReadyListener(listener);
}
@Override
public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) {
return sendMessage(message, consumer, deliveryCount);

View File

@ -52,6 +52,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@ -160,6 +161,9 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor, S
this.creationTime = System.currentTimeMillis();
}
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
}
// SecurityAuth implementation
@Override

View File

@ -30,6 +30,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
@ -58,7 +59,6 @@ import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.transaction.impl.XidImpl;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.wireformat.WireFormat;
public class AMQSession implements SessionCallback {
@ -147,6 +147,11 @@ public class AMQSession implements SessionCallback {
started.set(true);
}
@Override
public boolean isWritable(ReadyListener callback) {
return connection.isWritable(callback);
}
@Override
public void sendProducerCreditsMessage(int credits, SimpleString address) {
// TODO Auto-generated method stub
@ -186,18 +191,6 @@ public class AMQSession implements SessionCallback {
}
@Override
public void addReadyListener(ReadyListener listener) {
// TODO Auto-generated method stub
}
@Override
public void removeReadyListener(ReadyListener listener) {
// TODO Auto-generated method stub
}
@Override
public boolean hasCredits(ServerConsumer consumerID) {
AMQConsumer amqConsumer = consumers.get(consumerID.getID());

View File

@ -41,6 +41,7 @@ import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.ConfigurationHelper;
import org.apache.activemq.artemis.utils.VersionLoader;
@ -118,6 +119,10 @@ public final class StompConnection implements RemotingConnection {
return frame;
}
public boolean isWritable(ReadyListener callback) {
return transportConnection.isWritable(callback);
}
public boolean hasBytes() {
return frameHandler.hasBytes();
}

View File

@ -73,6 +73,11 @@ public class StompSession implements SessionCallback {
this.consumerCredits = ConfigurationHelper.getIntProperty(TransportConstants.STOMP_CONSUMERS_CREDIT, TransportConstants.STOMP_DEFAULT_CONSUMERS_CREDIT, connection.getAcceptorUsed().getConfiguration());
}
@Override
public boolean isWritable(ReadyListener callback) {
return connection.isWritable(callback);
}
void setServerSession(ServerSession session) {
this.session = session;
}
@ -181,16 +186,6 @@ public class StompSession implements SessionCallback {
public void closed() {
}
@Override
public void addReadyListener(final ReadyListener listener) {
connection.getTransportConnection().addReadyListener(listener);
}
@Override
public void removeReadyListener(final ReadyListener listener) {
connection.getTransportConnection().removeReadyListener(listener);
}
@Override
public void disconnect(ServerConsumer consumerId, String queueName) {
StompSubscription stompSubscription = subscriptions.remove(consumerId.getID());

View File

@ -546,6 +546,13 @@ public interface Configuration {
*/
Configuration setJournalCompactMinFiles(int minFiles);
/** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_SIZE}.*/
int getJournalPoolFiles();
/** Number of files that would be acceptable to keep on a pool. Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_POOL_SIZE}.*/
Configuration setJournalPoolFiles(int poolSize);
/**
* Returns the percentage of live data before compacting the journal. <br>
* Default value is {@link org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration#DEFAULT_JOURNAL_COMPACT_PERCENTAGE}.

View File

@ -151,6 +151,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
protected int journalFileSize = ActiveMQDefaultConfiguration.getDefaultJournalFileSize();
protected int journalPoolFiles = ActiveMQDefaultConfiguration.getDefaultJournalPoolFiles();
protected int journalMinFiles = ActiveMQDefaultConfiguration.getDefaultJournalMinFiles();
// AIO and NIO need different values for these attributes
@ -669,6 +671,18 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this;
}
@Override
public int getJournalPoolFiles() {
return journalPoolFiles;
}
@Override
public Configuration setJournalPoolFiles(int poolSize) {
this.journalPoolFiles = poolSize;
return this;
}
@Override
public int getJournalMinFiles() {
return journalMinFiles;

View File

@ -470,6 +470,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
config.setJournalMinFiles(getInteger(e, "journal-min-files", config.getJournalMinFiles(), Validators.GT_ZERO));
config.setJournalPoolFiles(getInteger(e, "journal-pool-files", config.getJournalPoolFiles(), Validators.MINUS_ONE_OR_GT_ZERO));
config.setJournalCompactMinFiles(getInteger(e, "journal-compact-min-files", config.getJournalCompactMinFiles(), Validators.GE_ZERO));
config.setJournalCompactPercentage(getInteger(e, "journal-compact-percentage", config.getJournalCompactPercentage(), Validators.PERCENTAGE));

View File

@ -106,7 +106,7 @@ public final class DescribeJournal {
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(bindingsDir, null, 1);
JournalImpl bindings = new JournalImpl(1024 * 1024, 2, -1, 0, bindingsFF, "activemq-bindings", "bindings", 1);
JournalImpl bindings = new JournalImpl(1024 * 1024, 2, 2, -1, 0, bindingsFF, "activemq-bindings", "bindings", 1);
describeJournal(bindingsFF, bindings, bindingsDir);
}
@ -117,7 +117,7 @@ public final class DescribeJournal {
// Will use only default values. The load function should adapt to anything different
ConfigurationImpl defaultValues = new ConfigurationImpl();
JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(), defaultValues.getJournalMinFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
JournalImpl messagesJournal = new JournalImpl(defaultValues.getJournalFileSize(), defaultValues.getJournalMinFiles(), defaultValues.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
return describeJournal(messagesFF, messagesJournal, messagesDir);
}

View File

@ -231,7 +231,7 @@ public class JournalStorageManager implements StorageManager {
SequentialFileFactory bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
Journal localBindings = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1);
Journal localBindings = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1);
bindingsJournal = localBindings;
originalBindingsJournal = localBindings;
@ -255,7 +255,7 @@ public class JournalStorageManager implements StorageManager {
idGenerator = new BatchingIDGenerator(0, JournalStorageManager.CHECKPOINT_BATCH_SIZE, this);
Journal localMessage = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO());
Journal localMessage = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO());
messageJournal = localMessage;
originalMessageJournal = localMessage;

View File

@ -611,6 +611,9 @@ public class ServerSessionPacketHandler implements ChannelHandler {
newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
Connection oldTransportConnection = remotingConnection.getTransportConnection();
remotingConnection = newConnection;
remotingConnection.setCloseListeners(closeListeners);
@ -624,6 +627,10 @@ public class ServerSessionPacketHandler implements ChannelHandler {
session.setTransferring(false);
// We do this because the old connection could be out of credits on netty
// this will force anything to resume after the reattach through the ReadyListener callbacks
oldTransportConnection.fireReady(true);
return serverLastReceivedCommandID;
}
}

View File

@ -148,7 +148,8 @@ public class ActiveMQPacketHandler implements ChannelHandler {
activeMQPrincipal = connection.getDefaultActiveMQPrincipal();
}
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel), null, true);
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(),
new CoreSessionCallback(request.getName(), protocolManager, channel, connection), null, true);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
channel.setHandler(handler);

View File

@ -29,6 +29,7 @@ import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.ServerConsumer;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.protocol.SessionCallback;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
@ -38,12 +39,20 @@ public final class CoreSessionCallback implements SessionCallback {
private ProtocolManager protocolManager;
private final RemotingConnection connection;
private String name;
public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel) {
public CoreSessionCallback(String name, ProtocolManager protocolManager, Channel channel, RemotingConnection connection) {
this.name = name;
this.protocolManager = protocolManager;
this.channel = channel;
this.connection = connection;
}
@Override
public boolean isWritable(ReadyListener callback) {
return connection.isWritable(callback);
}
@Override
@ -101,16 +110,6 @@ public final class CoreSessionCallback implements SessionCallback {
protocolManager.removeHandler(name);
}
@Override
public void addReadyListener(final ReadyListener listener) {
channel.getConnection().getTransportConnection().addReadyListener(listener);
}
@Override
public void removeReadyListener(final ReadyListener listener) {
channel.getConnection().getTransportConnection().removeReadyListener(listener);
}
@Override
public void disconnect(ServerConsumer consumerId, String queueName) {
if (channel.supports(PacketImpl.DISCONNECT_CONSUMER)) {

View File

@ -101,6 +101,14 @@ public class InVMConnection implements Connection {
// no op
}
public boolean isWritable(ReadyListener listener) {
return true;
}
@Override
public void fireReady(boolean ready) {
}
@Override
public RemotingConnection getProtocolConnection() {
return this.protocolConnection;
@ -230,14 +238,6 @@ public class InVMConnection implements Connection {
return -1;
}
@Override
public void addReadyListener(ReadyListener listener) {
}
@Override
public void removeReadyListener(ReadyListener listener) {
}
@Override
public boolean isUsingProtocolHandling() {
return false;

View File

@ -687,6 +687,8 @@ public class NettyAcceptor implements Acceptor {
if (conn != null) {
conn.fireReady(ready);
}
listener.connectionReadyForWrites(connectionID, ready);
}
}

View File

@ -45,7 +45,6 @@ import org.apache.activemq.artemis.core.client.impl.ServerLocatorInternal;
import org.apache.activemq.artemis.core.filter.Filter;
import org.apache.activemq.artemis.core.message.impl.MessageImpl;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.server.ActiveMQComponent;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.HandleStatus;
import org.apache.activemq.artemis.core.server.LargeServerMessage;
@ -58,8 +57,7 @@ import org.apache.activemq.artemis.core.server.impl.QueueImpl;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationService;
import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.artemis.spi.core.remoting.ConnectionLifeCycleListener;
import org.apache.activemq.artemis.spi.core.remoting.ReadyListener;
import org.apache.activemq.artemis.utils.FutureLatch;
import org.apache.activemq.artemis.utils.ReusableLatch;
import org.apache.activemq.artemis.utils.TypedProperties;
@ -69,7 +67,7 @@ import org.apache.activemq.artemis.utils.UUID;
* A Core BridgeImpl
*/
public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ConnectionLifeCycleListener {
public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowledgementHandler, ReadyListener {
// Constants -----------------------------------------------------
private static final boolean isTrace = ActiveMQServerLogger.LOGGER.isTraceEnabled();
@ -135,8 +133,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
private volatile ClientProducer producer;
private volatile boolean connectionWritable = false;
private volatile boolean started;
private volatile boolean stopping = false;
@ -497,6 +493,11 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
}
@Override
public void readyForWriting() {
queue.deliverAsync();
}
@Override
public HandleStatus handle(final MessageReference ref) throws Exception {
if (filter != null && !filter.match(ref.getMessage())) {
@ -504,7 +505,7 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
synchronized (this) {
if (!active || !connectionWritable) {
if (!active || !session.isWritable(this)) {
if (ActiveMQServerLogger.LOGGER.isDebugEnabled()) {
ActiveMQServerLogger.LOGGER.debug(this + "::Ignoring reference on bridge as it is set to inactive ref=" + ref);
}
@ -555,29 +556,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
}
}
@Override
public void connectionCreated(ActiveMQComponent component, Connection connection, String protocol) {
}
@Override
public void connectionDestroyed(Object connectionID) {
}
@Override
public void connectionException(Object connectionID, ActiveMQException me) {
}
@Override
public void connectionReadyForWrites(Object connectionID, boolean ready) {
connectionWritable = ready;
if (connectionWritable) {
queue.deliverAsync();
}
}
// FailureListener implementation --------------------------------
@Override
@ -891,8 +869,6 @@ public class BridgeImpl implements Bridge, SessionFailureListener, SendAcknowled
session.setSendAcknowledgementHandler(BridgeImpl.this);
session.addLifeCycleListener(BridgeImpl.this);
afterConnect();
active = true;

View File

@ -21,7 +21,6 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReadWriteLock;
@ -129,12 +128,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
private boolean transferring = false;
/* As well as consumer credit based flow control, we also tap into TCP flow control (assuming transport is using TCP)
* This is useful in the case where consumer-window-size = -1, but we don't want to OOM by sending messages ad infinitum to the Netty
* write queue when the TCP buffer is full, e.g. the client is slow or has died.
*/
private final AtomicBoolean writeReady = new AtomicBoolean(true);
private final long creationTime;
private AtomicLong consumerRateCheckTime = new AtomicLong(System.currentTimeMillis());
@ -198,8 +191,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
this.strictUpdateDeliveryCount = strictUpdateDeliveryCount;
this.callback.addReadyListener(this);
this.creationTime = System.currentTimeMillis();
if (browseOnly) {
@ -220,6 +211,11 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
}
@Override
public void readyForWriting() {
promptDelivery();
}
// ServerConsumer implementation
// ----------------------------------------------------------------------
@ -289,7 +285,7 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
// If the consumer is stopped then we don't accept the message, it
// should go back into the
// queue for delivery later.
if (!started || transferring) {
if (!started || transferring || !callback.isWritable(this)) {
return HandleStatus.BUSY;
}
@ -395,8 +391,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
ActiveMQServerLogger.LOGGER.trace("ServerConsumerImpl::" + this + " being closed with failed=" + failed, new Exception("trace"));
}
callback.removeReadyListener(this);
setStarted(false);
LargeMessageDeliverer del = largeMessageDeliverer;
@ -811,18 +805,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
}
}
@Override
public void readyForWriting(final boolean ready) {
if (ready) {
writeReady.set(true);
promptDelivery();
}
else {
writeReady.set(false);
}
}
/**
* To be used on tests only
*/

View File

@ -43,9 +43,7 @@ public interface SessionCallback {
void closed();
void addReadyListener(ReadyListener listener);
void removeReadyListener(ReadyListener listener);
void disconnect(ServerConsumer consumerId, String queueName);
boolean isWritable(ReadyListener callback);
}

View File

@ -593,6 +593,14 @@
</xsd:annotation>
</xsd:element>
<xsd:element name="journal-pool-files" type="xsd:int" default="-1" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
how many journal files to pre-create
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="journal-compact-percentage" type="xsd:int" default="30" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>

View File

@ -1630,7 +1630,7 @@ public abstract class ActiveMQTestBase extends Assert {
try {
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(new File(getJournalDir()), null, 1);
messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
final List<RecordInfo> committedRecords = new LinkedList<>();
final List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();
@ -1664,7 +1664,7 @@ public abstract class ActiveMQTestBase extends Assert {
final HashMap<Integer, AtomicInteger> recordsType = new HashMap<>();
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(config.getJournalLocation(), null, 1);
JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
JournalImpl messagesJournal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
List<JournalFile> filesToRead = messagesJournal.orderFiles();
for (JournalFile file : filesToRead) {
@ -1701,11 +1701,11 @@ public abstract class ActiveMQTestBase extends Assert {
if (messageJournal) {
ff = new NIOSequentialFileFactory(config.getJournalLocation(), null, 1);
journal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), 0, 0, ff, "activemq-data", "amq", 1);
journal = new JournalImpl(config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), 0, 0, ff, "activemq-data", "amq", 1);
}
else {
ff = new NIOSequentialFileFactory(config.getBindingsLocation(), null, 1);
journal = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), ff, "activemq-bindings", "bindings", 1);
journal = new JournalImpl(1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), ff, "activemq-bindings", "bindings", 1);
}
journal.start();

View File

@ -54,6 +54,7 @@ Name | Description
[journal-file-size](persistence.md) | the size (in bytes) of each journal file. Default=10485760 (10 MB)
[journal-max-io](persistence.md#configuring.message.journal.journal-max-io) | the maximum number of write requests that can be in the AIO queue at any one time. Default is 500 for AIO and 1 for NIO.
[journal-min-files](persistence.md#configuring.message.journal.journal-min-files) | how many journal files to pre-create. Default=2
[journal-pool-files](persistence.md#configuring.message.journal.journal-pool-files) | -1 (default) means no Limit. The system will create as many files as needed however when reclaiming files it will shrink back to the `journal-pool-files`
[journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true
[journal-sync-transactional](persistence.md) | if true wait for transaction data to be synchronized to the journal before returning response to client. Default=true
[journal-type](persistence.md) | the type of journal to use. Default=ASYNCIO

View File

@ -218,6 +218,18 @@ The message journal is configured using the following attributes in
steady state you should tune this number of files to match that
total amount of data.
- `journal-pool-files`
The system will create as many files as needed however when reclaiming files
it will shrink back to the `journal-pool-files`.
The default to this parameter is -1, meaning it will never delete files on the journal once created.
Notice that the system can't grow infinitely as you are still required to use paging for destinations that can
grow indefinitely.
Notice: in case you get too many files you can use [compacting](tools.md).
- `journal-max-io`
Write requests are queued up before being submitted to the system

View File

@ -47,24 +47,31 @@ OPTIONS
For a full list of data tools commands available use:
```
$ ./artemis help data
NAME
artemis data - data tools like (print|exp|imp|exp|encode|decode)
(example ./artemis data print)
artemis data - data tools group
(print|exp|imp|exp|encode|decode|compact) (example ./artemis data print)
SYNOPSIS
artemis data
artemis data decode [--prefix <prefix>] [--directory <directory>]
[--suffix <suffix>] [--file-size <size>]
artemis data encode [--prefix <prefix>] [--directory <directory>]
[--suffix <suffix>] [--file-size <size>]
artemis data exp [--bindings <binding>]
[--large-messages <largeMessges>] [--paging <paging>]
[--journal <journal>]
artemis data imp [--password <password>] [--port <port>] [--host <host>]
[--user <user>] [--transaction]
artemis data print [--bindings <binding>] [--paging <paging>]
[--journal <journal>]
artemis data compact [--broker <brokerConfig>] [--verbose]
[--paging <paging>] [--journal <journal>]
[--large-messages <largeMessges>] [--bindings <binding>]
artemis data decode [--broker <brokerConfig>] [--suffix <suffix>]
[--verbose] [--paging <paging>] [--prefix <prefix>] [--file-size <size>]
[--directory <directory>] --input <input> [--journal <journal>]
[--large-messages <largeMessges>] [--bindings <binding>]
artemis data encode [--directory <directory>] [--broker <brokerConfig>]
[--suffix <suffix>] [--verbose] [--paging <paging>] [--prefix <prefix>]
[--file-size <size>] [--journal <journal>]
[--large-messages <largeMessges>] [--bindings <binding>]
artemis data exp [--broker <brokerConfig>] [--verbose]
[--paging <paging>] [--journal <journal>]
[--large-messages <largeMessges>] [--bindings <binding>]
artemis data imp [--host <host>] [--verbose] [--port <port>]
[--password <password>] [--transaction] --input <input> [--user <user>]
artemis data print [--broker <brokerConfig>] [--verbose]
[--paging <paging>] [--journal <journal>]
[--large-messages <largeMessges>] [--bindings <binding>]
COMMANDS
With no arguments, Display help information
@ -73,73 +80,145 @@ COMMANDS
Print data records information (WARNING: don't use while a
production server is running)
With --bindings option, The folder used for bindings (default
../data/bindings)
With --broker option, This would override the broker configuration
from the bootstrap
With --paging option, The folder used for paging (default
../data/paging)
With --verbose option, Adds more information on the execution
With --paging option, The folder used for paging (default from
broker.xml)
With --journal option, The folder used for messages journal (default
../data/journal)
from broker.xml)
With --large-messages option, The folder used for large-messages
(default from broker.xml)
With --bindings option, The folder used for bindings (default from
broker.xml)
exp
Export all message-data using an XML that could be interpreted by
any system.
With --bindings option, The folder used for bindings (default
../data/bindings)
With --broker option, This would override the broker configuration
from the bootstrap
With --large-messages option, The folder used for large-messages
(default ../data/largemessages)
With --verbose option, Adds more information on the execution
With --paging option, The folder used for paging (default
../data/paging)
With --paging option, The folder used for paging (default from
broker.xml)
With --journal option, The folder used for messages journal (default
../data/journal)
from broker.xml)
With --large-messages option, The folder used for large-messages
(default from broker.xml)
With --bindings option, The folder used for bindings (default from
broker.xml)
imp
Import all message-data using an XML that could be interpreted by
any system.
With --password option, User name used to import the data. (default
null)
With --port option, The port used to import the data (default 61616)
With --host option, The host used to import the data (default
localhost)
With --user option, User name used to import the data. (default
With --verbose option, Adds more information on the execution
With --port option, The port used to import the data (default 61616)
With --password option, User name used to import the data. (default
null)
With --transaction option, If this is set to true you will need a
whole transaction to commit at the end. (default false)
With --input option, The input file name (default=exp.dmp)
With --user option, User name used to import the data. (default
null)
decode
Decode a journal's internal format into a new journal set of files
With --prefix option, The journal prefix (default activemq-datal)
With --directory option, The journal folder (default
../data/journal)
With --broker option, This would override the broker configuration
from the bootstrap
With --suffix option, The journal suffix (default amq)
With --verbose option, Adds more information on the execution
With --paging option, The folder used for paging (default from
broker.xml)
With --prefix option, The journal prefix (default activemq-data)
With --file-size option, The journal size (default 10485760)
With --directory option, The journal folder (default journal folder
from broker.xml)
With --input option, The input file name (default=exp.dmp)
With --journal option, The folder used for messages journal (default
from broker.xml)
With --large-messages option, The folder used for large-messages
(default from broker.xml)
With --bindings option, The folder used for bindings (default from
broker.xml)
encode
Encode a set of journal files into an internal encoded data format
With --prefix option, The journal prefix (default activemq-datal)
With --directory option, The journal folder (default the journal
folder from broker.xml)
With --directory option, The journal folder (default
../data/journal)
With --broker option, This would override the broker configuration
from the bootstrap
With --suffix option, The journal suffix (default amq)
With --verbose option, Adds more information on the execution
With --paging option, The folder used for paging (default from
broker.xml)
With --prefix option, The journal prefix (default activemq-data)
With --file-size option, The journal size (default 10485760)
With --journal option, The folder used for messages journal (default
from broker.xml)
With --large-messages option, The folder used for large-messages
(default from broker.xml)
With --bindings option, The folder used for bindings (default from
broker.xml)
compact
Compacts the journal of a non running server
With --broker option, This would override the broker configuration
from the bootstrap
With --verbose option, Adds more information on the execution
With --paging option, The folder used for paging (default from
broker.xml)
With --journal option, The folder used for messages journal (default
from broker.xml)
With --large-messages option, The folder used for large-messages
(default from broker.xml)
With --bindings option, The folder used for bindings (default from
broker.xml)
```

19
scripts/one-test.sh Executable file
View File

@ -0,0 +1,19 @@
#!/bin/sh
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
mvn -Ptests -DfailIfNoTests=false -Pextra-tests -DskipPerformanceTests=false -Dtest=$1 test

View File

@ -54,7 +54,7 @@ public class ConsumerStuckTest extends ActiveMQTestBase {
@Test
public void testClientStuckTest() throws Exception {
ServerLocator locator = createNettyNonHALocator().setConnectionTTL(1000).setClientFailureCheckPeriod(100).setConsumerWindowSize(10 * 1024 * 1024);
ServerLocator locator = createNettyNonHALocator().setConnectionTTL(1000).setClientFailureCheckPeriod(100).setConsumerWindowSize(10 * 1024 * 1024).setCallTimeout(1000);
ClientSessionFactory sf = locator.createSessionFactory();
((ClientSessionFactoryImpl) sf).stopPingingAfterOne();
@ -146,7 +146,7 @@ public class ConsumerStuckTest extends ActiveMQTestBase {
@Test
public void testClientStuckTestWithDirectDelivery() throws Exception {
ServerLocator locator = createNettyNonHALocator().setConnectionTTL(1000).setClientFailureCheckPeriod(100).setConsumerWindowSize(10 * 1024 * 1024);
ServerLocator locator = createNettyNonHALocator().setConnectionTTL(1000).setClientFailureCheckPeriod(100).setConsumerWindowSize(10 * 1024 * 1024).setCallTimeout(1000);
ClientSessionFactory sf = locator.createSessionFactory();
((ClientSessionFactoryImpl) sf).stopPingingAfterOne();

View File

@ -427,7 +427,7 @@ public class HangConsumerTest extends ActiveMQTestBase {
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(server.getConfiguration().getBindingsLocation(), null, 1);
JournalImpl messagesJournal = new JournalImpl(1024 * 1024, 2, 0, 0, messagesFF, "activemq-bindings", "bindings", 1);
JournalImpl messagesJournal = new JournalImpl(1024 * 1024, 2, 2, 0, 0, messagesFF, "activemq-bindings", "bindings", 1);
messagesJournal.start();
@ -483,6 +483,11 @@ public class HangConsumerTest extends ActiveMQTestBase {
targetCallback.sendProducerCreditsMessage(credits, address);
}
@Override
public boolean isWritable(ReadyListener callback) {
return true;
}
@Override
public void sendProducerCreditsFailMessage(int credits, SimpleString address) {
targetCallback.sendProducerCreditsFailMessage(credits, address);
@ -538,22 +543,6 @@ public class HangConsumerTest extends ActiveMQTestBase {
targetCallback.closed();
}
/* (non-Javadoc)
* @see SessionCallback#addReadyListener(ReadyListener)
*/
@Override
public void addReadyListener(ReadyListener listener) {
targetCallback.addReadyListener(listener);
}
/* (non-Javadoc)
* @see SessionCallback#removeReadyListener(ReadyListener)
*/
@Override
public void removeReadyListener(ReadyListener listener) {
targetCallback.removeReadyListener(listener);
}
@Override
public void disconnect(ServerConsumer consumerId, String queueName) {
//To change body of implemented methods use File | Settings | File Templates.

View File

@ -191,7 +191,7 @@ public class JournalCrashTest extends ActiveMQTestBase {
*/
private void printJournal() throws Exception {
NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getJournalDir()), 100);
JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 0, 0, factory, "activemq-data", "amq", 100);
JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 2, 0, 0, factory, "activemq-data", "amq", 100);
ArrayList<RecordInfo> records = new ArrayList<>();
ArrayList<PreparedTransactionInfo> transactions = new ArrayList<>();

View File

@ -1496,7 +1496,7 @@ public class PagingTest extends ActiveMQTestBase {
List<PreparedTransactionInfo> list = new ArrayList<>();
JournalImpl jrn = new JournalImpl(config.getJournalFileSize(), 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1);
JournalImpl jrn = new JournalImpl(config.getJournalFileSize(), 2, 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1);
jrn.start();
jrn.load(records, list, null);

View File

@ -266,7 +266,7 @@ public class RedeliveryConsumerTest extends ActiveMQTestBase {
server.stop();
JournalImpl journal = new JournalImpl(server.getConfiguration().getJournalFileSize(), 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1);
JournalImpl journal = new JournalImpl(server.getConfiguration().getJournalFileSize(), 2, 2, 0, 0, new NIOSequentialFileFactory(server.getConfiguration().getJournalLocation(), 1), "activemq-data", "amq", 1);
final AtomicInteger updates = new AtomicInteger();

View File

@ -1756,7 +1756,7 @@ public class BridgeTest extends ActiveMQTestBase {
protected Map<Long, AtomicInteger> loadQueues(ActiveMQServer serverToInvestigate) throws Exception {
SequentialFileFactory messagesFF = new NIOSequentialFileFactory(serverToInvestigate.getConfiguration().getJournalLocation(), 1);
JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(), serverToInvestigate.getConfiguration().getJournalMinFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
JournalImpl messagesJournal = new JournalImpl(serverToInvestigate.getConfiguration().getJournalFileSize(), serverToInvestigate.getConfiguration().getJournalMinFiles(), serverToInvestigate.getConfiguration().getJournalPoolFiles(), 0, 0, messagesFF, "activemq-data", "amq", 1);
List<RecordInfo> records = new LinkedList<>();
List<PreparedTransactionInfo> preparedTransactions = new LinkedList<>();

View File

@ -207,7 +207,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
final byte recordType = (byte) 0;
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO);
journal.start();
@ -486,7 +486,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
final CountDownLatch latchDone = new CountDownLatch(1);
final CountDownLatch latchWait = new CountDownLatch(1);
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
@Override
protected SequentialFile createControlFile(final List<JournalFile> files,

View File

@ -318,7 +318,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
}
public static JournalImpl createJournal(final String journalType, final String journalDir) {
JournalImpl journal = new JournalImpl(10485760, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir), "journaltst", "tst", 500);
JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir), "journaltst", "tst", 500);
return journal;
}

View File

@ -180,7 +180,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
@Test
public void testSpeedTransactional() throws Exception {
Journal journal = new JournalImpl(10 * 1024 * 1024, 10, 0, 0, getFileFactory(), "activemq-data", "amq", 5000);
Journal journal = new JournalImpl(10 * 1024 * 1024, 10, 10, 0, 0, getFileFactory(), "activemq-data", "amq", 5000);
journal.start();
@ -236,7 +236,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
JournalImplTestUnit.log.debug("num Files=" + numFiles);
Journal journal = new JournalImpl(10 * 1024 * 1024, numFiles, 0, 0, getFileFactory(), "activemq-data", "amq", 5000);
Journal journal = new JournalImpl(10 * 1024 * 1024, numFiles, numFiles, 0, 0, getFileFactory(), "activemq-data", "amq", 5000);
journal.start();
@ -259,7 +259,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
journal.stop();
journal = new JournalImpl(10 * 1024 * 1024, numFiles, 0, 0, getFileFactory(), "activemq-data", "amq", 5000);
journal = new JournalImpl(10 * 1024 * 1024, numFiles, numFiles, 0, 0, getFileFactory(), "activemq-data", "amq", 5000);
journal.start();
journal.load(new ArrayList<RecordInfo>(), null, null);

View File

@ -0,0 +1,440 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.performance.storage;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PageCursorProvider;
import org.apache.activemq.artemis.core.paging.impl.Page;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.replication.ReplicationManager;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.RouteContextList;
import org.apache.activemq.artemis.core.server.ServerMessage;
import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.core.transaction.Transaction;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
import java.io.File;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class PersistMultiThreadTest extends ActiveMQTestBase {
final String DIRECTORY = "./target/journaltmp";
FakePagingStore fakePagingStore = new FakePagingStore();
@Test
public void testMultipleWrites() throws Exception {
deleteDirectory(new File(DIRECTORY));
ActiveMQServer server = createServer(true);
server.getConfiguration().setJournalCompactMinFiles(ActiveMQDefaultConfiguration.getDefaultJournalCompactMinFiles());
server.getConfiguration().setJournalCompactPercentage(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage());
server.getConfiguration().setJournalDirectory(DIRECTORY + "/journal");
server.getConfiguration().setBindingsDirectory(DIRECTORY + "/bindings");
server.getConfiguration().setPagingDirectory(DIRECTORY + "/paging");
server.getConfiguration().setLargeMessagesDirectory(DIRECTORY + "/largemessage");
server.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
server.getConfiguration().setJournalMinFiles(2);
server.getConfiguration().setJournalType(JournalType.ASYNCIO);
server.start();
StorageManager storage = server.getStorageManager();
long msgID = storage.generateID();
System.out.println("msgID=" + msgID);
int NUMBER_OF_THREADS = 50;
int NUMBER_OF_MESSAGES = 5000;
MyThread[] threads = new MyThread[NUMBER_OF_THREADS];
final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS);
final CountDownLatch startFlag = new CountDownLatch(1);
final CountDownLatch finishFlag = new CountDownLatch(NUMBER_OF_THREADS);
MyDeleteThread deleteThread = new MyDeleteThread("deleteThread", storage, NUMBER_OF_MESSAGES * NUMBER_OF_THREADS * 10);
deleteThread.start();
for (int i = 0; i < threads.length; i++) {
threads[i] = new MyThread("writer::" + i, storage, NUMBER_OF_MESSAGES, alignFlag, startFlag, finishFlag);
}
for (MyThread t : threads) {
t.start();
}
alignFlag.await();
long startTime = System.currentTimeMillis();
startFlag.countDown();
// I'm using a countDown to avoid measuring time spent on thread context from join.
// i.e. i want to measure as soon as the loops are done
finishFlag.await();
long endtime = System.currentTimeMillis();
System.out.println("Time:: " + (endtime - startTime));
for (MyThread t : threads) {
t.join();
Assert.assertEquals(0, t.errors.get());
}
deleteThread.join();
Assert.assertEquals(0, deleteThread.errors.get());
}
LinkedBlockingDeque<Long> deletes = new LinkedBlockingDeque<>();
class MyThread extends Thread {
final StorageManager storage;
final int numberOfMessages;
final AtomicInteger errors = new AtomicInteger(0);
final CountDownLatch align;
final CountDownLatch start;
final CountDownLatch finish;
MyThread(String name,
StorageManager storage,
int numberOfMessages,
CountDownLatch align,
CountDownLatch start,
CountDownLatch finish) {
super(name);
this.storage = storage;
this.numberOfMessages = numberOfMessages;
this.align = align;
this.start = start;
this.finish = finish;
}
public void run() {
try {
align.countDown();
start.await();
long id = storage.generateID();
long txID = storage.generateID();
// each thread will store a single message that will never be deleted, trying to force compacting to happen
storeMessage(txID, id);
storage.commit(txID);
OperationContext ctx = storage.getContext();
for (int i = 0; i < numberOfMessages; i++) {
txID = storage.generateID();
long[] messageID = new long[10];
for (int msgI = 0; msgI < 10; msgI++) {
id = storage.generateID();
messageID[msgI] = id;
storeMessage(txID, id);
}
storage.commit(txID);
ctx.waitCompletion();
for (long deleteID : messageID) {
deletes.add(deleteID);
}
}
}
catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
finally {
finish.countDown();
}
}
private void storeMessage(long txID, long id) throws Exception {
ServerMessage message = new ServerMessageImpl(id, 10 * 1024);
message.setPagingStore(fakePagingStore);
message.getBodyBuffer().writeBytes(new byte[104]);
message.putStringProperty("hello", "" + id);
storage.storeMessageTransactional(txID, message);
storage.storeReferenceTransactional(txID, 1, id);
message.decrementRefCount();
}
}
class MyDeleteThread extends Thread {
final StorageManager storage;
final int numberOfMessages;
final AtomicInteger errors = new AtomicInteger(0);
MyDeleteThread(String name, StorageManager storage, int numberOfMessages) {
super(name);
this.storage = storage;
this.numberOfMessages = numberOfMessages;
}
public void run() {
long deletesNr = 0;
try {
for (int i = 0; i < numberOfMessages; i++) {
if (i % 1000 == 0) {
// storage.getContext().waitCompletion();
// deletesNr = 0;
// Thread.sleep(200);
}
deletesNr++;
Long deleteID = deletes.poll(10, TimeUnit.MINUTES);
if (deleteID == null) {
System.err.println("Coudn't poll delete info");
errors.incrementAndGet();
break;
}
storage.storeAcknowledge(1, deleteID);
storage.deleteMessage(deleteID);
}
}
catch (Exception e) {
e.printStackTrace(System.out);
errors.incrementAndGet();
}
finally {
System.err.println("Finished the delete loop!!!! deleted " + deletesNr);
}
}
}
class FakePagingStore implements PagingStore {
@Override
public SimpleString getAddress() {
return null;
}
@Override
public int getNumberOfPages() {
return 0;
}
@Override
public int getCurrentWritingPage() {
return 0;
}
@Override
public SimpleString getStoreName() {
return null;
}
@Override
public File getFolder() {
return null;
}
@Override
public AddressFullMessagePolicy getAddressFullMessagePolicy() {
return null;
}
@Override
public long getFirstPage() {
return 0;
}
@Override
public long getPageSizeBytes() {
return 0;
}
@Override
public long getAddressSize() {
return 0;
}
@Override
public long getMaxSize() {
return 0;
}
@Override
public void applySetting(AddressSettings addressSettings) {
}
@Override
public boolean isPaging() {
return false;
}
@Override
public void sync() throws Exception {
}
@Override
public void ioSync() throws Exception {
}
@Override
public boolean page(ServerMessage message,
Transaction tx,
RouteContextList listCtx,
ReentrantReadWriteLock.ReadLock readLock) throws Exception {
return false;
}
@Override
public Page createPage(int page) throws Exception {
return null;
}
@Override
public boolean checkPageFileExists(int page) throws Exception {
return false;
}
@Override
public PagingManager getPagingManager() {
return null;
}
@Override
public PageCursorProvider getCursorProvider() {
return null;
}
@Override
public void processReload() throws Exception {
}
@Override
public Page depage() throws Exception {
return null;
}
@Override
public void forceAnotherPage() throws Exception {
}
@Override
public Page getCurrentPage() {
return null;
}
@Override
public boolean startPaging() throws Exception {
return false;
}
@Override
public void stopPaging() throws Exception {
}
@Override
public void addSize(int size) {
}
@Override
public boolean checkMemory(Runnable runnable) {
return false;
}
@Override
public boolean lock(long timeout) {
return false;
}
@Override
public void unlock() {
}
@Override
public void flushExecutors() {
}
@Override
public Collection<Integer> getCurrentIds() throws Exception {
return null;
}
@Override
public void sendPages(ReplicationManager replicator, Collection<Integer> pageIds) throws Exception {
}
@Override
public void disableCleanup() {
}
@Override
public void enableCleanup() {
}
@Override
public void start() throws Exception {
}
@Override
public void stop() throws Exception {
}
@Override
public boolean isStarted() {
return false;
}
}
}

View File

@ -0,0 +1,297 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.performance.storage;
import javax.jms.BytesMessage;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import java.io.File;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSClient;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.DefaultConnectionProperties;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
public class SendReceiveMultiThreadTest extends ActiveMQTestBase {
final String DIRECTORY = "./target/journaltmp";
ConnectionFactory cf;
Destination destination;
AtomicInteger received = new AtomicInteger(0);
AtomicInteger sent = new AtomicInteger(0);
int NUMBER_OF_THREADS = 400;
int NUMBER_OF_MESSAGES = 5000;
CountDownLatch receivedLatch = new CountDownLatch(NUMBER_OF_MESSAGES * NUMBER_OF_THREADS);
@Test
public void testMultipleWrites() throws Exception {
deleteDirectory(new File(DIRECTORY));
ActiveMQServer server = createServer(true);
server.getConfiguration().setJournalFileSize(10 * 1024 * 1024);
server.getConfiguration().setJournalMinFiles(2);
server.getConfiguration().setJournalCompactMinFiles(ActiveMQDefaultConfiguration.getDefaultJournalCompactMinFiles());
server.getConfiguration().setJournalCompactPercentage(ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage());
server.getConfiguration().setJournalType(JournalType.ASYNCIO);
server.getConfiguration().addAcceptorConfiguration("core", DefaultConnectionProperties.DEFAULT_BROKER_BIND_URL);
server.getConfiguration().setJournalDirectory(DIRECTORY + "/journal");
server.getConfiguration().setBindingsDirectory(DIRECTORY + "/bindings");
server.getConfiguration().setPagingDirectory(DIRECTORY + "/paging");
server.getConfiguration().setLargeMessagesDirectory(DIRECTORY + "/largemessage");
server.getConfiguration().setJournalMaxIO_AIO(200);
// TODO Setup Acceptors
server.start();
Queue queue = server.createQueue(SimpleString.toSimpleString("jms.queue.performanceQueue"), SimpleString.toSimpleString("jms.queue.performanceQueue"), null, true, false);
Queue queue2 = server.createQueue(SimpleString.toSimpleString("jms.queue.stationaryQueue"), SimpleString.toSimpleString("jms.queue.stationaryQueue"), null, true, false);
MyThread[] threads = new MyThread[NUMBER_OF_THREADS];
ConsumerThread[] cthreads = new ConsumerThread[NUMBER_OF_THREADS];
final CountDownLatch alignFlag = new CountDownLatch(NUMBER_OF_THREADS);
final CountDownLatch startFlag = new CountDownLatch(1);
final CountDownLatch finishFlag = new CountDownLatch(NUMBER_OF_THREADS);
cf = new ActiveMQConnectionFactory();
Thread slowSending = new Thread() {
public void run() {
Connection conn = null;
try {
conn = cf.createConnection();
Session session = conn.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(ActiveMQJMSClient.createQueue("stationaryQueue"));
conn.start();
MessageConsumer consumer = session.createConsumer(ActiveMQJMSClient.createQueue("stationaryQueue"));
while (true) {
for (int i = 0; i < 10; i++) {
System.out.println("stationed message");
producer.send(session.createTextMessage("stationed"));
session.commit();
Thread.sleep(1000);
}
for (int i = 0; i < 10; i++) {
consumer.receive(5000);
session.commit();
System.out.println("Receiving stationed");
Thread.sleep(1000);
}
}
}
catch (Exception e) {
e.printStackTrace();
}
finally {
try {
conn.close();
}
catch (Exception ignored) {
}
}
}
};
slowSending.start();
destination = ActiveMQJMSClient.createQueue("performanceQueue");
for (int i = 0; i < threads.length; i++) {
threads[i] = new MyThread("sender::" + i, NUMBER_OF_MESSAGES, alignFlag, startFlag, finishFlag);
cthreads[i] = new ConsumerThread(NUMBER_OF_MESSAGES);
}
for (ConsumerThread t : cthreads) {
t.start();
}
for (MyThread t : threads) {
t.start();
}
Assert.assertEquals(NUMBER_OF_THREADS, queue.getConsumerCount());
alignFlag.await();
long startTime = System.currentTimeMillis();
startFlag.countDown();
// I'm using a countDown to avoid measuring time spent on thread context from join.
// i.e. i want to measure as soon as the loops are done
finishFlag.await();
long endtime = System.currentTimeMillis();
receivedLatch.await();
long endTimeConsuming = System.currentTimeMillis();
for (ConsumerThread t : cthreads) {
t.join();
Assert.assertEquals(0, t.errors);
}
for (MyThread t : threads) {
t.join();
Assert.assertEquals(0, t.errors.get());
}
slowSending.interrupt();
slowSending.join();
server.stop();
System.out.println("Time on sending:: " + (endtime - startTime));
System.out.println("Time on consuming:: " + (endTimeConsuming - startTime));
}
class ConsumerThread extends Thread {
final int numberOfMessages;
Connection connection;
Session session;
MessageConsumer consumer;
ConsumerThread(int numberOfMessages) throws Exception {
super("consumerthread");
this.numberOfMessages = numberOfMessages;
connection = cf.createConnection();
session = connection.createSession(true, Session.SESSION_TRANSACTED);
consumer = session.createConsumer(destination);
connection.start();
}
int errors = 0;
public void run() {
try {
for (int i = 0; i < numberOfMessages; i++) {
Message message = consumer.receive(50000);
if (message == null) {
System.err.println("Could not receive message at i = " + numberOfMessages);
errors++;
break;
}
int r = received.incrementAndGet();
if (r % 1000 == 0) {
System.out.println("Received " + r + " messages");
}
if (i % 50 == 0) {
session.commit();
}
receivedLatch.countDown();
}
session.commit();
connection.close();
}
catch (Exception e) {
e.printStackTrace();
errors++;
}
}
}
class MyThread extends Thread {
final int numberOfMessages;
final AtomicInteger errors = new AtomicInteger(0);
final CountDownLatch align;
final CountDownLatch start;
final CountDownLatch finish;
MyThread(String name, int numberOfMessages, CountDownLatch align, CountDownLatch start, CountDownLatch finish) {
super(name);
this.numberOfMessages = numberOfMessages;
this.align = align;
this.start = start;
this.finish = finish;
}
public void run() {
try {
Connection connection = cf.createConnection();
Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
MessageProducer producer = session.createProducer(destination);
align.countDown();
start.await();
for (int i = 0; i < numberOfMessages; i++) {
BytesMessage msg = session.createBytesMessage();
msg.writeBytes(new byte[1024]);
producer.send(msg);
session.commit();
int s = sent.incrementAndGet();
if (s % 1000 == 0) {
System.out.println("Sent " + s);
}
}
connection.close();
System.out.println("Send " + numberOfMessages + " messages on thread " + Thread.currentThread().getName());
}
catch (Exception e) {
e.printStackTrace();
errors.incrementAndGet();
}
finally {
finish.countDown();
}
}
}
}

View File

@ -75,7 +75,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
public void testInsertAndLoad() throws Exception {
SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl.start();
@ -91,7 +91,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
impl.stop();
factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl.start();
@ -108,7 +108,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
impl.stop();
factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl.start();
@ -136,7 +136,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
public void testInsertUpdateAndLoad() throws Exception {
SequentialFileFactory factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
JournalImpl impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl.start();
@ -153,7 +153,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
impl.stop();
factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
impl = new JournalImpl(10 * 1024 * 1024, 10, 0, 0, factory, "amq", "amq", 1000);
impl = new JournalImpl(10 * 1024 * 1024, 10, 10, 0, 0, factory, "amq", "amq", 1000);
impl.start();
@ -170,7 +170,7 @@ public class AddAndRemoveStressTest extends ActiveMQTestBase {
impl.stop();
factory = new AIOSequentialFileFactory(getTestDirfile(), 1000);
impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl = new JournalImpl(10 * 1024 * 1024, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, AddAndRemoveStressTest.NUMBER_OF_FILES_ON_JOURNAL, 0, 0, factory, "amq", "amq", 1000);
impl.start();

View File

@ -115,7 +115,7 @@ public class JournalCleanupCompactStressTest extends ActiveMQTestBase {
maxAIO = ActiveMQDefaultConfiguration.getDefaultJournalMaxIoNio();
}
journal = new JournalImpl(50 * 1024, 20, 50, ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(), factory, "activemq-data", "amq", maxAIO) {
journal = new JournalImpl(50 * 1024, 20, 20, 50, ActiveMQDefaultConfiguration.getDefaultJournalCompactPercentage(), factory, "activemq-data", "amq", maxAIO) {
@Override
protected void onCompactLockingTheJournal() throws Exception {
}

View File

@ -98,7 +98,7 @@ public abstract class MixupCompactorTestBase extends JournalImplTestBase {
@Override
public void createJournal() throws Exception {
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
journal = new JournalImpl(fileSize, minFiles, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
@Override
public void onCompactDone() {

View File

@ -84,7 +84,7 @@ public class NIOMultiThreadCompactorStressTest extends ActiveMQTestBase {
stopServer();
NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getJournalDir()), 1);
JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 0, 0, factory, "activemq-data", "amq", 100);
JournalImpl journal = new JournalImpl(ActiveMQDefaultConfiguration.getDefaultJournalFileSize(), 2, 2, 0, 0, factory, "activemq-data", "amq", 100);
List<RecordInfo> committedRecords = new ArrayList<>();
List<PreparedTransactionInfo> preparedTransactions = new ArrayList<>();

View File

@ -141,7 +141,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
factory = new FakeSequentialFileFactory(512, true);
try {
journalImpl = new JournalImpl(2000, 2, 0, 0, factory, "tt", "tt", 1000);
journalImpl = new JournalImpl(2000, 2, 2, 0, 0, factory, "tt", "tt", 1000);
Assert.fail("Expected IllegalArgumentException");
}
catch (IllegalArgumentException ignored) {
@ -1201,7 +1201,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
public void testAlignmentOverReload() throws Exception {
factory = new FakeSequentialFileFactory(512, false);
journalImpl = new JournalImpl(512 + 512 * 3, 20, 0, 0, factory, "amq", "amq", 1000);
journalImpl = new JournalImpl(512 + 512 * 3, 20, 20, 0, 0, factory, "amq", "amq", 1000);
journalImpl.start();
@ -1214,7 +1214,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.stop();
journalImpl = new JournalImpl(512 + 1024 + 512, 20, 0, 0, factory, "amq", "amq", 1000);
journalImpl = new JournalImpl(512 + 1024 + 512, 20, 20, 0, 0, factory, "amq", "amq", 1000);
addActiveMQComponent(journalImpl);
journalImpl.start();
journalImpl.load(AlignedJournalImplTest.dummyLoader);
@ -1230,7 +1230,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.stop();
journalImpl = new JournalImpl(512 + 1024 + 512, 20, 0, 0, factory, "amq", "amq", 1000);
journalImpl = new JournalImpl(512 + 1024 + 512, 20, 20, 0, 0, factory, "amq", "amq", 1000);
addActiveMQComponent(journalImpl);
journalImpl.start();
@ -1301,7 +1301,7 @@ public class AlignedJournalImplTest extends ActiveMQTestBase {
journalImpl.stop();
}
journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, 0, 0, factory, "tt", "tt", 1000);
journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, numberOfMinimalFiles, 0, 0, factory, "tt", "tt", 1000);
addActiveMQComponent(journalImpl);
journalImpl.start();

View File

@ -209,7 +209,7 @@ public class JournalAsyncTest extends ActiveMQTestBase {
journalImpl.stop();
}
journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, 0, 0, factory, "tt", "tt", 1000);
journalImpl = new JournalImpl(journalSize, numberOfMinimalFiles, numberOfMinimalFiles, 0, 0, factory, "tt", "tt", 1000);
journalImpl.start();

View File

@ -54,6 +54,8 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
protected int minFiles;
protected int poolSize;
protected int fileSize;
protected boolean sync;
@ -122,7 +124,16 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
// ---------------------------------------------------------------------------------
protected void setup(final int minFreeFiles, final int fileSize, final boolean sync, final int maxAIO) {
this.minFiles = minFreeFiles;
this.poolSize = minFreeFiles;
this.fileSize = fileSize;
this.sync = sync;
this.maxAIO = maxAIO;
}
protected void setup(final int minFreeFiles, final int poolSize, final int fileSize, final boolean sync, final int maxAIO) {
minFiles = minFreeFiles;
this.poolSize = poolSize;
this.fileSize = fileSize;
this.sync = sync;
this.maxAIO = maxAIO;
@ -130,13 +141,14 @@ public abstract class JournalImplTestBase extends ActiveMQTestBase {
protected void setup(final int minFreeFiles, final int fileSize, final boolean sync) {
minFiles = minFreeFiles;
poolSize = minFreeFiles;
this.fileSize = fileSize;
this.sync = sync;
maxAIO = 50;
}
public void createJournal() throws Exception {
journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
journal = new JournalImpl(fileSize, minFiles, poolSize, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO) {
@Override
public void onCompactDone() {
latchDone.countDown();

View File

@ -121,7 +121,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
@Test
public void testParams() throws Exception {
try {
new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, 0, 0, fileFactory, filePrefix, fileExtension, 1);
new JournalImpl(JournalImpl.MIN_FILE_SIZE - 1, 10, 10, 0, 0, fileFactory, filePrefix, fileExtension, 1);
Assert.fail("Should throw exception");
}
@ -130,7 +130,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
}
try {
new JournalImpl(10 * 1024, 1, 0, 0, fileFactory, filePrefix, fileExtension, 1);
new JournalImpl(10 * 1024, 1, 0, 0, 0, fileFactory, filePrefix, fileExtension, 1);
Assert.fail("Should throw exception");
}
@ -139,7 +139,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
}
try {
new JournalImpl(10 * 1024, 10, 0, 0, null, filePrefix, fileExtension, 1);
new JournalImpl(10 * 1024, 10, 0, 0, 0, null, filePrefix, fileExtension, 1);
Assert.fail("Should throw exception");
}
@ -148,7 +148,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
}
try {
new JournalImpl(10 * 1024, 10, 0, 0, fileFactory, null, fileExtension, 1);
new JournalImpl(10 * 1024, 10, 0, 0, 0, fileFactory, null, fileExtension, 1);
Assert.fail("Should throw exception");
}
@ -157,7 +157,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
}
try {
new JournalImpl(10 * 1024, 10, 0, 0, fileFactory, filePrefix, null, 1);
new JournalImpl(10 * 1024, 10, 0, 0, 0, fileFactory, filePrefix, null, 1);
Assert.fail("Should throw exception");
}
@ -166,7 +166,7 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
}
try {
new JournalImpl(10 * 1024, 10, 0, 0, fileFactory, filePrefix, null, 0);
new JournalImpl(10 * 1024, 10, 0, 0, 0, fileFactory, filePrefix, null, 0);
Assert.fail("Should throw exception");
}
@ -567,6 +567,103 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
stopJournal();
}
@Test
public void testOrganicallyGrowNoLimit() throws Exception {
setup(2, -1, 10 * 1024, true, 50);
createJournal();
journal.setAutoReclaim(true);
startJournal();
load();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(2, files1.size());
Assert.assertEquals(0, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
// Fill all the files
for (int i = 0; i < 200; i++) {
add(i);
journal.forceMoveNextFile();
}
for (int i = 0; i < 200; i++) {
delete(i);
}
journal.forceMoveNextFile();
journal.checkReclaimStatus();
files1 = fileFactory.listFiles(fileExtension);
Assert.assertTrue(files1.size() > 200);
int numberOfFiles = files1.size();
for (int i = 300; i < 350; i++) {
add(i);
journal.forceMoveNextFile();
}
journal.checkReclaimStatus();
files1 = fileFactory.listFiles(fileExtension);
Assert.assertTrue(files1.size() > 200);
Assert.assertEquals(numberOfFiles, files1.size());
System.out.println("we have " + files1.size() + " files now");
stopJournal();
}
@Test
public void testOrganicallyWithALimit() throws Exception {
setup(2, 5, 10 * 1024, true, 50);
createJournal();
journal.setAutoReclaim(true);
startJournal();
load();
List<String> files1 = fileFactory.listFiles(fileExtension);
Assert.assertEquals(2, files1.size());
Assert.assertEquals(0, journal.getDataFilesCount());
Assert.assertEquals(0, journal.getFreeFilesCount());
Assert.assertEquals(1, journal.getOpenedFilesCount());
Assert.assertEquals(0, journal.getIDMapSize());
// Fill all the files
for (int i = 0; i < 200; i++) {
add(i);
journal.forceMoveNextFile();
}
journal.checkReclaimStatus();
for (int i = 0; i < 200; i++) {
delete(i);
}
journal.forceMoveNextFile();
journal.checkReclaimStatus();
files1 = fileFactory.listFiles(fileExtension);
Assert.assertTrue("supposed to have less than 10 but it had " + files1.size() + " files created", files1.size() < 10);
stopJournal();
}
// Validate the methods that are used on assertions
@Test
public void testCalculations() throws Exception {

View File

@ -39,7 +39,7 @@ public class BatchIDGeneratorUnitTest extends ActiveMQTestBase {
@Test
public void testSequence() throws Exception {
NIOSequentialFileFactory factory = new NIOSequentialFileFactory(new File(getTestDir()), 1);
Journal journal = new JournalImpl(10 * 1024, 2, 0, 0, factory, "activemq-bindings", "bindings", 1);
Journal journal = new JournalImpl(10 * 1024, 2, 2, 0, 0, factory, "activemq-bindings", "bindings", 1);
journal.start();