Updated for https://issues.apache.org/jira/browse/AMQ-5578 adds unit tests, logging, and removes the preallocated batch stuff that snuck in there with commit 45e59e6e83 which was by accident.

This commit is contained in:
Christian Posta 2015-02-19 16:34:32 -07:00
parent 37b1b6a211
commit 023b2ac045
5 changed files with 93 additions and 60 deletions

View File

@ -237,7 +237,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
long cleanupInterval = 30*1000;
int journalMaxFileLength = Journal.DEFAULT_MAX_FILE_LENGTH;
int journalMaxWriteBatchSize = Journal.DEFAULT_MAX_WRITE_BATCH_SIZE;
int preallocationBatchSize = Journal.DEFAULT_PREALLOCATION_BATCH_SIZE;
boolean enableIndexWriteAsync = false;
int setIndexWriteBatchSize = PageFile.DEFAULT_WRITE_BATCH_SIZE;
private String preallocationScope = Journal.PreallocationScope.ENTIRE_JOURNAL.name();
@ -2493,7 +2492,6 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
manager.setPreallocationScope(Journal.PreallocationScope.valueOf(preallocationScope.trim().toUpperCase()));
manager.setPreallocationStrategy(
Journal.PreallocationStrategy.valueOf(preallocationStrategy.trim().toUpperCase()));
manager.setPreallocationBatchSize(preallocationBatchSize);
if (getDirectoryArchive() != null) {
IOHelper.mkdirs(getDirectoryArchive());
manager.setDirectoryArchive(getDirectoryArchive());
@ -3199,11 +3197,4 @@ public abstract class MessageDatabase extends ServiceSupport implements BrokerSe
this.preallocationStrategy = preallocationStrategy;
}
public int getPreallocationBatchSize() {
return preallocationBatchSize;
}
public void setPreallocationBatchSize(int preallocationBatchSize) {
this.preallocationBatchSize = preallocationBatchSize;
}
}

View File

@ -42,7 +42,6 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
protected final Integer dataFileId;
protected volatile int length;
protected final SequenceSet corruptedBlocks = new SequenceSet();
protected long preallocationBatchWindow = 0L;
DataFile(File file, int number) {
this.file = file;
@ -68,7 +67,6 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
public synchronized void incrementLength(int size) {
length += size;
preallocationBatchWindow -= size;
}
@Override
@ -115,38 +113,4 @@ public class DataFile extends LinkedNode<DataFile> implements Comparable<DataFil
return dataFileId;
}
public void preallocateJournalBatch(Journal journal, long newMessageSize) {
if (preallocationBatchWindow - newMessageSize <= 0) {
int preallocationBatchSize = Math.min(journal.getPreallocationBatchSize(),
journal.maxFileLength - length);
doPreallocation(preallocationBatchSize);
preallocationBatchWindow = preallocationBatchSize;
}
}
private void doPreallocation(int size) {
try {
RecoverableRandomAccessFile file = openRandomAccessFile();
FileChannel channel = file.getChannel();
channel.position(length+1);
ByteBuffer buffer = generateAllocation(size);
channel.write(buffer);
channel.force(false);
file.close();
} catch (IOException e) {
LOG.debug("Cannot allocate batch for journal, continue without preallocation of batch...");
}
}
private ByteBuffer generateAllocation(int size) {
ByteBuffer rc = ByteBuffer.allocate(size);
for (int i = 0; i < size; i++) {
rc.put((byte) 0x00);
}
rc.flip();
return rc;
}
}

View File

@ -211,10 +211,6 @@ class DataFileAppender implements FileAppender {
file = journal.rotateWriteFile();
}
// will do batch preallocation on the journal if configured
if (journal.preallocationScope == Journal.PreallocationScope.BATCH) {
file.preallocateJournalBatch(journal, write.location.getSize());
}
nextWriteBatch = newWriteBatch(write, file);
enqueueMutex.notifyAll();

View File

@ -61,7 +61,6 @@ public class Journal {
}
public enum PreallocationScope {
BATCH,
ENTIRE_JOURNAL;
}
@ -87,7 +86,6 @@ public class Journal {
public static final int DEFAULT_CLEANUP_INTERVAL = 1000 * 30;
public static final int PREFERED_DIFF = 1024 * 512;
public static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 1024 * 1024 * 4;
public static final int DEFAULT_PREALLOCATION_BATCH_SIZE = 1024 * 1024 * 1;
private static final Logger LOG = LoggerFactory.getLogger(Journal.class);
@ -123,7 +121,6 @@ public class Journal {
protected PreallocationScope preallocationScope = PreallocationScope.ENTIRE_JOURNAL;
protected PreallocationStrategy preallocationStrategy = PreallocationStrategy.SPARSE_FILE;
protected int preallocationBatchSize = DEFAULT_PREALLOCATION_BATCH_SIZE;
public interface DataFileRemovedListener {
void fileRemoved(DataFile datafile);
@ -183,6 +180,10 @@ public class Journal {
getCurrentWriteFile();
if (preallocationStrategy != PreallocationStrategy.SPARSE_FILE && maxFileLength != DEFAULT_MAX_FILE_LENGTH) {
LOG.warn("You are using a preallocation strategy and journal maxFileLength which should be benchmarked accordingly to not introduce unexpected latencies.");
}
if( lastAppendLocation.get()==null ) {
DataFile df = dataFiles.getTail();
lastAppendLocation.set(recoveryCheck(df));
@ -678,14 +679,6 @@ public class Journal {
this.preallocationScope = preallocationScope;
}
public int getPreallocationBatchSize() {
return preallocationBatchSize;
}
public void setPreallocationBatchSize(int preallocationBatchSize) {
this.preallocationBatchSize = preallocationBatchSize;
}
public File getDirectory() {
return directory;
}

View File

@ -0,0 +1,89 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.activemq.store.kahadb.disk.journal;
import org.apache.activemq.store.kahadb.KahaDBStore;
import org.junit.Test;
import java.io.File;
import java.io.FileInputStream;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.Random;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Created by ceposta
* <a href="http://christianposta.com/blog>http://christianposta.com/blog</a>.
*/
public class PreallocationJournalTest {
@Test
public void testSparseFilePreallocation() throws Exception {
executeTest("sparse_file");
}
@Test
public void testOSCopyPreallocation() throws Exception {
executeTest("os_kernel_copy");
}
@Test
public void testZerosPreallocation() throws Exception {
executeTest("zeros");
}
private void executeTest(String preallocationStrategy)throws Exception {
Random rand = new Random();
int randInt = rand.nextInt(100);
File dataDirectory = new File("./target/activemq-data/kahadb" + randInt);
KahaDBStore store = new KahaDBStore();
store.deleteAllMessages();
store.setDirectory(dataDirectory);
store.setPreallocationStrategy(preallocationStrategy);
store.start();
// time for files to get there.. i know this is a brittle test! need to find
// a better way (callbacks?) to notify when the journal is completely up
TimeUnit.MILLISECONDS.sleep(500);
File journalLog = new File(dataDirectory, "db-1.log");
assertTrue(journalLog.exists());
FileInputStream is = new FileInputStream(journalLog);
FileChannel channel = is.getChannel();
assertEquals(Journal.DEFAULT_MAX_FILE_LENGTH, channel.size());
channel.position(1 * 1024 * 1024 + 1);
ByteBuffer buff = ByteBuffer.allocate(1);
channel.read(buff);
buff.flip();
buff.position(0);
assertEquals(0x00, buff.get());
System.out.println("File size: " + channel.size());
store.stop();
}
}