https://issues.apache.org/jira/browse/AMQ-5438 - improve kahadb archive logs. This closes #50

This commit is contained in:
Dejan Bosanac 2014-12-04 14:04:23 +01:00
parent 5d77b395f6
commit 802e527ea4
5 changed files with 52 additions and 25 deletions

View File

@ -20,12 +20,7 @@ import java.io.File;
import java.io.FileFilter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.*;
import javax.transaction.xa.Xid;
@ -518,6 +513,10 @@ public class MultiKahaDBPersistenceAdapter extends LockableServiceSupport implem
return transactionStore.getJournalMaxWriteBatchSize();
}
public List<PersistenceAdapter> getAdapters() {
return Collections.unmodifiableList(adapters);
}
@Override
public String toString() {
String path = getDirectory() != null ? getDirectory().getAbsolutePath() : "DIRECTORY_NOT_SET";

View File

@ -444,4 +444,5 @@ public class MultiKahaDBTransactionStore implements TransactionStore {
}
destination.acknowledge(context, clientId, subscriptionName, messageId, ack);
}
}

View File

@ -20,23 +20,14 @@ import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.TreeMap;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Adler32;
import java.util.zip.Checksum;
import org.apache.activemq.store.kahadb.disk.util.LinkedNode;
import org.apache.activemq.util.IOHelper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.activemq.util.ByteSequence;
@ -95,7 +86,9 @@ public class Journal {
protected final Map<WriteKey, WriteCommand> inflightWrites = new ConcurrentHashMap<WriteKey, WriteCommand>();
protected File directory = new File(DEFAULT_DIRECTORY);
protected File directoryArchive = new File(DEFAULT_ARCHIVE_DIRECTORY);
protected File directoryArchive;
private boolean directoryArchiveOverridden = false;
protected String filePrefix = DEFAULT_FILE_PREFIX;
protected String fileSuffix = DEFAULT_FILE_SUFFIX;
protected boolean started;
@ -121,6 +114,12 @@ public class Journal {
protected boolean enableAsyncDiskSync = true;
private Timer timer;
public interface DataFileRemovedListener {
void fileRemoved(DataFile datafile);
}
private DataFileRemovedListener dataFileRemovedListener;
public synchronized void start() throws IOException {
if (started) {
return;
@ -434,14 +433,30 @@ public class Journal {
totalLength.addAndGet(-dataFile.getLength());
dataFile.unlink();
if (archiveDataLogs) {
dataFile.move(getDirectoryArchive());
LOG.debug("moved data file " + dataFile + " to " + getDirectoryArchive());
File directoryArchive = getDirectoryArchive();
if (directoryArchive.exists()) {
LOG.debug("Archive directory exists: {}", directoryArchive);
} else {
if ( dataFile.delete() ) {
LOG.debug("Discarded data file " + dataFile);
} else {
LOG.warn("Failed to discard data file " + dataFile.getFile());
if (directoryArchive.isAbsolute())
if (LOG.isDebugEnabled()) {
LOG.debug("Archive directory [{}] does not exist - creating it now",
directoryArchive.getAbsolutePath());
}
IOHelper.mkdirs(directoryArchive);
}
LOG.debug("Moving data file {} to {} ", dataFile, directoryArchive.getCanonicalPath());
dataFile.move(directoryArchive);
LOG.debug("Successfully moved data file");
} else {
LOG.debug("Deleting data file: {}", dataFile);
if ( dataFile.delete() ) {
LOG.debug("Discarded data file: {}", dataFile);
} else {
LOG.warn("Failed to discard data file : {}", dataFile.getFile());
}
}
if (dataFileRemovedListener != null) {
dataFileRemovedListener.fileRemoved(dataFile);
}
}
@ -657,10 +672,16 @@ public class Journal {
}
public File getDirectoryArchive() {
if (!directoryArchiveOverridden && (directoryArchive == null)) {
// create the directoryArchive relative to the journal location
directoryArchive = new File(directory.getAbsolutePath() +
File.separator + DEFAULT_ARCHIVE_DIRECTORY);
}
return directoryArchive;
}
public void setDirectoryArchive(File directoryArchive) {
directoryArchiveOverridden = true;
this.directoryArchive = directoryArchive;
}
@ -760,6 +781,10 @@ public class Journal {
return enableAsyncDiskSync;
}
public void setDataFileRemovedListener(DataFileRemovedListener dataFileRemovedListener) {
this.dataFileRemovedListener = dataFileRemovedListener;
}
public static class WriteCommand extends LinkedNode<WriteCommand> {
public final Location location;
public final ByteSequence data;

View File

@ -28,6 +28,8 @@ log4j.rootLogger=INFO, out, stdout
#log4j.logger.org.apache.activemq.store.kahadb=TRACE
#log4j.logger.org.apache.activemq.broker.region.cursors.AbstractStoreCursor=DEBUG
#log4j.logger.org.apache.activemq.store.jdbc.JDBCMessageStore=DEBUG
#log4j.logger.org.apache.activemq.store.kahadb.disk.journal=DEBUG
#log4j.logger.org.apache.activemq.store.kahadb.AbstractKahaDBStore=DEBUG
# CONSOLE appender not used by default
log4j.appender.stdout=org.apache.log4j.ConsoleAppender