HDFS-7929. inotify unable fetch pre-upgrade edit log segments once upgrade starts (Zhe Zhang via Colin P. McCabe)
This commit is contained in:
parent
bee5a6a64a
commit
43b41f2241
|
@ -1202,6 +1202,9 @@ Release 2.7.0 - UNRELEASED
|
||||||
HDFS-7943. Append cannot handle the last block with length greater than
|
HDFS-7943. Append cannot handle the last block with length greater than
|
||||||
the preferred block size. (jing9)
|
the preferred block size. (jing9)
|
||||||
|
|
||||||
|
HDFS-7929. inotify unable fetch pre-upgrade edit log segments once upgrade
|
||||||
|
starts (Zhe Zhang via Colin P. McCabe)
|
||||||
|
|
||||||
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
||||||
|
|
||||||
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
||||||
|
|
|
@ -406,7 +406,7 @@ public class FSImage implements Closeable {
|
||||||
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
|
for (Iterator<StorageDirectory> it = storage.dirIterator(false); it.hasNext();) {
|
||||||
StorageDirectory sd = it.next();
|
StorageDirectory sd = it.next();
|
||||||
try {
|
try {
|
||||||
NNUpgradeUtil.doPreUpgrade(sd);
|
NNUpgradeUtil.doPreUpgrade(conf, sd);
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
LOG.error("Failed to move aside pre-upgrade storage " +
|
LOG.error("Failed to move aside pre-upgrade storage " +
|
||||||
"in image directory " + sd.getRoot(), e);
|
"in image directory " + sd.getRoot(), e);
|
||||||
|
|
|
@ -581,7 +581,7 @@ public class FileJournalManager implements JournalManager {
|
||||||
public void doPreUpgrade() throws IOException {
|
public void doPreUpgrade() throws IOException {
|
||||||
LOG.info("Starting upgrade of edits directory " + sd.getRoot());
|
LOG.info("Starting upgrade of edits directory " + sd.getRoot());
|
||||||
try {
|
try {
|
||||||
NNUpgradeUtil.doPreUpgrade(sd);
|
NNUpgradeUtil.doPreUpgrade(conf, sd);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("Failed to move aside pre-upgrade storage " +
|
LOG.error("Failed to move aside pre-upgrade storage " +
|
||||||
"in image directory " + sd.getRoot(), ioe);
|
"in image directory " + sd.getRoot(), ioe);
|
||||||
|
|
|
@ -18,15 +18,19 @@
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
|
|
||||||
abstract class NNUpgradeUtil {
|
abstract class NNUpgradeUtil {
|
||||||
|
|
||||||
|
@ -100,14 +104,16 @@ abstract class NNUpgradeUtil {
|
||||||
* doUpgrade will not be called for any JM. The existing current dir is
|
* doUpgrade will not be called for any JM. The existing current dir is
|
||||||
* renamed to previous.tmp, and then a new, empty current dir is created.
|
* renamed to previous.tmp, and then a new, empty current dir is created.
|
||||||
*
|
*
|
||||||
|
* @param conf configuration for creating {@link EditLogFileOutputStream}
|
||||||
* @param sd the storage directory to perform the pre-upgrade procedure.
|
* @param sd the storage directory to perform the pre-upgrade procedure.
|
||||||
* @throws IOException in the event of error
|
* @throws IOException in the event of error
|
||||||
*/
|
*/
|
||||||
static void doPreUpgrade(StorageDirectory sd) throws IOException {
|
static void doPreUpgrade(Configuration conf, StorageDirectory sd)
|
||||||
|
throws IOException {
|
||||||
LOG.info("Starting upgrade of storage directory " + sd.getRoot());
|
LOG.info("Starting upgrade of storage directory " + sd.getRoot());
|
||||||
File curDir = sd.getCurrentDir();
|
File curDir = sd.getCurrentDir();
|
||||||
File prevDir = sd.getPreviousDir();
|
File prevDir = sd.getPreviousDir();
|
||||||
File tmpDir = sd.getPreviousTmp();
|
final File tmpDir = sd.getPreviousTmp();
|
||||||
|
|
||||||
Preconditions.checkState(curDir.exists(),
|
Preconditions.checkState(curDir.exists(),
|
||||||
"Current directory must exist for preupgrade.");
|
"Current directory must exist for preupgrade.");
|
||||||
|
@ -123,6 +129,35 @@ abstract class NNUpgradeUtil {
|
||||||
if (!curDir.mkdir()) {
|
if (!curDir.mkdir()) {
|
||||||
throw new IOException("Cannot create directory " + curDir);
|
throw new IOException("Cannot create directory " + curDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
List<String> fileNameList = IOUtils.listDirectory(tmpDir, new FilenameFilter() {
|
||||||
|
@Override
|
||||||
|
public boolean accept(File dir, String name) {
|
||||||
|
return dir.equals(tmpDir)
|
||||||
|
&& name.startsWith(NNStorage.NameNodeFile.EDITS.getName());
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
for (String s : fileNameList) {
|
||||||
|
File prevFile = new File(tmpDir, s);
|
||||||
|
Preconditions.checkState(prevFile.canRead(),
|
||||||
|
"Edits log file " + s + " is not readable.");
|
||||||
|
File newFile = new File(curDir, prevFile.getName());
|
||||||
|
Preconditions.checkState(newFile.createNewFile(),
|
||||||
|
"Cannot create new edits log file in " + curDir);
|
||||||
|
EditLogFileInputStream in = new EditLogFileInputStream(prevFile);
|
||||||
|
EditLogFileOutputStream out =
|
||||||
|
new EditLogFileOutputStream(conf, newFile, 512*1024);
|
||||||
|
FSEditLogOp logOp = in.nextValidOp();
|
||||||
|
while (logOp != null) {
|
||||||
|
out.write(logOp);
|
||||||
|
logOp = in.nextOp();
|
||||||
|
}
|
||||||
|
out.setReadyToFlush();
|
||||||
|
out.flushAndSync(true);
|
||||||
|
out.close();
|
||||||
|
in.close();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -28,7 +28,10 @@ import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FilenameFilter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -42,7 +45,9 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||||
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.TestParallelImageWrite;
|
import org.apache.hadoop.hdfs.server.namenode.TestParallelImageWrite;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
|
@ -451,6 +456,49 @@ public class TestDFSUpgrade {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPreserveEditLogs() throws Exception {
|
||||||
|
conf = new HdfsConfiguration();
|
||||||
|
conf = UpgradeUtilities.initializeStorageStateConf(1, conf);
|
||||||
|
String[] nameNodeDirs = conf.getStrings(DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
|
||||||
|
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_DUPLICATE_REPLICA_DELETION, false);
|
||||||
|
|
||||||
|
log("Normal NameNode upgrade", 1);
|
||||||
|
File[] created =
|
||||||
|
UpgradeUtilities.createNameNodeStorageDirs(nameNodeDirs, "current");
|
||||||
|
List<String> beforeUpgrade = new LinkedList<>();
|
||||||
|
for (final File createdDir : created) {
|
||||||
|
List<String> fileNameList =
|
||||||
|
IOUtils.listDirectory(createdDir, EditLogsFilter.INSTANCE);
|
||||||
|
beforeUpgrade.addAll(fileNameList);
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster = createCluster();
|
||||||
|
|
||||||
|
List<String> afterUpgrade = new LinkedList<>();
|
||||||
|
for (final File createdDir : created) {
|
||||||
|
List<String> fileNameList =
|
||||||
|
IOUtils.listDirectory(createdDir, EditLogsFilter.INSTANCE);
|
||||||
|
afterUpgrade.addAll(fileNameList);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (String s : beforeUpgrade) {
|
||||||
|
assertTrue(afterUpgrade.contains(s));
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster.shutdown();
|
||||||
|
UpgradeUtilities.createEmptyDirs(nameNodeDirs);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static enum EditLogsFilter implements FilenameFilter {
|
||||||
|
INSTANCE;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean accept(File dir, String name) {
|
||||||
|
return name.startsWith(NNStorage.NameNodeFile.EDITS.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public static void main(String[] args) throws Exception {
|
public static void main(String[] args) throws Exception {
|
||||||
TestDFSUpgrade t = new TestDFSUpgrade();
|
TestDFSUpgrade t = new TestDFSUpgrade();
|
||||||
TestDFSUpgrade.initialize();
|
TestDFSUpgrade.initialize();
|
||||||
|
|
Loading…
Reference in New Issue