AMQ-7143 - Temporary transaction file (PageFile) being opened and closed many times, causing poor performance on high latency FS as NFS

This commit is contained in:
Alan Protasio 2019-01-29 14:44:06 -08:00 committed by Christopher L. Shannon (cshannon)
parent ec9daee6c3
commit 9e65435517
3 changed files with 158 additions and 12 deletions

View File

@ -30,6 +30,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
@ -144,7 +145,7 @@ public class PageFile {
// Persistent settings stored in the page file.
private MetaData metaData;
private final ArrayList<File> tmpFilesForRemoval = new ArrayList<File>();
private final HashMap<File, RandomAccessFile> tmpFilesForRemoval = new HashMap<>();
private boolean useLFRUEviction = false;
private float LFUEvictionFactor = 0.2f;
@ -197,12 +198,18 @@ public class PageFile {
return page;
}
public byte[] getDiskBound() throws IOException {
public byte[] getDiskBound(HashMap<File, RandomAccessFile> tmpFiles) throws IOException {
if (diskBound == null && diskBoundLocation != -1) {
diskBound = new byte[length];
try(RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) {
if (tmpFiles.containsKey(tmpFile) && tmpFiles.get(tmpFile).getChannel().isOpen()) {
RandomAccessFile file = tmpFiles.get(tmpFile);
file.seek(diskBoundLocation);
file.read(diskBound);
} else {
try (RandomAccessFile file = new RandomAccessFile(tmpFile, "r")) {
file.seek(diskBoundLocation);
file.read(diskBound);
}
}
diskBoundLocation = -1;
}
@ -1144,12 +1151,12 @@ public class PageFile {
for (PageWrite w : batch) {
try {
checksum.update(w.getDiskBound(), 0, pageSize);
checksum.update(w.getDiskBound(tmpFilesForRemoval), 0, pageSize);
} catch (Throwable t) {
throw IOExceptionSupport.create("Cannot create recovery file. Reason: " + t, t);
}
recoveryFile.writeLong(w.page.getPageId());
recoveryFile.write(w.getDiskBound(), 0, pageSize);
recoveryFile.write(w.getDiskBound(tmpFilesForRemoval), 0, pageSize);
}
// Can we shrink the recovery buffer??
@ -1176,7 +1183,7 @@ public class PageFile {
for (PageWrite w : batch) {
writeFile.seek(toOffset(w.page.getPageId()));
writeFile.write(w.getDiskBound(), 0, pageSize);
writeFile.write(w.getDiskBound(tmpFilesForRemoval), 0, pageSize);
w.done();
}
@ -1197,7 +1204,8 @@ public class PageFile {
// the write cache.
if (w.isDone()) {
writes.remove(w.page.getPageId());
if (w.tmpFile != null && tmpFilesForRemoval.contains(w.tmpFile)) {
if (w.tmpFile != null && tmpFilesForRemoval.containsKey(w.tmpFile)) {
tmpFilesForRemoval.get(w.tmpFile).close();
if (!w.tmpFile.delete()) {
throw new IOException("Can't delete temporary KahaDB transaction file:" + w.tmpFile);
}
@ -1213,8 +1221,12 @@ public class PageFile {
}
}
public void removeTmpFile(File file) {
tmpFilesForRemoval.add(file);
public void removeTmpFile(File file, RandomAccessFile randomAccessFile) throws IOException {
if (!tmpFilesForRemoval.containsKey(file)) {
tmpFilesForRemoval.put(file, randomAccessFile);
} else {
randomAccessFile.close();
}
}
private long recoveryFileSizeForPages(int pageCount) {

View File

@ -656,8 +656,7 @@ public class Transaction implements Iterable<Page> {
public void commit() throws IOException {
if( writeTransactionId!=-1 ) {
if (tmpFile != null) {
tmpFile.close();
pageFile.removeTmpFile(getTempFile());
pageFile.removeTmpFile(getTempFile(), tmpFile);
tmpFile = null;
txFile = null;
}
@ -683,7 +682,7 @@ public class Transaction implements Iterable<Page> {
if( writeTransactionId!=-1 ) {
if (tmpFile != null) {
tmpFile.close();
pageFile.removeTmpFile(getTempFile());
getTempFile().delete();
tmpFile = null;
txFile = null;
}

View File

@ -0,0 +1,135 @@
package org.apache.activemq.store.kahadb.disk.page;
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import junit.framework.TestCase;
import org.apache.activemq.store.kahadb.disk.util.Marshaller;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
public class TransactionTest extends TestCase {
private static long NUMBER_OF_BYTES = 10485760L;
static class TransactionTestMarshaller implements Marshaller<List<Byte>> {
public static TransactionTestMarshaller INSTANCE = new TransactionTestMarshaller();
@Override
public void writePayload(final List<Byte> object, final DataOutput dataOut) throws IOException {
for (Byte b : object) {
dataOut.write(b);
}
}
@Override
public List<Byte> readPayload(final DataInput dataIn) throws IOException {
List<Byte> result = new ArrayList<>();
for (int i = 0; i < NUMBER_OF_BYTES; i++) {
result.add(dataIn.readByte());
}
return result;
}
@Override
public int getFixedSize() {
return 0;
}
@Override
public boolean isDeepCopySupported() {
return false;
}
@Override
public List<Byte> deepCopy(final List<Byte> source) {
return new ArrayList<>(source);
}
}
public void testDeleteTempFileWhenRollback() throws IOException {
PageFile pf = new PageFile(new File("target/test-data"), getName());
pf.delete();
pf.setEnablePageCaching(false);
pf.load();
System.setProperty("maxKahaDBTxSize", "" + (1024*1024));
Transaction tx = pf.tx();
Page<List<Byte>> page = tx.allocate();
page.set(getBytes());
tx.store(page, TransactionTestMarshaller.INSTANCE, true);
File tempFile = tx.getTempFile();
assertTrue(tempFile.exists());
tx.rollback();
pf.flush();
assertFalse(tempFile.exists());
}
public void testHugeTransaction() throws IOException {
PageFile pf = new PageFile(new File("target/test-data"), getName());
pf.delete();
pf.setEnablePageCaching(false);
pf.load();
System.setProperty("maxKahaDBTxSize", "" + (1024*1024));
Transaction tx = pf.tx();
Page<List<Byte>> page = tx.allocate();
List<Byte> bytes = getBytes();
page.set(bytes);
tx.store(page, TransactionTestMarshaller.INSTANCE, true);
tx.commit();
pf.flush();
tx = pf.tx();
page = tx.load(page.getPageId(), TransactionTestMarshaller.INSTANCE);
for (int i = 0; i < NUMBER_OF_BYTES; i++) {
assertEquals(bytes.get(i), page.get().get(i));
}
}
private List<Byte> getBytes() {
List<Byte> bytes = new ArrayList<>();
byte b = 0;
for (int i = 0; i < NUMBER_OF_BYTES; i++) {
bytes.add(b++);
}
return bytes;
}
}