This closes #877

This commit is contained in:
jbertram 2016-11-02 13:17:12 -05:00
commit 28645b1b83
43 changed files with 505 additions and 185 deletions

View File

@ -213,6 +213,9 @@ public class Create extends InputAbstract {
@Option(name = "--no-hornetq-acceptor", description = "Disable the HornetQ specific acceptor.") @Option(name = "--no-hornetq-acceptor", description = "Disable the HornetQ specific acceptor.")
boolean noHornetQAcceptor; boolean noHornetQAcceptor;
@Option(name = "--no-fsync", description = "Disable usage of fdatasync (channel.force(false) from java nio) on the journal")
boolean noJournalSync;
boolean IS_WINDOWS; boolean IS_WINDOWS;
boolean IS_CYGWIN; boolean IS_CYGWIN;
@ -567,6 +570,7 @@ public class Create extends InputAbstract {
filters.put("${web.protocol}", "http"); filters.put("${web.protocol}", "http");
filters.put("${extra.web.attributes}", ""); filters.put("${extra.web.attributes}", "");
} }
filters.put("${fsync}", String.valueOf(!noJournalSync));
filters.put("${user}", System.getProperty("user.name", "")); filters.put("${user}", System.getProperty("user.name", ""));
filters.put("${default.port}", String.valueOf(defaultPort + portOffset)); filters.put("${default.port}", String.valueOf(defaultPort + portOffset));
filters.put("${amqp.port}", String.valueOf(AMQP_PORT + portOffset)); filters.put("${amqp.port}", String.valueOf(AMQP_PORT + portOffset));
@ -776,7 +780,7 @@ public class Create extends InputAbstract {
System.out.println(""); System.out.println("");
System.out.println("Auto tuning journal ..."); System.out.println("Auto tuning journal ...");
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, aio); long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, aio);
long nanoseconds = SyncCalculation.toNanos(time, writes); long nanoseconds = SyncCalculation.toNanos(time, writes);
double writesPerMillisecond = (double) writes / (double) time; double writesPerMillisecond = (double) writes / (double) time;
@ -807,7 +811,7 @@ public class Create extends InputAbstract {
// forcing NIO // forcing NIO
return false; return false;
} else if (LibaioContext.isLoaded()) { } else if (LibaioContext.isLoaded()) {
try (LibaioContext context = new LibaioContext(1, true)) { try (LibaioContext context = new LibaioContext(1, true, true)) {
File tmpFile = new File(directory, "validateAIO.bin"); File tmpFile = new File(directory, "validateAIO.bin");
boolean supportsLibaio = true; boolean supportsLibaio = true;
try { try {

View File

@ -46,8 +46,9 @@ public class SyncCalculation {
int blocks, int blocks,
int tries, int tries,
boolean verbose, boolean verbose,
boolean fsync,
boolean aio) throws Exception { boolean aio) throws Exception {
SequentialFileFactory factory = newFactory(datafolder, aio); SequentialFileFactory factory = newFactory(datafolder, fsync, aio);
SequentialFile file = factory.createSequentialFile("test.tmp"); SequentialFile file = factory.createSequentialFile("test.tmp");
try { try {
@ -149,9 +150,9 @@ public class SyncCalculation {
return timeWait; return timeWait;
} }
private static SequentialFileFactory newFactory(File datafolder, boolean aio) { private static SequentialFileFactory newFactory(File datafolder, boolean datasync, boolean aio) {
if (aio && LibaioContext.isLoaded()) { if (aio && LibaioContext.isLoaded()) {
SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1); SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
factory.start(); factory.start();
((AIOSequentialFileFactory) factory).disableBufferReuse(); ((AIOSequentialFileFactory) factory).disableBufferReuse();

View File

@ -45,6 +45,8 @@ under the License.
<large-messages-directory>${data.dir}/large-messages</large-messages-directory> <large-messages-directory>${data.dir}/large-messages</large-messages-directory>
<journal-datasync>${fsync}</journal-datasync>
<journal-min-files>2</journal-min-files> <journal-min-files>2</journal-min-files>
<journal-pool-files>-1</journal-pool-files> <journal-pool-files>-1</journal-pool-files>

View File

@ -129,7 +129,7 @@ public class ArtemisTest {
public void testSync() throws Exception { public void testSync() throws Exception {
int writes = 20; int writes = 20;
int tries = 10; int tries = 10;
long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true); long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true);
System.out.println(); System.out.println();
System.out.println("TotalAvg = " + totalAvg); System.out.println("TotalAvg = " + totalAvg);
long nanoTime = SyncCalculation.toNanos(totalAvg, writes); long nanoTime = SyncCalculation.toNanos(totalAvg, writes);
@ -144,7 +144,7 @@ public class ArtemisTest {
Run.setEmbedded(true); Run.setEmbedded(true);
//instance1: default using http //instance1: default using http
File instance1 = new File(temporaryFolder.getRoot(), "instance1"); File instance1 = new File(temporaryFolder.getRoot(), "instance1");
Artemis.main("create", instance1.getAbsolutePath(), "--silent"); Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--no-fsync");
File bootstrapFile = new File(new File(instance1, "etc"), "bootstrap.xml"); File bootstrapFile = new File(new File(instance1, "etc"), "bootstrap.xml");
Assert.assertTrue(bootstrapFile.exists()); Assert.assertTrue(bootstrapFile.exists());
Document config = parseXml(bootstrapFile); Document config = parseXml(bootstrapFile);
@ -163,7 +163,7 @@ public class ArtemisTest {
//instance2: https //instance2: https
File instance2 = new File(temporaryFolder.getRoot(), "instance2"); File instance2 = new File(temporaryFolder.getRoot(), "instance2");
Artemis.main("create", instance2.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1"); Artemis.main("create", instance2.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--no-fsync");
bootstrapFile = new File(new File(instance2, "etc"), "bootstrap.xml"); bootstrapFile = new File(new File(instance2, "etc"), "bootstrap.xml");
Assert.assertTrue(bootstrapFile.exists()); Assert.assertTrue(bootstrapFile.exists());
config = parseXml(bootstrapFile); config = parseXml(bootstrapFile);
@ -184,7 +184,7 @@ public class ArtemisTest {
//instance3: https with clientAuth //instance3: https with clientAuth
File instance3 = new File(temporaryFolder.getRoot(), "instance3"); File instance3 = new File(temporaryFolder.getRoot(), "instance3");
Artemis.main("create", instance3.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--use-client-auth", "--ssl-trust", "etc/truststore", "--ssl-trust-password", "password2"); Artemis.main("create", instance3.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--use-client-auth", "--ssl-trust", "etc/truststore", "--ssl-trust-password", "password2", "--no-fsync");
bootstrapFile = new File(new File(instance3, "etc"), "bootstrap.xml"); bootstrapFile = new File(new File(instance3, "etc"), "bootstrap.xml");
Assert.assertTrue(bootstrapFile.exists()); Assert.assertTrue(bootstrapFile.exists());

View File

@ -130,6 +130,9 @@ public final class ActiveMQDefaultConfiguration {
// true means that the server will use the file based journal for persistence. // true means that the server will use the file based journal for persistence.
private static boolean DEFAULT_PERSISTENCE_ENABLED = true; private static boolean DEFAULT_PERSISTENCE_ENABLED = true;
// true means that the server will sync data files
private static boolean DEFAULT_JOURNAL_DATASYNC = true;
// Maximum number of threads to use for the scheduled thread pool // Maximum number of threads to use for the scheduled thread pool
private static int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5; private static int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
@ -456,6 +459,10 @@ public final class ActiveMQDefaultConfiguration {
return DEFAULT_PERSISTENCE_ENABLED; return DEFAULT_PERSISTENCE_ENABLED;
} }
public static boolean isDefaultJournalDatasync() {
return DEFAULT_JOURNAL_DATASYNC;
}
/** /**
* Maximum number of threads to use for the scheduled thread pool * Maximum number of threads to use for the scheduled thread pool
*/ */

View File

@ -93,7 +93,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
private final long connectionTTL; private final long connectionTTL;
private final Set<ClientSessionInternal> sessions = new HashSet<>(); private final Set<ClientSessionInternal> sessions = new ConcurrentHashSet<>();
private final Object createSessionLock = new Object(); private final Object createSessionLock = new Object();
@ -506,6 +506,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
// this is just a debug, since an interrupt is an expected event (in case of a shutdown) // this is just a debug, since an interrupt is an expected event (in case of a shutdown)
logger.debug(e1.getMessage(), e1); logger.debug(e1.getMessage(), e1);
} catch (Throwable t) { } catch (Throwable t) {
logger.warn(t.getMessage(), t);
//for anything else just close so clients are un blocked //for anything else just close so clients are un blocked
close(); close();
throw t; throw t;

View File

@ -60,6 +60,18 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider); dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
} }
@Override
public SequentialFileFactory setDatasync(boolean enabled) {
// noop
return this;
}
@Override
public boolean isDatasync() {
return false;
}
@Override @Override
public synchronized void start() { public synchronized void start() {
try { try {

View File

@ -21,9 +21,7 @@ import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -58,11 +56,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
*/ */
protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver(); protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
/**
* Used for asynchronous writes
*/
protected final Executor writerExecutor;
/** /**
* @param file * @param file
* @param directory * @param directory
@ -75,7 +68,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
this.file = new File(directory, file); this.file = new File(directory, file);
this.directory = directory; this.directory = directory;
this.factory = factory; this.factory = factory;
this.writerExecutor = writerExecutor;
} }
// Public -------------------------------------------------------- // Public --------------------------------------------------------
@ -166,20 +158,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
*/ */
@Override @Override
public synchronized void close() throws IOException, InterruptedException, ActiveMQException { public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
final CountDownLatch donelatch = new CountDownLatch(1);
if (writerExecutor != null) {
writerExecutor.execute(new Runnable() {
@Override
public void run() {
donelatch.countDown();
}
});
while (!donelatch.await(60, TimeUnit.SECONDS)) {
ActiveMQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName());
}
}
} }
@Override @Override

View File

@ -52,6 +52,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
protected final int maxIO; protected final int maxIO;
protected boolean dataSync = true;
private final IOCriticalErrorListener critialErrorListener; private final IOCriticalErrorListener critialErrorListener;
/** /**
@ -81,6 +83,19 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
this.maxIO = maxIO; this.maxIO = maxIO;
} }
@Override
public SequentialFileFactory setDatasync(boolean enabled) {
this.dataSync = enabled;
return this;
}
@Override
public boolean isDatasync() {
return dataSync;
}
@Override @Override
public void stop() { public void stop() {
if (timedBuffer != null) { if (timedBuffer != null) {

View File

@ -95,4 +95,8 @@ public interface SequentialFileFactory {
void createDirs() throws Exception; void createDirs() throws Exception;
void flush(); void flush();
SequentialFileFactory setDatasync(boolean enabled);
boolean isDatasync();
} }

View File

@ -97,7 +97,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
@Override @Override
public SequentialFile cloneFile() { public SequentialFile cloneFile() {
return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), writerExecutor); return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), null);
} }
@Override @Override
@ -214,11 +214,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(callback, bytes); AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(callback, bytes);
runnableCallback.initWrite(positionToWrite, bytesToWrite); runnableCallback.initWrite(positionToWrite, bytesToWrite);
if (writerExecutor != null) { runnableCallback.run();
writerExecutor.execute(runnableCallback);
} else {
runnableCallback.run();
}
} }
AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) { AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) {

View File

@ -211,7 +211,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
if (running.compareAndSet(false, true)) { if (running.compareAndSet(false, true)) {
super.start(); super.start();
this.libaioContext = new LibaioContext(maxIO, true); this.libaioContext = new LibaioContext(maxIO, true, dataSync);
this.running.set(true); this.running.set(true);

View File

@ -49,12 +49,15 @@ final class MappedSequentialFile implements SequentialFile {
private String fileName; private String fileName;
private MappedFile mappedFile; private MappedFile mappedFile;
private ActiveMQBuffer pooledActiveMQBuffer; private ActiveMQBuffer pooledActiveMQBuffer;
private final MappedSequentialFileFactory factory;
MappedSequentialFile(final File directory, MappedSequentialFile(MappedSequentialFileFactory factory,
final File directory,
final File file, final File file,
final long chunkBytes, final long chunkBytes,
final long overlapBytes, final long overlapBytes,
final IOCriticalErrorListener criticalErrorListener) { final IOCriticalErrorListener criticalErrorListener) {
this.factory = factory;
this.directory = directory; this.directory = directory;
this.file = file; this.file = file;
this.absoluteFile = null; this.absoluteFile = null;
@ -155,7 +158,7 @@ final class MappedSequentialFile implements SequentialFile {
final int readableBytes = writerIndex - readerIndex; final int readableBytes = writerIndex - readerIndex;
if (readableBytes > 0) { if (readableBytes > 0) {
this.mappedFile.write(byteBuf, readerIndex, readableBytes); this.mappedFile.write(byteBuf, readerIndex, readableBytes);
if (sync) { if (factory.isDatasync() && sync) {
this.mappedFile.force(); this.mappedFile.force();
} }
} }
@ -178,7 +181,7 @@ final class MappedSequentialFile implements SequentialFile {
final int readableBytes = writerIndex - readerIndex; final int readableBytes = writerIndex - readerIndex;
if (readableBytes > 0) { if (readableBytes > 0) {
this.mappedFile.write(byteBuf, readerIndex, readableBytes); this.mappedFile.write(byteBuf, readerIndex, readableBytes);
if (sync) { if (factory.isDatasync() && sync) {
this.mappedFile.force(); this.mappedFile.force();
} }
} }
@ -209,7 +212,7 @@ final class MappedSequentialFile implements SequentialFile {
final int readableBytes = writerIndex - readerIndex; final int readableBytes = writerIndex - readerIndex;
if (readableBytes > 0) { if (readableBytes > 0) {
this.mappedFile.write(byteBuf, readerIndex, readableBytes); this.mappedFile.write(byteBuf, readerIndex, readableBytes);
if (sync) { if (factory.isDatasync() && sync) {
this.mappedFile.force(); this.mappedFile.force();
} }
} }
@ -235,7 +238,7 @@ final class MappedSequentialFile implements SequentialFile {
final int readableBytes = writerIndex - readerIndex; final int readableBytes = writerIndex - readerIndex;
if (readableBytes > 0) { if (readableBytes > 0) {
this.mappedFile.write(byteBuf, readerIndex, readableBytes); this.mappedFile.write(byteBuf, readerIndex, readableBytes);
if (sync) { if (factory.isDatasync() && sync) {
this.mappedFile.force(); this.mappedFile.force();
} }
} }
@ -253,7 +256,7 @@ final class MappedSequentialFile implements SequentialFile {
final int remaining = limit - position; final int remaining = limit - position;
if (remaining > 0) { if (remaining > 0) {
this.mappedFile.write(bytes, position, remaining); this.mappedFile.write(bytes, position, remaining);
if (sync) { if (factory.isDatasync() && sync) {
this.mappedFile.force(); this.mappedFile.force();
} }
} }
@ -275,7 +278,7 @@ final class MappedSequentialFile implements SequentialFile {
final int remaining = limit - position; final int remaining = limit - position;
if (remaining > 0) { if (remaining > 0) {
this.mappedFile.write(bytes, position, remaining); this.mappedFile.write(bytes, position, remaining);
if (sync) { if (factory.isDatasync() && sync) {
this.mappedFile.force(); this.mappedFile.force();
} }
} }
@ -381,7 +384,7 @@ final class MappedSequentialFile implements SequentialFile {
@Override @Override
public SequentialFile cloneFile() { public SequentialFile cloneFile() {
checkIsNotOpen(); checkIsNotOpen();
return new MappedSequentialFile(this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener); return new MappedSequentialFile(factory, this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener);
} }
@Override @Override

View File

@ -37,6 +37,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
private final IOCriticalErrorListener criticalErrorListener; private final IOCriticalErrorListener criticalErrorListener;
private long chunkBytes; private long chunkBytes;
private long overlapBytes; private long overlapBytes;
private boolean useDataSync;
public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) { public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
this.directory = directory; this.directory = directory;
@ -72,7 +73,18 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override @Override
public SequentialFile createSequentialFile(String fileName) { public SequentialFile createSequentialFile(String fileName) {
return new MappedSequentialFile(directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener); return new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
}
@Override
public SequentialFileFactory setDatasync(boolean enabled) {
this.useDataSync = enabled;
return this;
}
@Override
public boolean isDatasync() {
return useDataSync;
} }
@Override @Override

View File

@ -23,8 +23,6 @@ import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
@ -35,7 +33,6 @@ import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle; import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
public final class NIOSequentialFile extends AbstractSequentialFile { public final class NIOSequentialFile extends AbstractSequentialFile {
@ -43,11 +40,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
private RandomAccessFile rfile; private RandomAccessFile rfile;
/**
* The write semaphore here is only used when writing asynchronously
*/
private Semaphore maxIOSemaphore;
private final int defaultMaxIO; private final int defaultMaxIO;
private int maxIO; private int maxIO;
@ -99,11 +91,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e; throw e;
} }
if (writerExecutor != null && useExecutor) {
maxIOSemaphore = new Semaphore(maxIO);
this.maxIO = maxIO;
}
} }
@Override @Override
@ -124,6 +111,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
throw e; throw e;
} }
channel.force(true);
fileSize = channel.size(); fileSize = channel.size();
} }
@ -138,13 +126,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
public synchronized void close() throws IOException, InterruptedException, ActiveMQException { public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
super.close(); super.close();
if (maxIOSemaphore != null) {
while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS)) {
ActiveMQJournalLogger.LOGGER.errorClosingFile(getFileName());
}
}
maxIOSemaphore = null;
try { try {
if (channel != null) { if (channel != null) {
channel.close(); channel.close();
@ -202,7 +183,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
@Override @Override
public void sync() throws IOException { public void sync() throws IOException {
if (channel != null) { if (factory.isDatasync() && channel != null) {
try { try {
channel.force(false); channel.force(false);
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
@ -250,7 +231,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
@Override @Override
public SequentialFile cloneFile() { public SequentialFile cloneFile() {
return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor); return new NIOSequentialFile(factory, directory, getFileName(), maxIO, null);
} }
@Override @Override
@ -298,40 +279,12 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
position.addAndGet(bytes.limit()); position.addAndGet(bytes.limit());
if (maxIOSemaphore == null || callback == null) { try {
// if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous doInternalWrite(bytes, sync, callback);
try { } catch (ClosedChannelException e) {
doInternalWrite(bytes, sync, callback); throw e;
} catch (ClosedChannelException e) { } catch (IOException e) {
throw e; factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
} catch (IOException e) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
}
} else {
// This is a flow control on writing, just like maxAIO on libaio
maxIOSemaphore.acquire();
writerExecutor.execute(new Runnable() {
@Override
public void run() {
try {
try {
doInternalWrite(bytes, sync, callback);
} catch (ClosedChannelException e) {
ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
} catch (IOException e) {
ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this);
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
}
} finally {
maxIOSemaphore.release();
}
}
});
} }
} }

BIN
artemis-native/bin/libartemis-native-64.so Normal file → Executable file

Binary file not shown.

View File

@ -536,7 +536,7 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_su
} }
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll
(JNIEnv * env, jobject thisObject, jobject contextPointer) { (JNIEnv * env, jobject thisObject, jobject contextPointer, jboolean useFdatasync) {
#ifdef DEBUG #ifdef DEBUG
fprintf (stdout, "Running blockedPoll\n"); fprintf (stdout, "Running blockedPoll\n");
@ -553,6 +553,8 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
short running = 1; short running = 1;
int lastFile = -1;
while (running) { while (running) {
int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0); int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0);
@ -574,6 +576,8 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
fflush(stdout); fflush(stdout);
#endif #endif
lastFile = -1;
for (i = 0; i < result; i++) for (i = 0; i < result; i++)
{ {
#ifdef DEBUG #ifdef DEBUG
@ -593,6 +597,11 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
break; break;
} }
if (useFdatasync && lastFile != iocbp->aio_fildes) {
lastFile = iocbp->aio_fildes;
fdatasync(lastFile);
}
int eventResult = (int)event->res; int eventResult = (int)event->res;

View File

@ -49,7 +49,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
* <br> * <br>
* Or else the native module won't be loaded because of version mismatches * Or else the native module won't be loaded because of version mismatches
*/ */
private static final int EXPECTED_NATIVE_VERSION = 6; private static final int EXPECTED_NATIVE_VERSION = 7;
private static boolean loaded = false; private static boolean loaded = false;
@ -146,6 +146,8 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
final int queueSize; final int queueSize;
final boolean useFdatasync;
/** /**
* The queue size here will use resources defined on the kernel parameter * The queue size here will use resources defined on the kernel parameter
* <a href="https://www.kernel.org/doc/Documentation/sysctl/fs.txt">fs.aio-max-nr</a> . * <a href="https://www.kernel.org/doc/Documentation/sysctl/fs.txt">fs.aio-max-nr</a> .
@ -153,11 +155,13 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
* @param queueSize the size to be initialize on libaio * @param queueSize the size to be initialize on libaio
* io_queue_init which can't be higher than /proc/sys/fs/aio-max-nr. * io_queue_init which can't be higher than /proc/sys/fs/aio-max-nr.
* @param useSemaphore should block on a semaphore avoiding using more submits than what's available. * @param useSemaphore should block on a semaphore avoiding using more submits than what's available.
* @param useFdatasync should use fdatasync before calling callbacks.
*/ */
public LibaioContext(int queueSize, boolean useSemaphore) { public LibaioContext(int queueSize, boolean useSemaphore, boolean useFdatasync) {
try { try {
contexts.incrementAndGet(); contexts.incrementAndGet();
this.ioContext = newContext(queueSize); this.ioContext = newContext(queueSize);
this.useFdatasync = useFdatasync;
} catch (Exception e) { } catch (Exception e) {
throw e; throw e;
} }
@ -349,7 +353,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
*/ */
public void poll() { public void poll() {
if (!closed.get()) { if (!closed.get()) {
blockedPoll(ioContext); blockedPoll(ioContext, useFdatasync);
} }
} }
@ -436,7 +440,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
/** /**
* This method will block as long as the context is open. * This method will block as long as the context is open.
*/ */
native void blockedPoll(ByteBuffer libaioContext); native void blockedPoll(ByteBuffer libaioContext, boolean useFdatasync);
static native int getNativeVersion(); static native int getNativeVersion();

View File

@ -54,7 +54,7 @@ public class LibaioTest {
parent.mkdirs(); parent.mkdirs();
boolean failed = false; boolean failed = false;
try (LibaioContext control = new LibaioContext<>(1, true); LibaioFile fileDescriptor = control.openFile(file, true)) { try (LibaioContext control = new LibaioContext<>(1, true, true); LibaioFile fileDescriptor = control.openFile(file, true)) {
fileDescriptor.fallocate(4 * 1024); fileDescriptor.fallocate(4 * 1024);
} catch (Exception e) { } catch (Exception e) {
e.printStackTrace(); e.printStackTrace();
@ -80,7 +80,7 @@ public class LibaioTest {
@Before @Before
public void setUpFactory() { public void setUpFactory() {
control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true); control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true, true);
} }
@After @After
@ -532,10 +532,10 @@ public class LibaioTest {
boolean exceptionThrown = false; boolean exceptionThrown = false;
control.close(); control.close();
control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false); control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false, true);
try { try {
// There is no space for a queue this huge, the native layer should throw the exception // There is no space for a queue this huge, the native layer should throw the exception
LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false); LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false, true);
} catch (RuntimeException e) { } catch (RuntimeException e) {
exceptionThrown = true; exceptionThrown = true;
} }
@ -630,7 +630,7 @@ public class LibaioTest {
@Test @Test
public void testBlockedCallback() throws Exception { public void testBlockedCallback() throws Exception {
final LibaioContext blockedContext = new LibaioContext(500, true); final LibaioContext blockedContext = new LibaioContext(500, true, true);
Thread t = new Thread() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {

View File

@ -53,7 +53,7 @@ public class OpenCloseContextTest {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
System.out.println("#test " + i); System.out.println("#test " + i);
final LibaioContext control = new LibaioContext<>(5, true); final LibaioContext control = new LibaioContext<>(5, true, true);
Thread t = new Thread() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
@ -111,7 +111,7 @@ public class OpenCloseContextTest {
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
System.out.println("#test " + i); System.out.println("#test " + i);
final LibaioContext control = new LibaioContext<>(5, true); final LibaioContext control = new LibaioContext<>(5, true, true);
Thread t = new Thread() { Thread t = new Thread() {
@Override @Override
public void run() { public void run() {
@ -164,9 +164,9 @@ public class OpenCloseContextTest {
@Test @Test
public void testCloseAndStart() throws Exception { public void testCloseAndStart() throws Exception {
final LibaioContext control2 = new LibaioContext<>(5, true); final LibaioContext control2 = new LibaioContext<>(5, true, true);
final LibaioContext control = new LibaioContext<>(5, true); final LibaioContext control = new LibaioContext<>(5, true, true);
control.close(); control.close();
control.poll(); control.poll();

View File

@ -177,7 +177,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
} }
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) { public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor); return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext());
} }
public void sendSASLSupported() { public void sendSASLSupported() {

View File

@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient; import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.paging.PagingStore; import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.server.BindingQueryResult; import org.apache.activemq.artemis.core.server.BindingQueryResult;
import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.server.MessageReference;
import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.server.QueueQueryResult;
@ -81,6 +82,8 @@ public class AMQPSessionCallback implements SessionCallback {
private ServerSession serverSession; private ServerSession serverSession;
private final OperationContext operationContext;
private AMQPSessionContext protonSession; private AMQPSessionContext protonSession;
private final Executor closeExecutor; private final Executor closeExecutor;
@ -91,12 +94,14 @@ public class AMQPSessionCallback implements SessionCallback {
ProtonProtocolManager manager, ProtonProtocolManager manager,
AMQPConnectionContext connection, AMQPConnectionContext connection,
Connection transportConnection, Connection transportConnection,
Executor executor) { Executor executor,
OperationContext operationContext) {
this.protonSPI = protonSPI; this.protonSPI = protonSPI;
this.manager = manager; this.manager = manager;
this.connection = connection; this.connection = connection;
this.transportConnection = transportConnection; this.transportConnection = transportConnection;
this.closeExecutor = executor; this.closeExecutor = executor;
this.operationContext = operationContext;
} }
@Override @Override
@ -151,7 +156,7 @@ public class AMQPSessionCallback implements SessionCallback {
false, // boolean autoCommitAcks, false, // boolean autoCommitAcks,
false, // boolean preAcknowledge, false, // boolean preAcknowledge,
true, //boolean xa, true, //boolean xa,
(String) null, this, true); (String) null, this, true, operationContext);
} }
@Override @Override

View File

@ -96,7 +96,8 @@ public class MQTTConnectionManager {
String id = UUIDGenerator.getInstance().generateStringUUID(); String id = UUIDGenerator.getInstance().generateStringUUID();
ActiveMQServer server = session.getServer(); ActiveMQServer server = session.getServer();
ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE); ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null,
session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE, server.newOperationContext());
return (ServerSessionImpl) serverSession; return (ServerSessionImpl) serverSession;
} }

View File

@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException; import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException; import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.persistence.OperationContext; import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.artemis.core.postoffice.Binding;
import org.apache.activemq.artemis.core.postoffice.Bindings; import org.apache.activemq.artemis.core.postoffice.Bindings;
@ -119,12 +120,15 @@ import org.apache.activemq.state.SessionState;
import org.apache.activemq.transport.TransmitCallback; import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.util.ByteSequence; import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.wireformat.WireFormat; import org.apache.activemq.wireformat.WireFormat;
import org.jboss.logging.Logger;
/** /**
* Represents an activemq connection. * Represents an activemq connection.
*/ */
public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth { public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth {
private static final Logger logger = Logger.getLogger(OpenWireConnection.class);
private static final KeepAliveInfo PING = new KeepAliveInfo(); private static final KeepAliveInfo PING = new KeepAliveInfo();
private final OpenWireProtocolManager protocolManager; private final OpenWireProtocolManager protocolManager;
@ -139,17 +143,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
private final AtomicBoolean stopping = new AtomicBoolean(false); private final AtomicBoolean stopping = new AtomicBoolean(false);
private boolean inServiceException;
private final AtomicBoolean asyncException = new AtomicBoolean(false);
// Clebert: Artemis session has meta-data support, perhaps we could reuse it here
private final Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>(); private final Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new ConcurrentHashMap<>(); private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new ConcurrentHashMap<>();
private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new ConcurrentHashMap<>(); private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new ConcurrentHashMap<>();
// Clebert TODO: Artemis already stores the Session. Why do we need a different one here
private final Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>(); private final Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
private ConnectionState state; private ConnectionState state;
@ -172,6 +170,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
*/ */
private ServerSession internalSession; private ServerSession internalSession;
private final OperationContext operationContext;
private volatile long lastSent = -1; private volatile long lastSent = -1;
private ConnectionEntry connectionEntry; private ConnectionEntry connectionEntry;
private boolean useKeepAlive; private boolean useKeepAlive;
@ -185,6 +185,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
OpenWireFormat wf) { OpenWireFormat wf) {
super(connection, executor); super(connection, executor);
this.server = server; this.server = server;
this.operationContext = server.newOperationContext();
this.protocolManager = openWireProtocolManager; this.protocolManager = openWireProtocolManager;
this.wireFormat = wf; this.wireFormat = wf;
this.useKeepAlive = openWireProtocolManager.isUseKeepAlive(); this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
@ -201,6 +202,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
return info.getUserName(); return info.getUserName();
} }
public OperationContext getOperationContext() {
return operationContext;
}
// SecurityAuth implementation // SecurityAuth implementation
@Override @Override
public RemotingConnection getRemotingConnection() { public RemotingConnection getRemotingConnection() {
@ -239,6 +245,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
super.bufferReceived(connectionID, buffer); super.bufferReceived(connectionID, buffer);
try { try {
recoverOperationContext();
Command command = (Command) wireFormat.unmarshal(buffer); Command command = (Command) wireFormat.unmarshal(buffer);
boolean responseRequired = command.isResponseRequired(); boolean responseRequired = command.isResponseRequired();
@ -285,17 +293,38 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
} }
// TODO: response through operation-context sendAsyncResponse(commandId, response);
if (response != null && !protocolManager.isStopping()) {
response.setCorrelationId(commandId);
dispatchSync(response);
}
} }
} catch (Exception e) { } catch (Exception e) {
ActiveMQServerLogger.LOGGER.debug(e); ActiveMQServerLogger.LOGGER.debug(e);
sendException(e); sendException(e);
} finally {
clearupOperationContext();
}
}
/** It will send the response through the operation context, as soon as everything is confirmed on disk */
private void sendAsyncResponse(final int commandId, final Response response) throws Exception {
if (response != null) {
operationContext.executeOnCompletion(new IOCallback() {
@Override
public void done() {
if (!protocolManager.isStopping()) {
try {
response.setCorrelationId(commandId);
dispatchSync(response);
} catch (Exception e) {
sendException(e);
}
}
}
@Override
public void onError(int errorCode, String errorMessage) {
sendException(new IOException(errorCode + "-" + errorMessage));
}
});
} }
} }
@ -626,7 +655,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
private void createInternalSession(ConnectionInfo info) throws Exception { private void createInternalSession(ConnectionInfo info) throws Exception {
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true); internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext);
} }
//raise the refCount of context //raise the refCount of context
@ -1083,7 +1112,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processBeginTransaction(TransactionInfo info) throws Exception { public Response processBeginTransaction(TransactionInfo info) throws Exception {
final TransactionId txID = info.getTransactionId(); final TransactionId txID = info.getTransactionId();
setOperationContext(null);
try { try {
internalSession.resetTX(null); internalSession.resetTX(null);
if (txID.isXATransaction()) { if (txID.isXATransaction()) {
@ -1101,7 +1129,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
} finally { } finally {
internalSession.resetTX(null); internalSession.resetTX(null);
clearOpeartionContext();
} }
return null; return null;
} }
@ -1118,12 +1145,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
AMQSession session = (AMQSession) tx.getProtocolData(); AMQSession session = (AMQSession) tx.getProtocolData();
setOperationContext(session); tx.commit(onePhase);
try {
tx.commit(onePhase);
} finally {
clearOpeartionContext();
}
return null; return null;
} }
@ -1137,21 +1159,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processForgetTransaction(TransactionInfo info) throws Exception { public Response processForgetTransaction(TransactionInfo info) throws Exception {
TransactionId txID = info.getTransactionId(); TransactionId txID = info.getTransactionId();
setOperationContext(null); if (txID.isXATransaction()) {
try { try {
if (txID.isXATransaction()) { Xid xid = OpenWireUtil.toXID(info.getTransactionId());
try { internalSession.xaForget(xid);
Xid xid = OpenWireUtil.toXID(info.getTransactionId()); } catch (Exception e) {
internalSession.xaForget(xid); e.printStackTrace();
} catch (Exception e) { throw e;
e.printStackTrace();
throw e;
}
} else {
txMap.remove(txID);
} }
} finally { } else {
clearOpeartionContext(); txMap.remove(txID);
} }
return null; return null;
@ -1161,7 +1178,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processPrepareTransaction(TransactionInfo info) throws Exception { public Response processPrepareTransaction(TransactionInfo info) throws Exception {
TransactionId txID = info.getTransactionId(); TransactionId txID = info.getTransactionId();
setOperationContext(null);
try { try {
if (txID.isXATransaction()) { if (txID.isXATransaction()) {
try { try {
@ -1177,7 +1193,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
} finally { } finally {
internalSession.resetTX(null); internalSession.resetTX(null);
clearOpeartionContext();
} }
return new IntegerResponse(XAResource.XA_RDONLY); return new IntegerResponse(XAResource.XA_RDONLY);
@ -1187,7 +1202,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processEndTransaction(TransactionInfo info) throws Exception { public Response processEndTransaction(TransactionInfo info) throws Exception {
TransactionId txID = info.getTransactionId(); TransactionId txID = info.getTransactionId();
setOperationContext(null);
if (txID.isXATransaction()) { if (txID.isXATransaction()) {
try { try {
Transaction tx = lookupTX(txID, null); Transaction tx = lookupTX(txID, null);
@ -1204,7 +1218,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
} else { } else {
txMap.remove(txID); txMap.remove(txID);
clearOpeartionContext();
} }
return null; return null;
@ -1267,13 +1280,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
Transaction tx = lookupTX(messageSend.getTransactionId(), session); Transaction tx = lookupTX(messageSend.getTransactionId(), session);
setOperationContext(session);
session.getCoreSession().resetTX(tx); session.getCoreSession().resetTX(tx);
try { try {
session.send(producerInfo, messageSend, sendProducerAck); session.send(producerInfo, messageSend, sendProducerAck);
} finally { } finally {
session.getCoreSession().resetTX(null); session.getCoreSession().resetTX(null);
clearOpeartionContext();
} }
return null; return null;
@ -1283,7 +1294,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
public Response processMessageAck(MessageAck ack) throws Exception { public Response processMessageAck(MessageAck ack) throws Exception {
AMQSession session = getSession(ack.getConsumerId().getParentId()); AMQSession session = getSession(ack.getConsumerId().getParentId());
Transaction tx = lookupTX(ack.getTransactionId(), session); Transaction tx = lookupTX(ack.getTransactionId(), session);
setOperationContext(session);
session.getCoreSession().resetTX(tx); session.getCoreSession().resetTX(tx);
try { try {
@ -1291,7 +1301,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
consumerBrokerExchange.acknowledge(ack); consumerBrokerExchange.acknowledge(ack);
} finally { } finally {
session.getCoreSession().resetTX(null); session.getCoreSession().resetTX(null);
clearOpeartionContext();
} }
return null; return null;
} }
@ -1367,17 +1376,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
} }
private void setOperationContext(AMQSession session) { private void recoverOperationContext() {
OperationContext ctx; server.getStorageManager().setContext(this.operationContext);
if (session == null) {
ctx = this.internalSession.getSessionContext();
} else {
ctx = session.getCoreSession().getSessionContext();
}
server.getStorageManager().setContext(ctx);
} }
private void clearOpeartionContext() { private void clearupOperationContext() {
server.getStorageManager().clearContext(); server.getStorageManager().clearContext();
} }

View File

@ -107,7 +107,7 @@ public class AMQSession implements SessionCallback {
// now // now
try { try {
coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true); coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext());
long sessionId = sessInfo.getSessionId().getValue(); long sessionId = sessInfo.getSessionId().getValue();
if (sessionId == -1) { if (sessionId == -1) {
@ -290,8 +290,6 @@ public class AMQSession implements SessionCallback {
} else { } else {
final Connection transportConnection = connection.getTransportConnection(); final Connection transportConnection = connection.getTransportConnection();
// new Exception("Setting to false").printStackTrace();
if (transportConnection == null) { if (transportConnection == null) {
// I don't think this could happen, but just in case, avoiding races // I don't think this could happen, but just in case, avoiding races
runnable = null; runnable = null;

View File

@ -230,7 +230,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
if (stompSession == null) { if (stompSession == null) {
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor())); stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
String name = UUIDGenerator.getInstance().generateStringUUID(); String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true); ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true, server.newOperationContext());
stompSession.setServerSession(session); stompSession.setServerSession(session);
sessions.put(connection.getID(), stompSession); sessions.put(connection.getID(), stompSession);
} }
@ -243,7 +243,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
if (stompSession == null) { if (stompSession == null) {
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor)); stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor));
String name = UUIDGenerator.getInstance().generateStringUUID(); String name = UUIDGenerator.getInstance().generateStringUUID();
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true); ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true, server.newOperationContext());
stompSession.setServerSession(session); stompSession.setServerSession(session);
transactedSessions.put(txID, stompSession); transactedSessions.put(txID, stompSession);
} }

View File

@ -78,6 +78,23 @@ public interface Configuration {
*/ */
Configuration setPersistenceEnabled(boolean enable); Configuration setPersistenceEnabled(boolean enable);
/**
* Should use fdatasync on journal files.
*
* @see <a href="http://man7.org/linux/man-pages/man2/fdatasync.2.html">fdatasync</a>
*
* @return a boolean
*/
boolean isJournalDatasync();
/**
* documented at {@link #isJournalDatasync()} ()}
*
* @param enable
* @return this
*/
Configuration setJournalDatasync(boolean enable);
/** /**
* @return usernames mapped to ResourceLimitSettings * @return usernames mapped to ResourceLimitSettings
*/ */

View File

@ -77,6 +77,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
private boolean persistenceEnabled = ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled(); private boolean persistenceEnabled = ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled();
private boolean journalDatasync = ActiveMQDefaultConfiguration.isDefaultJournalDatasync();
protected long fileDeploymentScanPeriod = ActiveMQDefaultConfiguration.getDefaultFileDeployerScanPeriod(); protected long fileDeploymentScanPeriod = ActiveMQDefaultConfiguration.getDefaultFileDeployerScanPeriod();
private boolean persistDeliveryCountBeforeDelivery = ActiveMQDefaultConfiguration.isDefaultPersistDeliveryCountBeforeDelivery(); private boolean persistDeliveryCountBeforeDelivery = ActiveMQDefaultConfiguration.isDefaultPersistDeliveryCountBeforeDelivery();
@ -297,6 +299,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
return this; return this;
} }
@Override
public boolean isJournalDatasync() {
return journalDatasync;
}
@Override
public ConfigurationImpl setJournalDatasync(boolean enable) {
journalDatasync = enable;
return this;
}
@Override @Override
public long getFileDeployerScanPeriod() { public long getFileDeployerScanPeriod() {
return fileDeploymentScanPeriod; return fileDeploymentScanPeriod;

View File

@ -486,6 +486,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
} }
} }
config.setJournalDatasync(getBoolean(e, "journal-datasync", config.isJournalDatasync()));
config.setJournalSyncTransactional(getBoolean(e, "journal-sync-transactional", config.isJournalSyncTransactional())); config.setJournalSyncTransactional(getBoolean(e, "journal-sync-transactional", config.isJournalSyncTransactional()));
config.setJournalSyncNonTransactional(getBoolean(e, "journal-sync-non-transactional", config.isJournalSyncNonTransactional())); config.setJournalSyncNonTransactional(getBoolean(e, "journal-sync-non-transactional", config.isJournalSyncNonTransactional()));

View File

@ -118,6 +118,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
} }
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO()); bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
bindingsFF.setDatasync(config.isJournalDatasync());
Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1, 0); Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1, 0);
@ -135,6 +136,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType()); throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
} }
journalFF.setDatasync(config.isJournalDatasync());
Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0); Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0);
messageJournal = localMessage; messageJournal = localMessage;

