HDFS-3765. namenode -initializeSharedEdits should be able to initialize all shared storages. Contributed by Vinay and Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1373060 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-08-14 20:11:31 +00:00
parent 4a84c9020d
commit d0e5d7e971
6 changed files with 192 additions and 49 deletions

View File

@ -64,6 +64,15 @@ public final class ExitUtil {
return firstExitException; return firstExitException;
} }
/**
* Reset the tracking of process termination. This is for use
* in unit tests where one test in the suite expects an exit
* but others do not.
*/
public static void resetFirstExitException() {
firstExitException = null;
}
/** /**
* Terminate the current process. Note that terminate is the *only* method * Terminate the current process. Note that terminate is the *only* method
* that should be used to terminate the daemon processes. * that should be used to terminate the daemon processes.

View File

@ -198,6 +198,9 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3276. initializeSharedEdits should have a -nonInteractive flag (todd) HDFS-3276. initializeSharedEdits should have a -nonInteractive flag (todd)
HDFS-3765. namenode -initializeSharedEdits should be able to initialize
all shared storages. (Vinay and todd via todd)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log HDFS-2982. Startup performance suffers when there are many edit log

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.contrib.bkjournal; package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import org.junit.Test; import org.junit.Test;
import org.junit.Before; import org.junit.Before;
import org.junit.After; import org.junit.After;
@ -25,6 +26,9 @@ import org.junit.BeforeClass;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
@ -35,12 +39,16 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ExitUtil.ExitException; import org.apache.hadoop.util.ExitUtil.ExitException;
import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.proto.BookieServer;
@ -48,7 +56,9 @@ import org.apache.bookkeeper.proto.BookieServer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException;
/** /**
* Integration test to ensure that the BookKeeper JournalManager * Integration test to ensure that the BookKeeper JournalManager
@ -68,6 +78,11 @@ public class TestBookKeeperAsHASharedDir {
bkutil.start(); bkutil.start();
} }
@Before
public void clearExitStatus() {
ExitUtil.resetFirstExitException();
}
@AfterClass @AfterClass
public static void teardownBookkeeper() throws Exception { public static void teardownBookkeeper() throws Exception {
bkutil.teardown(); bkutil.teardown();
@ -244,4 +259,97 @@ public class TestBookKeeperAsHASharedDir {
} }
} }
} }
/**
* Use NameNode INTIALIZESHAREDEDITS to initialize the shared edits. i.e. copy
* the edits log segments to new bkjm shared edits.
*
* @throws Exception
*/
@Test
public void testInitializeBKSharedEdits() throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
HAUtil.setAllowStandbyReads(conf, true);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
.numDataNodes(0).build();
cluster.waitActive();
// Shutdown and clear the current filebased shared dir.
cluster.shutdownNameNodes();
File shareddir = new File(cluster.getSharedEditsDir(0, 1));
assertTrue("Initial Shared edits dir not fully deleted",
FileUtil.fullyDelete(shareddir));
// Check namenodes should not start without shared dir.
assertCanNotStartNamenode(cluster, 0);
assertCanNotStartNamenode(cluster, 1);
// Configure bkjm as new shared edits dir in both namenodes
Configuration nn1Conf = cluster.getConfiguration(0);
Configuration nn2Conf = cluster.getConfiguration(1);
nn1Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
.createJournalURI("/initializeSharedEdits").toString());
nn2Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
.createJournalURI("/initializeSharedEdits").toString());
BKJMUtil.addJournalManagerDefinition(nn1Conf);
BKJMUtil.addJournalManagerDefinition(nn2Conf);
// Initialize the BKJM shared edits.
assertFalse(NameNode.initializeSharedEdits(nn1Conf));
// NameNode should be able to start and should be in sync with BKJM as
// shared dir
assertCanStartHANameNodes(cluster, conf, "/testBKJMInitialize");
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private void assertCanNotStartNamenode(MiniDFSCluster cluster, int nnIndex) {
try {
cluster.restartNameNode(nnIndex, false);
fail("Should not have been able to start NN" + (nnIndex)
+ " without shared dir");
} catch (IOException ioe) {
LOG.info("Got expected exception", ioe);
GenericTestUtils.assertExceptionContains(
"Cannot start an HA namenode with name dirs that need recovery", ioe);
}
}
private void assertCanStartHANameNodes(MiniDFSCluster cluster,
Configuration conf, String path) throws ServiceFailedException,
IOException, URISyntaxException, InterruptedException {
// Now should be able to start both NNs. Pass "false" here so that we don't
// try to waitActive on all NNs, since the second NN doesn't exist yet.
cluster.restartNameNode(0, false);
cluster.restartNameNode(1, true);
// Make sure HA is working.
cluster
.getNameNode(0)
.getRpcServer()
.transitionToActive(
new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER));
FileSystem fs = null;
try {
Path newPath = new Path(path);
fs = HATestUtil.configureFailoverFs(cluster, conf);
assertTrue(fs.mkdirs(newPath));
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
cluster.getNameNode(1));
assertTrue(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
newPath.toString(), false).isDir());
} finally {
if (fs != null) {
fs.close();
}
}
}
} }

