HDFS-3126. Journal stream from Namenode to BackupNode needs to have timeout. Contributed by Hari Mankude.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1308636 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Suresh Srinivas 2012-04-03 00:55:08 +00:00
parent 4f15b9dfed
commit e449de0526
5 changed files with 34 additions and 11 deletions

View File

@ -109,6 +109,9 @@ Trunk (unreleased changes)
HDFS-3116. Typo in fetchdt error message. (AOE Takashi via atm) HDFS-3116. Typo in fetchdt error message. (AOE Takashi via atm)
HDFS-3126. Journal stream from Namenode to BackupNode needs to have
timeout. (Hari Mankude via suresh)
Release 2.0.0 - UNRELEASED Release 2.0.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -190,7 +190,7 @@ public class NameNodeProxies {
InetSocketAddress address, Configuration conf, UserGroupInformation ugi) InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
throws IOException { throws IOException {
JournalProtocolPB proxy = (JournalProtocolPB) createNameNodeProxy(address, JournalProtocolPB proxy = (JournalProtocolPB) createNameNodeProxy(address,
conf, ugi, JournalProtocolPB.class); conf, ugi, JournalProtocolPB.class, 30000);
return new JournalProtocolTranslatorPB(proxy); return new JournalProtocolTranslatorPB(proxy);
} }
@ -198,7 +198,7 @@ public class NameNodeProxies {
createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address, createNNProxyWithRefreshAuthorizationPolicyProtocol(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi) throws IOException { Configuration conf, UserGroupInformation ugi) throws IOException {
RefreshAuthorizationPolicyProtocolPB proxy = (RefreshAuthorizationPolicyProtocolPB) RefreshAuthorizationPolicyProtocolPB proxy = (RefreshAuthorizationPolicyProtocolPB)
createNameNodeProxy(address, conf, ugi, RefreshAuthorizationPolicyProtocolPB.class); createNameNodeProxy(address, conf, ugi, RefreshAuthorizationPolicyProtocolPB.class, 0);
return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy); return new RefreshAuthorizationPolicyProtocolClientSideTranslatorPB(proxy);
} }
@ -206,7 +206,7 @@ public class NameNodeProxies {
createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address, createNNProxyWithRefreshUserMappingsProtocol(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi) throws IOException { Configuration conf, UserGroupInformation ugi) throws IOException {
RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB) RefreshUserMappingsProtocolPB proxy = (RefreshUserMappingsProtocolPB)
createNameNodeProxy(address, conf, ugi, RefreshUserMappingsProtocolPB.class); createNameNodeProxy(address, conf, ugi, RefreshUserMappingsProtocolPB.class, 0);
return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy); return new RefreshUserMappingsProtocolClientSideTranslatorPB(proxy);
} }
@ -214,7 +214,7 @@ public class NameNodeProxies {
InetSocketAddress address, Configuration conf, UserGroupInformation ugi) InetSocketAddress address, Configuration conf, UserGroupInformation ugi)
throws IOException { throws IOException {
GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB) GetUserMappingsProtocolPB proxy = (GetUserMappingsProtocolPB)
createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class); createNameNodeProxy(address, conf, ugi, GetUserMappingsProtocolPB.class, 0);
return new GetUserMappingsProtocolClientSideTranslatorPB(proxy); return new GetUserMappingsProtocolClientSideTranslatorPB(proxy);
} }
@ -222,7 +222,7 @@ public class NameNodeProxies {
InetSocketAddress address, Configuration conf, UserGroupInformation ugi, InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries) throws IOException { boolean withRetries) throws IOException {
NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy( NamenodeProtocolPB proxy = (NamenodeProtocolPB) createNameNodeProxy(
address, conf, ugi, NamenodeProtocolPB.class); address, conf, ugi, NamenodeProtocolPB.class, 0);
if (withRetries) { // create the proxy with retries if (withRetries) { // create the proxy with retries
RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200, RetryPolicy timeoutPolicy = RetryPolicies.exponentialBackoffRetry(5, 200,
TimeUnit.MILLISECONDS); TimeUnit.MILLISECONDS);
@ -244,7 +244,7 @@ public class NameNodeProxies {
InetSocketAddress address, Configuration conf, UserGroupInformation ugi, InetSocketAddress address, Configuration conf, UserGroupInformation ugi,
boolean withRetries) throws IOException { boolean withRetries) throws IOException {
ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB) NameNodeProxies ClientNamenodeProtocolPB proxy = (ClientNamenodeProtocolPB) NameNodeProxies
.createNameNodeProxy(address, conf, ugi, ClientNamenodeProtocolPB.class); .createNameNodeProxy(address, conf, ugi, ClientNamenodeProtocolPB.class, 0);
if (withRetries) { // create the proxy with retries if (withRetries) { // create the proxy with retries
RetryPolicy createPolicy = RetryPolicies RetryPolicy createPolicy = RetryPolicies
.retryUpToMaximumCountWithFixedSleep(5, .retryUpToMaximumCountWithFixedSleep(5,
@ -275,11 +275,11 @@ public class NameNodeProxies {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private static Object createNameNodeProxy(InetSocketAddress address, private static Object createNameNodeProxy(InetSocketAddress address,
Configuration conf, UserGroupInformation ugi, Class xface) Configuration conf, UserGroupInformation ugi, Class xface, int rpcTimeout)
throws IOException { throws IOException {
RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class); RPC.setProtocolEngine(conf, xface, ProtobufRpcEngine.class);
Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address, Object proxy = RPC.getProxy(xface, RPC.getProtocolVersion(xface), address,
ugi, conf, NetUtils.getDefaultSocketFactory(conf)); ugi, conf, NetUtils.getDefaultSocketFactory(conf), rpcTimeout);
return proxy; return proxy;
} }

View File

@ -46,6 +46,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
/** /**
@ -171,6 +172,12 @@ public class BackupNode extends NameNode {
@Override // NameNode @Override // NameNode
public void stop() { public void stop() {
stop(true);
}
@VisibleForTesting
void stop(boolean reportError) {
if(checkpointManager != null) { if(checkpointManager != null) {
// Prevent from starting a new checkpoint. // Prevent from starting a new checkpoint.
// Checkpoints that has already been started may proceed until // Checkpoints that has already been started may proceed until
@ -180,7 +187,10 @@ public class BackupNode extends NameNode {
// ClosedByInterruptException. // ClosedByInterruptException.
checkpointManager.shouldRun = false; checkpointManager.shouldRun = false;
} }
if(namenode != null && getRegistration() != null) {
// reportError is a test hook to simulate backupnode crashing and not
// doing a clean exit w.r.t active namenode
if (reportError && namenode != null && getRegistration() != null) {
// Exclude this node from the list of backup streams on the name-node // Exclude this node from the list of backup streams on the name-node
try { try {
namenode.errorReport(getRegistration(), NamenodeProtocol.FATAL, namenode.errorReport(getRegistration(), NamenodeProtocol.FATAL,

View File

@ -1020,7 +1020,7 @@ public class FSEditLog {
LOG.info("Registering new backup node: " + bnReg); LOG.info("Registering new backup node: " + bnReg);
BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg); BackupJournalManager bjm = new BackupJournalManager(bnReg, nnReg);
journalSet.add(bjm, true); journalSet.add(bjm, false);
} }
synchronized void releaseBackupStream(NamenodeRegistration registration) synchronized void releaseBackupStream(NamenodeRegistration registration)

View File

@ -185,6 +185,16 @@ public class TestBackupNode {
testBNInSync(cluster, backup, 4); testBNInSync(cluster, backup, 4);
assertNotNull(backup.getNamesystem().getFileInfo("/edit-while-bn-down", false)); assertNotNull(backup.getNamesystem().getFileInfo("/edit-while-bn-down", false));
// Trigger an unclean shutdown of the backup node. Backup node will not
// unregister from the active when this is done simulating a node crash.
backup.stop(false);
// do some edits on the active. This should go through without failing.
// This will verify that active is still up and can add entries to
// master editlog.
assertTrue(fileSys.mkdirs(new Path("/edit-while-bn-down-2")));
} finally { } finally {
LOG.info("Shutting down..."); LOG.info("Shutting down...");
if (backup != null) backup.stop(); if (backup != null) backup.stop();