HDFS-7929. inotify unable fetch pre-upgrade edit log segments once upgrade starts (Zhe Zhang via Colin P. McCabe)

(cherry picked from commit 43b41f2241)
This commit is contained in:
Colin Patrick Mccabe 2015-03-18 18:48:54 -07:00
parent 5a5b244648
commit 219eb22c15
5 changed files with 92 additions and 6 deletions

View File

@ -902,6 +902,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7587. Edit log corruption can happen if append fails with a quota
violation. (jing9)
HDFS-7929. inotify unable fetch pre-upgrade edit log segments once upgrade
starts (Zhe Zhang via Colin P. McCabe)
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

View File

@ -406,7 +406,7 @@ public class FSImage implements Closeable {
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
StorageDirectory sd = it.next();
try {
NNUpgradeUtil.doPreUpgrade(sd);
NNUpgradeUtil.doPreUpgrade(conf, sd);
} catch (Exception e) {
LOG.error("Failed to move aside pre-upgrade storage " +
"in image directory " + sd.getRoot(), e);

View File

@ -585,7 +585,7 @@ public class FileJournalManager implements JournalManager {
public void doPreUpgrade() throws IOException {
LOG.info("Starting upgrade of edits directory " + sd.getRoot());
try {
NNUpgradeUtil.doPreUpgrade(sd);
NNUpgradeUtil.doPreUpgrade(conf, sd);
} catch (IOException ioe) {
LOG.error("Failed to move aside pre-upgrade storage " +
"in image directory " + sd.getRoot(), ioe);

View File

@ -18,15 +18,19 @@
package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import com.google.common.base.Preconditions;
import org.apache.hadoop.io.IOUtils;
abstract class NNUpgradeUtil {
@ -100,14 +104,16 @@ abstract class NNUpgradeUtil {
* doUpgrade will not be called for any JM. The existing current dir is
* renamed to previous.tmp, and then a new, empty current dir is created.
*
* @param conf configuration for creating {@link EditLogFileOutputStream}
* @param sd the storage directory to perform the pre-upgrade procedure.
* @throws IOException in the event of error
*/
static void doPreUpgrade(StorageDirectory sd) throws IOException {
static void doPreUpgrade(Configuration conf, StorageDirectory sd)
throws IOException {
LOG.info("Starting upgrade of storage directory " + sd.getRoot());
File curDir = sd.getCurrentDir();
File prevDir = sd.getPreviousDir();
File tmpDir = sd.getPreviousTmp();
final File tmpDir = sd.getPreviousTmp();
Preconditions.checkState(curDir.exists(),
"Current directory must exist for preupgrade.");
@ -123,6 +129,35 @@ abstract class NNUpgradeUtil {
if (!curDir.mkdir()) {
throw new IOException("Cannot create directory " + curDir);
}
List<String> fileNameList = IOUtils.listDirectory(tmpDir, new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return dir.equals(tmpDir)
&& name.startsWith(NNStorage.NameNodeFile.EDITS.getName());
}
});
for (String s : fileNameList) {
File prevFile = new File(tmpDir, s);
Preconditions.checkState(prevFile.canRead(),
"Edits log file " + s + " is not readable.");
File newFile = new File(curDir, prevFile.getName());
Preconditions.checkState(newFile.createNewFile(),
"Cannot create new edits log file in " + curDir);
EditLogFileInputStream in = new EditLogFileInputStream(prevFile);
EditLogFileOutputStream out =
new EditLogFileOutputStream(conf, newFile, 512*1024);
FSEditLogOp logOp = in.nextValidOp();
while (logOp != null) {
out.write(logOp);
logOp = in.nextOp();
}
out.setReadyToFlush();
out.flushAndSync(true);
out.close();
in.close();
}
}
/**

View File

@ -28,7 +28,10 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.regex.Pattern;
import org.apache.commons.logging.Log;
@ -42,7 +45,9 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.TestParallelImageWrite;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.StringUtils;
import org.junit.BeforeClass;
@ -451,6 +456,49 @@ public class TestDFSUpgrade {
}
}
@Test
public void testPreserveEditLogs() throws Exception {
conf = new HdfsConfiguration();
conf = UpgradeUtilities.initializeStorageStateConf(1, conf);
String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
log("Normal NameNode upgrade", 1);
File[] created =
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
List<String> beforeUpgrade = new LinkedList<>();
for (final File createdDir : created) {
List<String> fileNameList =
IOUtils.listDirectory(createdDir, EditLogsFilter.INSTANCE);
beforeUpgrade.addAll(fileNameList);
}
cluster = createCluster();
List<String> afterUpgrade = new LinkedList<>();
for (final File createdDir : created) {
List<String> fileNameList =
IOUtils.listDirectory(createdDir, EditLogsFilter.INSTANCE);
afterUpgrade.addAll(fileNameList);
}
for (String s : beforeUpgrade) {
assertTrue(afterUpgrade.contains(s));
}
cluster.shutdown();
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
}
private static enum EditLogsFilter implements FilenameFilter {
INSTANCE;
@Override
public boolean accept(File dir, String name) {
return name.startsWith(NNStorage.NameNodeFile.EDITS.getName());
}
}
public static void main(String[] args) throws Exception {
TestDFSUpgrade t = new TestDFSUpgrade();
TestDFSUpgrade.initialize();