View File

@ -18,11 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.File; import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
@ -46,6 +42,7 @@ import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.Trash;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -53,9 +50,6 @@ import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState; import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby; import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
@ -68,8 +62,6 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -88,6 +80,7 @@ import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt; import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
/********************************************************** /**********************************************************
@ -816,9 +809,18 @@ public class NameNode {
String nsId = DFSUtil.getNamenodeNameServiceId(conf); String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId); String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId); initializeGenericKeys(conf, nsId, namenodeId);
if (conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY) == null) {
LOG.fatal("No shared edits directory configured for namespace " +
nsId + " namenode " + namenodeId);
return false;
}
NNStorage existingStorage = null; NNStorage existingStorage = null;
try { try {
FSNamesystem fsns = FSNamesystem.loadFromDisk(conf, Configuration confWithoutShared = new Configuration(conf);
confWithoutShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
FSNamesystem fsns = FSNamesystem.loadFromDisk(confWithoutShared,
FSNamesystem.getNamespaceDirs(conf), FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf, false)); FSNamesystem.getNamespaceEditsDirs(conf, false));
@ -845,10 +847,8 @@ public class NameNode {
fsns.getFSImage().getEditLog().initJournalsForWrite(); fsns.getFSImage().getEditLog().initJournalsForWrite();
fsns.getFSImage().getEditLog().recoverUnclosedStreams(); fsns.getFSImage().getEditLog().recoverUnclosedStreams();
if (copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs, copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs, newSharedStorage,
newSharedStorage, conf)) { conf);
return true; // aborted
}
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("Could not initialize shared edits dir", ioe); LOG.error("Could not initialize shared edits dir", ioe);
return true; // aborted return true; // aborted
@ -867,43 +867,59 @@ public class NameNode {
return false; // did not abort return false; // did not abort
} }
private static boolean copyEditLogSegmentsToSharedDir(FSNamesystem fsns, private static void copyEditLogSegmentsToSharedDir(FSNamesystem fsns,
Collection<URI> sharedEditsDirs, NNStorage newSharedStorage, Collection<URI> sharedEditsDirs, NNStorage newSharedStorage,
Configuration conf) throws FileNotFoundException, IOException { Configuration conf) throws IOException {
Preconditions.checkArgument(!sharedEditsDirs.isEmpty(),
"No shared edits specified");
// Copy edit log segments into the new shared edits dir. // Copy edit log segments into the new shared edits dir.
for (JournalAndStream jas : fsns.getFSImage().getEditLog().getJournals()) { List<URI> sharedEditsUris = new ArrayList<URI>(sharedEditsDirs);
FileJournalManager fjm = null; FSEditLog newSharedEditLog = new FSEditLog(conf, newSharedStorage,
if (!(jas.getManager() instanceof FileJournalManager)) { sharedEditsUris);
LOG.error("Cannot populate shared edits dir from non-file " + newSharedEditLog.initJournalsForWrite();
"journal manager: " + jas.getManager()); newSharedEditLog.recoverUnclosedStreams();
return true; // aborted
} else { FSEditLog sourceEditLog = fsns.getFSImage().editLog;
fjm = (FileJournalManager) jas.getManager();
long fromTxId = fsns.getFSImage().getMostRecentCheckpointTxId();
Collection<EditLogInputStream> streams = sourceEditLog.selectInputStreams(
fromTxId+1, 0);
// Set the nextTxid to the CheckpointTxId+1
newSharedEditLog.setNextTxId(fromTxId + 1);
// Copy all edits after last CheckpointTxId to shared edits dir
for (EditLogInputStream stream : streams) {
LOG.debug("Beginning to copy stream " + stream + " to shared edits");
FSEditLogOp op;
boolean segmentOpen = false;
while ((op = stream.readOp()) != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("copying op: " + op);
} }
for (EditLogFile elf : fjm.getLogFiles(fsns.getFSImage() if (!segmentOpen) {
.getMostRecentCheckpointTxId())) { newSharedEditLog.startLogSegment(op.txid, false);
File editLogSegment = elf.getFile(); segmentOpen = true;
for (URI sharedEditsUri : sharedEditsDirs) { }
StorageDirectory sharedEditsDir = newSharedStorage
.getStorageDirectory(sharedEditsUri); newSharedEditLog.logEdit(op);
File targetFile = new File(sharedEditsDir.getCurrentDir(),
editLogSegment.getName()); if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
if (!targetFile.exists()) { newSharedEditLog.logSync();
InputStream in = null; newSharedEditLog.endCurrentLogSegment(false);
OutputStream out = null; LOG.debug("ending log segment because of END_LOG_SEGMENT op in " + stream);
try { segmentOpen = false;
in = new FileInputStream(editLogSegment);
out = new AtomicFileOutputStream(targetFile);
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.cleanup(LOG, in, out);
} }
} }
if (segmentOpen) {
LOG.debug("ending log segment because of end of stream in " + stream);
newSharedEditLog.logSync();
newSharedEditLog.endCurrentLogSegment(false);
segmentOpen = false;
} }
} }
} }
return false; // did not abort
}
private static boolean finalize(Configuration conf, private static boolean finalize(Configuration conf,
boolean isConfirmationNeeded boolean isConfirmationNeeded

View File

@ -65,7 +65,7 @@ public abstract class HATestUtil {
* @throws CouldNotCatchUpException if the standby doesn't catch up to the * @throws CouldNotCatchUpException if the standby doesn't catch up to the
* active in NN_LAG_TIMEOUT milliseconds * active in NN_LAG_TIMEOUT milliseconds
*/ */
static void waitForStandbyToCatchUp(NameNode active, public static void waitForStandbyToCatchUp(NameNode active,
NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException { NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException {
long activeTxId = active.getNamesystem().getFSImage().getEditLog() long activeTxId = active.getNamesystem().getFSImage().getEditLog()

View File

@ -158,6 +158,13 @@ public class TestInitializeSharedEdits {
assertCanStartHaNameNodes("2"); assertCanStartHaNameNodes("2");
} }
@Test
public void testFailWhenNoSharedEditsSpecified() throws Exception {
Configuration confNoShared = new Configuration(conf);
confNoShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
assertFalse(NameNode.initializeSharedEdits(confNoShared, true));
}
@Test @Test
public void testDontOverWriteExistingDir() { public void testDontOverWriteExistingDir() {
assertFalse(NameNode.initializeSharedEdits(conf, false)); assertFalse(NameNode.initializeSharedEdits(conf, false));