ARTEMIS-832 Openwire was ignoring data syncs.
I'm also adding the possibility of sync on libaio, and not only relay on write-cache
This commit is contained in:
parent
99a440b0d0
commit
5e5ac0f057
|
@ -213,6 +213,9 @@ public class Create extends InputAbstract {
|
|||
@Option(name = "--no-hornetq-acceptor", description = "Disable the HornetQ specific acceptor.")
|
||||
boolean noHornetQAcceptor;
|
||||
|
||||
@Option(name = "--no-fsync", description = "Disable usage of fdatasync (channel.force(false) from java nio) on the journal")
|
||||
boolean noJournalSync;
|
||||
|
||||
boolean IS_WINDOWS;
|
||||
|
||||
boolean IS_CYGWIN;
|
||||
|
@ -567,6 +570,7 @@ public class Create extends InputAbstract {
|
|||
filters.put("${web.protocol}", "http");
|
||||
filters.put("${extra.web.attributes}", "");
|
||||
}
|
||||
filters.put("${fsync}", String.valueOf(!noJournalSync));
|
||||
filters.put("${user}", System.getProperty("user.name", ""));
|
||||
filters.put("${default.port}", String.valueOf(defaultPort + portOffset));
|
||||
filters.put("${amqp.port}", String.valueOf(AMQP_PORT + portOffset));
|
||||
|
@ -776,7 +780,7 @@ public class Create extends InputAbstract {
|
|||
System.out.println("");
|
||||
System.out.println("Auto tuning journal ...");
|
||||
|
||||
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, aio);
|
||||
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, aio);
|
||||
long nanoseconds = SyncCalculation.toNanos(time, writes);
|
||||
double writesPerMillisecond = (double) writes / (double) time;
|
||||
|
||||
|
@ -807,7 +811,7 @@ public class Create extends InputAbstract {
|
|||
// forcing NIO
|
||||
return false;
|
||||
} else if (LibaioContext.isLoaded()) {
|
||||
try (LibaioContext context = new LibaioContext(1, true)) {
|
||||
try (LibaioContext context = new LibaioContext(1, true, true)) {
|
||||
File tmpFile = new File(directory, "validateAIO.bin");
|
||||
boolean supportsLibaio = true;
|
||||
try {
|
||||
|
|
|
@ -46,8 +46,9 @@ public class SyncCalculation {
|
|||
int blocks,
|
||||
int tries,
|
||||
boolean verbose,
|
||||
boolean fsync,
|
||||
boolean aio) throws Exception {
|
||||
SequentialFileFactory factory = newFactory(datafolder, aio);
|
||||
SequentialFileFactory factory = newFactory(datafolder, fsync, aio);
|
||||
SequentialFile file = factory.createSequentialFile("test.tmp");
|
||||
|
||||
try {
|
||||
|
@ -149,9 +150,9 @@ public class SyncCalculation {
|
|||
return timeWait;
|
||||
}
|
||||
|
||||
private static SequentialFileFactory newFactory(File datafolder, boolean aio) {
|
||||
private static SequentialFileFactory newFactory(File datafolder, boolean datasync, boolean aio) {
|
||||
if (aio && LibaioContext.isLoaded()) {
|
||||
SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1);
|
||||
SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
|
||||
factory.start();
|
||||
((AIOSequentialFileFactory) factory).disableBufferReuse();
|
||||
|
||||
|
|
|
@ -45,6 +45,8 @@ under the License.
|
|||
|
||||
<large-messages-directory>${data.dir}/large-messages</large-messages-directory>
|
||||
|
||||
<journal-datasync>${fsync}</journal-datasync>
|
||||
|
||||
<journal-min-files>2</journal-min-files>
|
||||
|
||||
<journal-pool-files>-1</journal-pool-files>
|
||||
|
|
|
@ -129,7 +129,7 @@ public class ArtemisTest {
|
|||
public void testSync() throws Exception {
|
||||
int writes = 20;
|
||||
int tries = 10;
|
||||
long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true);
|
||||
long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true);
|
||||
System.out.println();
|
||||
System.out.println("TotalAvg = " + totalAvg);
|
||||
long nanoTime = SyncCalculation.toNanos(totalAvg, writes);
|
||||
|
@ -144,7 +144,7 @@ public class ArtemisTest {
|
|||
Run.setEmbedded(true);
|
||||
//instance1: default using http
|
||||
File instance1 = new File(temporaryFolder.getRoot(), "instance1");
|
||||
Artemis.main("create", instance1.getAbsolutePath(), "--silent");
|
||||
Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--no-fsync");
|
||||
File bootstrapFile = new File(new File(instance1, "etc"), "bootstrap.xml");
|
||||
Assert.assertTrue(bootstrapFile.exists());
|
||||
Document config = parseXml(bootstrapFile);
|
||||
|
@ -163,7 +163,7 @@ public class ArtemisTest {
|
|||
|
||||
//instance2: https
|
||||
File instance2 = new File(temporaryFolder.getRoot(), "instance2");
|
||||
Artemis.main("create", instance2.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1");
|
||||
Artemis.main("create", instance2.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--no-fsync");
|
||||
bootstrapFile = new File(new File(instance2, "etc"), "bootstrap.xml");
|
||||
Assert.assertTrue(bootstrapFile.exists());
|
||||
config = parseXml(bootstrapFile);
|
||||
|
@ -184,7 +184,7 @@ public class ArtemisTest {
|
|||
|
||||
//instance3: https with clientAuth
|
||||
File instance3 = new File(temporaryFolder.getRoot(), "instance3");
|
||||
Artemis.main("create", instance3.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--use-client-auth", "--ssl-trust", "etc/truststore", "--ssl-trust-password", "password2");
|
||||
Artemis.main("create", instance3.getAbsolutePath(), "--silent", "--ssl-key", "etc/keystore", "--ssl-key-password", "password1", "--use-client-auth", "--ssl-trust", "etc/truststore", "--ssl-trust-password", "password2", "--no-fsync");
|
||||
bootstrapFile = new File(new File(instance3, "etc"), "bootstrap.xml");
|
||||
Assert.assertTrue(bootstrapFile.exists());
|
||||
|
||||
|
|
|
@ -130,6 +130,9 @@ public final class ActiveMQDefaultConfiguration {
|
|||
// true means that the server will use the file based journal for persistence.
|
||||
private static boolean DEFAULT_PERSISTENCE_ENABLED = true;
|
||||
|
||||
// true means that the server will sync data files
|
||||
private static boolean DEFAULT_JOURNAL_DATASYNC = true;
|
||||
|
||||
// Maximum number of threads to use for the scheduled thread pool
|
||||
private static int DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE = 5;
|
||||
|
||||
|
@ -456,6 +459,10 @@ public final class ActiveMQDefaultConfiguration {
|
|||
return DEFAULT_PERSISTENCE_ENABLED;
|
||||
}
|
||||
|
||||
public static boolean isDefaultJournalDatasync() {
|
||||
return DEFAULT_JOURNAL_DATASYNC;
|
||||
}
|
||||
|
||||
/**
|
||||
* Maximum number of threads to use for the scheduled thread pool
|
||||
*/
|
||||
|
|
|
@ -93,7 +93,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
|
||||
private final long connectionTTL;
|
||||
|
||||
private final Set<ClientSessionInternal> sessions = new HashSet<>();
|
||||
private final Set<ClientSessionInternal> sessions = new ConcurrentHashSet<>();
|
||||
|
||||
private final Object createSessionLock = new Object();
|
||||
|
||||
|
@ -506,6 +506,7 @@ public class ClientSessionFactoryImpl implements ClientSessionFactoryInternal, C
|
|||
// this is just a debug, since an interrupt is an expected event (in case of a shutdown)
|
||||
logger.debug(e1.getMessage(), e1);
|
||||
} catch (Throwable t) {
|
||||
logger.warn(t.getMessage(), t);
|
||||
//for anything else just close so clients are un blocked
|
||||
close();
|
||||
throw t;
|
||||
|
|
|
@ -60,6 +60,18 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
|
|||
dbDriver = JDBCFileUtils.getDBFileDriver(className, connectionUrl, sqlProvider);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SequentialFileFactory setDatasync(boolean enabled) {
|
||||
|
||||
// noop
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDatasync() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void start() {
|
||||
try {
|
||||
|
|
|
@ -21,9 +21,7 @@ import java.io.IOException;
|
|||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
|
||||
|
@ -58,11 +56,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
|
|||
*/
|
||||
protected final TimedBufferObserver timedBufferObserver = new LocalBufferObserver();
|
||||
|
||||
/**
|
||||
* Used for asynchronous writes
|
||||
*/
|
||||
protected final Executor writerExecutor;
|
||||
|
||||
/**
|
||||
* @param file
|
||||
* @param directory
|
||||
|
@ -75,7 +68,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
|
|||
this.file = new File(directory, file);
|
||||
this.directory = directory;
|
||||
this.factory = factory;
|
||||
this.writerExecutor = writerExecutor;
|
||||
}
|
||||
|
||||
// Public --------------------------------------------------------
|
||||
|
@ -166,20 +158,6 @@ public abstract class AbstractSequentialFile implements SequentialFile {
|
|||
*/
|
||||
@Override
|
||||
public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
|
||||
final CountDownLatch donelatch = new CountDownLatch(1);
|
||||
|
||||
if (writerExecutor != null) {
|
||||
writerExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
donelatch.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
while (!donelatch.await(60, TimeUnit.SECONDS)) {
|
||||
ActiveMQJournalLogger.LOGGER.couldNotCompleteTask(new Exception("trace"), file.getName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -52,6 +52,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
|
|||
|
||||
protected final int maxIO;
|
||||
|
||||
protected boolean dataSync = true;
|
||||
|
||||
private final IOCriticalErrorListener critialErrorListener;
|
||||
|
||||
/**
|
||||
|
@ -81,6 +83,19 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
|
|||
this.maxIO = maxIO;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public SequentialFileFactory setDatasync(boolean enabled) {
|
||||
this.dataSync = enabled;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDatasync() {
|
||||
return dataSync;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
if (timedBuffer != null) {
|
||||
|
|
|
@ -95,4 +95,8 @@ public interface SequentialFileFactory {
|
|||
void createDirs() throws Exception;
|
||||
|
||||
void flush();
|
||||
|
||||
SequentialFileFactory setDatasync(boolean enabled);
|
||||
|
||||
boolean isDatasync();
|
||||
}
|
||||
|
|
|
@ -97,7 +97,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
@Override
|
||||
public SequentialFile cloneFile() {
|
||||
return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), writerExecutor);
|
||||
return new AIOSequentialFile(aioFactory, -1, -1, getFile().getParentFile(), getFile().getName(), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -214,11 +214,7 @@ public class AIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
AIOSequentialFileFactory.AIOSequentialCallback runnableCallback = getCallback(callback, bytes);
|
||||
runnableCallback.initWrite(positionToWrite, bytesToWrite);
|
||||
if (writerExecutor != null) {
|
||||
writerExecutor.execute(runnableCallback);
|
||||
} else {
|
||||
runnableCallback.run();
|
||||
}
|
||||
runnableCallback.run();
|
||||
}
|
||||
|
||||
AIOSequentialFileFactory.AIOSequentialCallback getCallback(IOCallback originalCallback, ByteBuffer buffer) {
|
||||
|
|
|
@ -211,7 +211,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
|
|||
if (running.compareAndSet(false, true)) {
|
||||
super.start();
|
||||
|
||||
this.libaioContext = new LibaioContext(maxIO, true);
|
||||
this.libaioContext = new LibaioContext(maxIO, true, dataSync);
|
||||
|
||||
this.running.set(true);
|
||||
|
||||
|
|
|
@ -49,12 +49,15 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
private String fileName;
|
||||
private MappedFile mappedFile;
|
||||
private ActiveMQBuffer pooledActiveMQBuffer;
|
||||
private final MappedSequentialFileFactory factory;
|
||||
|
||||
MappedSequentialFile(final File directory,
|
||||
MappedSequentialFile(MappedSequentialFileFactory factory,
|
||||
final File directory,
|
||||
final File file,
|
||||
final long chunkBytes,
|
||||
final long overlapBytes,
|
||||
final IOCriticalErrorListener criticalErrorListener) {
|
||||
this.factory = factory;
|
||||
this.directory = directory;
|
||||
this.file = file;
|
||||
this.absoluteFile = null;
|
||||
|
@ -155,7 +158,7 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
final int readableBytes = writerIndex - readerIndex;
|
||||
if (readableBytes > 0) {
|
||||
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
|
||||
if (sync) {
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
|
@ -178,7 +181,7 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
final int readableBytes = writerIndex - readerIndex;
|
||||
if (readableBytes > 0) {
|
||||
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
|
||||
if (sync) {
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
|
@ -209,7 +212,7 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
final int readableBytes = writerIndex - readerIndex;
|
||||
if (readableBytes > 0) {
|
||||
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
|
||||
if (sync) {
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
|
@ -235,7 +238,7 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
final int readableBytes = writerIndex - readerIndex;
|
||||
if (readableBytes > 0) {
|
||||
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
|
||||
if (sync) {
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
|
@ -253,7 +256,7 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
final int remaining = limit - position;
|
||||
if (remaining > 0) {
|
||||
this.mappedFile.write(bytes, position, remaining);
|
||||
if (sync) {
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
|
@ -275,7 +278,7 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
final int remaining = limit - position;
|
||||
if (remaining > 0) {
|
||||
this.mappedFile.write(bytes, position, remaining);
|
||||
if (sync) {
|
||||
if (factory.isDatasync() && sync) {
|
||||
this.mappedFile.force();
|
||||
}
|
||||
}
|
||||
|
@ -381,7 +384,7 @@ final class MappedSequentialFile implements SequentialFile {
|
|||
@Override
|
||||
public SequentialFile cloneFile() {
|
||||
checkIsNotOpen();
|
||||
return new MappedSequentialFile(this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener);
|
||||
return new MappedSequentialFile(factory, this.directory, this.file, this.chunkBytes, this.overlapBytes, this.criticalErrorListener);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -37,6 +37,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
private final IOCriticalErrorListener criticalErrorListener;
|
||||
private long chunkBytes;
|
||||
private long overlapBytes;
|
||||
private boolean useDataSync;
|
||||
|
||||
public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
|
||||
this.directory = directory;
|
||||
|
@ -72,7 +73,18 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
|
|||
|
||||
@Override
|
||||
public SequentialFile createSequentialFile(String fileName) {
|
||||
return new MappedSequentialFile(directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
|
||||
return new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SequentialFileFactory setDatasync(boolean enabled) {
|
||||
this.useDataSync = enabled;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDatasync() {
|
||||
return useDataSync;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -23,8 +23,6 @@ import java.nio.ByteBuffer;
|
|||
import java.nio.channels.ClosedChannelException;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
|
@ -35,7 +33,6 @@ import org.apache.activemq.artemis.core.io.IOCallback;
|
|||
import org.apache.activemq.artemis.core.io.SequentialFile;
|
||||
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalBundle;
|
||||
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
|
||||
|
||||
public final class NIOSequentialFile extends AbstractSequentialFile {
|
||||
|
||||
|
@ -43,11 +40,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
private RandomAccessFile rfile;
|
||||
|
||||
/**
|
||||
* The write semaphore here is only used when writing asynchronously
|
||||
*/
|
||||
private Semaphore maxIOSemaphore;
|
||||
|
||||
private final int defaultMaxIO;
|
||||
|
||||
private int maxIO;
|
||||
|
@ -99,11 +91,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
throw e;
|
||||
}
|
||||
|
||||
if (writerExecutor != null && useExecutor) {
|
||||
maxIOSemaphore = new Semaphore(maxIO);
|
||||
this.maxIO = maxIO;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -124,6 +111,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
throw e;
|
||||
}
|
||||
channel.force(true);
|
||||
|
||||
fileSize = channel.size();
|
||||
}
|
||||
|
@ -138,13 +126,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
public synchronized void close() throws IOException, InterruptedException, ActiveMQException {
|
||||
super.close();
|
||||
|
||||
if (maxIOSemaphore != null) {
|
||||
while (!maxIOSemaphore.tryAcquire(maxIO, 60, TimeUnit.SECONDS)) {
|
||||
ActiveMQJournalLogger.LOGGER.errorClosingFile(getFileName());
|
||||
}
|
||||
}
|
||||
|
||||
maxIOSemaphore = null;
|
||||
try {
|
||||
if (channel != null) {
|
||||
channel.close();
|
||||
|
@ -202,7 +183,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
@Override
|
||||
public void sync() throws IOException {
|
||||
if (channel != null) {
|
||||
if (factory.isDatasync() && channel != null) {
|
||||
try {
|
||||
channel.force(false);
|
||||
} catch (ClosedChannelException e) {
|
||||
|
@ -250,7 +231,7 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
@Override
|
||||
public SequentialFile cloneFile() {
|
||||
return new NIOSequentialFile(factory, directory, getFileName(), maxIO, writerExecutor);
|
||||
return new NIOSequentialFile(factory, directory, getFileName(), maxIO, null);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -298,40 +279,12 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
|
|||
|
||||
position.addAndGet(bytes.limit());
|
||||
|
||||
if (maxIOSemaphore == null || callback == null) {
|
||||
// if maxIOSemaphore == null, that means we are not using executors and the writes are synchronous
|
||||
try {
|
||||
doInternalWrite(bytes, sync, callback);
|
||||
} catch (ClosedChannelException e) {
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
}
|
||||
} else {
|
||||
// This is a flow control on writing, just like maxAIO on libaio
|
||||
maxIOSemaphore.acquire();
|
||||
|
||||
writerExecutor.execute(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
try {
|
||||
doInternalWrite(bytes, sync, callback);
|
||||
} catch (ClosedChannelException e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
|
||||
} catch (IOException e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
|
||||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), NIOSequentialFile.this);
|
||||
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
|
||||
} catch (Throwable e) {
|
||||
ActiveMQJournalLogger.LOGGER.errorSubmittingWrite(e);
|
||||
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), e.getMessage());
|
||||
}
|
||||
} finally {
|
||||
maxIOSemaphore.release();
|
||||
}
|
||||
}
|
||||
});
|
||||
try {
|
||||
doInternalWrite(bytes, sync, callback);
|
||||
} catch (ClosedChannelException e) {
|
||||
throw e;
|
||||
} catch (IOException e) {
|
||||
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Binary file not shown.
|
@ -536,7 +536,7 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_su
|
|||
}
|
||||
|
||||
JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_blockedPoll
|
||||
(JNIEnv * env, jobject thisObject, jobject contextPointer) {
|
||||
(JNIEnv * env, jobject thisObject, jobject contextPointer, jboolean useFdatasync) {
|
||||
|
||||
#ifdef DEBUG
|
||||
fprintf (stdout, "Running blockedPoll\n");
|
||||
|
@ -553,6 +553,8 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
|
|||
|
||||
short running = 1;
|
||||
|
||||
int lastFile = -1;
|
||||
|
||||
while (running) {
|
||||
|
||||
int result = io_getevents(theControl->ioContext, 1, max, theControl->events, 0);
|
||||
|
@ -574,6 +576,8 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
|
|||
fflush(stdout);
|
||||
#endif
|
||||
|
||||
lastFile = -1;
|
||||
|
||||
for (i = 0; i < result; i++)
|
||||
{
|
||||
#ifdef DEBUG
|
||||
|
@ -593,6 +597,11 @@ JNIEXPORT void JNICALL Java_org_apache_activemq_artemis_jlibaio_LibaioContext_bl
|
|||
break;
|
||||
}
|
||||
|
||||
if (useFdatasync && lastFile != iocbp->aio_fildes) {
|
||||
lastFile = iocbp->aio_fildes;
|
||||
fdatasync(lastFile);
|
||||
}
|
||||
|
||||
|
||||
int eventResult = (int)event->res;
|
||||
|
||||
|
|
|
@ -49,7 +49,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
|||
* <br>
|
||||
* Or else the native module won't be loaded because of version mismatches
|
||||
*/
|
||||
private static final int EXPECTED_NATIVE_VERSION = 6;
|
||||
private static final int EXPECTED_NATIVE_VERSION = 7;
|
||||
|
||||
private static boolean loaded = false;
|
||||
|
||||
|
@ -146,6 +146,8 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
|||
|
||||
final int queueSize;
|
||||
|
||||
final boolean useFdatasync;
|
||||
|
||||
/**
|
||||
* The queue size here will use resources defined on the kernel parameter
|
||||
* <a href="https://www.kernel.org/doc/Documentation/sysctl/fs.txt">fs.aio-max-nr</a> .
|
||||
|
@ -153,11 +155,13 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
|||
* @param queueSize the size to be initialize on libaio
|
||||
* io_queue_init which can't be higher than /proc/sys/fs/aio-max-nr.
|
||||
* @param useSemaphore should block on a semaphore avoiding using more submits than what's available.
|
||||
* @param useFdatasync should use fdatasync before calling callbacks.
|
||||
*/
|
||||
public LibaioContext(int queueSize, boolean useSemaphore) {
|
||||
public LibaioContext(int queueSize, boolean useSemaphore, boolean useFdatasync) {
|
||||
try {
|
||||
contexts.incrementAndGet();
|
||||
this.ioContext = newContext(queueSize);
|
||||
this.useFdatasync = useFdatasync;
|
||||
} catch (Exception e) {
|
||||
throw e;
|
||||
}
|
||||
|
@ -349,7 +353,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
|||
*/
|
||||
public void poll() {
|
||||
if (!closed.get()) {
|
||||
blockedPoll(ioContext);
|
||||
blockedPoll(ioContext, useFdatasync);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -436,7 +440,7 @@ public class LibaioContext<Callback extends SubmitInfo> implements Closeable {
|
|||
/**
|
||||
* This method will block as long as the context is open.
|
||||
*/
|
||||
native void blockedPoll(ByteBuffer libaioContext);
|
||||
native void blockedPoll(ByteBuffer libaioContext, boolean useFdatasync);
|
||||
|
||||
static native int getNativeVersion();
|
||||
|
||||
|
|
|
@ -54,7 +54,7 @@ public class LibaioTest {
|
|||
parent.mkdirs();
|
||||
|
||||
boolean failed = false;
|
||||
try (LibaioContext control = new LibaioContext<>(1, true); LibaioFile fileDescriptor = control.openFile(file, true)) {
|
||||
try (LibaioContext control = new LibaioContext<>(1, true, true); LibaioFile fileDescriptor = control.openFile(file, true)) {
|
||||
fileDescriptor.fallocate(4 * 1024);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
|
@ -80,7 +80,7 @@ public class LibaioTest {
|
|||
|
||||
@Before
|
||||
public void setUpFactory() {
|
||||
control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true);
|
||||
control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, true, true);
|
||||
}
|
||||
|
||||
@After
|
||||
|
@ -532,10 +532,10 @@ public class LibaioTest {
|
|||
boolean exceptionThrown = false;
|
||||
|
||||
control.close();
|
||||
control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false);
|
||||
control = new LibaioContext<>(LIBAIO_QUEUE_SIZE, false, true);
|
||||
try {
|
||||
// There is no space for a queue this huge, the native layer should throw the exception
|
||||
LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false);
|
||||
LibaioContext newController = new LibaioContext(Integer.MAX_VALUE, false, true);
|
||||
} catch (RuntimeException e) {
|
||||
exceptionThrown = true;
|
||||
}
|
||||
|
@ -630,7 +630,7 @@ public class LibaioTest {
|
|||
|
||||
@Test
|
||||
public void testBlockedCallback() throws Exception {
|
||||
final LibaioContext blockedContext = new LibaioContext(500, true);
|
||||
final LibaioContext blockedContext = new LibaioContext(500, true, true);
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
|
|
@ -53,7 +53,7 @@ public class OpenCloseContextTest {
|
|||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
System.out.println("#test " + i);
|
||||
final LibaioContext control = new LibaioContext<>(5, true);
|
||||
final LibaioContext control = new LibaioContext<>(5, true, true);
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -111,7 +111,7 @@ public class OpenCloseContextTest {
|
|||
|
||||
for (int i = 0; i < 10; i++) {
|
||||
System.out.println("#test " + i);
|
||||
final LibaioContext control = new LibaioContext<>(5, true);
|
||||
final LibaioContext control = new LibaioContext<>(5, true, true);
|
||||
Thread t = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
@ -164,9 +164,9 @@ public class OpenCloseContextTest {
|
|||
|
||||
@Test
|
||||
public void testCloseAndStart() throws Exception {
|
||||
final LibaioContext control2 = new LibaioContext<>(5, true);
|
||||
final LibaioContext control2 = new LibaioContext<>(5, true, true);
|
||||
|
||||
final LibaioContext control = new LibaioContext<>(5, true);
|
||||
final LibaioContext control = new LibaioContext<>(5, true, true);
|
||||
control.close();
|
||||
control.poll();
|
||||
|
||||
|
|
|
@ -177,7 +177,7 @@ public class AMQPConnectionCallback implements FailureListener, CloseListener {
|
|||
}
|
||||
|
||||
public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) {
|
||||
return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor);
|
||||
return new AMQPSessionCallback(this, manager, connection, this.connection, closeExecutor, server.newOperationContext());
|
||||
}
|
||||
|
||||
public void sendSASLSupported() {
|
||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
|
|||
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.paging.PagingStore;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.server.BindingQueryResult;
|
||||
import org.apache.activemq.artemis.core.server.MessageReference;
|
||||
import org.apache.activemq.artemis.core.server.QueueQueryResult;
|
||||
|
@ -81,6 +82,8 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
|
||||
private ServerSession serverSession;
|
||||
|
||||
private final OperationContext operationContext;
|
||||
|
||||
private AMQPSessionContext protonSession;
|
||||
|
||||
private final Executor closeExecutor;
|
||||
|
@ -91,12 +94,14 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
ProtonProtocolManager manager,
|
||||
AMQPConnectionContext connection,
|
||||
Connection transportConnection,
|
||||
Executor executor) {
|
||||
Executor executor,
|
||||
OperationContext operationContext) {
|
||||
this.protonSPI = protonSPI;
|
||||
this.manager = manager;
|
||||
this.connection = connection;
|
||||
this.transportConnection = transportConnection;
|
||||
this.closeExecutor = executor;
|
||||
this.operationContext = operationContext;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -151,7 +156,7 @@ public class AMQPSessionCallback implements SessionCallback {
|
|||
false, // boolean autoCommitAcks,
|
||||
false, // boolean preAcknowledge,
|
||||
true, //boolean xa,
|
||||
(String) null, this, true);
|
||||
(String) null, this, true, operationContext);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -96,7 +96,8 @@ public class MQTTConnectionManager {
|
|||
String id = UUIDGenerator.getInstance().generateStringUUID();
|
||||
ActiveMQServer server = session.getServer();
|
||||
|
||||
ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null, session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE);
|
||||
ServerSession serverSession = server.createSession(id, username, password, ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, session.getConnection(), MQTTUtil.SESSION_AUTO_COMMIT_SENDS, MQTTUtil.SESSION_AUTO_COMMIT_ACKS, MQTTUtil.SESSION_PREACKNOWLEDGE, MQTTUtil.SESSION_XA, null,
|
||||
session.getSessionCallback(), MQTTUtil.SESSION_AUTO_CREATE_QUEUE, server.newOperationContext());
|
||||
return (ServerSessionImpl) serverSession;
|
||||
}
|
||||
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.io.IOCallback;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.postoffice.Binding;
|
||||
import org.apache.activemq.artemis.core.postoffice.Bindings;
|
||||
|
@ -119,12 +120,15 @@ import org.apache.activemq.state.SessionState;
|
|||
import org.apache.activemq.transport.TransmitCallback;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.wireformat.WireFormat;
|
||||
import org.jboss.logging.Logger;
|
||||
|
||||
/**
|
||||
* Represents an activemq connection.
|
||||
*/
|
||||
public class OpenWireConnection extends AbstractRemotingConnection implements SecurityAuth {
|
||||
|
||||
private static final Logger logger = Logger.getLogger(OpenWireConnection.class);
|
||||
|
||||
private static final KeepAliveInfo PING = new KeepAliveInfo();
|
||||
|
||||
private final OpenWireProtocolManager protocolManager;
|
||||
|
@ -139,17 +143,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
private final AtomicBoolean stopping = new AtomicBoolean(false);
|
||||
|
||||
private boolean inServiceException;
|
||||
|
||||
private final AtomicBoolean asyncException = new AtomicBoolean(false);
|
||||
|
||||
// Clebert: Artemis session has meta-data support, perhaps we could reuse it here
|
||||
private final Map<String, SessionId> sessionIdMap = new ConcurrentHashMap<>();
|
||||
|
||||
private final Map<ConsumerId, AMQConsumerBrokerExchange> consumerExchanges = new ConcurrentHashMap<>();
|
||||
private final Map<ProducerId, AMQProducerBrokerExchange> producerExchanges = new ConcurrentHashMap<>();
|
||||
|
||||
// Clebert TODO: Artemis already stores the Session. Why do we need a different one here
|
||||
private final Map<SessionId, AMQSession> sessions = new ConcurrentHashMap<>();
|
||||
|
||||
private ConnectionState state;
|
||||
|
@ -172,6 +170,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
*/
|
||||
private ServerSession internalSession;
|
||||
|
||||
private final OperationContext operationContext;
|
||||
|
||||
private volatile long lastSent = -1;
|
||||
private ConnectionEntry connectionEntry;
|
||||
private boolean useKeepAlive;
|
||||
|
@ -185,6 +185,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
OpenWireFormat wf) {
|
||||
super(connection, executor);
|
||||
this.server = server;
|
||||
this.operationContext = server.newOperationContext();
|
||||
this.protocolManager = openWireProtocolManager;
|
||||
this.wireFormat = wf;
|
||||
this.useKeepAlive = openWireProtocolManager.isUseKeepAlive();
|
||||
|
@ -201,6 +202,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
return info.getUserName();
|
||||
}
|
||||
|
||||
|
||||
public OperationContext getOperationContext() {
|
||||
return operationContext;
|
||||
}
|
||||
|
||||
// SecurityAuth implementation
|
||||
@Override
|
||||
public RemotingConnection getRemotingConnection() {
|
||||
|
@ -239,6 +245,8 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
super.bufferReceived(connectionID, buffer);
|
||||
try {
|
||||
|
||||
recoverOperationContext();
|
||||
|
||||
Command command = (Command) wireFormat.unmarshal(buffer);
|
||||
|
||||
boolean responseRequired = command.isResponseRequired();
|
||||
|
@ -285,17 +293,38 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
}
|
||||
}
|
||||
|
||||
// TODO: response through operation-context
|
||||
|
||||
if (response != null && !protocolManager.isStopping()) {
|
||||
response.setCorrelationId(commandId);
|
||||
dispatchSync(response);
|
||||
}
|
||||
sendAsyncResponse(commandId, response);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
ActiveMQServerLogger.LOGGER.debug(e);
|
||||
|
||||
sendException(e);
|
||||
} finally {
|
||||
clearupOperationContext();
|
||||
}
|
||||
}
|
||||
|
||||
/** It will send the response through the operation context, as soon as everything is confirmed on disk */
|
||||
private void sendAsyncResponse(final int commandId, final Response response) throws Exception {
|
||||
if (response != null) {
|
||||
operationContext.executeOnCompletion(new IOCallback() {
|
||||
@Override
|
||||
public void done() {
|
||||
if (!protocolManager.isStopping()) {
|
||||
try {
|
||||
response.setCorrelationId(commandId);
|
||||
dispatchSync(response);
|
||||
} catch (Exception e) {
|
||||
sendException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onError(int errorCode, String errorMessage) {
|
||||
sendException(new IOException(errorCode + "-" + errorMessage));
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -626,7 +655,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
}
|
||||
|
||||
private void createInternalSession(ConnectionInfo info) throws Exception {
|
||||
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true);
|
||||
internalSession = server.createSession(UUIDGenerator.getInstance().generateStringUUID(), context.getUserName(), info.getPassword(), -1, this, true, false, false, false, null, null, true, operationContext);
|
||||
}
|
||||
|
||||
//raise the refCount of context
|
||||
|
@ -1083,7 +1112,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
public Response processBeginTransaction(TransactionInfo info) throws Exception {
|
||||
final TransactionId txID = info.getTransactionId();
|
||||
|
||||
setOperationContext(null);
|
||||
try {
|
||||
internalSession.resetTX(null);
|
||||
if (txID.isXATransaction()) {
|
||||
|
@ -1101,7 +1129,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
}
|
||||
} finally {
|
||||
internalSession.resetTX(null);
|
||||
clearOpeartionContext();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -1118,12 +1145,7 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
AMQSession session = (AMQSession) tx.getProtocolData();
|
||||
|
||||
setOperationContext(session);
|
||||
try {
|
||||
tx.commit(onePhase);
|
||||
} finally {
|
||||
clearOpeartionContext();
|
||||
}
|
||||
tx.commit(onePhase);
|
||||
|
||||
return null;
|
||||
}
|
||||
|
@ -1137,21 +1159,16 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
public Response processForgetTransaction(TransactionInfo info) throws Exception {
|
||||
TransactionId txID = info.getTransactionId();
|
||||
|
||||
setOperationContext(null);
|
||||
try {
|
||||
if (txID.isXATransaction()) {
|
||||
try {
|
||||
Xid xid = OpenWireUtil.toXID(info.getTransactionId());
|
||||
internalSession.xaForget(xid);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
throw e;
|
||||
}
|
||||
} else {
|
||||
txMap.remove(txID);
|
||||
if (txID.isXATransaction()) {
|
||||
try {
|
||||
Xid xid = OpenWireUtil.toXID(info.getTransactionId());
|
||||
internalSession.xaForget(xid);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
throw e;
|
||||
}
|
||||
} finally {
|
||||
clearOpeartionContext();
|
||||
} else {
|
||||
txMap.remove(txID);
|
||||
}
|
||||
|
||||
return null;
|
||||
|
@ -1161,7 +1178,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
public Response processPrepareTransaction(TransactionInfo info) throws Exception {
|
||||
TransactionId txID = info.getTransactionId();
|
||||
|
||||
setOperationContext(null);
|
||||
try {
|
||||
if (txID.isXATransaction()) {
|
||||
try {
|
||||
|
@ -1177,7 +1193,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
}
|
||||
} finally {
|
||||
internalSession.resetTX(null);
|
||||
clearOpeartionContext();
|
||||
}
|
||||
|
||||
return new IntegerResponse(XAResource.XA_RDONLY);
|
||||
|
@ -1187,7 +1202,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
public Response processEndTransaction(TransactionInfo info) throws Exception {
|
||||
TransactionId txID = info.getTransactionId();
|
||||
|
||||
setOperationContext(null);
|
||||
if (txID.isXATransaction()) {
|
||||
try {
|
||||
Transaction tx = lookupTX(txID, null);
|
||||
|
@ -1204,7 +1218,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
}
|
||||
} else {
|
||||
txMap.remove(txID);
|
||||
clearOpeartionContext();
|
||||
}
|
||||
|
||||
return null;
|
||||
|
@ -1267,13 +1280,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
Transaction tx = lookupTX(messageSend.getTransactionId(), session);
|
||||
|
||||
setOperationContext(session);
|
||||
session.getCoreSession().resetTX(tx);
|
||||
try {
|
||||
session.send(producerInfo, messageSend, sendProducerAck);
|
||||
} finally {
|
||||
session.getCoreSession().resetTX(null);
|
||||
clearOpeartionContext();
|
||||
}
|
||||
|
||||
return null;
|
||||
|
@ -1283,7 +1294,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
public Response processMessageAck(MessageAck ack) throws Exception {
|
||||
AMQSession session = getSession(ack.getConsumerId().getParentId());
|
||||
Transaction tx = lookupTX(ack.getTransactionId(), session);
|
||||
setOperationContext(session);
|
||||
session.getCoreSession().resetTX(tx);
|
||||
|
||||
try {
|
||||
|
@ -1291,7 +1301,6 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
consumerBrokerExchange.acknowledge(ack);
|
||||
} finally {
|
||||
session.getCoreSession().resetTX(null);
|
||||
clearOpeartionContext();
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -1367,17 +1376,11 @@ public class OpenWireConnection extends AbstractRemotingConnection implements Se
|
|||
|
||||
}
|
||||
|
||||
private void setOperationContext(AMQSession session) {
|
||||
OperationContext ctx;
|
||||
if (session == null) {
|
||||
ctx = this.internalSession.getSessionContext();
|
||||
} else {
|
||||
ctx = session.getCoreSession().getSessionContext();
|
||||
}
|
||||
server.getStorageManager().setContext(ctx);
|
||||
private void recoverOperationContext() {
|
||||
server.getStorageManager().setContext(this.operationContext);
|
||||
}
|
||||
|
||||
private void clearOpeartionContext() {
|
||||
private void clearupOperationContext() {
|
||||
server.getStorageManager().clearContext();
|
||||
}
|
||||
|
||||
|
|
|
@ -107,7 +107,7 @@ public class AMQSession implements SessionCallback {
|
|||
// now
|
||||
|
||||
try {
|
||||
coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true);
|
||||
coreSession = server.createSession(name, username, password, minLargeMessageSize, connection, true, false, false, false, null, this, true, connection.getOperationContext());
|
||||
|
||||
long sessionId = sessInfo.getSessionId().getValue();
|
||||
if (sessionId == -1) {
|
||||
|
@ -290,8 +290,6 @@ public class AMQSession implements SessionCallback {
|
|||
} else {
|
||||
final Connection transportConnection = connection.getTransportConnection();
|
||||
|
||||
// new Exception("Setting to false").printStackTrace();
|
||||
|
||||
if (transportConnection == null) {
|
||||
// I don't think this could happen, but just in case, avoiding races
|
||||
runnable = null;
|
||||
|
|
|
@ -230,7 +230,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
|
|||
if (stompSession == null) {
|
||||
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(server.getExecutorFactory().getExecutor()));
|
||||
String name = UUIDGenerator.getInstance().generateStringUUID();
|
||||
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true);
|
||||
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, true, false, false, false, null, stompSession, true, server.newOperationContext());
|
||||
stompSession.setServerSession(session);
|
||||
sessions.put(connection.getID(), stompSession);
|
||||
}
|
||||
|
@ -243,7 +243,7 @@ public class StompProtocolManager extends AbstractProtocolManager<StompFrame, St
|
|||
if (stompSession == null) {
|
||||
stompSession = new StompSession(connection, this, server.getStorageManager().newContext(executor));
|
||||
String name = UUIDGenerator.getInstance().generateStringUUID();
|
||||
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true);
|
||||
ServerSession session = server.createSession(name, connection.getLogin(), connection.getPasscode(), ActiveMQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, connection, false, false, false, false, null, stompSession, true, server.newOperationContext());
|
||||
stompSession.setServerSession(session);
|
||||
transactedSessions.put(txID, stompSession);
|
||||
}
|
||||
|
|
|
@ -78,6 +78,23 @@ public interface Configuration {
|
|||
*/
|
||||
Configuration setPersistenceEnabled(boolean enable);
|
||||
|
||||
/**
|
||||
* Should use fdatasync on journal files.
|
||||
*
|
||||
* @see <a href="http://man7.org/linux/man-pages/man2/fdatasync.2.html">fdatasync</a>
|
||||
*
|
||||
* @return a boolean
|
||||
*/
|
||||
boolean isJournalDatasync();
|
||||
|
||||
/**
|
||||
* documented at {@link #isJournalDatasync()} ()}
|
||||
*
|
||||
* @param enable
|
||||
* @return this
|
||||
*/
|
||||
Configuration setJournalDatasync(boolean enable);
|
||||
|
||||
/**
|
||||
* @return usernames mapped to ResourceLimitSettings
|
||||
*/
|
||||
|
|
|
@ -77,6 +77,8 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
|
||||
private boolean persistenceEnabled = ActiveMQDefaultConfiguration.isDefaultPersistenceEnabled();
|
||||
|
||||
private boolean journalDatasync = ActiveMQDefaultConfiguration.isDefaultJournalDatasync();
|
||||
|
||||
protected long fileDeploymentScanPeriod = ActiveMQDefaultConfiguration.getDefaultFileDeployerScanPeriod();
|
||||
|
||||
private boolean persistDeliveryCountBeforeDelivery = ActiveMQDefaultConfiguration.isDefaultPersistDeliveryCountBeforeDelivery();
|
||||
|
@ -297,6 +299,17 @@ public class ConfigurationImpl implements Configuration, Serializable {
|
|||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isJournalDatasync() {
|
||||
return journalDatasync;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ConfigurationImpl setJournalDatasync(boolean enable) {
|
||||
journalDatasync = enable;
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getFileDeployerScanPeriod() {
|
||||
return fileDeploymentScanPeriod;
|
||||
|
|
|
@ -486,6 +486,8 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
|
|||
}
|
||||
}
|
||||
|
||||
config.setJournalDatasync(getBoolean(e, "journal-datasync", config.isJournalDatasync()));
|
||||
|
||||
config.setJournalSyncTransactional(getBoolean(e, "journal-sync-transactional", config.isJournalSyncTransactional()));
|
||||
|
||||
config.setJournalSyncNonTransactional(getBoolean(e, "journal-sync-non-transactional", config.isJournalSyncNonTransactional()));
|
||||
|
|
|
@ -118,6 +118,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
|||
}
|
||||
|
||||
bindingsFF = new NIOSequentialFileFactory(config.getBindingsLocation(), criticalErrorListener, config.getJournalMaxIO_NIO());
|
||||
bindingsFF.setDatasync(config.isJournalDatasync());
|
||||
|
||||
Journal localBindings = new JournalImpl(ioExecutors, 1024 * 1024, 2, config.getJournalCompactMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactPercentage(), bindingsFF, "activemq-bindings", "bindings", 1, 0);
|
||||
|
||||
|
@ -135,6 +136,9 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
|
|||
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
|
||||
}
|
||||
|
||||
journalFF.setDatasync(config.isJournalDatasync());
|
||||
|
||||
|
||||
Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0);
|
||||
|
||||
messageJournal = localMessage;
|
||||
|
|
|
@ -20,6 +20,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
|
|||
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
|
||||
import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
|
||||
import org.apache.activemq.artemis.api.core.SimpleString;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.protocol.core.Channel;
|
||||
import org.apache.activemq.artemis.core.protocol.core.ChannelHandler;
|
||||
import org.apache.activemq.artemis.core.protocol.core.CoreRemotingConnection;
|
||||
|
@ -150,7 +151,9 @@ public class ActiveMQPacketHandler implements ChannelHandler {
|
|||
activeMQPrincipal = connection.getDefaultActiveMQPrincipal();
|
||||
}
|
||||
|
||||
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true);
|
||||
OperationContext sessionOperationContext = server.newOperationContext();
|
||||
|
||||
ServerSession session = server.createSession(request.getName(), activeMQPrincipal == null ? request.getUsername() : activeMQPrincipal.getUserName(), activeMQPrincipal == null ? request.getPassword() : activeMQPrincipal.getPassword(), request.getMinLargeMessageSize(), connection, request.isAutoCommitSends(), request.isAutoCommitAcks(), request.isPreAcknowledge(), request.isXA(), request.getDefaultAddress(), new CoreSessionCallback(request.getName(), protocolManager, channel, connection), true, sessionOperationContext);
|
||||
|
||||
ServerSessionPacketHandler handler = new ServerSessionPacketHandler(session, server.getStorageManager(), channel);
|
||||
channel.setHandler(handler);
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.activemq.artemis.core.config.Configuration;
|
|||
import org.apache.activemq.artemis.core.config.DivertConfiguration;
|
||||
import org.apache.activemq.artemis.core.management.impl.ActiveMQServerControlImpl;
|
||||
import org.apache.activemq.artemis.core.paging.PagingManager;
|
||||
import org.apache.activemq.artemis.core.persistence.OperationContext;
|
||||
import org.apache.activemq.artemis.core.persistence.StorageManager;
|
||||
import org.apache.activemq.artemis.core.postoffice.PostOffice;
|
||||
import org.apache.activemq.artemis.core.remoting.server.RemotingService;
|
||||
|
@ -180,7 +181,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
|
|||
boolean xa,
|
||||
String defaultAddress,
|
||||
SessionCallback callback,
|
||||
boolean autoCreateQueues) throws Exception;
|
||||
boolean autoCreateQueues,
|
||||
OperationContext context) throws Exception;
|
||||
|
||||
SecurityStore getSecurityStore();
|
||||
|
||||
|
@ -192,6 +194,8 @@ public interface ActiveMQServer extends ActiveMQComponent {
|
|||
|
||||
HierarchicalRepository<AddressSettings> getAddressSettingsRepository();
|
||||
|
||||
OperationContext newOperationContext();
|
||||
|
||||
int getConnectionCount();
|
||||
|
||||
long getTotalConnectionCount();
|
||||
|
|
|
@ -423,6 +423,11 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
return manager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public OperationContext newOperationContext() {
|
||||
return getStorageManager().newContext(getExecutorFactory().getExecutor());
|
||||
}
|
||||
|
||||
@Override
|
||||
public final synchronized void start() throws Exception {
|
||||
if (state != SERVER_STATE.STOPPED) {
|
||||
|
@ -1188,7 +1193,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
final boolean xa,
|
||||
final String defaultAddress,
|
||||
final SessionCallback callback,
|
||||
final boolean autoCreateQueues) throws Exception {
|
||||
final boolean autoCreateQueues,
|
||||
final OperationContext context) throws Exception {
|
||||
String validatedUser = "";
|
||||
|
||||
if (securityStore != null) {
|
||||
|
@ -1201,7 +1207,6 @@ public class ActiveMQServerImpl implements ActiveMQServer {
|
|||
|
||||
checkSessionLimit(validatedUser);
|
||||
|
||||
final OperationContext context = storageManager.newContext(getExecutorFactory().getExecutor());
|
||||
final ServerSessionImpl session = internalCreateSession(name, username, password, validatedUser, minLargeMessageSize, connection, autoCommitSends, autoCommitAcks, preAcknowledge, xa, defaultAddress, callback, context, autoCreateQueues);
|
||||
|
||||
sessions.put(name, session);
|
||||
|
|
|
@ -46,6 +46,14 @@
|
|||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="journal-datasync" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
that means the server will use fdatasync to confirm writes on the disk.
|
||||
</xsd:documentation>
|
||||
</xsd:annotation>
|
||||
</xsd:element>
|
||||
|
||||
<xsd:element name="persistence-enabled" type="xsd:boolean" default="true" maxOccurs="1" minOccurs="0">
|
||||
<xsd:annotation>
|
||||
<xsd:documentation>
|
||||
|
|
|
@ -354,6 +354,8 @@ public class FileConfigurationTest extends ConfigurationImplTest {
|
|||
assertEquals(1234567, conf.getGlobalMaxSize());
|
||||
assertEquals(37, conf.getMaxDiskUsage());
|
||||
assertEquals(123, conf.getDiskScanPeriod());
|
||||
|
||||
assertEquals(false, conf.isJournalDatasync());
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -450,7 +450,7 @@ public abstract class ActiveMQTestBase extends Assert {
|
|||
* @throws Exception
|
||||
*/
|
||||
protected ConfigurationImpl createBasicConfig(final int serverID) {
|
||||
ConfigurationImpl configuration = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(getJournalDir(serverID, false)).setBindingsDirectory(getBindingsDir(serverID, false)).setPagingDirectory(getPageDir(serverID, false)).setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD);
|
||||
ConfigurationImpl configuration = new ConfigurationImpl().setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(getDefaultJournalType()).setJournalDirectory(getJournalDir(serverID, false)).setBindingsDirectory(getBindingsDir(serverID, false)).setPagingDirectory(getPageDir(serverID, false)).setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).setJournalCompactMinFiles(0).setJournalCompactPercentage(0).setClusterPassword(CLUSTER_PASSWORD).setJournalDatasync(false);
|
||||
|
||||
return configuration;
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@
|
|||
<message-expiry-scan-period>10111213</message-expiry-scan-period>
|
||||
<message-expiry-thread-priority>8</message-expiry-thread-priority>
|
||||
<id-cache-size>127</id-cache-size>
|
||||
<journal-datasync>false</journal-datasync>
|
||||
<persist-id-cache>true</persist-id-cache>
|
||||
<populate-validated-user>true</populate-validated-user>
|
||||
<connection-ttl-check-interval>98765</connection-ttl-check-interval>
|
||||
|
|
|
@ -62,6 +62,7 @@ Name | Description
|
|||
[journal-sync-non-transactional](persistence.md) | if true wait for non transaction data to be synced to the journal before returning response to client. Default=true
|
||||
[journal-sync-transactional](persistence.md) | if true wait for transaction data to be synchronized to the journal before returning response to client. Default=true
|
||||
[journal-type](persistence.md) | the type of journal to use. Default=ASYNCIO
|
||||
[journal-datasync](persistence.md) | It will use fsync on journal operations. Default=true.
|
||||
[large-messages-directory](large-messages.md "Configuring the server") | the directory to store large messages. Default=data/largemessages
|
||||
[management-address](management.md "Configuring Core Management") | the name of the management address to send management messages to. It is prefixed with "jms.queue" so that JMS clients can send messages to it. Default=jms.queue.activemq.management
|
||||
[management-notification-address](management.md "Configuring The Core Management Notification Address") | the name of the address that consumers bind to receive management notifications. Default=activemq.notifications
|
||||
|
|
|
@ -299,6 +299,10 @@ The message journal is configured using the following attributes in
|
|||
|
||||
The default for this parameter is `30`
|
||||
|
||||
- `journal-datasync` (default: true)
|
||||
|
||||
This will disable the use of fdatasync on journal writes.
|
||||
|
||||
### An important note on disabling disk write cache.
|
||||
|
||||
> **Warning**
|
||||
|
|
|
@ -0,0 +1,235 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.activemq.artemis.tests.integration.persistence;
|
||||
|
||||
import javax.jms.Connection;
|
||||
import javax.jms.ConnectionFactory;
|
||||
import javax.jms.Message;
|
||||
import javax.jms.MessageConsumer;
|
||||
import javax.jms.MessageProducer;
|
||||
import javax.jms.Queue;
|
||||
import javax.jms.Session;
|
||||
import java.io.File;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.activemq.artemis.core.server.ActiveMQServer;
|
||||
import org.apache.activemq.artemis.core.server.JournalType;
|
||||
import org.apache.activemq.artemis.jlibaio.LibaioContext;
|
||||
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
|
||||
import org.apache.activemq.artemis.jms.server.impl.JMSServerManagerImpl;
|
||||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.apache.qpid.jms.JmsConnectionFactory;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
import org.junit.runners.Parameterized;
|
||||
|
||||
@RunWith(Parameterized.class)
|
||||
public class SyncSendTest extends ActiveMQTestBase {
|
||||
|
||||
private static long totalRecordTime = -1;
|
||||
private static final int RECORDS = 300;
|
||||
private static final int MEASURE_RECORDS = 100;
|
||||
private static final int WRAMP_UP = 100;
|
||||
|
||||
@Parameterized.Parameters(name = "storage={0}, protocol={1}")
|
||||
public static Collection getParameters() {
|
||||
Object[] storages = new Object[]{"libaio", "nio", "null"};
|
||||
Object[] protocols = new Object[]{"core", "openwire", "amqp"};
|
||||
|
||||
ArrayList<Object[]> objects = new ArrayList<>();
|
||||
for (Object s : storages) {
|
||||
if (s.equals("libaio") && !LibaioContext.isLoaded()) {
|
||||
continue;
|
||||
}
|
||||
for (Object p : protocols) {
|
||||
objects.add(new Object[]{s, p});
|
||||
}
|
||||
}
|
||||
|
||||
return objects;
|
||||
}
|
||||
|
||||
private final String storage;
|
||||
private final String protocol;
|
||||
|
||||
public SyncSendTest(String storage, String protocol) {
|
||||
this.storage = storage;
|
||||
this.protocol = protocol;
|
||||
}
|
||||
|
||||
ActiveMQServer server;
|
||||
JMSServerManagerImpl jms;
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
if (storage.equals("null")) {
|
||||
server = createServer(false, true);
|
||||
} else {
|
||||
server = createServer(true, true);
|
||||
}
|
||||
|
||||
jms = new JMSServerManagerImpl(server);
|
||||
|
||||
if (storage.equals("libaio")) {
|
||||
server.getConfiguration().setJournalType(JournalType.ASYNCIO);
|
||||
} else {
|
||||
server.getConfiguration().setJournalType(JournalType.NIO);
|
||||
|
||||
}
|
||||
jms.start();
|
||||
}
|
||||
|
||||
private long getTimePerSync() throws Exception {
|
||||
|
||||
if (storage.equals("null")) {
|
||||
return 0;
|
||||
}
|
||||
if (totalRecordTime < 0) {
|
||||
File measureFile = temporaryFolder.newFile();
|
||||
|
||||
System.out.println("File::" + measureFile);
|
||||
|
||||
RandomAccessFile rfile = new RandomAccessFile(measureFile, "rw");
|
||||
FileChannel channel = rfile.getChannel();
|
||||
|
||||
channel.position(0);
|
||||
|
||||
ByteBuffer buffer = ByteBuffer.allocate(10);
|
||||
buffer.put(new byte[10]);
|
||||
buffer.position(0);
|
||||
|
||||
Assert.assertEquals(10, channel.write(buffer));
|
||||
channel.force(true);
|
||||
|
||||
long time = System.nanoTime();
|
||||
|
||||
for (int i = 0; i < MEASURE_RECORDS + WRAMP_UP; i++) {
|
||||
if (i == WRAMP_UP) {
|
||||
time = System.nanoTime();
|
||||
}
|
||||
channel.position(0);
|
||||
buffer.position(0);
|
||||
buffer.putInt(i);
|
||||
buffer.position(0);
|
||||
Assert.assertEquals(10, channel.write(buffer));
|
||||
channel.force(false);
|
||||
}
|
||||
|
||||
long timeEnd = System.nanoTime();
|
||||
|
||||
totalRecordTime = ((timeEnd - time) / MEASURE_RECORDS) * RECORDS;
|
||||
|
||||
System.out.println("total time = " + totalRecordTime);
|
||||
|
||||
}
|
||||
return totalRecordTime;
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSendConsumeAudoACK() throws Exception {
|
||||
|
||||
long recordTime = getTimePerSync();
|
||||
|
||||
jms.createQueue(true, "queue", null, true, null);
|
||||
|
||||
ConnectionFactory factory = newCF();
|
||||
|
||||
Connection connection = factory.createConnection();
|
||||
try {
|
||||
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
|
||||
|
||||
Queue queue;
|
||||
if (protocol.equals("amqp")) {
|
||||
queue = session.createQueue("jms.queue.queue");
|
||||
} else {
|
||||
queue = session.createQueue("queue");
|
||||
}
|
||||
MessageProducer producer = session.createProducer(queue);
|
||||
|
||||
long start = System.nanoTime();
|
||||
|
||||
for (int i = 0; i < (RECORDS + WRAMP_UP); i++) {
|
||||
if (i == WRAMP_UP) {
|
||||
start = System.nanoTime(); // wramp up
|
||||
}
|
||||
producer.send(session.createMessage());
|
||||
}
|
||||
|
||||
long end = System.nanoTime();
|
||||
|
||||
System.out.println("end - start = " + (end - start) + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(end - start));
|
||||
System.out.println("RECORD TIME = " + recordTime + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(recordTime));
|
||||
|
||||
if ((end - start) < recordTime) {
|
||||
Assert.fail("Messages are being sent too fast! Faster than the disk would be able to sync!");
|
||||
}
|
||||
|
||||
connection.start();
|
||||
MessageConsumer consumer = session.createConsumer(queue);
|
||||
|
||||
for (int i = 0; i < (RECORDS + WRAMP_UP); i++) {
|
||||
if (i == WRAMP_UP) {
|
||||
start = System.nanoTime(); // wramp up
|
||||
}
|
||||
Message msg = consumer.receive(5000);
|
||||
Assert.assertNotNull(msg);
|
||||
}
|
||||
|
||||
end = System.nanoTime();
|
||||
|
||||
System.out.println("end - start = " + (end - start) + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(end - start));
|
||||
System.out.println("RECORD TIME = " + recordTime + " milliseconds = " + TimeUnit.NANOSECONDS.toMillis(recordTime));
|
||||
|
||||
// There's no way to sync on ack for AMQP
|
||||
if (!protocol.equals("amqp") && (end - start) < recordTime) {
|
||||
Assert.fail("Messages are being acked too fast! Faster than the disk would be able to sync!");
|
||||
}
|
||||
} finally {
|
||||
connection.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// this will set ack as synchronous, to make sure we make proper measures against the sync on disk
|
||||
private ConnectionFactory newCF() {
|
||||
if (protocol.equals("core")) {
|
||||
ConnectionFactory factory = new ActiveMQConnectionFactory();
|
||||
((ActiveMQConnectionFactory) factory).setBlockOnAcknowledge(true);
|
||||
return factory;
|
||||
} else if (protocol.equals("amqp")) {
|
||||
final JmsConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:61616");
|
||||
factory.setForceAsyncAcks(true);
|
||||
return factory;
|
||||
} else {
|
||||
org.apache.activemq.ActiveMQConnectionFactory cf = new org.apache.activemq.ActiveMQConnectionFactory("tcp://localhost:61616?wireFormat.cacheEnabled=true");
|
||||
cf.setSendAcksAsync(false);
|
||||
return cf;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
|
@ -35,6 +35,7 @@ import org.apache.activemq.artemis.integration.vertx.VertxOutgoingConnectorServi
|
|||
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.vertx.java.core.Handler;
|
||||
import org.vertx.java.core.Vertx;
|
||||
|
@ -48,8 +49,10 @@ import org.vertx.java.spi.cluster.impl.hazelcast.HazelcastClusterManagerFactory;
|
|||
|
||||
/**
|
||||
* This class tests the basics of ActiveMQ
|
||||
* vertx integration
|
||||
* vertx inte
|
||||
* gration
|
||||
*/
|
||||
@Ignore
|
||||
public class ActiveMQVertxUnitTest extends ActiveMQTestBase {
|
||||
|
||||
private PlatformManager vertxManager;
|
||||
|
|
|
@ -59,6 +59,16 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
|
|||
this(1, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public SequentialFileFactory setDatasync(boolean enabled) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isDatasync() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getMaxIO() {
|
||||
return 1;
|
||||
|
|
Loading…
Reference in New Issue