View File

@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException; import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.protocol.core.Channel; import org.apache.activemq.artemis.core.protocol.core.Channel;
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler; import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection; import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
@ -150,7 +151,9 @@ public class ActiveMQPacketHandler implements ChannelHandler {
activeMQPrincipal = connection.getDefaultActiveMQPrincipal(); activeMQPrincipal = connection.getDefaultActiveMQPrincipal();
} }
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true); OperationContext sessionOperationContext = server.newOperationContext();
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext);
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel); ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
channel.setHandler(handler); channel.setHandler(handler);

View File

@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.DivertConfiguration; import org.apache.activemq.artemis.core.config.DivertConfiguration;
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl; import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
import org.apache.activemq.artemis.core.paging.PagingManager; import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.StorageManager; import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.postoffice.PostOffice; import org.apache.activemq.artemis.core.postoffice.PostOffice;
import org.apache.activemq.artemis.core.remoting.server.RemotingService; import org.apache.activemq.artemis.core.remoting.server.RemotingService;
@ -180,7 +181,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
boolean xa, boolean xa,
String defaultAddress, String defaultAddress,
SessionCallback callback, SessionCallback callback,
boolean autoCreateQueues) throws Exception; boolean autoCreateQueues,
OperationContext context) throws Exception;
SecurityStore getSecurityStore(); SecurityStore getSecurityStore();
@ -192,6 +194,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
HierarchicalRepository<AddressSettings> getAddressSettingsRepository(); HierarchicalRepository<AddressSettings> getAddressSettingsRepository();
OperationContext newOperationContext();
int getConnectionCount(); int getConnectionCount();
long getTotalConnectionCount(); long getTotalConnectionCount();

