HDFS-2874. Edit log should log to shared dirs before local dirs. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1240445 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
969318cfc2
commit
11df1c2561
|
@ -160,3 +160,5 @@ HDFS-2769. HA: When HA is enabled with a shared edits dir, that dir should be
|
||||||
marked required. (atm via eli)
|
marked required. (atm via eli)
|
||||||
|
|
||||||
HDFS-2863. Failures observed if dfs.edits.dir and shared.edits.dir have same directories. (Bikas Saha via atm)
|
HDFS-2863. Failures observed if dfs.edits.dir and shared.edits.dir have same directories. (Bikas Saha via atm)
|
||||||
|
|
||||||
|
HDFS-2874. Edit log should log to shared dirs before local dirs. (todd)
|
||||||
|
|
|
@ -23,6 +23,7 @@ import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -97,9 +98,9 @@ public final class Util {
|
||||||
* @param names collection of strings to convert to URIs
|
* @param names collection of strings to convert to URIs
|
||||||
* @return collection of URIs
|
* @return collection of URIs
|
||||||
*/
|
*/
|
||||||
public static Collection<URI> stringCollectionAsURIs(
|
public static List<URI> stringCollectionAsURIs(
|
||||||
Collection<String> names) {
|
Collection<String> names) {
|
||||||
Collection<URI> uris = new ArrayList<URI>(names.size());
|
List<URI> uris = new ArrayList<URI>(names.size());
|
||||||
for(String name : names) {
|
for(String name : names) {
|
||||||
try {
|
try {
|
||||||
uris.add(stringAsURI(name));
|
uris.add(stringAsURI(name));
|
||||||
|
|
|
@ -126,12 +126,12 @@ public class FSEditLog {
|
||||||
private NNStorage storage;
|
private NNStorage storage;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
|
||||||
private Collection<URI> editsDirs;
|
private List<URI> editsDirs;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The edit directories that are shared between primary and secondary.
|
* The edit directories that are shared between primary and secondary.
|
||||||
*/
|
*/
|
||||||
private Collection<URI> sharedEditsDirs;
|
private List<URI> sharedEditsDirs;
|
||||||
|
|
||||||
private static class TransactionId {
|
private static class TransactionId {
|
||||||
public long txid;
|
public long txid;
|
||||||
|
@ -170,11 +170,11 @@ public class FSEditLog {
|
||||||
* @param storage Storage object used by namenode
|
* @param storage Storage object used by namenode
|
||||||
* @param editsDirs List of journals to use
|
* @param editsDirs List of journals to use
|
||||||
*/
|
*/
|
||||||
FSEditLog(Configuration conf, NNStorage storage, Collection<URI> editsDirs) {
|
FSEditLog(Configuration conf, NNStorage storage, List<URI> editsDirs) {
|
||||||
init(conf, storage, editsDirs);
|
init(conf, storage, editsDirs);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void init(Configuration conf, NNStorage storage, Collection<URI> editsDirs) {
|
private void init(Configuration conf, NNStorage storage, List<URI> editsDirs) {
|
||||||
isSyncRunning = false;
|
isSyncRunning = false;
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
this.storage = storage;
|
this.storage = storage;
|
||||||
|
@ -209,7 +209,7 @@ public class FSEditLog {
|
||||||
state = State.OPEN_FOR_READING;
|
state = State.OPEN_FOR_READING;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initJournals(Collection<URI> dirs) {
|
private void initJournals(List<URI> dirs) {
|
||||||
int minimumRedundantJournals = conf.getInt(
|
int minimumRedundantJournals = conf.getInt(
|
||||||
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
|
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_DEFAULT);
|
||||||
|
|
|
@ -115,7 +115,7 @@ public class FSImage implements Closeable {
|
||||||
*/
|
*/
|
||||||
protected FSImage(Configuration conf,
|
protected FSImage(Configuration conf,
|
||||||
Collection<URI> imageDirs,
|
Collection<URI> imageDirs,
|
||||||
Collection<URI> editsDirs)
|
List<URI> editsDirs)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
this.conf = conf;
|
this.conf = conf;
|
||||||
|
|
||||||
|
@ -485,7 +485,7 @@ public class FSImage implements Closeable {
|
||||||
void doImportCheckpoint(FSNamesystem target) throws IOException {
|
void doImportCheckpoint(FSNamesystem target) throws IOException {
|
||||||
Collection<URI> checkpointDirs =
|
Collection<URI> checkpointDirs =
|
||||||
FSImage.getCheckpointDirs(conf, null);
|
FSImage.getCheckpointDirs(conf, null);
|
||||||
Collection<URI> checkpointEditsDirs =
|
List<URI> checkpointEditsDirs =
|
||||||
FSImage.getCheckpointEditsDirs(conf, null);
|
FSImage.getCheckpointEditsDirs(conf, null);
|
||||||
|
|
||||||
if (checkpointDirs == null || checkpointDirs.isEmpty()) {
|
if (checkpointDirs == null || checkpointDirs.isEmpty()) {
|
||||||
|
@ -1094,7 +1094,7 @@ public class FSImage implements Closeable {
|
||||||
return Util.stringCollectionAsURIs(dirNames);
|
return Util.stringCollectionAsURIs(dirNames);
|
||||||
}
|
}
|
||||||
|
|
||||||
static Collection<URI> getCheckpointEditsDirs(Configuration conf,
|
static List<URI> getCheckpointEditsDirs(Configuration conf,
|
||||||
String defaultName) {
|
String defaultName) {
|
||||||
Collection<String> dirNames =
|
Collection<String> dirNames =
|
||||||
conf.getStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY);
|
conf.getStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_EDITS_DIR_KEY);
|
||||||
|
|
|
@ -86,6 +86,7 @@ import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
@ -190,10 +191,8 @@ import org.apache.hadoop.util.VersionInfo;
|
||||||
import org.mortbay.util.ajax.JSON;
|
import org.mortbay.util.ajax.JSON;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Joiner;
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
|
||||||
|
|
||||||
/***************************************************
|
/***************************************************
|
||||||
* FSNamesystem does the actual bookkeeping work for the
|
* FSNamesystem does the actual bookkeeping work for the
|
||||||
|
@ -350,7 +349,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
public static FSNamesystem loadFromDisk(Configuration conf)
|
public static FSNamesystem loadFromDisk(Configuration conf)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
|
Collection<URI> namespaceDirs = FSNamesystem.getNamespaceDirs(conf);
|
||||||
Collection<URI> namespaceEditsDirs =
|
List<URI> namespaceEditsDirs =
|
||||||
FSNamesystem.getNamespaceEditsDirs(conf);
|
FSNamesystem.getNamespaceEditsDirs(conf);
|
||||||
|
|
||||||
if (namespaceDirs.size() == 1) {
|
if (namespaceDirs.size() == 1) {
|
||||||
|
@ -678,28 +677,44 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
return Util.stringCollectionAsURIs(dirNames);
|
return Util.stringCollectionAsURIs(dirNames);
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Collection<URI> getNamespaceEditsDirs(Configuration conf) {
|
/**
|
||||||
Collection<URI> editsDirs = getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY);
|
* Return an ordered list of edits directories to write to.
|
||||||
editsDirs.addAll(getSharedEditsDirs(conf));
|
* The list is ordered such that all shared edits directories
|
||||||
Set<URI> uniqueEditsDirs = new HashSet<URI>();
|
* are ordered before non-shared directories, and any duplicates
|
||||||
uniqueEditsDirs.addAll(editsDirs);
|
* are removed. The order they are specified in the configuration
|
||||||
if (uniqueEditsDirs.size() != editsDirs.size()) {
|
* is retained.
|
||||||
// clearing and re-initializing editsDirs to preserve Collection semantics
|
*/
|
||||||
// assigning finalEditsDirs to editsDirs would leak Set semantics in the
|
public static List<URI> getNamespaceEditsDirs(Configuration conf) {
|
||||||
// return value and cause unexpected results downstream. eg future addAll
|
// Use a LinkedHashSet so that order is maintained while we de-dup
|
||||||
// calls. Perf is not an issue since these are small lists.
|
// the entries.
|
||||||
editsDirs.clear();
|
LinkedHashSet<URI> editsDirs = new LinkedHashSet<URI>();
|
||||||
editsDirs.addAll(uniqueEditsDirs);
|
|
||||||
LOG.warn("Overlapping entries in " + DFS_NAMENODE_EDITS_DIR_KEY
|
// First add the shared edits dirs. It's critical that the shared dirs
|
||||||
+ " and/or " + DFS_NAMENODE_SHARED_EDITS_DIR_KEY
|
// are added first, since JournalSet syncs them in the order they are listed,
|
||||||
+ ". Using the following entries: " + Joiner.on(',').join(editsDirs));
|
// and we need to make sure all edits are in place in the shared storage
|
||||||
|
// before they are replicated locally. See HDFS-2874.
|
||||||
|
for (URI dir : getSharedEditsDirs(conf)) {
|
||||||
|
if (!editsDirs.add(dir)) {
|
||||||
|
LOG.warn("Edits URI " + dir + " listed multiple times in " +
|
||||||
|
DFS_NAMENODE_SHARED_EDITS_DIR_KEY + ". Ignoring duplicates.");
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now add the non-shared dirs.
|
||||||
|
for (URI dir : getStorageDirs(conf, DFS_NAMENODE_EDITS_DIR_KEY)) {
|
||||||
|
if (!editsDirs.add(dir)) {
|
||||||
|
LOG.warn("Edits URI " + dir + " listed multiple times in " +
|
||||||
|
DFS_NAMENODE_SHARED_EDITS_DIR_KEY + " and " +
|
||||||
|
DFS_NAMENODE_EDITS_DIR_KEY + ". Ignoring duplicates.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (editsDirs.isEmpty()) {
|
if (editsDirs.isEmpty()) {
|
||||||
// If this is the case, no edit dirs have been explicitly configured.
|
// If this is the case, no edit dirs have been explicitly configured.
|
||||||
// Image dirs are to be used for edits too.
|
// Image dirs are to be used for edits too.
|
||||||
return getNamespaceDirs(conf);
|
return Lists.newArrayList(getNamespaceDirs(conf));
|
||||||
} else {
|
} else {
|
||||||
return editsDirs;
|
return Lists.newArrayList(editsDirs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -708,7 +723,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
* @param conf
|
* @param conf
|
||||||
* @return Collection of edit directories.
|
* @return Collection of edit directories.
|
||||||
*/
|
*/
|
||||||
public static Collection<URI> getSharedEditsDirs(Configuration conf) {
|
public static List<URI> getSharedEditsDirs(Configuration conf) {
|
||||||
// don't use getStorageDirs here, because we want an empty default
|
// don't use getStorageDirs here, because we want an empty default
|
||||||
// rather than the dir in /tmp
|
// rather than the dir in /tmp
|
||||||
Collection<String> dirNames = conf.getTrimmedStringCollection(
|
Collection<String> dirNames = conf.getTrimmedStringCollection(
|
||||||
|
|
|
@ -309,15 +309,27 @@ public class JournalSet implements JournalManager {
|
||||||
*/
|
*/
|
||||||
private void mapJournalsAndReportErrors(
|
private void mapJournalsAndReportErrors(
|
||||||
JournalClosure closure, String status) throws IOException{
|
JournalClosure closure, String status) throws IOException{
|
||||||
|
|
||||||
List<JournalAndStream> badJAS = Lists.newLinkedList();
|
List<JournalAndStream> badJAS = Lists.newLinkedList();
|
||||||
for (JournalAndStream jas : journals) {
|
for (JournalAndStream jas : journals) {
|
||||||
try {
|
try {
|
||||||
closure.apply(jas);
|
closure.apply(jas);
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
|
if (jas.isRequired()) {
|
||||||
|
String msg = "Error: " + status + " failed for required journal ("
|
||||||
|
+ jas + ")";
|
||||||
|
LOG.fatal(msg, t);
|
||||||
|
// If we fail on *any* of the required journals, then we must not
|
||||||
|
// continue on any of the other journals. Abort them to ensure that
|
||||||
|
// retry behavior doesn't allow them to keep going in any way.
|
||||||
|
abortAllJournals();
|
||||||
|
throw new IOException(msg);
|
||||||
|
} else {
|
||||||
LOG.error("Error: " + status + " failed for (journal " + jas + ")", t);
|
LOG.error("Error: " + status + " failed for (journal " + jas + ")", t);
|
||||||
badJAS.add(jas);
|
badJAS.add(jas);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
disableAndReportErrorOnJournals(badJAS);
|
disableAndReportErrorOnJournals(badJAS);
|
||||||
if (!NameNodeResourcePolicy.areResourcesAvailable(journals,
|
if (!NameNodeResourcePolicy.areResourcesAvailable(journals,
|
||||||
minimumRedundantJournals)) {
|
minimumRedundantJournals)) {
|
||||||
|
@ -327,6 +339,17 @@ public class JournalSet implements JournalManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Abort all of the underlying streams.
|
||||||
|
*/
|
||||||
|
private void abortAllJournals() {
|
||||||
|
for (JournalAndStream jas : journals) {
|
||||||
|
if (jas.isActive()) {
|
||||||
|
jas.abort();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implementation of EditLogOutputStream that applies a requested method on
|
* An implementation of EditLogOutputStream that applies a requested method on
|
||||||
* all the journals that are currently active.
|
* all the journals that are currently active.
|
||||||
|
|
|
@ -661,7 +661,7 @@ public class NameNode {
|
||||||
}
|
}
|
||||||
|
|
||||||
Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
|
Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(conf);
|
||||||
Collection<URI> editDirsToFormat =
|
List<URI> editDirsToFormat =
|
||||||
FSNamesystem.getNamespaceEditsDirs(conf);
|
FSNamesystem.getNamespaceEditsDirs(conf);
|
||||||
for(Iterator<URI> it = dirsToFormat.iterator(); it.hasNext();) {
|
for(Iterator<URI> it = dirsToFormat.iterator(); it.hasNext();) {
|
||||||
File curDir = new File(it.next().getPath());
|
File curDir = new File(it.next().getPath());
|
||||||
|
|
|
@ -114,7 +114,7 @@ public class SecondaryNameNode implements Runnable {
|
||||||
private String infoBindAddress;
|
private String infoBindAddress;
|
||||||
|
|
||||||
private Collection<URI> checkpointDirs;
|
private Collection<URI> checkpointDirs;
|
||||||
private Collection<URI> checkpointEditsDirs;
|
private List<URI> checkpointEditsDirs;
|
||||||
|
|
||||||
private CheckpointConf checkpointConf;
|
private CheckpointConf checkpointConf;
|
||||||
private FSNamesystem namesystem;
|
private FSNamesystem namesystem;
|
||||||
|
@ -729,7 +729,7 @@ public class SecondaryNameNode implements Runnable {
|
||||||
*/
|
*/
|
||||||
CheckpointStorage(Configuration conf,
|
CheckpointStorage(Configuration conf,
|
||||||
Collection<URI> imageDirs,
|
Collection<URI> imageDirs,
|
||||||
Collection<URI> editsDirs) throws IOException {
|
List<URI> editsDirs) throws IOException {
|
||||||
super(conf, imageDirs, editsDirs);
|
super(conf, imageDirs, editsDirs);
|
||||||
|
|
||||||
// the 2NN never writes edits -- it only downloads them. So
|
// the 2NN never writes edits -- it only downloads them. So
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.net.URI;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Properties;
|
import java.util.Properties;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
|
@ -47,7 +48,7 @@ public class TestClusterId {
|
||||||
private String getClusterId(Configuration config) throws IOException {
|
private String getClusterId(Configuration config) throws IOException {
|
||||||
// see if cluster id not empty.
|
// see if cluster id not empty.
|
||||||
Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(config);
|
Collection<URI> dirsToFormat = FSNamesystem.getNamespaceDirs(config);
|
||||||
Collection<URI> editsToFormat = FSNamesystem.getNamespaceEditsDirs(config);
|
List<URI> editsToFormat = FSNamesystem.getNamespaceEditsDirs(config);
|
||||||
FSImage fsImage = new FSImage(config, dirsToFormat, editsToFormat);
|
FSImage fsImage = new FSImage(config, dirsToFormat, editsToFormat);
|
||||||
|
|
||||||
Iterator<StorageDirectory> sdit =
|
Iterator<StorageDirectory> sdit =
|
||||||
|
|
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
import org.mockito.verification.VerificationMode;
|
import org.mockito.verification.VerificationMode;
|
||||||
|
|
||||||
public class TestEditLogJournalFailures {
|
public class TestEditLogJournalFailures {
|
||||||
|
@ -144,21 +145,35 @@ public class TestEditLogJournalFailures {
|
||||||
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
|
DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY);
|
||||||
shutDownMiniCluster();
|
shutDownMiniCluster();
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY, editsDirs[1]);
|
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_REQUIRED_KEY, editsDirs[0]);
|
||||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, 0);
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_MINIMUM_KEY, 0);
|
||||||
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0);
|
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_MINIMUM_KEY, 0);
|
||||||
setUpMiniCluster(conf, true);
|
setUpMiniCluster(conf, true);
|
||||||
|
|
||||||
assertTrue(doAnEdit());
|
assertTrue(doAnEdit());
|
||||||
// Invalidated the one required edits journal.
|
// Invalidated the one required edits journal.
|
||||||
invalidateEditsDirAtIndex(1, false, false);
|
invalidateEditsDirAtIndex(0, false, false);
|
||||||
|
JournalAndStream nonRequiredJas = getJournalAndStream(1);
|
||||||
|
EditLogFileOutputStream nonRequiredSpy =
|
||||||
|
spyOnStream(nonRequiredJas);
|
||||||
|
|
||||||
// Make sure runtime.exit(...) hasn't been called at all yet.
|
// Make sure runtime.exit(...) hasn't been called at all yet.
|
||||||
assertExitInvocations(0);
|
assertExitInvocations(0);
|
||||||
|
|
||||||
|
// ..and that the other stream is active.
|
||||||
|
assertTrue(nonRequiredJas.isActive());
|
||||||
|
|
||||||
// This will actually return true in the tests, since the NN will not in
|
// This will actually return true in the tests, since the NN will not in
|
||||||
// fact call Runtime.exit();
|
// fact call Runtime.exit();
|
||||||
doAnEdit();
|
doAnEdit();
|
||||||
|
|
||||||
|
// Since the required directory failed setReadyToFlush, and that
|
||||||
|
// directory was listed prior to the non-required directory,
|
||||||
|
// we should not call setReadyToFlush on the non-required
|
||||||
|
// directory. Regression test for HDFS-2874.
|
||||||
|
Mockito.verify(nonRequiredSpy, Mockito.never()).setReadyToFlush();
|
||||||
|
assertFalse(nonRequiredJas.isActive());
|
||||||
|
|
||||||
// A single failure of a required journal should result in a call to
|
// A single failure of a required journal should result in a call to
|
||||||
// runtime.exit(...).
|
// runtime.exit(...).
|
||||||
assertExitInvocations(atLeast(1));
|
assertExitInvocations(atLeast(1));
|
||||||
|
@ -217,15 +232,10 @@ public class TestEditLogJournalFailures {
|
||||||
* @param index the index of the journal to take offline.
|
* @param index the index of the journal to take offline.
|
||||||
* @return the original <code>EditLogOutputStream</code> of the journal.
|
* @return the original <code>EditLogOutputStream</code> of the journal.
|
||||||
*/
|
*/
|
||||||
private EditLogOutputStream invalidateEditsDirAtIndex(int index,
|
private void invalidateEditsDirAtIndex(int index,
|
||||||
boolean failOnFlush, boolean failOnWrite) throws IOException {
|
boolean failOnFlush, boolean failOnWrite) throws IOException {
|
||||||
FSImage fsimage = cluster.getNamesystem().getFSImage();
|
JournalAndStream jas = getJournalAndStream(index);
|
||||||
FSEditLog editLog = fsimage.getEditLog();
|
EditLogFileOutputStream spyElos = spyOnStream(jas);
|
||||||
|
|
||||||
JournalAndStream jas = editLog.getJournals().get(index);
|
|
||||||
EditLogFileOutputStream elos =
|
|
||||||
(EditLogFileOutputStream) jas.getCurrentStream();
|
|
||||||
EditLogFileOutputStream spyElos = spy(elos);
|
|
||||||
if (failOnWrite) {
|
if (failOnWrite) {
|
||||||
doThrow(new IOException("fail on write()")).when(spyElos).write(
|
doThrow(new IOException("fail on write()")).when(spyElos).write(
|
||||||
(FSEditLogOp) any());
|
(FSEditLogOp) any());
|
||||||
|
@ -237,25 +247,24 @@ public class TestEditLogJournalFailures {
|
||||||
.setReadyToFlush();
|
.setReadyToFlush();
|
||||||
}
|
}
|
||||||
doNothing().when(spyElos).abort();
|
doNothing().when(spyElos).abort();
|
||||||
|
}
|
||||||
|
|
||||||
|
private EditLogFileOutputStream spyOnStream(JournalAndStream jas) {
|
||||||
|
EditLogFileOutputStream elos =
|
||||||
|
(EditLogFileOutputStream) jas.getCurrentStream();
|
||||||
|
EditLogFileOutputStream spyElos = spy(elos);
|
||||||
jas.setCurrentStreamForTests(spyElos);
|
jas.setCurrentStreamForTests(spyElos);
|
||||||
|
return spyElos;
|
||||||
return elos;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Restore the journal at index <code>index</code> with the passed
|
* Pull out one of the JournalAndStream objects from the edit log.
|
||||||
* {@link EditLogOutputStream}.
|
|
||||||
*
|
|
||||||
* @param index index of the journal to restore.
|
|
||||||
* @param elos the {@link EditLogOutputStream} to put at that index.
|
|
||||||
*/
|
*/
|
||||||
private void restoreEditsDirAtIndex(int index, EditLogOutputStream elos) {
|
private JournalAndStream getJournalAndStream(int index) {
|
||||||
FSImage fsimage = cluster.getNamesystem().getFSImage();
|
FSImage fsimage = cluster.getNamesystem().getFSImage();
|
||||||
FSEditLog editLog = fsimage.getEditLog();
|
FSEditLog editLog = fsimage.getEditLog();
|
||||||
|
|
||||||
JournalAndStream jas = editLog.getJournals().get(index);
|
return editLog.getJournals().get(index);
|
||||||
jas.setCurrentStreamForTests(elos);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
import java.net.URISyntaxException;
|
import java.net.URISyntaxException;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -35,6 +36,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
@ -66,6 +68,35 @@ public class TestFailureOfSharedDir {
|
||||||
requiredEditsDirs.contains(bar));
|
requiredEditsDirs.contains(bar));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Make sure that the shared edits dirs are listed before non-shared dirs
|
||||||
|
* when the configuration is parsed. This ensures that the shared journals
|
||||||
|
* are synced before the local ones.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testSharedDirsComeFirstInEditsList() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
URI sharedA = new URI("file:///shared-A");
|
||||||
|
URI sharedB = new URI("file:///shared-B");
|
||||||
|
URI localA = new URI("file:///local-A");
|
||||||
|
URI localB = new URI("file:///local-B");
|
||||||
|
URI localC = new URI("file:///local-C");
|
||||||
|
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
|
||||||
|
Joiner.on(",").join(sharedA,sharedB));
|
||||||
|
// List them in reverse order, to make sure they show up in
|
||||||
|
// the order listed, regardless of lexical sort order.
|
||||||
|
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
|
||||||
|
Joiner.on(",").join(localC, localB, localA));
|
||||||
|
List<URI> dirs = FSNamesystem.getNamespaceEditsDirs(conf);
|
||||||
|
assertEquals(
|
||||||
|
"Shared dirs should come first, then local dirs, in the order " +
|
||||||
|
"they were listed in the configuration.",
|
||||||
|
Joiner.on(",").join(sharedA, sharedB, localC, localB, localA),
|
||||||
|
Joiner.on(",").join(dirs));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test that marking the shared edits dir as being "required" causes the NN to
|
* Test that marking the shared edits dir as being "required" causes the NN to
|
||||||
* fail if that dir can't be accessed.
|
* fail if that dir can't be accessed.
|
||||||
|
@ -73,10 +104,8 @@ public class TestFailureOfSharedDir {
|
||||||
@Test
|
@Test
|
||||||
public void testFailureOfSharedDir() throws Exception {
|
public void testFailureOfSharedDir() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
// The shared edits dir will automatically be marked required.
|
|
||||||
URI sharedEditsUri = MiniDFSCluster.formatSharedEditsDir(
|
|
||||||
new File(MiniDFSCluster.getBaseDirectory()), 0, 1);
|
|
||||||
|
|
||||||
|
// The shared edits dir will automatically be marked required.
|
||||||
MiniDFSCluster cluster = null;
|
MiniDFSCluster cluster = null;
|
||||||
try {
|
try {
|
||||||
cluster = new MiniDFSCluster.Builder(conf)
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
@ -84,8 +113,6 @@ public class TestFailureOfSharedDir {
|
||||||
.numDataNodes(0)
|
.numDataNodes(0)
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
assertEquals(sharedEditsUri, cluster.getSharedEditsDir(0, 1));
|
|
||||||
|
|
||||||
cluster.waitActive();
|
cluster.waitActive();
|
||||||
cluster.transitionToActive(0);
|
cluster.transitionToActive(0);
|
||||||
|
|
||||||
|
@ -94,6 +121,7 @@ public class TestFailureOfSharedDir {
|
||||||
assertTrue(fs.mkdirs(new Path("/test1")));
|
assertTrue(fs.mkdirs(new Path("/test1")));
|
||||||
|
|
||||||
// Blow away the shared edits dir.
|
// Blow away the shared edits dir.
|
||||||
|
URI sharedEditsUri = cluster.getSharedEditsDir(0, 1);
|
||||||
FileUtil.fullyDelete(new File(sharedEditsUri));
|
FileUtil.fullyDelete(new File(sharedEditsUri));
|
||||||
|
|
||||||
NameNode nn0 = cluster.getNameNode(0);
|
NameNode nn0 = cluster.getNameNode(0);
|
||||||
|
@ -107,6 +135,19 @@ public class TestFailureOfSharedDir {
|
||||||
ioe);
|
ioe);
|
||||||
LOG.info("Got expected exception", ioe);
|
LOG.info("Got expected exception", ioe);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check that none of the edits dirs rolled, since the shared edits
|
||||||
|
// dir didn't roll. Regression test for HDFS-2874.
|
||||||
|
for (URI editsUri : cluster.getNameEditsDirs(0)) {
|
||||||
|
if (editsUri.equals(sharedEditsUri)) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
File editsDir = new File(editsUri.getPath());
|
||||||
|
File curDir = new File(editsDir, "current");
|
||||||
|
GenericTestUtils.assertGlobEquals(curDir,
|
||||||
|
"edits_.*",
|
||||||
|
NNStorage.getInProgressEditsFileName(1));
|
||||||
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (cluster != null) {
|
if (cluster != null) {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
Loading…
Reference in New Issue