HDFS-3678. Edit log files are never being purged from 2NN. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1398604 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-10-16 01:31:43 +00:00
parent 7a9f21aeaf
commit 32728e64bb
8 changed files with 135 additions and 27 deletions

View File

@ -109,6 +109,8 @@ Release 2.0.3-alpha - Unreleased
HDFS-4049. Fix hflush performance regression due to nagling delays HDFS-4049. Fix hflush performance regression due to nagling delays
(todd) (todd)
HDFS-3678. Edit log files are never being purged from 2NN. (atm)
Release 2.0.2-alpha - 2012-09-07 Release 2.0.2-alpha - 2012-09-07
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -61,7 +61,7 @@ import com.google.common.collect.Lists;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class FSEditLog { public class FSEditLog implements LogsPurgeable {
static final Log LOG = LogFactory.getLog(FSEditLog.class); static final Log LOG = LogFactory.getLog(FSEditLog.class);
@ -944,6 +944,7 @@ public class FSEditLog {
/** /**
* Archive any log files that are older than the given txid. * Archive any log files that are older than the given txid.
*/ */
@Override
public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) { public synchronized void purgeLogsOlderThan(final long minTxIdToKeep) {
assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op assert curSegmentTxId == HdfsConstants.INVALID_TXID || // on format this is no-op
minTxIdToKeep <= curSegmentTxId : minTxIdToKeep <= curSegmentTxId :

View File

@ -90,7 +90,7 @@ public class FSImage implements Closeable {
final private Configuration conf; final private Configuration conf;
private final NNStorageRetentionManager archivalManager; protected NNStorageRetentionManager archivalManager;
/** /**
* Construct an FSImage * Construct an FSImage

View File

@ -33,7 +33,7 @@ import org.apache.hadoop.classification.InterfaceStability;
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public interface JournalManager extends Closeable { public interface JournalManager extends Closeable, LogsPurgeable {
/** /**
* Begin writing to a new segment of the log stream, which starts at * Begin writing to a new segment of the log stream, which starts at
* the given transaction ID. * the given transaction ID.
@ -64,17 +64,6 @@ public interface JournalManager extends Closeable {
*/ */
void setOutputBufferCapacity(int size); void setOutputBufferCapacity(int size);
/**
* The JournalManager may archive/purge any logs for transactions less than
* or equal to minImageTxId.
*
* @param minTxIdToKeep the earliest txid that must be retained after purging
* old logs
* @throws IOException if purging fails
*/
void purgeLogsOlderThan(long minTxIdToKeep)
throws IOException;
/** /**
* Recover segments which have not been finalized. * Recover segments which have not been finalized.
*/ */

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException;
/**
* Interface used to abstract over classes which manage edit logs that may need
* to be purged.
*/
interface LogsPurgeable {
/**
* Remove all edit logs with transaction IDs lower than the given transaction
* ID.
*
* @param minTxIdToKeep the lowest transaction ID that should be retained
* @throws IOException in the event of error
*/
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException;
}

View File

@ -52,12 +52,12 @@ public class NNStorageRetentionManager {
NNStorageRetentionManager.class); NNStorageRetentionManager.class);
private final NNStorage storage; private final NNStorage storage;
private final StoragePurger purger; private final StoragePurger purger;
private final FSEditLog editLog; private final LogsPurgeable purgeableLogs;
public NNStorageRetentionManager( public NNStorageRetentionManager(
Configuration conf, Configuration conf,
NNStorage storage, NNStorage storage,
FSEditLog editLog, LogsPurgeable purgeableLogs,
StoragePurger purger) { StoragePurger purger) {
this.numCheckpointsToRetain = conf.getInt( this.numCheckpointsToRetain = conf.getInt(
DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY, DFSConfigKeys.DFS_NAMENODE_NUM_CHECKPOINTS_RETAINED_KEY,
@ -72,13 +72,13 @@ public class NNStorageRetentionManager {
" must not be negative"); " must not be negative");
this.storage = storage; this.storage = storage;
this.editLog = editLog; this.purgeableLogs = purgeableLogs;
this.purger = purger; this.purger = purger;
} }
public NNStorageRetentionManager(Configuration conf, NNStorage storage, public NNStorageRetentionManager(Configuration conf, NNStorage storage,
FSEditLog editLog) { LogsPurgeable purgeableLogs) {
this(conf, storage, editLog, new DeletionStoragePurger()); this(conf, storage, purgeableLogs, new DeletionStoragePurger());
} }
public void purgeOldStorage() throws IOException { public void purgeOldStorage() throws IOException {
@ -95,7 +95,7 @@ public class NNStorageRetentionManager {
// handy for HA, where a remote node may not have as many // handy for HA, where a remote node may not have as many
// new images. // new images.
long purgeLogsFrom = Math.max(0, minImageTxId + 1 - numExtraEditsToRetain); long purgeLogsFrom = Math.max(0, minImageTxId + 1 - numExtraEditsToRetain);
editLog.purgeLogsOlderThan(purgeLogsFrom); purgeableLogs.purgeLogsOlderThan(purgeLogsFrom);
} }
private void purgeCheckpointsOlderThan( private void purgeCheckpointsOlderThan(
@ -103,7 +103,6 @@ public class NNStorageRetentionManager {
long minTxId) { long minTxId) {
for (FSImageFile image : inspector.getFoundImages()) { for (FSImageFile image : inspector.getFoundImages()) {
if (image.getCheckpointTxId() < minTxId) { if (image.getCheckpointTxId() < minTxId) {
LOG.info("Purging old image " + image);
purger.purgeImage(image); purger.purgeImage(image);
} }
} }
@ -146,11 +145,13 @@ public class NNStorageRetentionManager {
static class DeletionStoragePurger implements StoragePurger { static class DeletionStoragePurger implements StoragePurger {
@Override @Override
public void purgeLog(EditLogFile log) { public void purgeLog(EditLogFile log) {
LOG.info("Purging old edit log " + log);
deleteOrWarn(log.getFile()); deleteOrWarn(log.getFile());
} }
@Override @Override
public void purgeImage(FSImageFile image) { public void purgeImage(FSImageFile image) {
LOG.info("Purging old image " + image);
deleteOrWarn(image.getFile()); deleteOrWarn(image.getFile());
deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile())); deleteOrWarn(MD5FileUtils.getDigestFileForFile(image.getFile()));
} }

View File

@ -58,6 +58,8 @@ import org.apache.hadoop.hdfs.server.common.Storage.StorageState;
import static org.apache.hadoop.util.ExitUtil.terminate; import static org.apache.hadoop.util.ExitUtil.terminate;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@ -490,10 +492,6 @@ public class SecondaryNameNode implements Runnable {
LOG.warn("Checkpoint done. New Image Size: " LOG.warn("Checkpoint done. New Image Size: "
+ dstStorage.getFsImageName(txid).length()); + dstStorage.getFsImageName(txid).length());
// Since we've successfully checkpointed, we can remove some old
// image files
checkpointImage.purgeOldStorage();
return loadImage; return loadImage;
} }
@ -728,6 +726,34 @@ public class SecondaryNameNode implements Runnable {
} }
static class CheckpointStorage extends FSImage { static class CheckpointStorage extends FSImage {
private static class CheckpointLogPurger implements LogsPurgeable {
private NNStorage storage;
private StoragePurger purger
= new NNStorageRetentionManager.DeletionStoragePurger();
public CheckpointLogPurger(NNStorage storage) {
this.storage = storage;
}
@Override
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
Iterator<StorageDirectory> iter = storage.dirIterator();
while (iter.hasNext()) {
StorageDirectory dir = iter.next();
List<EditLogFile> editFiles = FileJournalManager.matchEditLogs(
dir.getCurrentDir());
for (EditLogFile f : editFiles) {
if (f.getLastTxId() < minTxIdToKeep) {
purger.purgeLog(f);
}
}
}
}
}
/** /**
* Construct a checkpoint image. * Construct a checkpoint image.
* @param conf Node configuration. * @param conf Node configuration.
@ -744,6 +770,11 @@ public class SecondaryNameNode implements Runnable {
// we shouldn't have any editLog instance. Setting to null // we shouldn't have any editLog instance. Setting to null
// makes sure we don't accidentally depend on it. // makes sure we don't accidentally depend on it.
editLog = null; editLog = null;
// Replace the archival manager with one that can actually work on the
// 2NN's edits storage.
this.archivalManager = new NNStorageRetentionManager(conf, storage,
new CheckpointLogPurger(storage));
} }
/** /**
@ -840,6 +871,7 @@ public class SecondaryNameNode implements Runnable {
} }
Checkpointer.rollForwardByApplyingLogs(manifest, dstImage, dstNamesystem); Checkpointer.rollForwardByApplyingLogs(manifest, dstImage, dstNamesystem);
// The following has the side effect of purging old fsimages/edit logs.
dstImage.saveFSImageInAllDirs(dstNamesystem, dstImage.getLastAppliedTxId()); dstImage.saveFSImageInAllDirs(dstNamesystem, dstImage.getLastAppliedTxId());
dstStorage.writeAll(); dstStorage.writeAll();
} }

View File

@ -28,6 +28,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException; import java.io.IOException;
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo; import java.lang.management.ThreadInfo;
@ -62,6 +63,7 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.CheckpointStorage; import org.apache.hadoop.hdfs.server.namenode.SecondaryNameNode.CheckpointStorage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
@ -1853,6 +1855,50 @@ public class TestCheckpoint {
} }
} }
/**
* Regression test for HDFS-3678 "Edit log files are never being purged from 2NN"
*/
@Test
public void testSecondaryPurgesEditLogs() throws IOException {
MiniDFSCluster cluster = null;
SecondaryNameNode secondary = null;
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_NUM_EXTRA_EDITS_RETAINED_KEY, 0);
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(0)
.format(true).build();
FileSystem fs = cluster.getFileSystem();
fs.mkdirs(new Path("/foo"));
secondary = startSecondaryNameNode(conf);
// Checkpoint a few times. Doing this will cause a log roll, and thus
// several edit log segments on the 2NN.
for (int i = 0; i < 5; i++) {
secondary.doCheckpoint();
}
// Make sure there are no more edit log files than there should be.
List<File> checkpointDirs = getCheckpointCurrentDirs(secondary);
for (File checkpointDir : checkpointDirs) {
List<EditLogFile> editsFiles = FileJournalManager.matchEditLogs(
checkpointDir);
assertEquals("Edit log files were not purged from 2NN", 1,
editsFiles.size());
}
} finally {
if (secondary != null) {
secondary.shutdown();
}
if (cluster != null) {
cluster.shutdown();
}
}
}
/** /**
* Regression test for HDFS-3835 - "Long-lived 2NN cannot perform a * Regression test for HDFS-3835 - "Long-lived 2NN cannot perform a
* checkpoint if security is enabled and the NN restarts without outstanding * checkpoint if security is enabled and the NN restarts without outstanding
@ -2010,7 +2056,7 @@ public class TestCheckpoint {
ImmutableSet.of("VERSION")); ImmutableSet.of("VERSION"));
} }
private List<File> getCheckpointCurrentDirs(SecondaryNameNode secondary) { private static List<File> getCheckpointCurrentDirs(SecondaryNameNode secondary) {
List<File> ret = Lists.newArrayList(); List<File> ret = Lists.newArrayList();
for (URI u : secondary.getCheckpointDirs()) { for (URI u : secondary.getCheckpointDirs()) {
File checkpointDir = new File(u.getPath()); File checkpointDir = new File(u.getPath());
@ -2019,7 +2065,7 @@ public class TestCheckpoint {
return ret; return ret;
} }
private CheckpointStorage spyOnSecondaryImage(SecondaryNameNode secondary1) { private static CheckpointStorage spyOnSecondaryImage(SecondaryNameNode secondary1) {
CheckpointStorage spy = Mockito.spy((CheckpointStorage)secondary1.getFSImage());; CheckpointStorage spy = Mockito.spy((CheckpointStorage)secondary1.getFSImage());;
secondary1.setFSImage(spy); secondary1.setFSImage(spy);
return spy; return spy;