View File

@ -423,6 +423,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
return manager; return manager;
} }
@Override
public OperationContext newOperationContext() {
return getStorageManager().newContext(getExecutorFactory().getExecutor());
}
@Override @Override
public final synchronized void start() throws Exception { public final synchronized void start() throws Exception {
if (state != SERVER_STATE.STOPPED) { if (state != SERVER_STATE.STOPPED) {
@ -1188,7 +1193,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
final boolean xa, final boolean xa,
final String defaultAddress, final String defaultAddress,
final SessionCallback callback, final SessionCallback callback,
final boolean autoCreateQueues) throws Exception { final boolean autoCreateQueues,
final OperationContext context) throws Exception {
String validatedUser = ""; String validatedUser = "";
if (securityStore != null) { if (securityStore != null) {
@ -1201,7 +1207,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
checkSessionLimit(validatedUser); checkSessionLimit(validatedUser);
final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues); final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues);
sessions.put(name, session); sessions.put(name, session);

View File

@ -46,6 +46,14 @@
</xsd:annotation> </xsd:annotation>
</xsd:element> </xsd:element>
<xsd:element name="journal-datasync" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation>
<xsd:documentation>
that means the server will use fdatasync to confirm writes on the disk.
</xsd:documentation>
</xsd:annotation>
</xsd:element>
<xsd:element name="persistence-enabled" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0"> <xsd:element name="persistence-enabled" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
<xsd:annotation> <xsd:annotation>
<xsd:documentation> <xsd:documentation>

View File

@ -354,6 +354,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
assertEquals(1234567, conf.getGlobalMaxSize()); assertEquals(1234567, conf.getGlobalMaxSize());
assertEquals(37, conf.getMaxDiskUsage()); assertEquals(37, conf.getMaxDiskUsage());
assertEquals(123, conf.getDiskScanPeriod()); assertEquals(123, conf.getDiskScanPeriod());
assertEquals(false, conf.isJournalDatasync());
} }
@Test @Test

View File

@ -450,7 +450,7 @@ public abstract class ActiveMQTestBase extends Assert {
* @throws Exception * @throws Exception
*/ */
protected ConfigurationImpl createBasicConfig(final int serverID) { protected ConfigurationImpl createBasicConfig(final int serverID) {
ConfigurationImpl configuration = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(getJournalDir(serverID, false)).setBindingsDirectory(getBindingsDir(serverID, false)).setPagingDirectory(getPageDir(serverID, false)).setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD); ConfigurationImpl configuration = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(getJournalDir(serverID, false)).setBindingsDirectory(getBindingsDir(serverID, false)).setPagingDirectory(getPageDir(serverID, false)).setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD).setJournalDatasync(false);
return configuration; return configuration;
} }

View File

@ -49,6 +49,7 @@
<message-expiry-scan-period>10111213</message-expiry-scan-period> <message-expiry-scan-period>10111213</message-expiry-scan-period>
<message-expiry-thread-priority>8</message-expiry-thread-priority> <message-expiry-thread-priority>8</message-expiry-thread-priority>
<id-cache-size>127</id-cache-size> <id-cache-size>127</id-cache-size>
<journal-datasync>false</journal-datasync>
<persist-id-cache>true</persist-id-cache> <persist-id-cache>true</persist-id-cache>
<populate-validated-user>true</populate-validated-user> <populate-validated-user>true</populate-validated-user>
<connection-ttl-check-interval>98765</connection-ttl-check-interval> <connection-ttl-check-interval>98765</connection-ttl-check-interval>

View File

@ -62,6 +62,7 @@ Name | Description
[journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true [journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true
[journal-sync-transactional](persistence.md) | if true wait for transaction data to be synchronized to the journal before returning response to client. Default=true [journal-sync-transactional](persistence.md) | if true wait for transaction data to be synchronized to the journal before returning response to client. Default=true
[journal-type](persistence.md) | the type of journal to use. Default=ASYNCIO [journal-type](persistence.md) | the type of journal to use. Default=ASYNCIO
[journal-datasync](persistence.md) | It will use fsync on journal operations. Default=true.
[large-messages-directory](large-messages.md "Configuring the server") | the directory to store large messages. Default=data/largemessages [large-messages-directory](large-messages.md "Configuring the server") | the directory to store large messages. Default=data/largemessages
[management-address](management.md "Configuring Core Management") | the name of the management address to send management messages to. It is prefixed with "jms.queue" so that JMS clients can send messages to it. Default=jms.queue.activemq.management [management-address](management.md "Configuring Core Management") | the name of the management address to send management messages to. It is prefixed with "jms.queue" so that JMS clients can send messages to it. Default=jms.queue.activemq.management
[management-notification-address](management.md "Configuring The Core Management Notification Address") | the name of the address that consumers bind to receive management notifications. Default=activemq.notifications [management-notification-address](management.md "Configuring The Core Management Notification Address") | the name of the address that consumers bind to receive management notifications. Default=activemq.notifications

View File

@ -298,6 +298,10 @@ The message journal is configured using the following attributes in
data files on the journal data files on the journal
The default for this parameter is `30` The default for this parameter is `30`
- `journal-datasync` (default: true)
This will disable the use of fdatasync on journal writes.
### An important note on disabling disk write cache. ### An important note on disabling disk write cache.

View File

@ -0,0 +1,235 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.persistence;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Queue;
import javax.jms.Session;
import java.io.File;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.apache.qpid.jms.JmsConnectionFactory;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@RunWith(Parameterized.class)
public class SyncSendTest extends ActiveMQTestBase {
private static long totalRecordTime = -1;
private static final int RECORDS = 300;
private static final int MEASURE_RECORDS = 100;
private static final int WRAMP_UP = 100;
@Parameterized.Parameters(name = "storage={0}, protocol={1}")
public static Collection getParameters() {
Object[] storages = new Object[]{"libaio", "nio", "null"};
Object[] protocols = new Object[]{"core", "openwire", "amqp"};
ArrayList<Object[]> objects = new ArrayList<>();
for (Object s : storages) {
if (s.equals("libaio") && !LibaioContext.isLoaded()) {
continue;
}
for (Object p : protocols) {
objects.add(new Object[]{s, p});
}
}
return objects;
}
private final String storage;
private final String protocol;
public SyncSendTest(String storage, String protocol) {
this.storage = storage;
this.protocol = protocol;
}
ActiveMQServer server;
JMSServerManagerImpl jms;
@Override
public void setUp() throws Exception {
super.setUp();
if (storage.equals("null")) {
server = createServer(false, true);
} else {
server = createServer(true, true);
}
jms = new JMSServerManagerImpl(server);
if (storage.equals("libaio")) {
server.getConfiguration().setJournalType(JournalType.ASYNCIO);
} else {
server.getConfiguration().setJournalType(JournalType.NIO);
}
jms.start();
}
private long getTimePerSync() throws Exception {
if (storage.equals("null")) {
return 0;
}
if (totalRecordTime < 0) {
File measureFile = temporaryFolder.newFile();
System.out.println("File::" + measureFile);
RandomAccessFile rfile = new RandomAccessFile(measureFile, "rw");
FileChannel channel = rfile.getChannel();
channel.position(0);
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put(new byte[10]);
buffer.position(0);
Assert.assertEquals(10, channel.write(buffer));
channel.force(true);
long time = System.nanoTime();
for (int i = 0; i < MEASURE_RECORDS + WRAMP_UP; i++) {
if (i == WRAMP_UP) {
time = System.nanoTime();
}
channel.position(0);
buffer.position(0);
buffer.putInt(i);
buffer.position(0);
Assert.assertEquals(10, channel.write(buffer));
channel.force(false);
}
long timeEnd = System.nanoTime();
totalRecordTime = ((timeEnd - time) / MEASURE_RECORDS) * RECORDS;
System.out.println("total time = " + totalRecordTime);
}
return totalRecordTime;
}
@Test
public void testSendConsumeAudoACK() throws Exception {
long recordTime = getTimePerSync();
jms.createQueue(true, "queue", null, true, null);
ConnectionFactory factory = newCF();
Connection connection = factory.createConnection();
try {
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue;
if (protocol.equals("amqp")) {
queue = session.createQueue("jms.queue.queue");
} else {
queue = session.createQueue("queue");
}
MessageProducer producer = session.createProducer(queue);
long start = System.nanoTime();
for (int i = 0; i < (RECORDS + WRAMP_UP); i++) {
if (i == WRAMP_UP) {
start = System.nanoTime(); // wramp up
}
producer.send(session.createMessage());
}
long end = System.nanoTime();
System.out.println("end - start = " + (end - start) + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(end - start));
System.out.println("RECORD TIME = " + recordTime + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(recordTime));
if ((end - start) < recordTime) {
Assert.fail("Messages are being sent too fast! Faster than the disk would be able to sync!");
}
connection.start();
MessageConsumer consumer = session.createConsumer(queue);
for (int i = 0; i < (RECORDS + WRAMP_UP); i++) {
if (i == WRAMP_UP) {
start = System.nanoTime(); // wramp up
}
Message msg = consumer.receive(5000);
Assert.assertNotNull(msg);
}
end = System.nanoTime();
System.out.println("end - start = " + (end - start) + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(end - start));
System.out.println("RECORD TIME = " + recordTime + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(recordTime));
// There's no way to sync on ack for AMQP
if (!protocol.equals("amqp") && (end - start) < recordTime) {
Assert.fail("Messages are being acked too fast! Faster than the disk would be able to sync!");
}
} finally {
connection.close();
}
}
// this will set ack as synchronous, to make sure we make proper measures against the sync on disk
private ConnectionFactory newCF() {
if (protocol.equals("core")) {
ConnectionFactory factory = new ActiveMQConnectionFactory();
((ActiveMQConnectionFactory) factory).setBlockOnAcknowledge(true);
return factory;
} else if (protocol.equals("amqp")) {
final JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
factory.setForceAsyncAcks(true);
return factory;
} else {
org.apache.activemq.ActiveMQConnectionFactory cf = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.cacheEnabled=true");
cf.setSendAcksAsync(false);
return cf;
}
}
}

View File

@ -35,6 +35,7 @@ import org.apache.activemq.artemis.integration.vertx.VertxOutgoingConnectorServi
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase; import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.vertx.java.core.Handler; import org.vertx.java.core.Handler;
import org.vertx.java.core.Vertx; import org.vertx.java.core.Vertx;
@ -48,8 +49,10 @@ import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
/** /**
* This class tests the basics of ActiveMQ * This class tests the basics of ActiveMQ
* vertx integration * vertx inte
* gration
*/ */
@Ignore
public class ActiveMQVertxUnitTest extends ActiveMQTestBase { public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
private PlatformManager vertxManager; private PlatformManager vertxManager;

View File

@ -59,6 +59,16 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
this(1, false); this(1, false);
} }
@Override
public SequentialFileFactory setDatasync(boolean enabled) {
return null;
}
@Override
public boolean isDatasync() {
return false;
}
@Override @Override
public int getMaxIO() { public int getMaxIO() {
return 1; return 1;