This closes #985

This commit is contained in:
Martyn Taylor 2017-02-03 14:29:37 +00:00
commit 03a999e39d
46 changed files with 1972 additions and 266 deletions

View File

@ -50,7 +50,7 @@ import org.apache.activemq.artemis.cli.commands.tools.DecodeJournal;
import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal; import org.apache.activemq.artemis.cli.commands.tools.EncodeJournal;
import org.apache.activemq.artemis.cli.commands.tools.HelpData; import org.apache.activemq.artemis.cli.commands.tools.HelpData;
import org.apache.activemq.artemis.cli.commands.tools.PrintData; import org.apache.activemq.artemis.cli.commands.tools.PrintData;
import org.apache.activemq.artemis.cli.commands.tools.SyncRecalc; import org.apache.activemq.artemis.cli.commands.tools.PerfJournal;
import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter; import org.apache.activemq.artemis.cli.commands.tools.XmlDataExporter;
import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter; import org.apache.activemq.artemis.cli.commands.tools.XmlDataImporter;
import org.apache.activemq.artemis.cli.commands.user.AddUser; import org.apache.activemq.artemis.cli.commands.user.AddUser;
@ -163,7 +163,7 @@ public class Artemis {
withDefaultCommand(HelpData.class).withCommands(PrintData.class, XmlDataExporter.class, XmlDataImporter.class, DecodeJournal.class, EncodeJournal.class, CompactJournal.class); withDefaultCommand(HelpData.class).withCommands(PrintData.class, XmlDataExporter.class, XmlDataImporter.class, DecodeJournal.class, EncodeJournal.class, CompactJournal.class);
builder.withGroup("user").withDescription("default file-based user management (add|rm|list|reset) (example ./artemis user list)"). builder.withGroup("user").withDescription("default file-based user management (add|rm|list|reset) (example ./artemis user list)").
withDefaultCommand(HelpUser.class).withCommands(ListUser.class, AddUser.class, RemoveUser.class, ResetUser.class); withDefaultCommand(HelpUser.class).withCommands(ListUser.class, AddUser.class, RemoveUser.class, ResetUser.class);
builder = builder.withCommands(Run.class, Stop.class, Kill.class, SyncRecalc.class); builder = builder.withCommands(Run.class, Stop.class, Kill.class, PerfJournal.class);
} else { } else {
builder.withGroup("data").withDescription("data tools group (print) (example ./artemis data print)"). builder.withGroup("data").withDescription("data tools group (print) (example ./artemis data print)").
withDefaultCommand(HelpData.class).withCommands(PrintData.class); withDefaultCommand(HelpData.class).withCommands(PrintData.class);

View File

@ -40,6 +40,7 @@ import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.CLIException; import org.apache.activemq.artemis.cli.CLIException;
import org.apache.activemq.artemis.cli.commands.util.HashUtil; import org.apache.activemq.artemis.cli.commands.util.HashUtil;
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType; import org.apache.activemq.artemis.core.server.cluster.impl.MessageLoadBalancingType;
import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jlibaio.LibaioFile; import org.apache.activemq.artemis.jlibaio.LibaioFile;
@ -209,11 +210,17 @@ public class Create extends InputAbstract {
@Option(name = "--addresses", description = "comma separated list of addresses ") @Option(name = "--addresses", description = "comma separated list of addresses ")
String addresses; String addresses;
@Option(name = "--aio", description = "Force aio journal on the configuration regardless of the library being available or not.") @Option(name = "--aio", description = "sets the journal as asyncio.")
boolean forceLibaio; boolean aio;
@Option(name = "--nio", description = "Force nio journal on the configuration regardless of the library being available or not.") @Option(name = "--nio", description = "sets the journal as nio.")
boolean forceNIO; boolean nio;
@Option(name = "--mapped", description = "Sets the journal as mapped.")
boolean mapped;
// this is used by the setupJournalType method
private JournalType journalType;
@Option(name = "--disable-persistence", description = "Disable message persistence to the journal") @Option(name = "--disable-persistence", description = "Disable message persistence to the journal")
boolean disablePersistence; boolean disablePersistence;
@ -558,15 +565,13 @@ public class Create extends InputAbstract {
throw new RuntimeException(String.format("The path '%s' is not writable.", directory)); throw new RuntimeException(String.format("The path '%s' is not writable.", directory));
} }
} }
public Object run(ActionContext context) throws Exception { public Object run(ActionContext context) throws Exception {
if (forceLibaio && forceNIO) {
throw new RuntimeException("You can't specify --nio and --aio in the same execution.");
}
IS_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win"); IS_WINDOWS = System.getProperty("os.name").toLowerCase().trim().startsWith("win");
IS_CYGWIN = IS_WINDOWS && "cygwin".equals(System.getenv("OSTYPE")); IS_CYGWIN = IS_WINDOWS && "cygwin".equals(System.getenv("OSTYPE"));
setupJournalType();
// requireLogin should set alloAnonymous=false, to avoid user's questions // requireLogin should set alloAnonymous=false, to avoid user's questions
if (requireLogin != null && requireLogin.booleanValue()) { if (requireLogin != null && requireLogin.booleanValue()) {
allowAnonymous = Boolean.FALSE; allowAnonymous = Boolean.FALSE;
@ -603,15 +608,7 @@ public class Create extends InputAbstract {
filters.put("${shared-store.settings}", ""); filters.put("${shared-store.settings}", "");
} }
boolean aio; filters.put("${journal.settings}", journalType.name());
if (IS_WINDOWS || !supportsLibaio()) {
aio = false;
filters.put("${journal.settings}", "NIO");
} else {
aio = true;
filters.put("${journal.settings}", "ASYNCIO");
}
if (sslKey != null) { if (sslKey != null) {
filters.put("${web.protocol}", "https"); filters.put("${web.protocol}", "https");
@ -761,7 +758,7 @@ public class Create extends InputAbstract {
filters.put("${auto-create}", isAutoCreate() ? "true" : "false"); filters.put("${auto-create}", isAutoCreate() ? "true" : "false");
performAutoTune(filters, aio, dataFolder); performAutoTune(filters, journalType, dataFolder);
write(ETC_BROKER_XML, filters, false); write(ETC_BROKER_XML, filters, false);
write(ETC_ARTEMIS_USERS_PROPERTIES, filters, false); write(ETC_ARTEMIS_USERS_PROPERTIES, filters, false);
@ -802,6 +799,40 @@ public class Create extends InputAbstract {
return null; return null;
} }
private void setupJournalType() {
int countJournalTypes = countBoolean(aio, nio, mapped);
if (countJournalTypes > 1) {
throw new RuntimeException("You can only select one journal type (--nio | --aio | --mapped).");
}
if (countJournalTypes == 0) {
if (supportsLibaio()) {
aio = true;
} else {
nio = true;
}
}
if (aio) {
journalType = JournalType.ASYNCIO;
} else if (nio) {
journalType = JournalType.NIO;
} else if (mapped) {
journalType = JournalType.MAPPED;
}
}
private static int countBoolean(boolean...b) {
int count = 0;
for (boolean itemB : b) {
if (itemB) count++;
}
return count;
}
private String getLogManager() throws IOException { private String getLogManager() throws IOException {
String logManager = ""; String logManager = "";
File dir = new File(path(getHome().toString(), false) + "/lib"); File dir = new File(path(getHome().toString(), false) + "/lib");
@ -858,7 +889,7 @@ public class Create extends InputAbstract {
filters.put("${address-queue.settings}", writer.toString()); filters.put("${address-queue.settings}", writer.toString());
} }
private void performAutoTune(HashMap<String, String> filters, boolean aio, File dataFolder) { private void performAutoTune(HashMap<String, String> filters, JournalType journalType, File dataFolder) {
if (noAutoTune) { if (noAutoTune) {
filters.put("${journal-buffer.settings}", ""); filters.put("${journal-buffer.settings}", "");
} else { } else {
@ -867,7 +898,7 @@ public class Create extends InputAbstract {
System.out.println(""); System.out.println("");
System.out.println("Auto tuning journal ..."); System.out.println("Auto tuning journal ...");
long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, aio); long time = SyncCalculation.syncTest(dataFolder, 4096, writes, 5, verbose, !noJournalSync, journalType);
long nanoseconds = SyncCalculation.toNanos(time, writes, verbose); long nanoseconds = SyncCalculation.toNanos(time, writes, verbose);
double writesPerMillisecond = (double) writes / (double) time; double writesPerMillisecond = (double) writes / (double) time;
@ -891,13 +922,10 @@ public class Create extends InputAbstract {
} }
public boolean supportsLibaio() { public boolean supportsLibaio() {
if (forceLibaio) { if (IS_WINDOWS) {
// forcing libaio
return true;
} else if (forceNIO) {
// forcing NIO
return false; return false;
} else if (LibaioContext.isLoaded()) { }
if (LibaioContext.isLoaded()) {
try (LibaioContext context = new LibaioContext(1, true, true)) { try (LibaioContext context = new LibaioContext(1, true, true)) {
File tmpFile = new File(directory, "validateAIO.bin"); File tmpFile = new File(directory, "validateAIO.bin");
boolean supportsLibaio = true; boolean supportsLibaio = true;

View File

@ -20,13 +20,35 @@ package org.apache.activemq.artemis.cli.commands.tools;
import java.text.DecimalFormat; import java.text.DecimalFormat;
import io.airlift.airline.Command; import io.airlift.airline.Command;
import io.airlift.airline.Option;
import org.apache.activemq.artemis.cli.commands.ActionContext; import org.apache.activemq.artemis.cli.commands.ActionContext;
import org.apache.activemq.artemis.cli.commands.util.SyncCalculation; import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration; import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
@Command(name = "sync", description = "Calculates the journal-buffer-timeout you should use with the current data folder") @Command(name = "perf-journal", description = "Calculates the journal-buffer-timeout you should use with the current data folder")
public class SyncRecalc extends LockAbstract { public class PerfJournal extends LockAbstract {
@Option(name = "--block-size", description = "The block size for each write (default 4096)")
public int size = 4 * 1024;
@Option(name = "--writes", description = "The number of writes to be performed (default 250)")
public int writes = 250;
@Option(name = "--tries", description = "The number of tries for the test (default 5)")
public int tries = 5;
@Option(name = "--no-sync", description = "Disable sync")
public boolean nosyncs = false;
@Option(name = "--sync", description = "Enable syncs")
public boolean syncs = false;
@Option(name = "--journal-type", description = "Journal Type to be used (default from broker.xml)")
public String journalType = null;
@Override @Override
public Object execute(ActionContext context) throws Exception { public Object execute(ActionContext context) throws Exception {
@ -34,11 +56,26 @@ public class SyncRecalc extends LockAbstract {
FileConfiguration fileConfiguration = getFileConfiguration(); FileConfiguration fileConfiguration = getFileConfiguration();
int writes = 250; if (nosyncs) {
fileConfiguration.setJournalDatasync(false);
} else if (syncs) {
fileConfiguration.setJournalDatasync(true);
}
if (journalType != null) {
fileConfiguration.setJournalType(JournalType.getType(journalType));
}
System.out.println(""); System.out.println("");
System.out.println("Auto tuning journal ..."); System.out.println("Auto tuning journal ...");
long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), 4096, writes, 5, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType() == JournalType.ASYNCIO); System.out.println("Performing " + tries + " tests writing " + writes + " blocks of " + size + " on each test, sync=" + fileConfiguration.isJournalDatasync() + " with journalType = " + fileConfiguration.getJournalType());
fileConfiguration.getJournalLocation().mkdirs();
long time = SyncCalculation.syncTest(fileConfiguration.getJournalLocation(), size, writes, tries, verbose, fileConfiguration.isJournalDatasync(), fileConfiguration.getJournalType());
long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose); long nanosecondsWait = SyncCalculation.toNanos(time, writes, verbose);
double writesPerMillisecond = (double) writes / (double) time; double writesPerMillisecond = (double) writes / (double) time;

View File

@ -24,10 +24,14 @@ import java.text.DecimalFormat;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.utils.ReusableLatch; import org.apache.activemq.artemis.utils.ReusableLatch;
@ -47,8 +51,8 @@ public class SyncCalculation {
int tries, int tries,
boolean verbose, boolean verbose,
boolean fsync, boolean fsync,
boolean aio) throws Exception { JournalType journalType) throws Exception {
SequentialFileFactory factory = newFactory(datafolder, fsync, aio); SequentialFileFactory factory = newFactory(datafolder, fsync, journalType, blockSize * blocks);
if (verbose) { if (verbose) {
System.out.println("Using " + factory.getClass().getName() + " to calculate sync times"); System.out.println("Using " + factory.getClass().getName() + " to calculate sync times");
@ -61,6 +65,8 @@ public class SyncCalculation {
file.fill(blockSize * blocks); file.fill(blockSize * blocks);
file.close();
long[] result = new long[tries]; long[] result = new long[tries];
byte[] block = new byte[blockSize]; byte[] block = new byte[blockSize];
@ -94,6 +100,7 @@ public class SyncCalculation {
System.out.println("**************************************************"); System.out.println("**************************************************");
System.out.println(ntry + " of " + tries + " calculation"); System.out.println(ntry + " of " + tries + " calculation");
} }
file.open();
file.position(0); file.position(0);
long start = System.currentTimeMillis(); long start = System.currentTimeMillis();
for (int i = 0; i < blocks; i++) { for (int i = 0; i < blocks; i++) {
@ -115,6 +122,7 @@ public class SyncCalculation {
System.out.println("bufferTimeout = " + toNanos(result[ntry], blocks, verbose)); System.out.println("bufferTimeout = " + toNanos(result[ntry], blocks, verbose));
System.out.println("**************************************************"); System.out.println("**************************************************");
} }
file.close();
} }
factory.releaseDirectBuffer(bufferBlock); factory.releaseDirectBuffer(bufferBlock);
@ -162,17 +170,36 @@ public class SyncCalculation {
return timeWait; return timeWait;
} }
private static SequentialFileFactory newFactory(File datafolder, boolean datasync, boolean aio) { private static SequentialFileFactory newFactory(File datafolder, boolean datasync, JournalType journalType, int fileSize) {
if (aio && LibaioContext.isLoaded()) { SequentialFileFactory factory;
SequentialFileFactory factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
factory.start();
((AIOSequentialFileFactory) factory).disableBufferReuse();
return factory; if (journalType == JournalType.ASYNCIO && !LibaioContext.isLoaded()) {
} else { journalType = JournalType.NIO;
SequentialFileFactory factory = new NIOSequentialFileFactory(datafolder, 1); }
factory.start();
return factory; switch (journalType) {
case NIO:
factory = new NIOSequentialFileFactory(datafolder, 1);
factory.start();
return factory;
case ASYNCIO:
factory = new AIOSequentialFileFactory(datafolder, 1).setDatasync(datasync);
factory.start();
((AIOSequentialFileFactory) factory).disableBufferReuse();
return factory;
case MAPPED:
factory = new MappedSequentialFileFactory(datafolder, new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
}
}, true).chunkBytes(fileSize).overlapBytes(0).setDatasync(datasync);
factory.start();
return factory;
default:
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(journalType);
} }
} }
} }

View File

@ -47,6 +47,7 @@ import org.apache.activemq.artemis.cli.commands.util.SyncCalculation;
import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl; import org.apache.activemq.artemis.core.client.impl.ServerLocatorImpl;
import org.apache.activemq.artemis.core.config.FileDeploymentManager; import org.apache.activemq.artemis.core.config.FileDeploymentManager;
import org.apache.activemq.artemis.core.config.impl.FileConfiguration; import org.apache.activemq.artemis.core.config.impl.FileConfiguration;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.jlibaio.LibaioContext; import org.apache.activemq.artemis.jlibaio.LibaioContext;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory; import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQDestination; import org.apache.activemq.artemis.jms.client.ActiveMQDestination;
@ -114,7 +115,7 @@ public class ArtemisTest extends CliTestBase {
public void testSync() throws Exception { public void testSync() throws Exception {
int writes = 20; int writes = 20;
int tries = 10; int tries = 10;
long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, true); long totalAvg = SyncCalculation.syncTest(temporaryFolder.getRoot(), 4096, writes, tries, true, true, JournalType.NIO);
System.out.println(); System.out.println();
System.out.println("TotalAvg = " + totalAvg); System.out.println("TotalAvg = " + totalAvg);
long nanoTime = SyncCalculation.toNanos(totalAvg, writes, false); long nanoTime = SyncCalculation.toNanos(totalAvg, writes, false);
@ -130,6 +131,19 @@ public class ArtemisTest extends CliTestBase {
Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--no-fsync"); Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--no-fsync");
} }
@Test
public void testSimpleCreateMapped() throws Throwable {
try {
//instance1: default using http
File instance1 = new File(temporaryFolder.getRoot(), "instance1");
Artemis.main("create", instance1.getAbsolutePath(), "--silent", "--mapped");
} catch (Throwable e) {
e.printStackTrace();
throw e;
}
}
@Test @Test
public void testWebConfig() throws Exception { public void testWebConfig() throws Exception {
setupAuth(); setupAuth();
@ -500,6 +514,20 @@ public class ArtemisTest extends CliTestBase {
} }
@Test
public void testPerfJournal() throws Exception {
File instanceFolder = temporaryFolder.newFolder("server1");
setupAuth(instanceFolder);
Run.setEmbedded(true);
Artemis.main("create", instanceFolder.getAbsolutePath(), "--force", "--silent", "--no-web", "--no-autotune", "--require-login");
System.setProperty("artemis.instance", instanceFolder.getAbsolutePath());
Artemis.main("perf-journal", "--journal-type", "NIO", "--writes", "5000", "--tries", "50", "--verbose");
}
public void testSimpleRun(String folderName) throws Exception { public void testSimpleRun(String folderName) throws Exception {
File instanceFolder = temporaryFolder.newFolder(folderName); File instanceFolder = temporaryFolder.newFolder(folderName);

View File

@ -118,11 +118,6 @@ public class JDBCSequentialFile implements SequentialFile {
return writePosition + size <= dbDriver.getMaxSize(); return writePosition + size <= dbDriver.getMaxSize();
} }
@Override
public int getAlignment() throws Exception {
return 0;
}
@Override @Override
public int calculateBlockStart(int position) throws Exception { public int calculateBlockStart(int position) throws Exception {
return 0; return 0;

View File

@ -176,6 +176,12 @@ public class JDBCSequentialFileFactory implements SequentialFileFactory, ActiveM
return 1; return 1;
} }
@Override
public JDBCSequentialFileFactory setAlignment(int alignment) {
// no op
return this;
}
@Override @Override
public int calculateBlockSize(final int bytes) { public int calculateBlockSize(final int bytes) {
return bytes; return bytes;

View File

@ -74,8 +74,6 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
private final Executor completeExecutor; private final Executor completeExecutor;
private final Object journalLock = new Object();
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
// Track Tx Records // Track Tx Records
@ -135,11 +133,9 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
@Override @Override
public synchronized void stop() throws SQLException { public synchronized void stop() throws SQLException {
if (started) { if (started) {
synchronized (journalLock) { sync();
sync(); started = false;
started = false; super.stop();
super.stop();
}
} }
} }
@ -305,7 +301,7 @@ public class JDBCJournalImpl extends AbstractJDBCDriver implements Journal {
record.setIoCompletion(callback); record.setIoCompletion(callback);
} }
synchronized (journalLock) { synchronized (this) {
if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) { if (record.isTransactional() || record.getRecordType() == JDBCJournalRecord.PREPARE_RECORD) {
addTxRecord(record); addTxRecord(record);
} }

View File

@ -18,6 +18,7 @@ package org.apache.activemq.artemis.jms.persistence.impl.journal;
import java.io.File; import java.io.File;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
@ -78,8 +79,9 @@ public final class JMSJournalStorageManagerImpl implements JMSStorageManager {
final IDGenerator idGenerator, final IDGenerator idGenerator,
final Configuration config, final Configuration config,
final ReplicationManager replicator) { final ReplicationManager replicator) {
if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO) { final EnumSet<JournalType> supportedJournalTypes = EnumSet.allOf(JournalType.class);
throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals"); if (!supportedJournalTypes.contains(config.getJournalType())) {
throw new IllegalArgumentException("Only " + supportedJournalTypes + " are supported Journal types");
} }
this.config = config; this.config = config;

View File

@ -0,0 +1,371 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.netty.buffer;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel;
import io.netty.util.internal.PlatformDependent;
/**
* A NIO direct {@link ByteBuffer} wrapper.
* Only ByteBuffer's manipulation operations are supported.
* Is best suited only for encoding/decoding purposes.
*/
public final class UnpooledUnsafeDirectByteBufWrapper extends AbstractReferenceCountedByteBuf {
private ByteBuffer buffer;
private long memoryAddress;
/**
* Creates a new direct buffer by wrapping the specified initial buffer.
*/
public UnpooledUnsafeDirectByteBufWrapper() {
super(0);
this.buffer = null;
this.memoryAddress = 0L;
}
public void wrap(ByteBuffer buffer, int srcIndex, int length) {
if (buffer != null) {
this.buffer = buffer;
this.memoryAddress = PlatformDependent.directBufferAddress(buffer) + srcIndex;
clear();
maxCapacity(length);
} else {
reset();
}
}
public void reset() {
this.buffer = null;
this.memoryAddress = 0L;
clear();
maxCapacity(0);
}
@Override
public boolean isDirect() {
return true;
}
@Override
public int capacity() {
return maxCapacity();
}
@Override
public ByteBuf capacity(int newCapacity) {
if (newCapacity != maxCapacity()) {
throw new IllegalArgumentException("can't set a capacity different from the max allowed one");
}
return this;
}
@Override
public ByteBufAllocator alloc() {
return null;
}
@Override
public ByteOrder order() {
return ByteOrder.BIG_ENDIAN;
}
@Override
public boolean hasArray() {
return false;
}
@Override
public byte[] array() {
throw new UnsupportedOperationException("direct buffer");
}
@Override
public int arrayOffset() {
throw new UnsupportedOperationException("direct buffer");
}
@Override
public boolean hasMemoryAddress() {
return true;
}
@Override
public long memoryAddress() {
return memoryAddress;
}
@Override
protected byte _getByte(int index) {
return UnsafeByteBufUtil.getByte(addr(index));
}
@Override
protected short _getShort(int index) {
return UnsafeByteBufUtil.getShort(addr(index));
}
@Override
protected short _getShortLE(int index) {
return UnsafeByteBufUtil.getShortLE(addr(index));
}
@Override
protected int _getUnsignedMedium(int index) {
return UnsafeByteBufUtil.getUnsignedMedium(addr(index));
}
@Override
protected int _getUnsignedMediumLE(int index) {
return UnsafeByteBufUtil.getUnsignedMediumLE(addr(index));
}
@Override
protected int _getInt(int index) {
return UnsafeByteBufUtil.getInt(addr(index));
}
@Override
protected int _getIntLE(int index) {
return UnsafeByteBufUtil.getIntLE(addr(index));
}
@Override
protected long _getLong(int index) {
return UnsafeByteBufUtil.getLong(addr(index));
}
@Override
protected long _getLongLE(int index) {
return UnsafeByteBufUtil.getLongLE(addr(index));
}
@Override
public ByteBuf getBytes(int index, ByteBuf dst, int dstIndex, int length) {
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
return this;
}
@Override
public ByteBuf getBytes(int index, byte[] dst, int dstIndex, int length) {
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst, dstIndex, length);
return this;
}
@Override
public ByteBuf getBytes(int index, ByteBuffer dst) {
UnsafeByteBufUtil.getBytes(this, addr(index), index, dst);
return this;
}
@Override
public ByteBuf readBytes(ByteBuffer dst) {
int length = dst.remaining();
checkReadableBytes(length);
getBytes(readerIndex, dst);
readerIndex += length;
return this;
}
@Override
protected void _setByte(int index, int value) {
UnsafeByteBufUtil.setByte(addr(index), value);
}
@Override
protected void _setShort(int index, int value) {
UnsafeByteBufUtil.setShort(addr(index), value);
}
@Override
protected void _setShortLE(int index, int value) {
UnsafeByteBufUtil.setShortLE(addr(index), value);
}
@Override
protected void _setMedium(int index, int value) {
UnsafeByteBufUtil.setMedium(addr(index), value);
}
@Override
protected void _setMediumLE(int index, int value) {
UnsafeByteBufUtil.setMediumLE(addr(index), value);
}
@Override
protected void _setInt(int index, int value) {
UnsafeByteBufUtil.setInt(addr(index), value);
}
@Override
protected void _setIntLE(int index, int value) {
UnsafeByteBufUtil.setIntLE(addr(index), value);
}
@Override
protected void _setLong(int index, long value) {
UnsafeByteBufUtil.setLong(addr(index), value);
}
@Override
protected void _setLongLE(int index, long value) {
UnsafeByteBufUtil.setLongLE(addr(index), value);
}
@Override
public ByteBuf setBytes(int index, ByteBuf src, int srcIndex, int length) {
UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
return this;
}
@Override
public ByteBuf setBytes(int index, byte[] src, int srcIndex, int length) {
UnsafeByteBufUtil.setBytes(this, addr(index), index, src, srcIndex, length);
return this;
}
@Override
public ByteBuf setBytes(int index, ByteBuffer src) {
UnsafeByteBufUtil.setBytes(this, addr(index), index, src);
return this;
}
@Override
@Deprecated
public ByteBuf getBytes(int index, OutputStream out, int length) throws IOException {
throw new UnsupportedOperationException("unsupported!");
}
@Override
@Deprecated
public int getBytes(int index, GatheringByteChannel out, int length) throws IOException {
throw new UnsupportedOperationException("unsupported!");
}
@Override
@Deprecated
public int getBytes(int index, FileChannel out, long position, int length) throws IOException {
throw new UnsupportedOperationException("unsupported!");
}
@Override
@Deprecated
public int readBytes(GatheringByteChannel out, int length) throws IOException {
throw new UnsupportedOperationException("unsupported!");
}
@Override
@Deprecated
public int readBytes(FileChannel out, long position, int length) throws IOException {
throw new UnsupportedOperationException("unsupported!");
}
@Override
@Deprecated
public int setBytes(int index, InputStream in, int length) throws IOException {
throw new UnsupportedOperationException("unsupported!");
}
@Override
@Deprecated
public int setBytes(int index, ScatteringByteChannel in, int length) throws IOException {
throw new UnsupportedOperationException("unsupported!");
}
@Override
@Deprecated
public int setBytes(int index, FileChannel in, long position, int length) throws IOException {
throw new UnsupportedOperationException("unsupported!");
}
@Override
public int nioBufferCount() {
return 1;
}
@Override
@Deprecated
public ByteBuffer[] nioBuffers(int index, int length) {
throw new UnsupportedOperationException("unsupported!");
}
@Override
@Deprecated
public ByteBuf copy(int index, int length) {
throw new UnsupportedOperationException("unsupported!");
}
@Override
@Deprecated
public ByteBuffer internalNioBuffer(int index, int length) {
throw new UnsupportedOperationException("cannot access directly the wrapped buffer!");
}
@Override
@Deprecated
public ByteBuffer nioBuffer(int index, int length) {
throw new UnsupportedOperationException("unsupported!");
}
@Override
@Deprecated
protected void deallocate() {
//NO_OP
}
@Override
public ByteBuf unwrap() {
return null;
}
private long addr(int index) {
return memoryAddress + index;
}
@Override
@Deprecated
protected SwappedByteBuf newSwappedByteBuf() {
throw new UnsupportedOperationException("unsupported!");
}
@Override
public ByteBuf setZero(int index, int length) {
UnsafeByteBufUtil.setZero(this, addr(index), index, length);
return this;
}
@Override
public ByteBuf writeZero(int length) {
ensureWritable(length);
int wIndex = writerIndex;
setZero(wIndex, length);
writerIndex = wIndex + length;
return this;
}
}

View File

@ -54,6 +54,8 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
protected boolean dataSync = true; protected boolean dataSync = true;
protected volatile int alignment = -1;
private final IOCriticalErrorListener critialErrorListener; private final IOCriticalErrorListener critialErrorListener;
/** /**
@ -83,6 +85,20 @@ public abstract class AbstractSequentialFileFactory implements SequentialFileFac
this.maxIO = maxIO; this.maxIO = maxIO;
} }
@Override
public int getAlignment() {
if (alignment < 0) {
alignment = 1;
}
return alignment;
}
@Override
public AbstractSequentialFileFactory setAlignment(int alignment) {
this.alignment = alignment;
return this;
}
@Override @Override
public SequentialFileFactory setDatasync(boolean enabled) { public SequentialFileFactory setDatasync(boolean enabled) {

View File

@ -43,8 +43,6 @@ public interface SequentialFile {
boolean fits(int size); boolean fits(int size);
int getAlignment() throws Exception;
int calculateBlockStart(int position) throws Exception; int calculateBlockStart(int position) throws Exception;
String getFileName(); String getFileName();

View File

@ -79,6 +79,8 @@ public interface SequentialFileFactory {
int getAlignment(); int getAlignment();
SequentialFileFactory setAlignment(int alignment);
int calculateBlockSize(int bytes); int calculateBlockSize(int bytes);
File getDirectory(); File getDirectory();

View File

@ -78,17 +78,9 @@ public class AIOSequentialFile extends AbstractSequentialFile {
return opened; return opened;
} }
@Override
public int getAlignment() {
// TODO: get the alignment from the file system, but we have to cache this, we can't call it every time
/* checkOpened();
return aioFile.getBlockSize(); */
return 512;
}
@Override @Override
public int calculateBlockStart(final int position) { public int calculateBlockStart(final int position) {
int alignment = getAlignment(); int alignment = factory.getAlignment();
int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;

View File

@ -143,13 +143,13 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override @Override
public ByteBuffer allocateDirectBuffer(final int size) { public ByteBuffer allocateDirectBuffer(final int size) {
int blocks = size / 512; int blocks = size / getAlignment();
if (size % 512 != 0) { if (size % getAlignment() != 0) {
blocks++; blocks++;
} }
// The buffer on AIO has to be a multiple of 512 // The buffer on AIO has to be a multiple of getAlignment()
ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks * 512, 512); ByteBuffer buffer = LibaioContext.newAlignedBuffer(blocks * getAlignment(), getAlignment());
buffer.limit(size); buffer.limit(size);
@ -163,8 +163,8 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override @Override
public ByteBuffer newBuffer(int size) { public ByteBuffer newBuffer(int size) {
if (size % 512 != 0) { if (size % getAlignment() != 0) {
size = (size / 512 + 1) * 512; size = (size / getAlignment() + 1) * getAlignment();
} }
return buffersControl.newBuffer(size); return buffersControl.newBuffer(size);
@ -178,7 +178,26 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
@Override @Override
public int getAlignment() { public int getAlignment() {
return 512; if (alignment < 0) {
File checkFile = null;
try {
journalDir.mkdirs();
checkFile = File.createTempFile("journalCheck", ".tmp", journalDir);
checkFile.mkdirs();
checkFile.createNewFile();
alignment = LibaioContext.getBlockSize(checkFile);
} catch (Throwable e) {
logger.warn(e.getMessage(), e);
alignment = 512;
} finally {
if (checkFile != null) {
checkFile.delete();
}
}
}
return alignment;
} }
// For tests only // For tests only
@ -399,7 +418,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
// if a buffer is bigger than the configured-bufferSize, we just create a new // if a buffer is bigger than the configured-bufferSize, we just create a new
// buffer. // buffer.
if (size > bufferSize) { if (size > bufferSize) {
return LibaioContext.newAlignedBuffer(size, 512); return LibaioContext.newAlignedBuffer(size, getAlignment());
} else { } else {
// We need to allocate buffers following the rules of the storage // We need to allocate buffers following the rules of the storage
// being used (AIO/NIO) // being used (AIO/NIO)
@ -410,7 +429,7 @@ public final class AIOSequentialFileFactory extends AbstractSequentialFileFactor
if (buffer == null) { if (buffer == null) {
// if empty create a new one. // if empty create a new one.
buffer = LibaioContext.newAlignedBuffer(size, 512); buffer = LibaioContext.newAlignedBuffer(size, getAlignment());
buffer.limit(alignedSize); buffer.limit(alignedSize);
} else { } else {

View File

@ -20,18 +20,19 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.BufferUnderflowException; import java.nio.BufferUnderflowException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.MappedByteBuffer; import java.nio.MappedByteBuffer;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.UnpooledUnsafeDirectByteBufWrapper;
import io.netty.util.internal.PlatformDependent; import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
final class MappedFile implements AutoCloseable { final class MappedFile implements AutoCloseable {
private static final ByteBuffer ZERO_PAGE = ByteBuffer.allocateDirect(MappedByteBufferCache.PAGE_SIZE).order(ByteOrder.nativeOrder());
private final MappedByteBufferCache cache; private final MappedByteBufferCache cache;
private final int zerosMaxPage; private final UnpooledUnsafeDirectByteBufWrapper byteBufWrapper;
private final ChannelBufferWrapper channelBufferWrapper;
private MappedByteBuffer lastMapped; private MappedByteBuffer lastMapped;
private long lastMappedStart; private long lastMappedStart;
private long lastMappedLimit; private long lastMappedLimit;
@ -45,7 +46,8 @@ final class MappedFile implements AutoCloseable {
this.lastMappedLimit = -1; this.lastMappedLimit = -1;
this.position = 0; this.position = 0;
this.length = this.cache.fileSize(); this.length = this.cache.fileSize();
this.zerosMaxPage = Math.min(ZERO_PAGE.capacity(), (int) Math.min(Integer.MAX_VALUE, cache.overlapBytes())); this.byteBufWrapper = new UnpooledUnsafeDirectByteBufWrapper();
this.channelBufferWrapper = new ChannelBufferWrapper(this.byteBufWrapper, false);
} }
public static MappedFile of(File file, long chunckSize, long overlapSize) throws IOException { public static MappedFile of(File file, long chunckSize, long overlapSize) throws IOException {
@ -58,29 +60,33 @@ final class MappedFile implements AutoCloseable {
private int checkOffset(long offset, int bytes) throws BufferUnderflowException, IOException { private int checkOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
if (!MappedByteBufferCache.inside(offset, lastMappedStart, lastMappedLimit)) { if (!MappedByteBufferCache.inside(offset, lastMappedStart, lastMappedLimit)) {
try { return updateOffset(offset, bytes);
final int index = cache.indexFor(offset);
final long mappedPosition = cache.mappedPositionFor(index);
final long mappedLimit = cache.mappedLimitFor(mappedPosition);
if (offset + bytes > mappedLimit) {
throw new IOException("mapping overflow!");
}
lastMapped = cache.acquireMappedByteBuffer(index);
lastMappedStart = mappedPosition;
lastMappedLimit = mappedLimit;
final int bufferPosition = (int) (offset - mappedPosition);
return bufferPosition;
} catch (IllegalStateException e) {
throw new IOException(e);
} catch (IllegalArgumentException e) {
throw new BufferUnderflowException();
}
} else { } else {
final int bufferPosition = (int) (offset - lastMappedStart); final int bufferPosition = (int) (offset - lastMappedStart);
return bufferPosition; return bufferPosition;
} }
} }
private int updateOffset(long offset, int bytes) throws BufferUnderflowException, IOException {
try {
final int index = cache.indexFor(offset);
final long mappedPosition = cache.mappedPositionFor(index);
final long mappedLimit = cache.mappedLimitFor(mappedPosition);
if (offset + bytes > mappedLimit) {
throw new IOException("mapping overflow!");
}
lastMapped = cache.acquireMappedByteBuffer(index);
lastMappedStart = mappedPosition;
lastMappedLimit = mappedLimit;
final int bufferPosition = (int) (offset - mappedPosition);
return bufferPosition;
} catch (IllegalStateException e) {
throw new IOException(e);
} catch (IllegalArgumentException e) {
throw new BufferUnderflowException();
}
}
public void force() { public void force() {
if (lastMapped != null) { if (lastMapped != null) {
lastMapped.force(); lastMapped.force();
@ -179,6 +185,26 @@ final class MappedFile implements AutoCloseable {
return read; return read;
} }
/**
* Writes an encoded sequence of bytes to this file from the given buffer.
* <p>
* <p> Bytes are written starting at this file's current position,
*/
public void write(EncodingSupport encodingSupport) throws IOException {
final int encodedSize = encodingSupport.getEncodeSize();
final int bufferPosition = checkOffset(position, encodedSize);
this.byteBufWrapper.wrap(this.lastMapped, bufferPosition, encodedSize);
try {
encodingSupport.encode(this.channelBufferWrapper);
} finally {
this.byteBufWrapper.reset();
}
position += encodedSize;
if (position > this.length) {
this.length = position;
}
}
/** /**
* Writes a sequence of bytes to this file from the given buffer. * Writes a sequence of bytes to this file from the given buffer.
* <p> * <p>
@ -273,21 +299,20 @@ final class MappedFile implements AutoCloseable {
* <p> Bytes are written starting at this file's current position, * <p> Bytes are written starting at this file's current position,
*/ */
public void zeros(long offset, int count) throws IOException { public void zeros(long offset, int count) throws IOException {
final long targetOffset = offset + count; while (count > 0) {
final int zerosBulkCopies = count / zerosMaxPage; //do not need to validate the bytes count
final long srcAddress = PlatformDependent.directBufferAddress(ZERO_PAGE); final int bufferPosition = checkOffset(offset, 0);
for (int i = 0; i < zerosBulkCopies; i++) { final int endZerosPosition = (int)Math.min((long)bufferPosition + count, lastMapped.capacity());
final int bufferPosition = checkOffset(offset, zerosMaxPage); final int zeros = endZerosPosition - bufferPosition;
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition; final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
PlatformDependent.copyMemory(srcAddress, destAddress, zerosMaxPage); PlatformDependent.setMemory(destAddress, zeros, (byte) 0);
offset += zerosMaxPage; offset += zeros;
count -= zeros;
//TODO need to call force on each write?
//this.force();
} }
final int remainingToBeZeroes = (int) (targetOffset - offset); if (offset > this.length) {
final int bufferPosition = checkOffset(offset, remainingToBeZeroes); this.length = offset;
final long destAddress = PlatformDependent.directBufferAddress(lastMapped) + bufferPosition;
PlatformDependent.copyMemory(srcAddress, destAddress, remainingToBeZeroes);
if (targetOffset > this.length) {
this.length = targetOffset;
} }
} }

View File

@ -20,16 +20,13 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile; import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.FileLock; import java.nio.channels.FileLock;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType; import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException; import org.apache.activemq.artemis.api.core.ActiveMQIOErrorException;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.io.IOCallback; import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
@ -44,12 +41,11 @@ final class MappedSequentialFile implements SequentialFile {
private final long chunkBytes; private final long chunkBytes;
private final long overlapBytes; private final long overlapBytes;
private final IOCriticalErrorListener criticalErrorListener; private final IOCriticalErrorListener criticalErrorListener;
private final MappedSequentialFileFactory factory;
private File file; private File file;
private File absoluteFile; private File absoluteFile;
private String fileName; private String fileName;
private MappedFile mappedFile; private MappedFile mappedFile;
private ActiveMQBuffer pooledActiveMQBuffer;
private final MappedSequentialFileFactory factory;
MappedSequentialFile(MappedSequentialFileFactory factory, MappedSequentialFile(MappedSequentialFileFactory factory,
final File directory, final File directory,
@ -65,19 +61,24 @@ final class MappedSequentialFile implements SequentialFile {
this.chunkBytes = chunkBytes; this.chunkBytes = chunkBytes;
this.overlapBytes = overlapBytes; this.overlapBytes = overlapBytes;
this.mappedFile = null; this.mappedFile = null;
this.pooledActiveMQBuffer = null;
this.criticalErrorListener = criticalErrorListener; this.criticalErrorListener = criticalErrorListener;
} }
private void checkIsOpen() { private void checkIsOpen() {
if (!isOpen()) { if (!isOpen()) {
throw new IllegalStateException("must be open!"); throw new IllegalStateException("File not opened!");
}
}
private void checkIsOpen(IOCallback callback) {
if (!isOpen()) {
callback.onError(ActiveMQExceptionType.IO_ERROR.getCode(), "File not opened");
} }
} }
private void checkIsNotOpen() { private void checkIsNotOpen() {
if (isOpen()) { if (isOpen()) {
throw new IllegalStateException("must be closed!"); throw new IllegalStateException("File opened!");
} }
} }
@ -101,7 +102,6 @@ final class MappedSequentialFile implements SequentialFile {
@Override @Override
public void open(int maxIO, boolean useExecutor) throws IOException { public void open(int maxIO, boolean useExecutor) throws IOException {
//ignore maxIO e useExecutor //ignore maxIO e useExecutor
ActiveMQJournalLogger.LOGGER.warn("ignoring maxIO and useExecutor unsupported parameters!");
this.open(); this.open();
} }
@ -113,11 +113,6 @@ final class MappedSequentialFile implements SequentialFile {
return hasRemaining; return hasRemaining;
} }
@Override
public int getAlignment() {
return 0;
}
@Override @Override
public int calculateBlockStart(int position) { public int calculateBlockStart(int position) {
return position; return position;
@ -139,7 +134,7 @@ final class MappedSequentialFile implements SequentialFile {
@Override @Override
public void delete() { public void delete() {
checkIsNotOpen(); close();
if (file.exists() && !file.delete()) { if (file.exists() && !file.delete()) {
ActiveMQJournalLogger.LOGGER.errorDeletingFile(this); ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
} }
@ -147,10 +142,10 @@ final class MappedSequentialFile implements SequentialFile {
@Override @Override
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws IOException { public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws IOException {
checkIsOpen();
if (callback == null) { if (callback == null) {
throw new NullPointerException("callback parameter need to be set"); throw new NullPointerException("callback parameter need to be set");
} }
checkIsOpen(callback);
try { try {
final ByteBuf byteBuf = bytes.byteBuf(); final ByteBuf byteBuf = bytes.byteBuf();
final int writerIndex = byteBuf.writerIndex(); final int writerIndex = byteBuf.writerIndex();
@ -187,34 +182,16 @@ final class MappedSequentialFile implements SequentialFile {
} }
} }
private ActiveMQBuffer acquiresActiveMQBufferWithAtLeast(int size) {
if (this.pooledActiveMQBuffer == null || this.pooledActiveMQBuffer.capacity() < size) {
this.pooledActiveMQBuffer = new ChannelBufferWrapper(Unpooled.directBuffer(size, size).order(ByteOrder.nativeOrder()));
} else {
this.pooledActiveMQBuffer.clear();
}
return pooledActiveMQBuffer;
}
@Override @Override
public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws IOException { public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws IOException {
checkIsOpen();
if (callback == null) { if (callback == null) {
throw new NullPointerException("callback parameter need to be set"); throw new NullPointerException("callback parameter need to be set");
} }
checkIsOpen(callback);
try { try {
final int encodedSize = bytes.getEncodeSize(); this.mappedFile.write(bytes);
final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize); if (factory.isDatasync() && sync) {
bytes.encode(outBuffer); this.mappedFile.force();
final ByteBuf byteBuf = outBuffer.byteBuf();
final int writerIndex = byteBuf.writerIndex();
final int readerIndex = byteBuf.readerIndex();
final int readableBytes = writerIndex - readerIndex;
if (readableBytes > 0) {
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
if (factory.isDatasync() && sync) {
this.mappedFile.force();
}
} }
callback.done(); callback.done();
} catch (IOException e) { } catch (IOException e) {
@ -229,33 +206,26 @@ final class MappedSequentialFile implements SequentialFile {
@Override @Override
public void write(EncodingSupport bytes, boolean sync) throws IOException { public void write(EncodingSupport bytes, boolean sync) throws IOException {
checkIsOpen(); checkIsOpen();
final int encodedSize = bytes.getEncodeSize(); this.mappedFile.write(bytes);
final ActiveMQBuffer outBuffer = acquiresActiveMQBufferWithAtLeast(encodedSize); if (factory.isDatasync() && sync) {
bytes.encode(outBuffer); this.mappedFile.force();
final ByteBuf byteBuf = outBuffer.byteBuf();
final int writerIndex = byteBuf.writerIndex();
final int readerIndex = byteBuf.readerIndex();
final int readableBytes = writerIndex - readerIndex;
if (readableBytes > 0) {
this.mappedFile.write(byteBuf, readerIndex, readableBytes);
if (factory.isDatasync() && sync) {
this.mappedFile.force();
}
} }
} }
@Override @Override
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) { public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
checkIsOpen();
if (callback == null) { if (callback == null) {
throw new NullPointerException("callback parameter need to be set"); throw new NullPointerException("callback parameter need to be set");
} }
checkIsOpen(callback);
try { try {
final int position = bytes.position(); final int position = bytes.position();
final int limit = bytes.limit(); final int limit = bytes.limit();
final int remaining = limit - position; final int remaining = limit - position;
if (remaining > 0) { if (remaining > 0) {
this.mappedFile.write(bytes, position, remaining); this.mappedFile.write(bytes, position, remaining);
final int newPosition = position + remaining;
bytes.position(newPosition);
if (factory.isDatasync() && sync) { if (factory.isDatasync() && sync) {
this.mappedFile.force(); this.mappedFile.force();
} }
@ -278,6 +248,8 @@ final class MappedSequentialFile implements SequentialFile {
final int remaining = limit - position; final int remaining = limit - position;
if (remaining > 0) { if (remaining > 0) {
this.mappedFile.write(bytes, position, remaining); this.mappedFile.write(bytes, position, remaining);
final int newPosition = position + remaining;
bytes.position(newPosition);
if (factory.isDatasync() && sync) { if (factory.isDatasync() && sync) {
this.mappedFile.force(); this.mappedFile.force();
} }
@ -286,10 +258,10 @@ final class MappedSequentialFile implements SequentialFile {
@Override @Override
public int read(ByteBuffer bytes, IOCallback callback) throws IOException { public int read(ByteBuffer bytes, IOCallback callback) throws IOException {
checkIsOpen();
if (callback == null) { if (callback == null) {
throw new NullPointerException("callback parameter need to be set"); throw new NullPointerException("callback parameter need to be set");
} }
checkIsOpen(callback);
try { try {
final int position = bytes.position(); final int position = bytes.position();
final int limit = bytes.limit(); final int limit = bytes.limit();
@ -301,8 +273,10 @@ final class MappedSequentialFile implements SequentialFile {
bytes.flip(); bytes.flip();
callback.done(); callback.done();
return bytesRead; return bytesRead;
} else {
callback.done();
return 0;
} }
return 0;
} catch (IOException e) { } catch (IOException e) {
if (this.criticalErrorListener != null) { if (this.criticalErrorListener != null) {
this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this); this.criticalErrorListener.onIOException(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
@ -365,7 +339,14 @@ final class MappedSequentialFile implements SequentialFile {
@Override @Override
public void renameTo(String newFileName) throws Exception { public void renameTo(String newFileName) throws Exception {
checkIsNotOpen(); try {
close();
} catch (Exception e) {
if (e instanceof IOException) {
factory.onIOError(new ActiveMQIOErrorException(e.getMessage(), e), e.getMessage(), this);
}
throw e;
}
if (this.fileName == null) { if (this.fileName == null) {
this.fileName = this.file.getName(); this.fileName = this.file.getName();
} }
@ -393,14 +374,10 @@ final class MappedSequentialFile implements SequentialFile {
if (dstFile.isOpen()) { if (dstFile.isOpen()) {
throw new IllegalArgumentException("dstFile must be closed too"); throw new IllegalArgumentException("dstFile must be closed too");
} }
try (RandomAccessFile src = new RandomAccessFile(file, "rw"); try (RandomAccessFile src = new RandomAccessFile(file, "rw"); FileChannel srcChannel = src.getChannel(); FileLock srcLock = srcChannel.lock()) {
FileChannel srcChannel = src.getChannel();
FileLock srcLock = srcChannel.lock()) {
final long readableBytes = srcChannel.size(); final long readableBytes = srcChannel.size();
if (readableBytes > 0) { if (readableBytes > 0) {
try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw"); try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw"); FileChannel dstChannel = dst.getChannel(); FileLock dstLock = dstChannel.lock()) {
FileChannel dstChannel = dst.getChannel();
FileLock dstLock = dstChannel.lock()) {
final long oldLength = dst.length(); final long oldLength = dst.length();
final long newLength = oldLength + readableBytes; final long newLength = oldLength + readableBytes;
dst.setLength(newLength); dst.setLength(newLength);

View File

@ -20,7 +20,6 @@ import java.io.File;
import java.io.FilenameFilter; import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
@ -29,30 +28,42 @@ import io.netty.util.internal.PlatformDependent;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener; import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
public final class MappedSequentialFileFactory implements SequentialFileFactory { public final class MappedSequentialFileFactory implements SequentialFileFactory {
private static long DEFAULT_BLOCK_SIZE = 64L << 20; private static long DEFAULT_BLOCK_SIZE = 64L << 20;
private final File directory; private final File directory;
private final IOCriticalErrorListener criticalErrorListener; private final IOCriticalErrorListener criticalErrorListener;
private final TimedBuffer timedBuffer;
private long chunkBytes; private long chunkBytes;
private long overlapBytes; private long overlapBytes;
private boolean useDataSync; private boolean useDataSync;
private boolean supportCallbacks;
public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) { protected volatile int alignment = -1;
public MappedSequentialFileFactory(File directory,
IOCriticalErrorListener criticalErrorListener,
boolean supportCallbacks) {
this.directory = directory; this.directory = directory;
this.criticalErrorListener = criticalErrorListener; this.criticalErrorListener = criticalErrorListener;
this.chunkBytes = DEFAULT_BLOCK_SIZE; this.chunkBytes = DEFAULT_BLOCK_SIZE;
this.overlapBytes = DEFAULT_BLOCK_SIZE / 4; this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
this.useDataSync = true;
this.timedBuffer = null;
this.supportCallbacks = supportCallbacks;
}
public MappedSequentialFileFactory(File directory, IOCriticalErrorListener criticalErrorListener) {
this(directory, criticalErrorListener, false);
} }
public MappedSequentialFileFactory(File directory) { public MappedSequentialFileFactory(File directory) {
this.directory = directory; this(directory, null);
this.criticalErrorListener = null;
this.chunkBytes = DEFAULT_BLOCK_SIZE;
this.overlapBytes = DEFAULT_BLOCK_SIZE / 4;
} }
public long chunkBytes() { public long chunkBytes() {
return chunkBytes; return chunkBytes;
} }
@ -73,7 +84,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override @Override
public SequentialFile createSequentialFile(String fileName) { public SequentialFile createSequentialFile(String fileName) {
return new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener); final MappedSequentialFile mappedSequentialFile = new MappedSequentialFile(this, directory, new File(directory, fileName), chunkBytes, overlapBytes, criticalErrorListener);
if (this.timedBuffer == null) {
return mappedSequentialFile;
} else {
return new TimedSequentialFile(this, mappedSequentialFile);
}
} }
@Override @Override
@ -89,17 +105,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override @Override
public int getMaxIO() { public int getMaxIO() {
return 0; return 1;
} }
@Override @Override
public List<String> listFiles(final String extension) throws Exception { public List<String> listFiles(final String extension) throws Exception {
final FilenameFilter extensionFilter = new FilenameFilter() { final FilenameFilter extensionFilter = (file, name) -> name.endsWith("." + extension);
@Override
public boolean accept(final File file, final String name) {
return name.endsWith("." + extension);
}
};
final String[] fileNames = directory.list(extensionFilter); final String[] fileNames = directory.list(extensionFilter);
if (fileNames == null) { if (fileNames == null) {
return Collections.EMPTY_LIST; return Collections.EMPTY_LIST;
@ -109,7 +120,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override @Override
public boolean isSupportsCallbacks() { public boolean isSupportsCallbacks() {
return false; return this.supportCallbacks;
} }
@Override @Override
@ -121,7 +132,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override @Override
public ByteBuffer allocateDirectBuffer(final int size) { public ByteBuffer allocateDirectBuffer(final int size) {
return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder()); return ByteBuffer.allocateDirect(size);
} }
@Override @Override
@ -131,7 +142,7 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override @Override
public ByteBuffer newBuffer(final int size) { public ByteBuffer newBuffer(final int size) {
return ByteBuffer.allocateDirect(size).order(ByteOrder.nativeOrder()); return ByteBuffer.allocate(size);
} }
@Override @Override
@ -143,17 +154,23 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override @Override
public void activateBuffer(SequentialFile file) { public void activateBuffer(SequentialFile file) {
if (timedBuffer != null) {
file.setTimedBuffer(timedBuffer);
}
} }
@Override @Override
public void deactivateBuffer() { public void deactivateBuffer() {
if (timedBuffer != null) {
// When moving to a new file, we need to make sure any pending buffer will be transferred to the buffer
timedBuffer.flush();
timedBuffer.setObserver(null);
}
} }
@Override @Override
public ByteBuffer wrapBuffer(final byte[] bytes) { public ByteBuffer wrapBuffer(final byte[] bytes) {
return ByteBuffer.wrap(bytes).order(ByteOrder.nativeOrder()); return ByteBuffer.wrap(bytes);
} }
@Override @Override
@ -161,6 +178,12 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
return 1; return 1;
} }
@Override
public MappedSequentialFileFactory setAlignment(int alignment) {
this.alignment = alignment;
return this;
}
@Override @Override
public int calculateBlockSize(int bytes) { public int calculateBlockSize(int bytes) {
return bytes; return bytes;
@ -173,7 +196,6 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override @Override
public void clearBuffer(final ByteBuffer buffer) { public void clearBuffer(final ByteBuffer buffer) {
buffer.clear();
if (buffer.isDirect()) { if (buffer.isDirect()) {
BytesUtils.zerosDirect(buffer); BytesUtils.zerosDirect(buffer);
} else if (buffer.hasArray()) { } else if (buffer.hasArray()) {
@ -187,16 +209,21 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
buffer.put(i, (byte) 0); buffer.put(i, (byte) 0);
} }
} }
buffer.rewind();
} }
@Override @Override
public void start() { public void start() {
if (timedBuffer != null) {
timedBuffer.start();
}
} }
@Override @Override
public void stop() { public void stop() {
if (timedBuffer != null) {
timedBuffer.stop();
}
} }
@Override @Override
@ -209,6 +236,8 @@ public final class MappedSequentialFileFactory implements SequentialFileFactory
@Override @Override
public void flush() { public void flush() {
if (timedBuffer != null) {
timedBuffer.flush();
}
} }
} }

View File

@ -0,0 +1,377 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io.mapped;
import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.io.DummyCallback;
import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.buffer.TimedBuffer;
import org.apache.activemq.artemis.core.io.buffer.TimedBufferObserver;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.journal.ActiveMQJournalLogger;
final class TimedSequentialFile implements SequentialFile {
private final SequentialFileFactory factory;
private final SequentialFile sequentialFile;
private final LocalBufferObserver observer;
private final ThreadLocal<ResettableIOCallback> callbackPool;
private TimedBuffer timedBuffer;
TimedSequentialFile(SequentialFileFactory factory, SequentialFile sequentialFile) {
this.sequentialFile = sequentialFile;
this.factory = factory;
this.observer = new LocalBufferObserver();
this.callbackPool = ThreadLocal.withInitial(ResettableIOCallback::new);
}
@Override
public boolean isOpen() {
return this.sequentialFile.isOpen();
}
@Override
public boolean exists() {
return this.sequentialFile.exists();
}
@Override
public void open() throws Exception {
this.sequentialFile.open();
}
@Override
public void open(int maxIO, boolean useExecutor) throws Exception {
this.sequentialFile.open(maxIO, useExecutor);
}
@Override
public boolean fits(int size) {
if (timedBuffer == null) {
return this.sequentialFile.fits(size);
} else {
return timedBuffer.checkSize(size);
}
}
@Override
public int calculateBlockStart(int position) throws Exception {
return this.sequentialFile.calculateBlockStart(position);
}
@Override
public String getFileName() {
return this.sequentialFile.getFileName();
}
@Override
public void fill(int size) throws Exception {
this.sequentialFile.fill(size);
}
@Override
public void delete() throws IOException, InterruptedException, ActiveMQException {
this.sequentialFile.delete();
}
@Override
public void write(ActiveMQBuffer bytes, boolean sync, IOCallback callback) throws Exception {
if (this.timedBuffer != null) {
this.timedBuffer.addBytes(bytes, sync, callback);
} else {
this.sequentialFile.write(bytes, sync, callback);
}
}
@Override
public void write(ActiveMQBuffer bytes, boolean sync) throws Exception {
if (sync) {
if (this.timedBuffer != null) {
final ResettableIOCallback callback = callbackPool.get();
try {
this.timedBuffer.addBytes(bytes, true, callback);
callback.waitCompletion();
} finally {
callback.reset();
}
} else {
this.sequentialFile.write(bytes, true);
}
} else {
if (this.timedBuffer != null) {
this.timedBuffer.addBytes(bytes, false, DummyCallback.getInstance());
} else {
this.sequentialFile.write(bytes, false);
}
}
}
@Override
public void write(EncodingSupport bytes, boolean sync, IOCallback callback) throws Exception {
if (this.timedBuffer != null) {
this.timedBuffer.addBytes(bytes, sync, callback);
} else {
this.sequentialFile.write(bytes, sync, callback);
}
}
@Override
public void write(EncodingSupport bytes, boolean sync) throws Exception {
if (sync) {
if (this.timedBuffer != null) {
final ResettableIOCallback callback = callbackPool.get();
try {
this.timedBuffer.addBytes(bytes, true, callback);
callback.waitCompletion();
} finally {
callback.reset();
}
} else {
this.sequentialFile.write(bytes, true);
}
} else {
if (this.timedBuffer != null) {
this.timedBuffer.addBytes(bytes, false, DummyCallback.getInstance());
} else {
this.sequentialFile.write(bytes, false);
}
}
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync, IOCallback callback) {
this.sequentialFile.writeDirect(bytes, sync, callback);
}
@Override
public void writeDirect(ByteBuffer bytes, boolean sync) throws Exception {
this.sequentialFile.writeDirect(bytes, sync);
}
@Override
public int read(ByteBuffer bytes, IOCallback callback) throws Exception {
return this.sequentialFile.read(bytes, callback);
}
@Override
public int read(ByteBuffer bytes) throws Exception {
return this.sequentialFile.read(bytes);
}
@Override
public void position(long pos) throws IOException {
this.sequentialFile.position(pos);
}
@Override
public long position() {
return this.sequentialFile.position();
}
@Override
public void close() throws Exception {
this.sequentialFile.close();
}
@Override
public void sync() throws IOException {
this.sequentialFile.sync();
}
@Override
public long size() throws Exception {
return this.sequentialFile.size();
}
@Override
public void renameTo(String newFileName) throws Exception {
this.sequentialFile.renameTo(newFileName);
}
@Override
public SequentialFile cloneFile() {
return new TimedSequentialFile(factory, this.sequentialFile.cloneFile());
}
@Override
public void copyTo(SequentialFile newFileName) throws Exception {
this.sequentialFile.copyTo(newFileName);
}
@Override
public void setTimedBuffer(TimedBuffer buffer) {
if (this.timedBuffer != null) {
this.timedBuffer.setObserver(null);
}
this.timedBuffer = buffer;
if (buffer != null) {
buffer.setObserver(this.observer);
}
}
@Override
public File getJavaFile() {
return this.sequentialFile.getJavaFile();
}
private static final class ResettableIOCallback implements IOCallback {
private final CyclicBarrier cyclicBarrier;
private int errorCode;
private String errorMessage;
ResettableIOCallback() {
this.cyclicBarrier = new CyclicBarrier(2);
}
public void waitCompletion() throws InterruptedException, ActiveMQException, BrokenBarrierException {
this.cyclicBarrier.await();
if (this.errorMessage != null) {
throw ActiveMQExceptionType.createException(this.errorCode, this.errorMessage);
}
}
public void reset() {
this.errorCode = 0;
this.errorMessage = null;
}
@Override
public void done() {
try {
this.cyclicBarrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new IllegalStateException(e);
}
}
@Override
public void onError(int errorCode, String errorMessage) {
try {
this.errorCode = errorCode;
this.errorMessage = errorMessage;
this.cyclicBarrier.await();
} catch (BrokenBarrierException | InterruptedException e) {
throw new IllegalStateException(e);
}
}
}
private static final class DelegateCallback implements IOCallback {
final List<IOCallback> delegates;
private DelegateCallback() {
this.delegates = new ArrayList<>();
}
public List<IOCallback> delegates() {
return this.delegates;
}
@Override
public void done() {
final int size = delegates.size();
for (int i = 0; i < size; i++) {
try {
final IOCallback callback = delegates.get(i);
callback.done();
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCompletingCallback(e);
}
}
}
@Override
public void onError(final int errorCode, final String errorMessage) {
for (IOCallback callback : delegates) {
try {
callback.onError(errorCode, errorMessage);
} catch (Throwable e) {
ActiveMQJournalLogger.LOGGER.errorCallingErrorCallback(e);
}
}
}
}
private final class LocalBufferObserver implements TimedBufferObserver {
private final ThreadLocal<DelegateCallback> callbacksPool = ThreadLocal.withInitial(DelegateCallback::new);
@Override
public void flushBuffer(final ByteBuffer buffer, final boolean requestedSync, final List<IOCallback> callbacks) {
buffer.flip();
if (buffer.limit() == 0) {
//if there are no bytes to flush, can release the callbacks
final int size = callbacks.size();
for (int i = 0; i < size; i++) {
callbacks.get(i).done();
}
} else {
final DelegateCallback delegateCallback = callbacksPool.get();
final int size = callbacks.size();
final List<IOCallback> delegates = delegateCallback.delegates();
for (int i = 0; i < size; i++) {
delegates.add(callbacks.get(i));
}
try {
sequentialFile.writeDirect(buffer, requestedSync, delegateCallback);
} finally {
delegates.clear();
}
}
}
@Override
public ByteBuffer newBuffer(final int size, final int limit) {
final int alignedSize = factory.calculateBlockSize(size);
final int alignedLimit = factory.calculateBlockSize(limit);
final ByteBuffer buffer = factory.newBuffer(alignedSize);
buffer.limit(alignedLimit);
return buffer;
}
@Override
public int getRemainingBytes() {
try {
final int remaining = (int) Math.min(sequentialFile.size() - sequentialFile.position(), Integer.MAX_VALUE);
return remaining;
} catch (Exception e) {
throw new IllegalStateException(e);
}
}
@Override
public String toString() {
return "TimedBufferObserver on file (" + getFileName() + ")";
}
}
}

View File

@ -53,11 +53,6 @@ public final class NIOSequentialFile extends AbstractSequentialFile {
defaultMaxIO = maxIO; defaultMaxIO = maxIO;
} }
@Override
public int getAlignment() {
return 1;
}
@Override @Override
public int calculateBlockStart(final int position) { public int calculateBlockStart(final int position) {
return position; return position;

View File

@ -0,0 +1,208 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.LockSupport;
import java.util.stream.Stream;
import io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueue;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.SimpleWaitIOCallback;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
/**
* To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
*/
public class JournalTptBenchmark {
public static void main(String[] args) throws Exception {
final boolean useDefaultIoExecutor = true;
final int fileSize = 1024 * 1024;
final boolean dataSync = true;
final Type type = Type.Mapped;
final int tests = 5;
final int warmup = 20_000;
final int measurements = 20_000;
final int msgSize = 100;
final byte[] msgContent = new byte[msgSize];
Arrays.fill(msgContent, (byte) 1);
final int totalMessages = (measurements * tests + warmup);
final File tmpDirectory = new File("./");
//using the default configuration when the broker starts!
final SequentialFileFactory factory;
switch (type) {
case Mapped:
final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true);
factory = mappedFactory.chunkBytes(fileSize).overlapBytes(0).setDatasync(dataSync);
break;
case Nio:
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
break;
case Aio:
factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync);
//disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
if (!LibaioContext.isLoaded()) {
throw new IllegalStateException("lib AIO not loaded!");
}
break;
default:
throw new AssertionError("unsupported case");
}
int numFiles = (int) (totalMessages * factory.calculateBlockSize(msgSize)) / fileSize;
if (numFiles < 2) {
numFiles = 2;
}
ExecutorService service = null;
final Journal journal;
if (useDefaultIoExecutor) {
journal = new JournalImpl(fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO());
journal.start();
} else {
final ArrayList<MpscArrayQueue<Runnable>> tasks = new ArrayList<>();
service = Executors.newSingleThreadExecutor();
journal = new JournalImpl(() -> new Executor() {
private final MpscArrayQueue<Runnable> taskQueue = new MpscArrayQueue<>(1024);
{
tasks.add(taskQueue);
}
@Override
public void execute(Runnable command) {
while (!taskQueue.offer(command)) {
LockSupport.parkNanos(1L);
}
}
}, fileSize, numFiles, numFiles, Integer.MAX_VALUE, 100, factory, "activemq-data", "amq", factory.getMaxIO(), 0);
journal.start();
service.execute(() -> {
final int size = tasks.size();
final int capacity = 1024;
while (!Thread.currentThread().isInterrupted()) {
for (int i = 0; i < size; i++) {
final MpscArrayQueue<Runnable> runnables = tasks.get(i);
for (int j = 0; j < capacity; j++) {
final Runnable task = runnables.poll();
if (task == null) {
break;
}
try {
task.run();
} catch (Throwable t) {
System.err.println(t);
}
}
}
}
});
}
try {
journal.load(new ArrayList<RecordInfo>(), null, null);
} catch (Exception e) {
throw new RuntimeException(e);
}
try {
final EncodingSupport encodingSupport = new EncodingSupport() {
@Override
public int getEncodeSize() {
return msgSize;
}
@Override
public void encode(ActiveMQBuffer buffer) {
final int writerIndex = buffer.writerIndex();
buffer.setBytes(writerIndex, msgContent);
buffer.writerIndex(writerIndex + msgSize);
}
@Override
public void decode(ActiveMQBuffer buffer) {
}
};
long id = 1;
{
final long elapsed = writeMeasurements(id, journal, encodingSupport, warmup);
id += warmup;
System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec");
}
for (int t = 0; t < tests; t++) {
final long elapsed = writeMeasurements(id, journal, encodingSupport, measurements);
System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec");
id += warmup;
}
} finally {
journal.stop();
if (service != null) {
service.shutdown();
}
final File[] fileToDeletes = tmpDirectory.listFiles();
System.out.println("Files to deletes" + Arrays.toString(fileToDeletes));
Stream.of(fileToDeletes).forEach(File::delete);
}
}
private static long writeMeasurements(long id,
Journal journal,
EncodingSupport encodingSupport,
int measurements) throws Exception {
System.gc();
TimeUnit.SECONDS.sleep(2);
final long start = System.nanoTime();
for (int i = 0; i < measurements; i++) {
write(id, journal, encodingSupport);
id++;
}
final long elapsed = System.nanoTime() - start;
return elapsed;
}
private static void write(long id, Journal journal, EncodingSupport encodingSupport) throws Exception {
journal.appendAddRecord(id, (byte) 1, encodingSupport, false);
final SimpleWaitIOCallback ioCallback = new SimpleWaitIOCallback();
journal.appendUpdateRecord(id, (byte) 1, encodingSupport, true, ioCallback);
ioCallback.waitCompletion();
}
private enum Type {
Mapped, Nio, Aio
}
}

View File

@ -0,0 +1,203 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.core.io;
import java.io.File;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.LockSupport;
import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQExceptionType;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.jlibaio.LibaioContext;
/**
* To benchmark Type.Aio you need to define -Djava.library.path=${project-root}/native/src/.libs when calling the JVM
*/
public class SequentialFileTptBenchmark {
private static final FastWaitIOCallback CALLBACK = new FastWaitIOCallback();
public static void main(String[] args) throws Exception {
final boolean dataSync = true;
final boolean writeSync = true;
final Type type = Type.Mapped;
final int tests = 10;
final int warmup = 20_000;
final int measurements = 20_000;
final int msgSize = 100;
final byte[] msgContent = new byte[msgSize];
Arrays.fill(msgContent, (byte) 1);
final File tmpDirectory = new File("./");
//using the default configuration when the broker starts!
final SequentialFileFactory factory;
switch (type) {
case Mapped:
final MappedSequentialFileFactory mappedFactory = new MappedSequentialFileFactory(tmpDirectory, null, true);
final int alignedMessageSize = mappedFactory.calculateBlockSize(msgSize);
final int totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup);
factory = mappedFactory.chunkBytes(totalFileSize).overlapBytes(0).setDatasync(dataSync);
break;
case Nio:
factory = new NIOSequentialFileFactory(tmpDirectory, true, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_NIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO, 1, false, null).setDatasync(dataSync);
break;
case Aio:
factory = new AIOSequentialFileFactory(tmpDirectory, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 500, false, null).setDatasync(dataSync);
//disable it when using directly the same buffer: ((AIOSequentialFileFactory)factory).disableBufferReuse();
if (!LibaioContext.isLoaded()) {
throw new IllegalStateException("lib AIO not loaded!");
}
break;
default:
throw new AssertionError("unsupported case");
}
factory.start();
try {
final EncodingSupport encodingSupport = new EncodingSupport() {
@Override
public int getEncodeSize() {
return msgSize;
}
@Override
public void encode(ActiveMQBuffer buffer) {
final int writerIndex = buffer.writerIndex();
buffer.setBytes(writerIndex, msgContent);
buffer.writerIndex(writerIndex + msgSize);
}
@Override
public void decode(ActiveMQBuffer buffer) {
}
};
final int alignedMessageSize = factory.calculateBlockSize(msgSize);
final long totalFileSize = Math.max(alignedMessageSize * measurements, alignedMessageSize * warmup);
if (totalFileSize > Integer.MAX_VALUE)
throw new IllegalArgumentException("reduce measurements/warmup");
final int fileSize = (int) totalFileSize;
final SequentialFile sequentialFile = factory.createSequentialFile("seq.dat");
sequentialFile.getJavaFile().delete();
sequentialFile.getJavaFile().deleteOnExit();
sequentialFile.open();
final long startZeros = System.nanoTime();
sequentialFile.fill(fileSize);
final long elapsedZeros = System.nanoTime() - startZeros;
System.out.println("Zeroed " + fileSize + " bytes in " + TimeUnit.NANOSECONDS.toMicros(elapsedZeros) + " us");
try {
{
final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, warmup, writeSync);
System.out.println("warmup:" + (measurements * 1000_000_000L) / elapsed + " ops/sec");
}
for (int t = 0; t < tests; t++) {
final long elapsed = writeMeasurements(factory, sequentialFile, encodingSupport, measurements, writeSync);
System.out.println((measurements * 1000_000_000L) / elapsed + " ops/sec");
}
} finally {
sequentialFile.close();
}
} finally {
factory.stop();
}
}
private static long writeMeasurements(SequentialFileFactory sequentialFileFactory,
SequentialFile sequentialFile,
EncodingSupport encodingSupport,
int measurements,
boolean writeSync) throws Exception {
//System.gc();
TimeUnit.SECONDS.sleep(2);
sequentialFileFactory.activateBuffer(sequentialFile);
sequentialFile.position(0);
final long start = System.nanoTime();
for (int i = 0; i < measurements; i++) {
write(sequentialFile, encodingSupport, writeSync);
}
sequentialFileFactory.deactivateBuffer();
final long elapsed = System.nanoTime() - start;
return elapsed;
}
private static void write(SequentialFile sequentialFile,
EncodingSupport encodingSupport,
boolean sync) throws Exception {
//this pattern is necessary to ensure that NIO's TimedBuffer fill flush the buffer and know the real size of it
if (sequentialFile.fits(encodingSupport.getEncodeSize())) {
final FastWaitIOCallback ioCallback = CALLBACK.reset();
sequentialFile.write(encodingSupport, sync, ioCallback);
ioCallback.waitCompletion();
} else {
throw new IllegalStateException("can't happen!");
}
}
private enum Type {
Mapped, Nio, Aio
}
private static final class FastWaitIOCallback implements IOCallback {
private final AtomicBoolean done = new AtomicBoolean(false);
private int errorCode = 0;
private String errorMessage = null;
public FastWaitIOCallback reset() {
errorCode = 0;
errorMessage = null;
done.lazySet(false);
return this;
}
@Override
public void done() {
errorCode = 0;
errorMessage = null;
done.lazySet(true);
}
@Override
public void onError(int errorCode, String errorMessage) {
this.errorCode = errorCode;
this.errorMessage = errorMessage;
done.lazySet(true);
}
public void waitCompletion() throws InterruptedException, ActiveMQException {
final Thread currentThread = Thread.currentThread();
while (!done.get()) {
LockSupport.parkNanos(1L);
if (currentThread.isInterrupted())
throw new InterruptedException();
}
if (errorMessage != null) {
throw ActiveMQExceptionType.createException(errorCode, errorMessage);
}
}
}
}

View File

@ -39,9 +39,7 @@ public final class LibaioFile<Callback extends SubmitInfo> implements AutoClosea
} }
public int getBlockSize() { public int getBlockSize() {
return 512; return LibaioContext.getBlockSizeFD(fd);
// FIXME
//return LibaioContext.getBlockSizeFD(fd);
} }
public boolean lock() { public boolean lock() {

View File

@ -563,7 +563,7 @@ public interface Configuration {
Configuration setJournalDirectory(String dir); Configuration setJournalDirectory(String dir);
/** /**
* Returns the type of journal used by this server (either {@code NIO} or {@code ASYNCIO}). * Returns the type of journal used by this server ({@code NIO}, {@code ASYNCIO} or {@code MAPPED}).
* <br> * <br>
* Default value is ASYNCIO. * Default value is ASYNCIO.
*/ */

View File

@ -16,6 +16,8 @@
*/ */
package org.apache.activemq.artemis.core.config.impl; package org.apache.activemq.artemis.core.config.impl;
import java.util.EnumSet;
import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle; import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType; import org.apache.activemq.artemis.core.server.DivertConfigurationRoutingType;
import org.apache.activemq.artemis.core.server.JournalType; import org.apache.activemq.artemis.core.server.JournalType;
@ -125,7 +127,7 @@ public final class Validators {
@Override @Override
public void validate(final String name, final Object value) { public void validate(final String name, final Object value) {
String val = (String) value; String val = (String) value;
if (val == null || !val.equals(JournalType.NIO.toString()) && !val.equals(JournalType.ASYNCIO.toString())) { if (val == null || !EnumSet.allOf(JournalType.class).contains(JournalType.valueOf(val))) {
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType(val); throw ActiveMQMessageBundle.BUNDLE.invalidJournalType(val);
} }
} }

View File

@ -497,22 +497,19 @@ public final class FileConfigurationParser extends XMLConfigurationUtil {
String s = getString(e, "journal-type", config.getJournalType().toString(), Validators.JOURNAL_TYPE); String s = getString(e, "journal-type", config.getJournalType().toString(), Validators.JOURNAL_TYPE);
if (s.equals(JournalType.NIO.toString())) { config.setJournalType(JournalType.getType(s));
config.setJournalType(JournalType.NIO);
} else if (s.equals(JournalType.ASYNCIO.toString())) { if (config.getJournalType() == JournalType.ASYNCIO) {
// https://jira.jboss.org/jira/browse/HORNETQ-295 // https://jira.jboss.org/jira/browse/HORNETQ-295
// We do the check here to see if AIO is supported so we can use the correct defaults and/or use // We do the check here to see if AIO is supported so we can use the correct defaults and/or use
// correct settings in xml // correct settings in xml
// If we fall back later on these settings can be ignored // If we fall back later on these settings can be ignored
boolean supportsAIO = AIOSequentialFileFactory.isSupported(); boolean supportsAIO = AIOSequentialFileFactory.isSupported();
if (supportsAIO) { if (!supportsAIO) {
config.setJournalType(JournalType.ASYNCIO);
} else {
if (validateAIO) { if (validateAIO) {
ActiveMQServerLogger.LOGGER.AIONotFound(); ActiveMQServerLogger.LOGGER.AIONotFound();
} }
config.setJournalType(JournalType.NIO); config.setJournalType(JournalType.NIO);
} }
} }

View File

@ -21,6 +21,7 @@ import java.io.File;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -43,6 +44,7 @@ import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile; import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.Journal; import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.impl.JournalFile; import org.apache.activemq.artemis.core.journal.impl.JournalFile;
@ -113,7 +115,7 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
@Override @Override
protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) { protected void init(Configuration config, IOCriticalErrorListener criticalErrorListener) {
if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO) { if (!EnumSet.allOf(JournalType.class).contains(config.getJournalType())) {
throw ActiveMQMessageBundle.BUNDLE.invalidJournal(); throw ActiveMQMessageBundle.BUNDLE.invalidJournal();
} }
@ -125,21 +127,28 @@ public class JournalStorageManager extends AbstractJournalStorageManager {
bindingsJournal = localBindings; bindingsJournal = localBindings;
originalBindingsJournal = localBindings; originalBindingsJournal = localBindings;
if (config.getJournalType() == JournalType.ASYNCIO) { switch (config.getJournalType()) {
ActiveMQServerLogger.LOGGER.journalUseAIO();
journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener); case NIO:
} else if (config.getJournalType() == JournalType.NIO) { ActiveMQServerLogger.LOGGER.journalUseNIO();
ActiveMQServerLogger.LOGGER.journalUseNIO(); journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener);
journalFF = new NIOSequentialFileFactory(config.getJournalLocation(), true, config.getJournalBufferSize_NIO(), config.getJournalBufferTimeout_NIO(), config.getJournalMaxIO_NIO(), config.isLogJournalWriteRate(), criticalErrorListener); break;
} else { case ASYNCIO:
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType()); ActiveMQServerLogger.LOGGER.journalUseAIO();
journalFF = new AIOSequentialFileFactory(config.getJournalLocation(), config.getJournalBufferSize_AIO(), config.getJournalBufferTimeout_AIO(), config.getJournalMaxIO_AIO(), config.isLogJournalWriteRate(), criticalErrorListener);
break;
case MAPPED:
ActiveMQServerLogger.LOGGER.journalUseMAPPED();
//the mapped version do not need buffering by default
journalFF = new MappedSequentialFileFactory(config.getJournalLocation(), criticalErrorListener, true).chunkBytes(config.getJournalFileSize()).overlapBytes(0);
break;
default:
throw ActiveMQMessageBundle.BUNDLE.invalidJournalType2(config.getJournalType());
} }
journalFF.setDatasync(config.isJournalDatasync()); journalFF.setDatasync(config.isJournalDatasync());
Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", journalFF.getMaxIO(), 0);
Journal localMessage = new JournalImpl(ioExecutors, config.getJournalFileSize(), config.getJournalMinFiles(), config.getJournalPoolFiles(), config.getJournalCompactMinFiles(), config.getJournalCompactPercentage(), journalFF, "activemq-data", "amq", config.getJournalType() == JournalType.ASYNCIO ? config.getJournalMaxIO_AIO() : config.getJournalMaxIO_NIO(), 0);
messageJournal = localMessage; messageJournal = localMessage;
originalMessageJournal = localMessage; originalMessageJournal = localMessage;

View File

@ -1558,8 +1558,12 @@ public interface ActiveMQServerLogger extends BasicLogger {
@Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224072, value = "Message Counter Sample Period too short: {0}", format = Message.Format.MESSAGE_FORMAT)
void invalidMessageCounterPeriod(long value); void invalidMessageCounterPeriod(long value);
@LogMessage(level = Logger.Level.INFO)
@Message(id = 224073, value = "Using MAPPED Journal", format = Message.Format.MESSAGE_FORMAT)
void journalUseMAPPED();
@LogMessage(level = Logger.Level.ERROR) @LogMessage(level = Logger.Level.ERROR)
@Message(id = 224073, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT) @Message(id = 224074, value = "Failed to purge queue {0} on no consumers", format = Message.Format.MESSAGE_FORMAT)
void failedToPurgeQueue(@Cause Exception e, SimpleString bindingName); void failedToPurgeQueue(@Cause Exception e, SimpleString bindingName);
} }

View File

@ -16,6 +16,33 @@
*/ */
package org.apache.activemq.artemis.core.server; package org.apache.activemq.artemis.core.server;
public enum JournalType { public enum JournalType {
NIO, ASYNCIO; NIO, ASYNCIO, MAPPED;
public static final String validValues;
static {
StringBuffer stringBuffer = new StringBuffer();
for (JournalType type : JournalType.values()) {
if (stringBuffer.length() != 0) {
stringBuffer.append(",");
}
stringBuffer.append(type.name());
}
validValues = stringBuffer.toString();
}
public static JournalType getType(String type) {
switch (type) {
case "NIO": return NIO;
case "ASYNCIO" : return ASYNCIO;
case "MAPPED" : return MAPPED;
default: throw new IllegalStateException("Invalid JournalType:" + type + " valid Types: " + validValues);
}
}
} }

View File

@ -584,6 +584,7 @@
<xsd:restriction base="xsd:string"> <xsd:restriction base="xsd:string">
<xsd:enumeration value="ASYNCIO"/> <xsd:enumeration value="ASYNCIO"/>
<xsd:enumeration value="NIO"/> <xsd:enumeration value="NIO"/>
<xsd:enumeration value="MAPPED"/>
</xsd:restriction> </xsd:restriction>
</xsd:simpleType> </xsd:simpleType>
</xsd:element> </xsd:element>

View File

@ -576,6 +576,7 @@
<xsd:restriction base="xsd:string"> <xsd:restriction base="xsd:string">
<xsd:enumeration value="ASYNCIO"/> <xsd:enumeration value="ASYNCIO"/>
<xsd:enumeration value="NIO"/> <xsd:enumeration value="NIO"/>
<xsd:enumeration value="MAPPED"/>
</xsd:restriction> </xsd:restriction>
</xsd:simpleType> </xsd:simpleType>
</xsd:element> </xsd:element>

View File

@ -59,7 +59,8 @@ public class AIOJournalImplTest extends JournalImplTestUnit {
file.mkdir(); file.mkdir();
return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 1000000, 10, false); // forcing the alignment to be 512, as this test was hard coded around this size.
return new AIOSequentialFileFactory(getTestDirfile(), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, 1000000, 10, false).setAlignment(512);
} }
@Override @Override

View File

@ -44,7 +44,7 @@ public class AIOSequentialFileFactoryTest extends SequentialFileFactoryTestBase
SequentialFile file = factory.createSequentialFile("filtetmp.log"); SequentialFile file = factory.createSequentialFile("filtetmp.log");
file.open(); file.open();
ByteBuffer buff = factory.newBuffer(10); ByteBuffer buff = factory.newBuffer(10);
Assert.assertEquals(512, buff.limit()); Assert.assertEquals(factory.getAlignment(), buff.limit());
file.close(); file.close();
factory.releaseBuffer(buff); factory.releaseBuffer(buff);
} }

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.journal;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
public class MappedImportExportTest extends NIOImportExportTest {
@Override
protected SequentialFileFactory getFileFactory() throws Exception {
return new MappedSequentialFileFactory(getTestDirfile());
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.journal;
import java.io.File;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
public class MappedJournalCompactTest extends NIOJournalCompactTest {
@Override
protected SequentialFileFactory getFileFactory() throws Exception {
File file = new File(getTestDir());
ActiveMQTestBase.deleteDirectory(file);
file.mkdir();
return new MappedSequentialFileFactory(getTestDirfile());
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.journal;
import java.io.File;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.JournalImplTestUnit;
public class MappedJournalImplTest extends JournalImplTestUnit {
@Override
protected SequentialFileFactory getFileFactory() throws Exception {
File file = new File(getTestDir());
deleteDirectory(file);
file.mkdir();
return new MappedSequentialFileFactory(getTestDirfile());
}
@Override
protected int getAlignment() {
return fileFactory.getAlignment();
}
}

View File

@ -0,0 +1,184 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.artemis.tests.integration.journal;
import java.io.File;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.EncodingSupport;
import org.apache.activemq.artemis.tests.unit.core.journal.impl.SequentialFileFactoryTestBase;
import org.junit.Assert;
import org.junit.Test;
public class MappedSequentialFileFactoryTest extends SequentialFileFactoryTestBase {
@Override
protected SequentialFileFactory createFactory(String folder) {
return new MappedSequentialFileFactory(new File(folder));
}
@Test
public void testInterrupts() throws Throwable {
final EncodingSupport fakeEncoding = new EncodingSupport() {
@Override
public int getEncodeSize() {
return 10;
}
@Override
public void encode(ActiveMQBuffer buffer) {
buffer.writeBytes(new byte[10]);
}
@Override
public void decode(ActiveMQBuffer buffer) {
}
};
final AtomicInteger calls = new AtomicInteger(0);
final MappedSequentialFileFactory factory = new MappedSequentialFileFactory(new File(getTestDir()), (code, message, file) -> {
new Exception("shutdown").printStackTrace();
calls.incrementAndGet();
});
Thread threadOpen = new Thread() {
@Override
public void run() {
try {
Thread.currentThread().interrupt();
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
} catch (Exception e) {
e.printStackTrace();
}
}
};
threadOpen.start();
threadOpen.join();
Thread threadClose = new Thread() {
@Override
public void run() {
try {
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
file.write(fakeEncoding, true);
Thread.currentThread().interrupt();
file.close();
} catch (Exception e) {
e.printStackTrace();
}
}
};
threadClose.start();
threadClose.join();
Thread threadWrite = new Thread() {
@Override
public void run() {
try {
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
Thread.currentThread().interrupt();
file.write(fakeEncoding, true);
file.close();
} catch (Exception e) {
e.printStackTrace();
}
}
};
threadWrite.start();
threadWrite.join();
Thread threadFill = new Thread() {
@Override
public void run() {
try {
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
Thread.currentThread().interrupt();
file.fill(1024);
file.close();
} catch (Exception e) {
e.printStackTrace();
}
}
};
threadFill.start();
threadFill.join();
Thread threadWriteDirect = new Thread() {
@Override
public void run() {
try {
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
ByteBuffer buffer = ByteBuffer.allocate(10);
buffer.put(new byte[10]);
Thread.currentThread().interrupt();
file.writeDirect(buffer, true);
file.close();
} catch (Exception e) {
e.printStackTrace();
}
}
};
threadWriteDirect.start();
threadWriteDirect.join();
Thread threadRead = new Thread() {
@Override
public void run() {
try {
SequentialFile file = factory.createSequentialFile("file.txt");
file.open();
file.write(fakeEncoding, true);
file.position(0);
ByteBuffer readBytes = ByteBuffer.allocate(fakeEncoding.getEncodeSize());
Thread.currentThread().interrupt();
file.read(readBytes);
file.close();
} catch (Exception e) {
e.printStackTrace();
}
}
};
threadRead.start();
threadRead.join();
// An interrupt exception shouldn't issue a shutdown
Assert.assertEquals(0, calls.get());
}
}

View File

@ -45,7 +45,7 @@ public class NIOImportExportTest extends JournalImplTestBase {
@Test @Test
public void testExportImport() throws Exception { public void testExportImport() throws Exception {
setup(10, 10 * 1024, true); setup(10, 10 * 4096, true);
createJournal(); createJournal();
@ -99,7 +99,7 @@ public class NIOImportExportTest extends JournalImplTestBase {
@Test @Test
public void testExportImport3() throws Exception { public void testExportImport3() throws Exception {
setup(10, 10 * 1024, true); setup(10, 10 * 4096, true);
createJournal(); createJournal();
@ -162,7 +162,7 @@ public class NIOImportExportTest extends JournalImplTestBase {
@Test @Test
public void testExportImport2() throws Exception { public void testExportImport2() throws Exception {
setup(10, 10 * 1024, true); setup(10, 10 * 4096, true);
createJournal(); createJournal();

View File

@ -479,7 +479,7 @@ public class NIOJournalCompactTest extends JournalImplTestBase {
performNonTransactionalDelete = false; performNonTransactionalDelete = false;
} }
setup(2, 60 * 1024, false); setup(2, 60 * 4096, false);
ArrayList<Long> liveIDs = new ArrayList<>(); ArrayList<Long> liveIDs = new ArrayList<>();

View File

@ -22,8 +22,11 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.activemq.artemis.ArtemisConstants; import org.apache.activemq.artemis.ArtemisConstants;
import org.apache.activemq.artemis.core.io.IOCriticalErrorListener;
import org.apache.activemq.artemis.core.io.SequentialFile;
import org.apache.activemq.artemis.core.io.SequentialFileFactory; import org.apache.activemq.artemis.core.io.SequentialFileFactory;
import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.aio.AIOSequentialFileFactory;
import org.apache.activemq.artemis.core.io.mapped.MappedSequentialFileFactory;
import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory; import org.apache.activemq.artemis.core.io.nio.NIOSequentialFileFactory;
import org.apache.activemq.artemis.core.journal.LoaderCallback; import org.apache.activemq.artemis.core.journal.LoaderCallback;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo; import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
@ -102,6 +105,27 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
internalTest("nio2", getTestDir(), 10000, 0, true, true, 1); internalTest("nio2", getTestDir(), 10000, 0, true, true, 1);
} }
@Test
public void testMMap() throws Exception {
internalTest("mmap", getTestDir(), 10000, 100, true, true, 1);
}
@Test
public void testMMAPHugeTransaction() throws Exception {
internalTest("mmap", getTestDir(), 10000, 10000, true, true, 1);
}
@Test
public void testMMAPOMultiThread() throws Exception {
internalTest("mmap", getTestDir(), 1000, 100, true, true, 10);
}
@Test
public void testMMAPNonTransactional() throws Exception {
internalTest("mmap", getTestDir(), 10000, 0, true, true, 1);
}
// Package protected --------------------------------------------- // Package protected ---------------------------------------------
private void internalTest(final String type, private void internalTest(final String type,
@ -234,7 +258,7 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
if (args.length != 5) { if (args.length != 5) {
System.err.println("Use: java -cp <classpath> " + ValidateTransactionHealthTest.class.getCanonicalName() + System.err.println("Use: java -cp <classpath> " + ValidateTransactionHealthTest.class.getCanonicalName() +
" aio|nio <journalDirectory> <NumberOfElements> <TransactionSize> <NumberOfThreads>"); " aio|nio|mmap <journalDirectory> <NumberOfElements> <TransactionSize> <NumberOfThreads>");
System.exit(-1); System.exit(-1);
} }
System.out.println("Running"); System.out.println("Running");
@ -320,15 +344,22 @@ public class ValidateTransactionHealthTest extends ActiveMQTestBase {
} }
public static JournalImpl createJournal(final String journalType, final String journalDir) { public static JournalImpl createJournal(final String journalType, final String journalDir) {
JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir), "journaltst", "tst", 500); JournalImpl journal = new JournalImpl(10485760, 2, 2, 0, 0, ValidateTransactionHealthTest.getFactory(journalType, journalDir, 10485760), "journaltst", "tst", 500);
return journal; return journal;
} }
public static SequentialFileFactory getFactory(final String factoryType, final String directory) { public static SequentialFileFactory getFactory(final String factoryType, final String directory, int fileSize) {
if (factoryType.equals("aio")) { if (factoryType.equals("aio")) {
return new AIOSequentialFileFactory(new File(directory), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 10, false); return new AIOSequentialFileFactory(new File(directory), ArtemisConstants.DEFAULT_JOURNAL_BUFFER_SIZE_AIO, ArtemisConstants.DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO, 10, false);
} else if (factoryType.equals("nio2")) { } else if (factoryType.equals("nio2")) {
return new NIOSequentialFileFactory(new File(directory), true, 1); return new NIOSequentialFileFactory(new File(directory), true, 1);
} else if (factoryType.equals("mmap")) {
return new MappedSequentialFileFactory(new File(directory), new IOCriticalErrorListener() {
@Override
public void onIOException(Throwable code, String message, SequentialFile file) {
code.printStackTrace();
}
}, true).chunkBytes(fileSize).overlapBytes(0);
} else { } else {
return new NIOSequentialFileFactory(new File(directory), false, 1); return new NIOSequentialFileFactory(new File(directory), false, 1);
} }

View File

@ -43,8 +43,8 @@ import org.junit.runners.Parameterized;
@RunWith(Parameterized.class) @RunWith(Parameterized.class)
public class GlobalPagingTest extends PagingTest { public class GlobalPagingTest extends PagingTest {
public GlobalPagingTest(StoreConfiguration.StoreType storeType) { public GlobalPagingTest(StoreConfiguration.StoreType storeType, boolean mapped) {
super(storeType); super(storeType, mapped);
} }
@Override @Override

View File

@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.DescribeJournal
import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds; import org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds;
import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl; import org.apache.activemq.artemis.core.persistence.impl.journal.OperationContextImpl;
import org.apache.activemq.artemis.core.server.ActiveMQServer; import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.server.JournalType;
import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.core.server.Queue;
import org.apache.activemq.artemis.core.server.RoutingType; import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl; import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
@ -109,17 +110,20 @@ public class PagingTest extends ActiveMQTestBase {
protected static final int PAGE_SIZE = 10 * 1024; protected static final int PAGE_SIZE = 10 * 1024;
protected final boolean mapped;
protected final StoreConfiguration.StoreType storeType; protected final StoreConfiguration.StoreType storeType;
static final SimpleString ADDRESS = new SimpleString("SimpleAddress"); static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
public PagingTest(StoreConfiguration.StoreType storeType) { public PagingTest(StoreConfiguration.StoreType storeType, boolean mapped) {
this.storeType = storeType; this.storeType = storeType;
this.mapped = mapped;
} }
@Parameterized.Parameters(name = "storeType={0}") @Parameterized.Parameters(name = "storeType={0}, mapped={1}")
public static Collection<Object[]> data() { public static Collection<Object[]> data() {
Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE}, {StoreConfiguration.StoreType.DATABASE}}; Object[][] params = new Object[][]{{StoreConfiguration.StoreType.FILE, false}, {StoreConfiguration.StoreType.FILE, true}, {StoreConfiguration.StoreType.DATABASE, false}};
return Arrays.asList(params); return Arrays.asList(params);
} }
@ -5654,6 +5658,8 @@ public class PagingTest extends ActiveMQTestBase {
Configuration configuration = super.createDefaultInVMConfig().setJournalSyncNonTransactional(false); Configuration configuration = super.createDefaultInVMConfig().setJournalSyncNonTransactional(false);
if (storeType == StoreConfiguration.StoreType.DATABASE) { if (storeType == StoreConfiguration.StoreType.DATABASE) {
setDBStoreType(configuration); setDBStoreType(configuration);
} else if (mapped) {
configuration.setJournalType(JournalType.MAPPED);
} }
return configuration; return configuration;
} }

View File

@ -50,7 +50,7 @@ public class CleanBufferTest extends ActiveMQTestBase {
@Test @Test
public void testCleanOnAIO() { public void testCleanOnAIO() {
if (LibaioContext.isLoaded()) { if (LibaioContext.isLoaded()) {
SequentialFileFactory factory = new AIOSequentialFileFactory(new File("Whatever"), 50); SequentialFileFactory factory = new AIOSequentialFileFactory(new File("./target"), 50);
testBuffer(factory); testBuffer(factory);
} }

View File

@ -39,6 +39,9 @@ public abstract class JournalImplTestUnit extends JournalImplTestBase {
@Override @Override
@After @After
public void tearDown() throws Exception { public void tearDown() throws Exception {
//stop journal first to let it manage its files
stopComponent(journal);
List<String> files = fileFactory.listFiles(fileExtension); List<String> files = fileFactory.listFiles(fileExtension);
for (String file : files) { for (String file : files) {

View File

@ -132,11 +132,11 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
try { try {
checkFill(sf, 2048); checkFill(factory, sf, 2048);
checkFill(sf, 512); checkFill(factory, sf, 512);
checkFill(sf, 512 * 4); checkFill(factory, sf, 512 * 4);
} finally { } finally {
sf.close(); sf.close();
} }
@ -226,19 +226,19 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
sf.write(bb1, true); sf.write(bb1, true);
long bytesWritten = sf.position() - initialPos; long bytesWritten = sf.position() - initialPos;
Assert.assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten); Assert.assertEquals(calculateRecordSize(bytes1.length, factory.getAlignment()), bytesWritten);
initialPos = sf.position(); initialPos = sf.position();
sf.write(bb2, true); sf.write(bb2, true);
bytesWritten = sf.position() - initialPos; bytesWritten = sf.position() - initialPos;
Assert.assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten); Assert.assertEquals(calculateRecordSize(bytes2.length, factory.getAlignment()), bytesWritten);
initialPos = sf.position(); initialPos = sf.position();
sf.write(bb3, true); sf.write(bb3, true);
bytesWritten = sf.position() - initialPos; bytesWritten = sf.position() - initialPos;
Assert.assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten); Assert.assertEquals(calculateRecordSize(bytes3.length, factory.getAlignment()), bytesWritten);
sf.position(0); sf.position(0);
@ -247,20 +247,20 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
ByteBuffer rb3 = factory.newBuffer(bytes3.length); ByteBuffer rb3 = factory.newBuffer(bytes3.length);
int bytesRead = sf.read(rb1); int bytesRead = sf.read(rb1);
Assert.assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesRead); Assert.assertEquals(calculateRecordSize(bytes1.length, factory.getAlignment()), bytesRead);
for (int i = 0; i < bytes1.length; i++) { for (int i = 0; i < bytes1.length; i++) {
Assert.assertEquals(bytes1[i], rb1.get(i)); Assert.assertEquals(bytes1[i], rb1.get(i));
} }
bytesRead = sf.read(rb2); bytesRead = sf.read(rb2);
Assert.assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesRead); Assert.assertEquals(calculateRecordSize(bytes2.length, factory.getAlignment()), bytesRead);
for (int i = 0; i < bytes2.length; i++) { for (int i = 0; i < bytes2.length; i++) {
Assert.assertEquals(bytes2[i], rb2.get(i)); Assert.assertEquals(bytes2[i], rb2.get(i));
} }
bytesRead = sf.read(rb3); bytesRead = sf.read(rb3);
Assert.assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesRead); Assert.assertEquals(calculateRecordSize(bytes3.length, factory.getAlignment()), bytesRead);
for (int i = 0; i < bytes3.length; i++) { for (int i = 0; i < bytes3.length; i++) {
Assert.assertEquals(bytes3[i], rb3.get(i)); Assert.assertEquals(bytes3[i], rb3.get(i));
} }
@ -291,19 +291,19 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
sf.write(wrapBuffer(bytes1), true); sf.write(wrapBuffer(bytes1), true);
long bytesWritten = sf.position() - initialPos; long bytesWritten = sf.position() - initialPos;
Assert.assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten); Assert.assertEquals(calculateRecordSize(bytes1.length, factory.getAlignment()), bytesWritten);
initialPos = sf.position(); initialPos = sf.position();
sf.write(wrapBuffer(bytes2), true); sf.write(wrapBuffer(bytes2), true);
bytesWritten = sf.position() - initialPos; bytesWritten = sf.position() - initialPos;
Assert.assertEquals(calculateRecordSize(bytes2.length, sf.getAlignment()), bytesWritten); Assert.assertEquals(calculateRecordSize(bytes2.length, factory.getAlignment()), bytesWritten);
initialPos = sf.position(); initialPos = sf.position();
sf.write(wrapBuffer(bytes3), true); sf.write(wrapBuffer(bytes3), true);
bytesWritten = sf.position() - initialPos; bytesWritten = sf.position() - initialPos;
Assert.assertEquals(calculateRecordSize(bytes3.length, sf.getAlignment()), bytesWritten); Assert.assertEquals(calculateRecordSize(bytes3.length, factory.getAlignment()), bytesWritten);
byte[] rbytes1 = new byte[bytes1.length]; byte[] rbytes1 = new byte[bytes1.length];
@ -315,7 +315,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
ByteBuffer rb2 = factory.newBuffer(rbytes2.length); ByteBuffer rb2 = factory.newBuffer(rbytes2.length);
ByteBuffer rb3 = factory.newBuffer(rbytes3.length); ByteBuffer rb3 = factory.newBuffer(rbytes3.length);
sf.position(calculateRecordSize(bytes1.length, sf.getAlignment()) + calculateRecordSize(bytes2.length, sf.getAlignment())); sf.position(calculateRecordSize(bytes1.length, factory.getAlignment()) + calculateRecordSize(bytes2.length, factory.getAlignment()));
int bytesRead = sf.read(rb3); int bytesRead = sf.read(rb3);
Assert.assertEquals(rb3.limit(), bytesRead); Assert.assertEquals(rb3.limit(), bytesRead);
@ -361,7 +361,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
sf.write(wrapBuffer(bytes1), true); sf.write(wrapBuffer(bytes1), true);
long bytesWritten = sf.position() - initialPos; long bytesWritten = sf.position() - initialPos;
Assert.assertEquals(calculateRecordSize(bytes1.length, sf.getAlignment()), bytesWritten); Assert.assertEquals(calculateRecordSize(bytes1.length, factory.getAlignment()), bytesWritten);
sf.close(); sf.close();
@ -386,7 +386,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
return ActiveMQBuffers.wrappedBuffer(bytes); return ActiveMQBuffers.wrappedBuffer(bytes);
} }
protected void checkFill(final SequentialFile file, final int size) throws Exception { protected void checkFill(final SequentialFileFactory factory, final SequentialFile file, final int size) throws Exception {
file.fill(size); file.fill(size);
file.close(); file.close();
@ -399,7 +399,7 @@ public abstract class SequentialFileFactoryTestBase extends ActiveMQTestBase {
int bytesRead = file.read(bb); int bytesRead = file.read(bb);
Assert.assertEquals(calculateRecordSize(size, file.getAlignment()), bytesRead); Assert.assertEquals(calculateRecordSize(size, factory.getAlignment()), bytesRead);
for (int i = 0; i < size; i++) { for (int i = 0; i < size; i++) {
// log.debug(" i is " + i); // log.debug(" i is " + i);

View File

@ -37,7 +37,7 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
private final Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<>(); private final Map<String, FakeSequentialFile> fileMap = new ConcurrentHashMap<>();
private final int alignment; private volatile int alignment;
private final boolean supportsCallback; private final boolean supportsCallback;
@ -198,6 +198,12 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
return alignment; return alignment;
} }
@Override
public FakeSequentialFileFactory setAlignment(int alignment) {
this.alignment = alignment;
return this;
}
// Package protected --------------------------------------------- // Package protected ---------------------------------------------
// Protected ----------------------------------------------------- // Protected -----------------------------------------------------
@ -462,11 +468,6 @@ public class FakeSequentialFileFactory implements SequentialFileFactory {
} }
} }
@Override
public int getAlignment() throws Exception {
return alignment;
}
@Override @Override
public int calculateBlockStart(final int position) throws Exception { public int calculateBlockStart(final int position) throws Exception {
int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment; int pos = (position / alignment + (position % alignment != 0 ? 1 : 0)) * alignment;