HDFS-11094. Send back HAState along with NamespaceInfo during a versionRequest as an optional parameter. Contributed by Eric Badger

(cherry picked from commit 012d28a1fa)
This commit is contained in:
Mingliang Liu 2016-12-15 14:54:40 -08:00
parent aec0d06a34
commit 059d33bb8b
9 changed files with 160 additions and 46 deletions

View File

@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeComm
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.NNHAStatusHeartbeatProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
@ -60,6 +59,7 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeCommandPro
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeRegistrationProto.NamenodeRoleProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamenodeRegistrationProto.NamenodeRoleProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamespaceInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NamespaceInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.NNHAStatusHeartbeatProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RecoveringBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RecoveringBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RemoteEditLogManifestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RemoteEditLogManifestProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RemoteEditLogProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsServerProtos.RemoteEditLogProto;
@ -315,7 +315,8 @@ public class PBHelper {
StorageInfoProto storage = info.getStorageInfo(); StorageInfoProto storage = info.getStorageInfo();
return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(), return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(), info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(),
info.getSoftwareVersion(), info.getCapabilities()); info.getSoftwareVersion(), info.getCapabilities(),
convert(info.getState()));
} }
public static NamenodeCommand convert(NamenodeCommandProto cmd) { public static NamenodeCommand convert(NamenodeCommandProto cmd) {
@ -701,44 +702,64 @@ public class PBHelper {
} }
public static NamespaceInfoProto convert(NamespaceInfo info) { public static NamespaceInfoProto convert(NamespaceInfo info) {
return NamespaceInfoProto.newBuilder() NamespaceInfoProto.Builder builder = NamespaceInfoProto.newBuilder();
.setBlockPoolID(info.getBlockPoolID()) builder.setBlockPoolID(info.getBlockPoolID())
.setBuildVersion(info.getBuildVersion()) .setBuildVersion(info.getBuildVersion())
.setUnused(0) .setUnused(0)
.setStorageInfo(PBHelper.convert((StorageInfo)info)) .setStorageInfo(PBHelper.convert((StorageInfo)info))
.setSoftwareVersion(info.getSoftwareVersion()) .setSoftwareVersion(info.getSoftwareVersion())
.setCapabilities(info.getCapabilities()) .setCapabilities(info.getCapabilities());
.build(); HAServiceState state = info.getState();
if(state != null) {
builder.setState(convert(info.getState()));
}
return builder.build();
} }
public static HAServiceState convert(NNHAStatusHeartbeatProto.State s) {
if (s == null) {
return null;
}
switch (s) {
case ACTIVE:
return HAServiceState.ACTIVE;
case STANDBY:
return HAServiceState.STANDBY;
default:
throw new IllegalArgumentException("Unexpected HAServiceStateProto:"
+ s);
}
}
public static NNHAStatusHeartbeatProto.State convert(HAServiceState s) {
if (s == null) {
return null;
}
switch (s) {
case ACTIVE:
return NNHAStatusHeartbeatProto.State.ACTIVE;
case STANDBY:
return NNHAStatusHeartbeatProto.State.STANDBY;
default:
throw new IllegalArgumentException("Unexpected HAServiceState:"
+ s);
}
}
public static NNHAStatusHeartbeat convert(NNHAStatusHeartbeatProto s) { public static NNHAStatusHeartbeat convert(NNHAStatusHeartbeatProto s) {
if (s == null) return null; if (s == null) {
switch (s.getState()) { return null;
case ACTIVE:
return new NNHAStatusHeartbeat(HAServiceState.ACTIVE, s.getTxid());
case STANDBY:
return new NNHAStatusHeartbeat(HAServiceState.STANDBY, s.getTxid());
default:
throw new IllegalArgumentException("Unexpected NNHAStatusHeartbeat.State:" + s.getState());
} }
return new NNHAStatusHeartbeat(convert(s.getState()), s.getTxid());
} }
public static NNHAStatusHeartbeatProto convert(NNHAStatusHeartbeat hb) { public static NNHAStatusHeartbeatProto convert(NNHAStatusHeartbeat hb) {
if (hb == null) return null; if (hb == null) {
NNHAStatusHeartbeatProto.Builder builder = return null;
NNHAStatusHeartbeatProto.newBuilder();
switch (hb.getState()) {
case ACTIVE:
builder.setState(NNHAStatusHeartbeatProto.State.ACTIVE);
break;
case STANDBY:
builder.setState(NNHAStatusHeartbeatProto.State.STANDBY);
break;
default:
throw new IllegalArgumentException("Unexpected NNHAStatusHeartbeat.State:" +
hb.getState());
} }
NNHAStatusHeartbeatProto.Builder builder =
NNHAStatusHeartbeatProto.newBuilder();
builder.setState(convert(hb.getState()));
builder.setTxid(hb.getTxId()); builder.setTxid(hb.getTxId());
return builder.build(); return builder.build();
} }

View File

@ -305,8 +305,16 @@ class BPOfferService {
* verifies that this namespace matches (eg to prevent a misconfiguration * verifies that this namespace matches (eg to prevent a misconfiguration
* where a StandbyNode from a different cluster is specified) * where a StandbyNode from a different cluster is specified)
*/ */
void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException { void verifyAndSetNamespaceInfo(BPServiceActor actor, NamespaceInfo nsInfo)
throws IOException {
writeLock(); writeLock();
if(nsInfo.getState() == HAServiceState.ACTIVE
&& bpServiceToActive == null) {
LOG.info("Acknowledging ACTIVE Namenode during handshake" + actor);
bpServiceToActive = actor;
}
try { try {
if (this.bpNSInfo == null) { if (this.bpNSInfo == null) {
this.bpNSInfo = nsInfo; this.bpNSInfo = nsInfo;

View File

@ -259,11 +259,11 @@ class BPServiceActor implements Runnable {
// First phase of the handshake with NN - get the namespace // First phase of the handshake with NN - get the namespace
// info. // info.
NamespaceInfo nsInfo = retrieveNamespaceInfo(); NamespaceInfo nsInfo = retrieveNamespaceInfo();
// Verify that this matches the other NN in this HA pair. // Verify that this matches the other NN in this HA pair.
// This also initializes our block pool in the DN if we are // This also initializes our block pool in the DN if we are
// the first NN connection for this BP. // the first NN connection for this BP.
bpos.verifyAndSetNamespaceInfo(nsInfo); bpos.verifyAndSetNamespaceInfo(this, nsInfo);
// Second phase of the handshake with the NN. // Second phase of the handshake with the NN.
register(nsInfo); register(nsInfo);

View File

@ -1596,7 +1596,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
NamespaceInfo unprotectedGetNamespaceInfo() { NamespaceInfo unprotectedGetNamespaceInfo() {
return new NamespaceInfo(getFSImage().getStorage().getNamespaceID(), return new NamespaceInfo(getFSImage().getStorage().getNamespaceID(),
getClusterId(), getBlockPoolId(), getClusterId(), getBlockPoolId(),
getFSImage().getStorage().getCTime()); getFSImage().getStorage().getCTime(), getState());
} }
/** /**
@ -5215,12 +5215,16 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
return 0; return 0;
} }
} }
@Metric @Metric
public int getBlockCapacity() { public int getBlockCapacity() {
return blockManager.getCapacity(); return blockManager.getCapacity();
} }
public HAServiceState getState() {
return haContext == null ? null : haContext.getState().getServiceState();
}
@Override // FSNamesystemMBean @Override // FSNamesystemMBean
public String getFSState() { public String getFSState() {
return isInSafeMode() ? "safeMode" : "Operational"; return isInSafeMode() ? "safeMode" : "Operational";

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
@ -44,6 +45,7 @@ public class NamespaceInfo extends StorageInfo {
String blockPoolID = ""; // id of the block pool String blockPoolID = ""; // id of the block pool
String softwareVersion; String softwareVersion;
long capabilities; long capabilities;
HAServiceState state;
// only authoritative on the server-side to determine advertisement to // only authoritative on the server-side to determine advertisement to
// clients. enum will update the supported values // clients. enum will update the supported values
@ -88,6 +90,14 @@ public class NamespaceInfo extends StorageInfo {
CAPABILITIES_SUPPORTED); CAPABILITIES_SUPPORTED);
} }
public NamespaceInfo(int nsID, String clusterID, String bpID,
long cT, String buildVersion, String softwareVersion,
long capabilities, HAServiceState st) {
this(nsID, clusterID, bpID, cT, buildVersion, softwareVersion,
capabilities);
this.state = st;
}
// for use by server and/or client // for use by server and/or client
public NamespaceInfo(int nsID, String clusterID, String bpID, public NamespaceInfo(int nsID, String clusterID, String bpID,
long cT, String buildVersion, String softwareVersion, long cT, String buildVersion, String softwareVersion,
@ -105,6 +115,13 @@ public class NamespaceInfo extends StorageInfo {
this(nsID, clusterID, bpID, cT, Storage.getBuildVersion(), this(nsID, clusterID, bpID, cT, Storage.getBuildVersion(),
VersionInfo.getVersion()); VersionInfo.getVersion());
} }
public NamespaceInfo(int nsID, String clusterID, String bpID,
long cT, HAServiceState st) {
this(nsID, clusterID, bpID, cT, Storage.getBuildVersion(),
VersionInfo.getVersion());
this.state = st;
}
public long getCapabilities() { public long getCapabilities() {
return capabilities; return capabilities;
@ -115,6 +132,11 @@ public class NamespaceInfo extends StorageInfo {
this.capabilities = capabilities; this.capabilities = capabilities;
} }
@VisibleForTesting
public void setState(HAServiceState state) {
this.state = state;
}
public boolean isCapabilitySupported(Capability capability) { public boolean isCapabilitySupported(Capability capability) {
Preconditions.checkArgument(capability != Capability.UNKNOWN, Preconditions.checkArgument(capability != Capability.UNKNOWN,
"cannot test for unknown capability"); "cannot test for unknown capability");
@ -134,6 +156,10 @@ public class NamespaceInfo extends StorageInfo {
return softwareVersion; return softwareVersion;
} }
public HAServiceState getState() {
return state;
}
@Override @Override
public String toString(){ public String toString(){
return super.toString() + ";bpid=" + blockPoolID; return super.toString() + ";bpid=" + blockPoolID;

View File

@ -198,19 +198,6 @@ message HeartbeatRequestProto {
optional bool requestFullBlockReportLease = 9 [ default = false ]; optional bool requestFullBlockReportLease = 9 [ default = false ];
} }
/**
* state - State the NN is in when returning response to the DN
* txid - Highest transaction ID this NN has seen
*/
message NNHAStatusHeartbeatProto {
enum State {
ACTIVE = 0;
STANDBY = 1;
}
required State state = 1;
required uint64 txid = 2;
}
/** /**
* cmds - Commands from namenode to datanode. * cmds - Commands from namenode to datanode.
* haStatus - Status (from an HA perspective) of the NN sending this response * haStatus - Status (from an HA perspective) of the NN sending this response

View File

@ -32,6 +32,7 @@ option java_generate_equals_and_hash = true;
package hadoop.hdfs; package hadoop.hdfs;
import "hdfs.proto"; import "hdfs.proto";
import "HAServiceProtocol.proto";
/** /**
* A list of storage IDs. * A list of storage IDs.
@ -103,6 +104,7 @@ message NamespaceInfoProto {
required StorageInfoProto storageInfo = 4;// Node information required StorageInfoProto storageInfo = 4;// Node information
required string softwareVersion = 5; // Software version number (e.g. 2.0.0) required string softwareVersion = 5; // Software version number (e.g. 2.0.0)
optional uint64 capabilities = 6 [default = 0]; // feature flags optional uint64 capabilities = 6 [default = 0]; // feature flags
optional NNHAStatusHeartbeatProto.State state = 7;
} }
/** /**
@ -198,4 +200,17 @@ message NamenodeRegistrationProto {
} }
required StorageInfoProto storageInfo = 3; // Node information required StorageInfoProto storageInfo = 3; // Node information
optional NamenodeRoleProto role = 4 [default = NAMENODE]; // Namenode role optional NamenodeRoleProto role = 4 [default = NAMENODE]; // Namenode role
} }
/**
* state - State the NN is in when returning response to the DN
* txid - Highest transaction ID this NN has seen
*/
message NNHAStatusHeartbeatProto {
enum State {
ACTIVE = 0;
STANDBY = 1;
}
required State state = 1;
required uint64 txid = 2;
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull; import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertSame; import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
@ -799,4 +800,35 @@ public class TestBPOfferService {
} }
return -1; return -1;
} }
/*
*
*/
@Test
public void testNNHAStateUpdateFromVersionRequest() throws Exception {
final BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
Mockito.doReturn(true).when(mockDn).areHeartbeatsDisabledForTests();
BPServiceActor actor = bpos.getBPServiceActors().get(0);
bpos.start();
waitForInitialization(bpos);
// Should start with neither NN as active.
assertNull(bpos.getActiveNN());
// getNamespaceInfo() will not include HAServiceState
NamespaceInfo nsInfo = mockNN1.versionRequest();
bpos.verifyAndSetNamespaceInfo(actor, nsInfo);
assertNull(bpos.getActiveNN());
// Change mock so getNamespaceInfo() will include HAServiceState
Mockito.doReturn(new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0,
HAServiceState.ACTIVE)).when(mockNN1).versionRequest();
// Update the bpos NamespaceInfo
nsInfo = mockNN1.versionRequest();
bpos.verifyAndSetNamespaceInfo(actor, nsInfo);
assertNotNull(bpos.getActiveNN());
}
} }

View File

@ -33,6 +33,7 @@ import java.util.Collection;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -44,6 +45,7 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
import org.apache.hadoop.hdfs.server.namenode.ha.HAState; import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger; import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.junit.After; import org.junit.After;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -155,6 +157,25 @@ public class TestFSNamesystem {
+ "safemode 2nd time", bm.isPopulatingReplQueues()); + "safemode 2nd time", bm.isPopulatingReplQueues());
} }
@Test
public void testHAStateInNamespaceInfo() throws IOException {
Configuration conf = new Configuration();
FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
FSImage fsImage = Mockito.mock(FSImage.class);
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
NNStorage nnStorage = Mockito.mock(NNStorage.class);
Mockito.when(fsImage.getStorage()).thenReturn(nnStorage);
FSNamesystem fsNamesystem = new FSNamesystem(conf, fsImage);
FSNamesystem fsn = Mockito.spy(fsNamesystem);
Mockito.when(fsn.getState()).thenReturn(
HAServiceProtocol.HAServiceState.ACTIVE);
NamespaceInfo nsInfo = fsn.unprotectedGetNamespaceInfo();
assertNotNull(nsInfo.getState());
}
@Test @Test
public void testReset() throws Exception { public void testReset() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();