Revert "HDFS-11094. Send back HAState along with NamespaceInfo during a versionRequest as an optional parameter. Contributed by Eric Badger"
This reverts commit 8c4680852b
.
This commit is contained in:
parent
401c731872
commit
9f8344db50
|
@ -26,7 +26,7 @@ import com.google.protobuf.ByteString;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos.HAServiceStateProto;
|
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
|
||||||
import org.apache.hadoop.hdfs.DFSUtilClient;
|
import org.apache.hadoop.hdfs.DFSUtilClient;
|
||||||
import org.apache.hadoop.hdfs.protocol.Block;
|
import org.apache.hadoop.hdfs.protocol.Block;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
@ -338,8 +338,7 @@ public class PBHelper {
|
||||||
StorageInfoProto storage = info.getStorageInfo();
|
StorageInfoProto storage = info.getStorageInfo();
|
||||||
return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
|
return new NamespaceInfo(storage.getNamespceID(), storage.getClusterID(),
|
||||||
info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(),
|
info.getBlockPoolID(), storage.getCTime(), info.getBuildVersion(),
|
||||||
info.getSoftwareVersion(), info.getCapabilities(),
|
info.getSoftwareVersion(), info.getCapabilities());
|
||||||
convert(info.getState()));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static NamenodeCommand convert(NamenodeCommandProto cmd) {
|
public static NamenodeCommand convert(NamenodeCommandProto cmd) {
|
||||||
|
@ -745,68 +744,43 @@ public class PBHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static NamespaceInfoProto convert(NamespaceInfo info) {
|
public static NamespaceInfoProto convert(NamespaceInfo info) {
|
||||||
NamespaceInfoProto.Builder builder = NamespaceInfoProto.newBuilder();
|
return NamespaceInfoProto.newBuilder()
|
||||||
builder.setBlockPoolID(info.getBlockPoolID())
|
.setBlockPoolID(info.getBlockPoolID())
|
||||||
.setBuildVersion(info.getBuildVersion())
|
.setBuildVersion(info.getBuildVersion())
|
||||||
.setUnused(0)
|
.setUnused(0)
|
||||||
.setStorageInfo(PBHelper.convert((StorageInfo)info))
|
.setStorageInfo(PBHelper.convert((StorageInfo)info))
|
||||||
.setSoftwareVersion(info.getSoftwareVersion())
|
.setSoftwareVersion(info.getSoftwareVersion())
|
||||||
.setCapabilities(info.getCapabilities());
|
.setCapabilities(info.getCapabilities())
|
||||||
HAServiceState state = info.getState();
|
.build();
|
||||||
if(state != null) {
|
|
||||||
builder.setState(convert(info.getState()));
|
|
||||||
}
|
|
||||||
return builder.build();
|
|
||||||
}
|
|
||||||
|
|
||||||
public static HAServiceState convert(HAServiceStateProto s) {
|
|
||||||
if (s == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
switch (s) {
|
|
||||||
case INITIALIZING:
|
|
||||||
return HAServiceState.INITIALIZING;
|
|
||||||
case ACTIVE:
|
|
||||||
return HAServiceState.ACTIVE;
|
|
||||||
case STANDBY:
|
|
||||||
return HAServiceState.STANDBY;
|
|
||||||
default:
|
|
||||||
throw new IllegalArgumentException("Unexpected HAServiceStateProto:"
|
|
||||||
+ s);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static HAServiceStateProto convert(HAServiceState s) {
|
|
||||||
if (s == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
switch (s) {
|
|
||||||
case INITIALIZING:
|
|
||||||
return HAServiceStateProto.INITIALIZING;
|
|
||||||
case ACTIVE:
|
|
||||||
return HAServiceStateProto.ACTIVE;
|
|
||||||
case STANDBY:
|
|
||||||
return HAServiceStateProto.STANDBY;
|
|
||||||
default:
|
|
||||||
throw new IllegalArgumentException("Unexpected HAServiceState:"
|
|
||||||
+ s);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static NNHAStatusHeartbeat convert(NNHAStatusHeartbeatProto s) {
|
public static NNHAStatusHeartbeat convert(NNHAStatusHeartbeatProto s) {
|
||||||
if (s == null) {
|
if (s == null) return null;
|
||||||
return null;
|
switch (s.getState()) {
|
||||||
|
case ACTIVE:
|
||||||
|
return new NNHAStatusHeartbeat(HAServiceState.ACTIVE, s.getTxid());
|
||||||
|
case STANDBY:
|
||||||
|
return new NNHAStatusHeartbeat(HAServiceState.STANDBY, s.getTxid());
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("Unexpected NNHAStatusHeartbeat.State:" + s.getState());
|
||||||
}
|
}
|
||||||
return new NNHAStatusHeartbeat(convert(s.getState()), s.getTxid());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public static NNHAStatusHeartbeatProto convert(NNHAStatusHeartbeat hb) {
|
public static NNHAStatusHeartbeatProto convert(NNHAStatusHeartbeat hb) {
|
||||||
if (hb == null) {
|
if (hb == null) return null;
|
||||||
return null;
|
|
||||||
}
|
|
||||||
NNHAStatusHeartbeatProto.Builder builder =
|
NNHAStatusHeartbeatProto.Builder builder =
|
||||||
NNHAStatusHeartbeatProto.newBuilder();
|
NNHAStatusHeartbeatProto.newBuilder();
|
||||||
builder.setState(convert(hb.getState()));
|
switch (hb.getState()) {
|
||||||
|
case ACTIVE:
|
||||||
|
builder.setState(HAServiceProtocolProtos.HAServiceStateProto.ACTIVE);
|
||||||
|
break;
|
||||||
|
case STANDBY:
|
||||||
|
builder.setState(HAServiceProtocolProtos.HAServiceStateProto.STANDBY);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new IllegalArgumentException("Unexpected NNHAStatusHeartbeat.State:" +
|
||||||
|
hb.getState());
|
||||||
|
}
|
||||||
builder.setTxid(hb.getTxId());
|
builder.setTxid(hb.getTxId());
|
||||||
return builder.build();
|
return builder.build();
|
||||||
}
|
}
|
||||||
|
|
|
@ -307,16 +307,8 @@ class BPOfferService {
|
||||||
* verifies that this namespace matches (eg to prevent a misconfiguration
|
* verifies that this namespace matches (eg to prevent a misconfiguration
|
||||||
* where a StandbyNode from a different cluster is specified)
|
* where a StandbyNode from a different cluster is specified)
|
||||||
*/
|
*/
|
||||||
void verifyAndSetNamespaceInfo(BPServiceActor actor, NamespaceInfo nsInfo)
|
void verifyAndSetNamespaceInfo(NamespaceInfo nsInfo) throws IOException {
|
||||||
throws IOException {
|
|
||||||
writeLock();
|
writeLock();
|
||||||
|
|
||||||
if(nsInfo.getState() == HAServiceState.ACTIVE
|
|
||||||
&& bpServiceToActive == null) {
|
|
||||||
LOG.info("Acknowledging ACTIVE Namenode during handshake" + actor);
|
|
||||||
bpServiceToActive = actor;
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (this.bpNSInfo == null) {
|
if (this.bpNSInfo == null) {
|
||||||
this.bpNSInfo = nsInfo;
|
this.bpNSInfo = nsInfo;
|
||||||
|
|
|
@ -273,7 +273,7 @@ class BPServiceActor implements Runnable {
|
||||||
// Verify that this matches the other NN in this HA pair.
|
// Verify that this matches the other NN in this HA pair.
|
||||||
// This also initializes our block pool in the DN if we are
|
// This also initializes our block pool in the DN if we are
|
||||||
// the first NN connection for this BP.
|
// the first NN connection for this BP.
|
||||||
bpos.verifyAndSetNamespaceInfo(this, nsInfo);
|
bpos.verifyAndSetNamespaceInfo(nsInfo);
|
||||||
|
|
||||||
// Second phase of the handshake with the NN.
|
// Second phase of the handshake with the NN.
|
||||||
register(nsInfo);
|
register(nsInfo);
|
||||||
|
|
|
@ -1594,7 +1594,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
NamespaceInfo unprotectedGetNamespaceInfo() {
|
NamespaceInfo unprotectedGetNamespaceInfo() {
|
||||||
return new NamespaceInfo(getFSImage().getStorage().getNamespaceID(),
|
return new NamespaceInfo(getFSImage().getStorage().getNamespaceID(),
|
||||||
getClusterId(), getBlockPoolId(),
|
getClusterId(), getBlockPoolId(),
|
||||||
getFSImage().getStorage().getCTime(), getState());
|
getFSImage().getStorage().getCTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -4537,10 +4537,6 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
return blockManager.getCapacity();
|
return blockManager.getCapacity();
|
||||||
}
|
}
|
||||||
|
|
||||||
public HAServiceState getState() {
|
|
||||||
return haContext == null ? null : haContext.getState().getServiceState();
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override // FSNamesystemMBean
|
@Override // FSNamesystemMBean
|
||||||
public String getFSState() {
|
public String getFSState() {
|
||||||
return isInSafeMode() ? "safeMode" : "Operational";
|
return isInSafeMode() ? "safeMode" : "Operational";
|
||||||
|
|
|
@ -22,7 +22,6 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||||
|
@ -45,7 +44,6 @@ public class NamespaceInfo extends StorageInfo {
|
||||||
String blockPoolID = ""; // id of the block pool
|
String blockPoolID = ""; // id of the block pool
|
||||||
String softwareVersion;
|
String softwareVersion;
|
||||||
long capabilities;
|
long capabilities;
|
||||||
HAServiceState state;
|
|
||||||
|
|
||||||
// only authoritative on the server-side to determine advertisement to
|
// only authoritative on the server-side to determine advertisement to
|
||||||
// clients. enum will update the supported values
|
// clients. enum will update the supported values
|
||||||
|
@ -90,14 +88,6 @@ public class NamespaceInfo extends StorageInfo {
|
||||||
CAPABILITIES_SUPPORTED);
|
CAPABILITIES_SUPPORTED);
|
||||||
}
|
}
|
||||||
|
|
||||||
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
|
||||||
long cT, String buildVersion, String softwareVersion,
|
|
||||||
long capabilities, HAServiceState st) {
|
|
||||||
this(nsID, clusterID, bpID, cT, buildVersion, softwareVersion,
|
|
||||||
capabilities);
|
|
||||||
this.state = st;
|
|
||||||
}
|
|
||||||
|
|
||||||
// for use by server and/or client
|
// for use by server and/or client
|
||||||
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
||||||
long cT, String buildVersion, String softwareVersion,
|
long cT, String buildVersion, String softwareVersion,
|
||||||
|
@ -116,13 +106,6 @@ public class NamespaceInfo extends StorageInfo {
|
||||||
VersionInfo.getVersion());
|
VersionInfo.getVersion());
|
||||||
}
|
}
|
||||||
|
|
||||||
public NamespaceInfo(int nsID, String clusterID, String bpID,
|
|
||||||
long cT, HAServiceState st) {
|
|
||||||
this(nsID, clusterID, bpID, cT, Storage.getBuildVersion(),
|
|
||||||
VersionInfo.getVersion());
|
|
||||||
this.state = st;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getCapabilities() {
|
public long getCapabilities() {
|
||||||
return capabilities;
|
return capabilities;
|
||||||
}
|
}
|
||||||
|
@ -132,11 +115,6 @@ public class NamespaceInfo extends StorageInfo {
|
||||||
this.capabilities = capabilities;
|
this.capabilities = capabilities;
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
|
||||||
public void setState(HAServiceState state) {
|
|
||||||
this.state = state;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isCapabilitySupported(Capability capability) {
|
public boolean isCapabilitySupported(Capability capability) {
|
||||||
Preconditions.checkArgument(capability != Capability.UNKNOWN,
|
Preconditions.checkArgument(capability != Capability.UNKNOWN,
|
||||||
"cannot test for unknown capability");
|
"cannot test for unknown capability");
|
||||||
|
@ -156,10 +134,6 @@ public class NamespaceInfo extends StorageInfo {
|
||||||
return softwareVersion;
|
return softwareVersion;
|
||||||
}
|
}
|
||||||
|
|
||||||
public HAServiceState getState() {
|
|
||||||
return state;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString(){
|
public String toString(){
|
||||||
return super.toString() + ";bpid=" + blockPoolID;
|
return super.toString() + ";bpid=" + blockPoolID;
|
||||||
|
|
|
@ -32,7 +32,6 @@ option java_generate_equals_and_hash = true;
|
||||||
package hadoop.hdfs;
|
package hadoop.hdfs;
|
||||||
|
|
||||||
import "hdfs.proto";
|
import "hdfs.proto";
|
||||||
import "HAServiceProtocol.proto";
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Block access token information
|
* Block access token information
|
||||||
|
@ -102,7 +101,6 @@ message NamespaceInfoProto {
|
||||||
required StorageInfoProto storageInfo = 4;// Node information
|
required StorageInfoProto storageInfo = 4;// Node information
|
||||||
required string softwareVersion = 5; // Software version number (e.g. 2.0.0)
|
required string softwareVersion = 5; // Software version number (e.g. 2.0.0)
|
||||||
optional uint64 capabilities = 6 [default = 0]; // feature flags
|
optional uint64 capabilities = 6 [default = 0]; // feature flags
|
||||||
optional hadoop.common.HAServiceStateProto state = 7;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertNotNull;
|
|
||||||
import static org.junit.Assert.assertSame;
|
import static org.junit.Assert.assertSame;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
@ -800,34 +799,4 @@ public class TestBPOfferService {
|
||||||
}
|
}
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
|
||||||
*
|
|
||||||
*/
|
|
||||||
@Test
|
|
||||||
public void testNNHAStateUpdateFromVersionRequest() throws Exception {
|
|
||||||
final BPOfferService bpos = setupBPOSForNNs(mockNN1, mockNN2);
|
|
||||||
BPServiceActor actor = bpos.getBPServiceActors().get(0);
|
|
||||||
bpos.start();
|
|
||||||
waitForInitialization(bpos);
|
|
||||||
// Should start with neither NN as active.
|
|
||||||
assertNull(bpos.getActiveNN());
|
|
||||||
|
|
||||||
// getNamespaceInfo() will not include HAServiceState
|
|
||||||
NamespaceInfo nsInfo = mockNN1.versionRequest();
|
|
||||||
bpos.verifyAndSetNamespaceInfo(actor, nsInfo);
|
|
||||||
|
|
||||||
assertNull(bpos.getActiveNN());
|
|
||||||
|
|
||||||
// Change mock so getNamespaceInfo() will include HAServiceState
|
|
||||||
Mockito.doReturn(new NamespaceInfo(1, FAKE_CLUSTERID, FAKE_BPID, 0,
|
|
||||||
HAServiceState.ACTIVE)).when(mockNN1).versionRequest();
|
|
||||||
|
|
||||||
// Update the bpos NamespaceInfo
|
|
||||||
nsInfo = mockNN1.versionRequest();
|
|
||||||
bpos.verifyAndSetNamespaceInfo(actor, nsInfo);
|
|
||||||
|
|
||||||
assertNotNull(bpos.getActiveNN());
|
|
||||||
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -33,7 +33,6 @@ import java.util.Collection;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
@ -44,7 +43,6 @@ import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
|
import org.apache.hadoop.hdfs.server.namenode.ha.HAState;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
|
import org.apache.hadoop.hdfs.server.namenode.top.TopAuditLogger;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
@ -156,25 +154,6 @@ public class TestFSNamesystem {
|
||||||
+ "safemode 2nd time", bm.isPopulatingReplQueues());
|
+ "safemode 2nd time", bm.isPopulatingReplQueues());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testHAStateInNamespaceInfo() throws IOException {
|
|
||||||
Configuration conf = new Configuration();
|
|
||||||
|
|
||||||
FSEditLog fsEditLog = Mockito.mock(FSEditLog.class);
|
|
||||||
FSImage fsImage = Mockito.mock(FSImage.class);
|
|
||||||
Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog);
|
|
||||||
NNStorage nnStorage = Mockito.mock(NNStorage.class);
|
|
||||||
Mockito.when(fsImage.getStorage()).thenReturn(nnStorage);
|
|
||||||
|
|
||||||
FSNamesystem fsNamesystem = new FSNamesystem(conf, fsImage);
|
|
||||||
FSNamesystem fsn = Mockito.spy(fsNamesystem);
|
|
||||||
Mockito.when(fsn.getState()).thenReturn(
|
|
||||||
HAServiceProtocol.HAServiceState.ACTIVE);
|
|
||||||
|
|
||||||
NamespaceInfo nsInfo = fsn.unprotectedGetNamespaceInfo();
|
|
||||||
assertNotNull(nsInfo.getState());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testReset() throws Exception {
|
public void testReset() throws Exception {
|
||||||
Configuration conf = new Configuration();
|
Configuration conf = new Configuration();
|
||||||
|
|
Loading…
Reference in New Issue