HDDS-797. If DN is started before SCM, it does not register. Contributed by Hanisha Koneru.
This commit is contained in:
parent
15df2e7a75
commit
c8ca1747c0
|
@ -64,50 +64,57 @@ public class VersionEndpointTask implements
|
||||||
public EndpointStateMachine.EndPointStates call() throws Exception {
|
public EndpointStateMachine.EndPointStates call() throws Exception {
|
||||||
rpcEndPoint.lock();
|
rpcEndPoint.lock();
|
||||||
try{
|
try{
|
||||||
SCMVersionResponseProto versionResponse =
|
if (rpcEndPoint.getState().equals(
|
||||||
rpcEndPoint.getEndPoint().getVersion(null);
|
EndpointStateMachine.EndPointStates.GETVERSION)) {
|
||||||
VersionResponse response = VersionResponse.getFromProtobuf(
|
SCMVersionResponseProto versionResponse =
|
||||||
versionResponse);
|
rpcEndPoint.getEndPoint().getVersion(null);
|
||||||
rpcEndPoint.setVersion(response);
|
VersionResponse response = VersionResponse.getFromProtobuf(
|
||||||
|
versionResponse);
|
||||||
|
rpcEndPoint.setVersion(response);
|
||||||
|
|
||||||
String scmId = response.getValue(OzoneConsts.SCM_ID);
|
String scmId = response.getValue(OzoneConsts.SCM_ID);
|
||||||
String clusterId = response.getValue(OzoneConsts.CLUSTER_ID);
|
String clusterId = response.getValue(OzoneConsts.CLUSTER_ID);
|
||||||
|
|
||||||
// Check volumes
|
// Check volumes
|
||||||
VolumeSet volumeSet = ozoneContainer.getVolumeSet();
|
VolumeSet volumeSet = ozoneContainer.getVolumeSet();
|
||||||
volumeSet.writeLock();
|
volumeSet.writeLock();
|
||||||
try {
|
try {
|
||||||
Map<String, HddsVolume> volumeMap = volumeSet.getVolumeMap();
|
Map<String, HddsVolume> volumeMap = volumeSet.getVolumeMap();
|
||||||
|
|
||||||
Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " +
|
Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " +
|
||||||
"null");
|
"null");
|
||||||
Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " +
|
Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " +
|
||||||
"cannot be null");
|
"cannot be null");
|
||||||
|
|
||||||
// If version file does not exist create version file and also set scmId
|
// If version file does not exist create version file and also set scmId
|
||||||
for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
|
|
||||||
HddsVolume hddsVolume = entry.getValue();
|
for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
|
||||||
boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId,
|
HddsVolume hddsVolume = entry.getValue();
|
||||||
clusterId, LOG);
|
boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId,
|
||||||
if (!result) {
|
clusterId, LOG);
|
||||||
volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath());
|
if (!result) {
|
||||||
|
volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
if (volumeSet.getVolumesList().size() == 0) {
|
||||||
|
// All volumes are in inconsistent state
|
||||||
|
throw new DiskOutOfSpaceException("All configured Volumes are in " +
|
||||||
|
"Inconsistent State");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
volumeSet.writeUnlock();
|
||||||
}
|
}
|
||||||
if (volumeSet.getVolumesList().size() == 0) {
|
|
||||||
// All volumes are in inconsistent state
|
ozoneContainer.getDispatcher().setScmId(scmId);
|
||||||
throw new DiskOutOfSpaceException("All configured Volumes are in " +
|
|
||||||
"Inconsistent State");
|
EndpointStateMachine.EndPointStates nextState =
|
||||||
}
|
rpcEndPoint.getState().getNextState();
|
||||||
} finally {
|
rpcEndPoint.setState(nextState);
|
||||||
volumeSet.writeUnlock();
|
rpcEndPoint.zeroMissedCount();
|
||||||
|
} else {
|
||||||
|
LOG.debug("Cannot execute GetVersion task as endpoint state machine " +
|
||||||
|
"is in {} state", rpcEndPoint.getState());
|
||||||
}
|
}
|
||||||
|
|
||||||
ozoneContainer.getDispatcher().setScmId(scmId);
|
|
||||||
|
|
||||||
EndpointStateMachine.EndPointStates nextState =
|
|
||||||
rpcEndPoint.getState().getNextState();
|
|
||||||
rpcEndPoint.setState(nextState);
|
|
||||||
rpcEndPoint.zeroMissedCount();
|
|
||||||
} catch (DiskOutOfSpaceException ex) {
|
} catch (DiskOutOfSpaceException ex) {
|
||||||
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
|
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
|
||||||
} catch(IOException ex) {
|
} catch(IOException ex) {
|
||||||
|
|
|
@ -23,9 +23,13 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
|
||||||
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
|
||||||
|
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
|
||||||
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||||
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
|
import org.apache.hadoop.ozone.container.common.statemachine
|
||||||
|
.DatanodeStateMachine;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine
|
||||||
|
.EndpointStateMachine;
|
||||||
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
|
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||||
import org.apache.hadoop.hdds.scm.TestUtils;
|
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||||
|
@ -214,4 +218,50 @@ public class TestMiniOzoneCluster {
|
||||||
out.write("malformed".getBytes());
|
out.write("malformed".getBytes());
|
||||||
out.close();
|
out.close();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that a DN can register with SCM even if it was started before the SCM.
|
||||||
|
* @throws Exception
|
||||||
|
*/
|
||||||
|
@Test (timeout = 300_000)
|
||||||
|
public void testDNstartAfterSCM() throws Exception {
|
||||||
|
// Start a cluster with 1 DN
|
||||||
|
cluster = MiniOzoneCluster.newBuilder(conf)
|
||||||
|
.setNumDatanodes(1)
|
||||||
|
.build();
|
||||||
|
cluster.waitForClusterToBeReady();
|
||||||
|
|
||||||
|
// Stop the SCM
|
||||||
|
StorageContainerManager scm = cluster.getStorageContainerManager();
|
||||||
|
scm.stop();
|
||||||
|
|
||||||
|
// Restart DN
|
||||||
|
cluster.restartHddsDatanode(0, false);
|
||||||
|
|
||||||
|
// DN should be in GETVERSION state till the SCM is restarted.
|
||||||
|
// Check DN endpoint state for 20 seconds
|
||||||
|
DatanodeStateMachine dnStateMachine = cluster.getHddsDatanodes().get(0)
|
||||||
|
.getDatanodeStateMachine();
|
||||||
|
for (int i = 0; i < 20; i++) {
|
||||||
|
for (EndpointStateMachine endpoint :
|
||||||
|
dnStateMachine.getConnectionManager().getValues()) {
|
||||||
|
Assert.assertEquals(
|
||||||
|
EndpointStateMachine.EndPointStates.GETVERSION,
|
||||||
|
endpoint.getState());
|
||||||
|
}
|
||||||
|
Thread.sleep(1000);
|
||||||
|
}
|
||||||
|
|
||||||
|
// DN should successfully register with the SCM after SCM is restarted.
|
||||||
|
// Restart the SCM
|
||||||
|
cluster.restartStorageContainerManager();
|
||||||
|
// Wait for DN to register
|
||||||
|
cluster.waitForClusterToBeReady();
|
||||||
|
// DN should be in HEARTBEAT state after registering with the SCM
|
||||||
|
for (EndpointStateMachine endpoint :
|
||||||
|
dnStateMachine.getConnectionManager().getValues()) {
|
||||||
|
Assert.assertEquals(EndpointStateMachine.EndPointStates.HEARTBEAT,
|
||||||
|
endpoint.getState());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue