This commit is contained in:
Justin Bertram 2018-12-14 15:10:30 -06:00
commit 0acd706987
6 changed files with 46 additions and 74 deletions

View File

@ -21,11 +21,11 @@ import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.GatheringByteChannel; import java.nio.channels.GatheringByteChannel;
import java.nio.channels.ScatteringByteChannel; import java.nio.channels.ScatteringByteChannel;
import java.nio.file.StandardOpenOption;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
@ -1149,8 +1149,6 @@ public class LargeMessageControllerImpl implements LargeMessageController {
private final File cachedFile; private final File cachedFile;
private volatile RandomAccessFile cachedRAFile;
private volatile FileChannel cachedChannel; private volatile FileChannel cachedChannel;
private synchronized void readCache(final long position) { private synchronized void readCache(final long position) {
@ -1158,7 +1156,7 @@ public class LargeMessageControllerImpl implements LargeMessageController {
try { try {
if (position < readCachePositionStart || position > readCachePositionEnd) { if (position < readCachePositionStart || position > readCachePositionEnd) {
checkOpen(); final FileChannel cachedChannel = checkOpen();
if (position > cachedChannel.size()) { if (position > cachedChannel.size()) {
throw new ArrayIndexOutOfBoundsException("position > " + cachedChannel.size()); throw new ArrayIndexOutOfBoundsException("position > " + cachedChannel.size());
@ -1192,7 +1190,7 @@ public class LargeMessageControllerImpl implements LargeMessageController {
} }
public void cachePackage(final byte[] body) throws Exception { public void cachePackage(final byte[] body) throws Exception {
checkOpen(); final FileChannel cachedChannel = checkOpen();
cachedChannel.position(cachedChannel.size()); cachedChannel.position(cachedChannel.size());
cachedChannel.write(ByteBuffer.wrap(body)); cachedChannel.write(ByteBuffer.wrap(body));
@ -1203,33 +1201,25 @@ public class LargeMessageControllerImpl implements LargeMessageController {
/** /**
* @throws FileNotFoundException * @throws FileNotFoundException
*/ */
public void checkOpen() throws FileNotFoundException { private FileChannel checkOpen() throws IOException {
if (cachedFile != null || !cachedChannel.isOpen()) { FileChannel channel = cachedChannel;
cachedRAFile = new RandomAccessFile(cachedFile, "rw"); if (cachedFile != null || !channel.isOpen()) {
channel = FileChannel.open(cachedFile.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
cachedChannel = cachedRAFile.getChannel(); cachedChannel = channel;
} }
return channel;
} }
public void close() { public void close() {
FileChannel cachedChannel = this.cachedChannel;
if (cachedChannel != null && cachedChannel.isOpen()) { if (cachedChannel != null && cachedChannel.isOpen()) {
this.cachedChannel = null;
try { try {
cachedChannel.close(); cachedChannel.close();
} catch (Exception e) { } catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorClosingCache(e); ActiveMQClientLogger.LOGGER.errorClosingCache(e);
} }
cachedChannel = null;
} }
if (cachedRAFile != null) {
try {
cachedRAFile.close();
} catch (Exception e) {
ActiveMQClientLogger.LOGGER.errorClosingCache(e);
}
cachedRAFile = null;
}
} }
@Override @Override

View File

@ -20,6 +20,7 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.file.Files;
import java.util.List; import java.util.List;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
@ -91,7 +92,10 @@ public abstract class AbstractSequentialFile implements SequentialFile {
close(); close();
} }
if (file.exists() && !file.delete()) { try {
Files.deleteIfExists(file.toPath());
} catch (Throwable t) {
logger.trace("Fine error while deleting file", t);
ActiveMQJournalLogger.LOGGER.errorDeletingFile(this); ActiveMQJournalLogger.LOGGER.errorDeletingFile(this);
} }
} }

View File

@ -19,6 +19,10 @@ package org.apache.activemq.artemis.core.io;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import java.nio.file.Path;
import java.nio.file.StandardOpenOption;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -135,4 +139,24 @@ public interface SequentialFile {
* Returns a native File of the file underlying this sequential file. * Returns a native File of the file underlying this sequential file.
*/ */
File getJavaFile(); File getJavaFile();
static long appendTo(Path src, Path dst) throws IOException {
try (FileChannel srcChannel = FileChannel.open(src, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
FileLock srcLock = srcChannel.lock()) {
final long readableBytes = srcChannel.size();
if (readableBytes > 0) {
try (FileChannel dstChannel = FileChannel.open(dst, StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE);
FileLock dstLock = dstChannel.lock()) {
final long oldLength = dstChannel.size();
final long transferred = dstChannel.transferFrom(srcChannel, oldLength, readableBytes);
if (transferred != readableBytes) {
dstChannel.truncate(oldLength);
throw new IOException("copied less then expected");
}
return transferred;
}
}
return 0;
}
}
} }

View File

@ -18,10 +18,7 @@ package org.apache.activemq.artemis.core.io.mapped;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBuf;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer; import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@ -416,21 +413,7 @@ final class MappedSequentialFile implements SequentialFile {
if (dstFile.isOpen()) { if (dstFile.isOpen()) {
throw new IllegalArgumentException("dstFile must be closed too"); throw new IllegalArgumentException("dstFile must be closed too");
} }
try (RandomAccessFile src = new RandomAccessFile(file, "rw"); FileChannel srcChannel = src.getChannel(); FileLock srcLock = srcChannel.lock()) { SequentialFile.appendTo(file.toPath(), dstFile.getJavaFile().toPath());
final long readableBytes = srcChannel.size();
if (readableBytes > 0) {
try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw"); FileChannel dstChannel = dst.getChannel(); FileLock dstLock = dstChannel.lock()) {
final long oldLength = dst.length();
final long newLength = oldLength + readableBytes;
dst.setLength(newLength);
final long transferred = dstChannel.transferFrom(srcChannel, oldLength, readableBytes);
if (transferred != readableBytes) {
dstChannel.truncate(oldLength);
throw new IOException("copied less then expected");
}
}
}
}
} }
@Override @Override

View File

@ -18,11 +18,10 @@ package org.apache.activemq.artemis.core.io.nio;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.FileLock; import java.nio.file.StandardOpenOption;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import org.apache.activemq.artemis.api.core.ActiveMQException; import org.apache.activemq.artemis.api.core.ActiveMQException;
@ -41,8 +40,6 @@ public class NIOSequentialFile extends AbstractSequentialFile {
private FileChannel channel; private FileChannel channel;
private RandomAccessFile rfile;
private final int maxIO; private final int maxIO;
public NIOSequentialFile(final SequentialFileFactory factory, public NIOSequentialFile(final SequentialFileFactory factory,
@ -76,9 +73,7 @@ public class NIOSequentialFile extends AbstractSequentialFile {
@Override @Override
public void open(final int maxIO, final boolean useExecutor) throws IOException { public void open(final int maxIO, final boolean useExecutor) throws IOException {
try { try {
rfile = new RandomAccessFile(getFile(), "rw"); channel = FileChannel.open(getFile().toPath(), StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ);
channel = rfile.getChannel();
fileSize = channel.size(); fileSize = channel.size();
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
@ -133,10 +128,6 @@ public class NIOSequentialFile extends AbstractSequentialFile {
if (channel != null) { if (channel != null) {
channel.close(); channel.close();
} }
if (rfile != null) {
rfile.close();
}
} catch (ClosedChannelException e) { } catch (ClosedChannelException e) {
throw e; throw e;
} catch (IOException e) { } catch (IOException e) {
@ -145,8 +136,6 @@ public class NIOSequentialFile extends AbstractSequentialFile {
} }
channel = null; channel = null;
rfile = null;
notifyAll(); notifyAll();
} }
@ -333,24 +322,6 @@ public class NIOSequentialFile extends AbstractSequentialFile {
if (dstFile.isOpen()) { if (dstFile.isOpen()) {
throw new IllegalArgumentException("dstFile must be closed too"); throw new IllegalArgumentException("dstFile must be closed too");
} }
try (RandomAccessFile src = new RandomAccessFile(getFile(), "rw"); SequentialFile.appendTo(getFile().toPath(), dstFile.getJavaFile().toPath());
FileChannel srcChannel = src.getChannel();
FileLock srcLock = srcChannel.lock()) {
final long readableBytes = srcChannel.size();
if (readableBytes > 0) {
try (RandomAccessFile dst = new RandomAccessFile(dstFile.getJavaFile(), "rw");
FileChannel dstChannel = dst.getChannel();
FileLock dstLock = dstChannel.lock()) {
final long oldLength = dst.length();
final long newLength = oldLength + readableBytes;
dst.setLength(newLength);
final long transferred = dstChannel.transferFrom(srcChannel, oldLength, readableBytes);
if (transferred != readableBytes) {
dstChannel.truncate(oldLength);
throw new IOException("copied less then expected");
}
}
}
}
} }
} }

View File

@ -18,10 +18,10 @@ package org.apache.activemq.artemis.core.paging.impl;
import java.io.File; import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.nio.MappedByteBuffer; import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
@ -160,7 +160,7 @@ public final class Page implements Comparable<Page> {
} }
private static MappedByteBuffer mapFileForRead(File file, int fileSize) { private static MappedByteBuffer mapFileForRead(File file, int fileSize) {
try (RandomAccessFile raf = new RandomAccessFile(file, "rw"); FileChannel channel = raf.getChannel()) { try (FileChannel channel = FileChannel.open(file.toPath(), StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
return channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize); return channel.map(FileChannel.MapMode.READ_ONLY, 0, fileSize);
} catch (Exception e) { } catch (Exception e) {
throw new IllegalStateException(e); throw new IllegalStateException(e);