mirror of https://github.com/apache/activemq.git
Remove unnecessary zero init block as the java spec requires the allocations to do this already. Fix some warnings.
This commit is contained in:
parent
5d6d42ce97
commit
e89b7a57f1
|
@ -16,28 +16,45 @@
|
|||
*/
|
||||
package org.apache.activemq.store.kahadb.disk.journal;
|
||||
|
||||
import java.io.*;
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.UnsupportedEncodingException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.FileChannel;
|
||||
import java.util.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.Timer;
|
||||
import java.util.TimerTask;
|
||||
import java.util.TreeMap;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.zip.Adler32;
|
||||
import java.util.zip.Checksum;
|
||||
|
||||
import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
|
||||
import org.apache.activemq.store.kahadb.disk.util.SequenceSet;
|
||||
import org.apache.activemq.util.*;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.activemq.store.kahadb.disk.util.LinkedNodeList;
|
||||
import org.apache.activemq.store.kahadb.disk.util.SchedulerTimerTask;
|
||||
import org.apache.activemq.store.kahadb.disk.util.Sequence;
|
||||
import org.apache.activemq.util.ByteSequence;
|
||||
import org.apache.activemq.util.DataByteArrayInputStream;
|
||||
import org.apache.activemq.util.DataByteArrayOutputStream;
|
||||
import org.apache.activemq.util.IOHelper;
|
||||
import org.apache.activemq.util.RecoverableRandomAccessFile;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* Manages DataFiles
|
||||
*
|
||||
*
|
||||
*/
|
||||
public class Journal {
|
||||
public static final String CALLER_BUFFER_APPENDER = "org.apache.kahadb.journal.CALLER_BUFFER_APPENDER";
|
||||
|
@ -88,8 +105,7 @@ public class Journal {
|
|||
}
|
||||
|
||||
private static byte[] createBatchControlRecordHeader() {
|
||||
try {
|
||||
DataByteArrayOutputStream os = new DataByteArrayOutputStream();
|
||||
try (DataByteArrayOutputStream os = new DataByteArrayOutputStream();) {
|
||||
os.writeInt(BATCH_CONTROL_RECORD_SIZE);
|
||||
os.writeByte(BATCH_CONTROL_RECORD_TYPE);
|
||||
os.write(BATCH_CONTROL_RECORD_MAGIC);
|
||||
|
@ -163,6 +179,7 @@ public class Journal {
|
|||
appender = callerBufferAppender ? new CallerBufferingDataFileAppender(this) : new DataFileAppender(this);
|
||||
|
||||
File[] files = directory.listFiles(new FilenameFilter() {
|
||||
@Override
|
||||
public boolean accept(File dir, String n) {
|
||||
return dir.equals(directory) && n.startsWith(filePrefix) && n.endsWith(fileSuffix);
|
||||
}
|
||||
|
@ -217,12 +234,13 @@ public class Journal {
|
|||
totalLength.addAndGet(lastAppendLocation.get().getOffset() - maxFileLength);
|
||||
}
|
||||
|
||||
|
||||
cleanupTask = new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
cleanup();
|
||||
}
|
||||
};
|
||||
|
||||
this.timer = new Timer("KahaDB Scheduler", true);
|
||||
TimerTask task = new SchedulerTimerTask(cleanupTask);
|
||||
this.timer.scheduleAtFixedRate(task, DEFAULT_CLEANUP_INTERVAL,DEFAULT_CLEANUP_INTERVAL);
|
||||
|
@ -230,21 +248,18 @@ public class Journal {
|
|||
LOG.trace("Startup took: "+(end-start)+" ms");
|
||||
}
|
||||
|
||||
|
||||
public void preallocateEntireJournalDataFile(RecoverableRandomAccessFile file) {
|
||||
|
||||
if (PreallocationScope.ENTIRE_JOURNAL == preallocationScope) {
|
||||
|
||||
if (PreallocationStrategy.OS_KERNEL_COPY == preallocationStrategy) {
|
||||
doPreallocationKernelCopy(file);
|
||||
|
||||
}else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
|
||||
} else if (PreallocationStrategy.ZEROS == preallocationStrategy) {
|
||||
doPreallocationZeros(file);
|
||||
}
|
||||
else {
|
||||
} else {
|
||||
doPreallocationSparseFile(file);
|
||||
}
|
||||
}else {
|
||||
} else {
|
||||
LOG.info("Using journal preallocation scope of batch allocation");
|
||||
}
|
||||
}
|
||||
|
@ -260,10 +275,7 @@ public class Journal {
|
|||
|
||||
private void doPreallocationZeros(RecoverableRandomAccessFile file) {
|
||||
ByteBuffer buffer = ByteBuffer.allocate(maxFileLength);
|
||||
for (int i = 0; i < maxFileLength; i++) {
|
||||
buffer.put((byte) 0x00);
|
||||
}
|
||||
buffer.flip();
|
||||
buffer.limit(maxFileLength);
|
||||
|
||||
try {
|
||||
FileChannel channel = file.getChannel();
|
||||
|
@ -385,49 +397,48 @@ public class Journal {
|
|||
}
|
||||
}
|
||||
|
||||
|
||||
public int checkBatchRecord(DataFileAccessor reader, int offset) throws IOException {
|
||||
byte controlRecord[] = new byte[BATCH_CONTROL_RECORD_SIZE];
|
||||
DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);
|
||||
|
||||
reader.readFully(offset, controlRecord);
|
||||
try (DataByteArrayInputStream controlIs = new DataByteArrayInputStream(controlRecord);) {
|
||||
|
||||
// Assert that it's a batch record.
|
||||
for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
|
||||
if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
reader.readFully(offset, controlRecord);
|
||||
|
||||
int size = controlIs.readInt();
|
||||
if( size > MAX_BATCH_SIZE ) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if( isChecksum() ) {
|
||||
|
||||
long expectedChecksum = controlIs.readLong();
|
||||
if( expectedChecksum == 0 ) {
|
||||
// Checksuming was not enabled when the record was stored.
|
||||
// we can't validate the record :(
|
||||
return size;
|
||||
// Assert that it's a batch record.
|
||||
for( int i=0; i < BATCH_CONTROL_RECORD_HEADER.length; i++ ) {
|
||||
if( controlIs.readByte() != BATCH_CONTROL_RECORD_HEADER[i] ) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
byte data[] = new byte[size];
|
||||
reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
|
||||
|
||||
Checksum checksum = new Adler32();
|
||||
checksum.update(data, 0, data.length);
|
||||
|
||||
if( expectedChecksum!=checksum.getValue() ) {
|
||||
int size = controlIs.readInt();
|
||||
if( size > MAX_BATCH_SIZE ) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
if( isChecksum() ) {
|
||||
|
||||
long expectedChecksum = controlIs.readLong();
|
||||
if( expectedChecksum == 0 ) {
|
||||
// Checksuming was not enabled when the record was stored.
|
||||
// we can't validate the record :(
|
||||
return size;
|
||||
}
|
||||
|
||||
byte data[] = new byte[size];
|
||||
reader.readFully(offset+BATCH_CONTROL_RECORD_SIZE, data);
|
||||
|
||||
Checksum checksum = new Adler32();
|
||||
checksum.update(data, 0, data.length);
|
||||
|
||||
if( expectedChecksum!=checksum.getValue() ) {
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
return size;
|
||||
}
|
||||
return size;
|
||||
}
|
||||
|
||||
|
||||
void addToTotalLength(int size) {
|
||||
totalLength.addAndGet(size);
|
||||
}
|
||||
|
@ -784,6 +795,7 @@ public class Journal {
|
|||
public void setReplicationTarget(ReplicationTarget replicationTarget) {
|
||||
this.replicationTarget = replicationTarget;
|
||||
}
|
||||
|
||||
public ReplicationTarget getReplicationTarget() {
|
||||
return replicationTarget;
|
||||
}
|
||||
|
@ -869,10 +881,12 @@ public class Journal {
|
|||
hash = (int)(file ^ offset);
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return hash;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj instanceof WriteKey) {
|
||||
WriteKey di = (WriteKey)obj;
|
||||
|
|
Loading…
Reference in New Issue