HDDS-249. Fail if multiple SCM IDs on the DataNode and add SCM ID check after version request. Contributed by Bharat Viswanadham.
This commit is contained in:
parent
993ec026d1
commit
9fa9e301b0
|
@ -23,10 +23,14 @@ import org.apache.hadoop.hdds.protocol.proto
|
|||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine
|
||||
.EndpointStateMachine;
|
||||
import org.apache.hadoop.ozone.container.common.utils.HddsVolumeUtil;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.protocol.VersionResponse;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
@ -37,6 +41,8 @@ import java.util.concurrent.Callable;
|
|||
*/
|
||||
public class VersionEndpointTask implements
|
||||
Callable<EndpointStateMachine.EndPointStates> {
|
||||
public static final Logger LOG = LoggerFactory.getLogger(VersionEndpointTask
|
||||
.class);
|
||||
private final EndpointStateMachine rpcEndPoint;
|
||||
private final Configuration configuration;
|
||||
private final OzoneContainer ozoneContainer;
|
||||
|
@ -71,21 +77,32 @@ public class VersionEndpointTask implements
|
|||
|
||||
Preconditions.checkNotNull(scmId, "Reply from SCM: scmId cannot be " +
|
||||
"null");
|
||||
Preconditions.checkNotNull(scmId, "Reply from SCM: clusterId cannot be" +
|
||||
" null");
|
||||
Preconditions.checkNotNull(clusterId, "Reply from SCM: clusterId " +
|
||||
"cannot be null");
|
||||
|
||||
// If version file does not exist create version file and also set scmId
|
||||
for (Map.Entry<String, HddsVolume> entry : volumeMap.entrySet()) {
|
||||
HddsVolume hddsVolume = entry.getValue();
|
||||
hddsVolume.format(clusterId);
|
||||
ozoneContainer.getDispatcher().setScmId(scmId);
|
||||
boolean result = HddsVolumeUtil.checkVolume(hddsVolume, scmId,
|
||||
clusterId, LOG);
|
||||
if (!result) {
|
||||
volumeSet.failVolume(hddsVolume.getHddsRootDir().getPath());
|
||||
}
|
||||
}
|
||||
if (volumeSet.getVolumesList().size() == 0) {
|
||||
// All volumes are inconsistent state
|
||||
throw new DiskOutOfSpaceException("All configured Volumes are in " +
|
||||
"Inconsistent State");
|
||||
}
|
||||
ozoneContainer.getDispatcher().setScmId(scmId);
|
||||
|
||||
EndpointStateMachine.EndPointStates nextState =
|
||||
rpcEndPoint.getState().getNextState();
|
||||
rpcEndPoint.setState(nextState);
|
||||
rpcEndPoint.zeroMissedCount();
|
||||
} catch (IOException ex) {
|
||||
} catch (DiskOutOfSpaceException ex) {
|
||||
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.SHUTDOWN);
|
||||
} catch(IOException ex) {
|
||||
rpcEndPoint.logIfNeeded(ex);
|
||||
} finally {
|
||||
rpcEndPoint.unlock();
|
||||
|
|
|
@ -25,8 +25,10 @@ import org.apache.hadoop.ozone.common.InconsistentStorageStateException;
|
|||
import org.apache.hadoop.ozone.container.common.DataNodeLayoutVersion;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Properties;
|
||||
import java.util.UUID;
|
||||
|
||||
|
@ -160,4 +162,58 @@ public final class HddsVolumeUtil {
|
|||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check Volume is consistent state or not.
|
||||
* @param hddsVolume
|
||||
* @param scmId
|
||||
* @param clusterId
|
||||
* @param logger
|
||||
* @return true - if volume is in consistent state, otherwise false.
|
||||
*/
|
||||
public static boolean checkVolume(HddsVolume hddsVolume, String scmId, String
|
||||
clusterId, Logger logger) {
|
||||
File hddsRoot = hddsVolume.getHddsRootDir();
|
||||
String volumeRoot = hddsRoot.getPath();
|
||||
File scmDir = new File(hddsRoot, scmId);
|
||||
|
||||
try {
|
||||
hddsVolume.format(clusterId);
|
||||
} catch (IOException ex) {
|
||||
logger.error("Error during formatting volume {}, exception is {}",
|
||||
volumeRoot, ex);
|
||||
return false;
|
||||
}
|
||||
|
||||
File[] hddsFiles = hddsRoot.listFiles();
|
||||
|
||||
if(hddsFiles == null) {
|
||||
// This is the case for IOException, where listFiles returns null.
|
||||
// So, we fail the volume.
|
||||
return false;
|
||||
} else if (hddsFiles.length == 1) {
|
||||
// DN started for first time or this is a newly added volume.
|
||||
// So we create scm directory.
|
||||
if (!scmDir.mkdir()) {
|
||||
logger.error("Unable to create scmDir {}", scmDir);
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
} else if(hddsFiles.length == 2) {
|
||||
// The files should be Version and SCM directory
|
||||
if (scmDir.exists()) {
|
||||
return true;
|
||||
} else {
|
||||
logger.error("Volume {} is in Inconsistent state, expected scm " +
|
||||
"directory {} does not exist", volumeRoot, scmDir
|
||||
.getAbsolutePath());
|
||||
return false;
|
||||
}
|
||||
} else {
|
||||
// The hdds root dir should always have 2 files. One is Version file
|
||||
// and other is SCM directory.
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.ozone.common.Storage;
|
|||
import org.apache.hadoop.ozone.container.common.impl.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.common.volume.VolumeSet;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
|
||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.impl.ContainerDataYaml;
|
||||
|
@ -64,14 +65,16 @@ public class ContainerReader implements Runnable {
|
|||
private final ContainerSet containerSet;
|
||||
private final OzoneConfiguration config;
|
||||
private final File hddsVolumeDir;
|
||||
private final VolumeSet volumeSet;
|
||||
|
||||
ContainerReader(HddsVolume volume, ContainerSet cset, OzoneConfiguration
|
||||
conf) {
|
||||
ContainerReader(VolumeSet volSet, HddsVolume volume, ContainerSet cset,
|
||||
OzoneConfiguration conf) {
|
||||
Preconditions.checkNotNull(volume);
|
||||
this.hddsVolume = volume;
|
||||
this.hddsVolumeDir = hddsVolume.getHddsRootDir();
|
||||
this.containerSet = cset;
|
||||
this.config = conf;
|
||||
this.volumeSet = volSet;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -97,10 +100,18 @@ public class ContainerReader implements Runnable {
|
|||
});
|
||||
|
||||
if (scmDir == null) {
|
||||
LOG.error("Volume {} is empty with out metadata and chunks",
|
||||
LOG.error("IO error for the volume {}, skipped loading",
|
||||
hddsVolumeRootDir);
|
||||
volumeSet.failVolume(hddsVolumeRootDir.getPath());
|
||||
return;
|
||||
}
|
||||
|
||||
if (scmDir.length > 1) {
|
||||
LOG.error("Volume {} is in Inconsistent state", hddsVolumeRootDir);
|
||||
volumeSet.failVolume(hddsVolumeRootDir.getPath());
|
||||
return;
|
||||
}
|
||||
|
||||
for (File scmLoc : scmDir) {
|
||||
File currentDir = null;
|
||||
currentDir = new File(scmLoc, Storage.STORAGE_DIR_CURRENT);
|
||||
|
@ -123,9 +134,8 @@ public class ContainerReader implements Runnable {
|
|||
verifyContainerFile(containerName, containerFile,
|
||||
checksumFile);
|
||||
} else {
|
||||
LOG.error(
|
||||
"Missing container metadata files for Container: " +
|
||||
"{}", containerName);
|
||||
LOG.error("Missing container metadata files for " +
|
||||
"Container: {}", containerName);
|
||||
}
|
||||
} else {
|
||||
LOG.error("Missing container metadata directory for " +
|
||||
|
|
|
@ -106,7 +106,7 @@ public class OzoneContainer {
|
|||
while (volumeSetIterator.hasNext()) {
|
||||
HddsVolume volume = volumeSetIterator.next();
|
||||
File hddsVolumeRootDir = volume.getHddsRootDir();
|
||||
Thread thread = new Thread(new ContainerReader(volume,
|
||||
Thread thread = new Thread(new ContainerReader(volumeSet, volume,
|
||||
containerSet, config));
|
||||
thread.start();
|
||||
volumeThreads.add(thread);
|
||||
|
|
|
@ -56,6 +56,13 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
|
|||
private AtomicInteger heartbeatCount = new AtomicInteger(0);
|
||||
private AtomicInteger rpcCount = new AtomicInteger(0);
|
||||
private AtomicInteger containerReportsCount = new AtomicInteger(0);
|
||||
private String clusterId;
|
||||
private String scmId;
|
||||
|
||||
public ScmTestMock() {
|
||||
clusterId = UUID.randomUUID().toString();
|
||||
scmId = UUID.randomUUID().toString();
|
||||
}
|
||||
|
||||
// Map of datanode to containers
|
||||
private Map<DatanodeDetails, Map<String, ContainerInfo>> nodeContainers =
|
||||
|
@ -157,8 +164,8 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
|
|||
return VersionResponse.newBuilder()
|
||||
.setVersion(versionInfo.getVersion())
|
||||
.addValue(VersionInfo.DESCRIPTION_KEY, versionInfo.getDescription())
|
||||
.addValue(OzoneConsts.SCM_ID, UUID.randomUUID().toString())
|
||||
.addValue(OzoneConsts.CLUSTER_ID, UUID.randomUUID().toString())
|
||||
.addValue(OzoneConsts.SCM_ID, scmId)
|
||||
.addValue(OzoneConsts.CLUSTER_ID, clusterId)
|
||||
.build().getProtobufMessage();
|
||||
|
||||
}
|
||||
|
@ -329,4 +336,20 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
|
|||
public void addScmCommandRequest(SCMCommandProto scmCmd) {
|
||||
scmCommandRequests.add(scmCmd);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set scmId.
|
||||
* @param id
|
||||
*/
|
||||
public void setScmId(String id) {
|
||||
this.scmId = id;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set scmId.
|
||||
* @return scmId
|
||||
*/
|
||||
public String getScmId() {
|
||||
return scmId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.ozone.container.common;
|
|||
import com.google.common.collect.Maps;
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
|
@ -57,9 +58,9 @@ import java.util.concurrent.ExecutorService;
|
|||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys.HDDS_DATANODE_DIR_KEY;
|
||||
import static org.apache.hadoop.hdds.scm.ScmConfigKeys
|
||||
.OZONE_SCM_HEARTBEAT_RPC_TIMEOUT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
/**
|
||||
|
@ -68,7 +69,9 @@ import static org.junit.Assert.assertTrue;
|
|||
public class TestDatanodeStateMachine {
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(TestDatanodeStateMachine.class);
|
||||
private final int scmServerCount = 3;
|
||||
// Changing it to 1, as current code checks for multiple scm directories,
|
||||
// and fail if exists
|
||||
private final int scmServerCount = 1;
|
||||
private List<String> serverAddresses;
|
||||
private List<RPC.Server> scmServers;
|
||||
private List<ScmTestMock> mockServers;
|
||||
|
@ -90,7 +93,6 @@ public class TestDatanodeStateMachine {
|
|||
String address = "127.0.0.1";
|
||||
serverAddresses.add(address + ":" + port);
|
||||
ScmTestMock mock = new ScmTestMock();
|
||||
|
||||
scmServers.add(SCMTestUtils.startScmRpcServer(conf, mock,
|
||||
new InetSocketAddress(address, port), 10));
|
||||
mockServers.add(mock);
|
||||
|
@ -107,7 +109,7 @@ public class TestDatanodeStateMachine {
|
|||
}
|
||||
|
||||
File dataDir = new File(testRoot, "data");
|
||||
conf.set(DFS_DATANODE_DATA_DIR_KEY, dataDir.getAbsolutePath());
|
||||
conf.set(HDDS_DATANODE_DIR_KEY, dataDir.getAbsolutePath());
|
||||
if (!dataDir.mkdirs()) {
|
||||
LOG.info("Data dir create failed.");
|
||||
}
|
||||
|
@ -145,7 +147,7 @@ public class TestDatanodeStateMachine {
|
|||
} catch (Exception e) {
|
||||
//ignore all execption from the shutdown
|
||||
} finally {
|
||||
testRoot.delete();
|
||||
FileUtil.fullyDelete(testRoot);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -162,7 +164,7 @@ public class TestDatanodeStateMachine {
|
|||
stateMachine.startDaemon();
|
||||
SCMConnectionManager connectionManager =
|
||||
stateMachine.getConnectionManager();
|
||||
GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 3,
|
||||
GenericTestUtils.waitFor(() -> connectionManager.getValues().size() == 1,
|
||||
1000, 30000);
|
||||
|
||||
stateMachine.stopDaemon();
|
||||
|
|
|
@ -70,8 +70,10 @@ import org.apache.hadoop.ozone.container.common.states.endpoint
|
|||
.RegisterEndpointTask;
|
||||
import org.apache.hadoop.ozone.container.common.states.endpoint
|
||||
.VersionEndpointTask;
|
||||
import org.apache.hadoop.ozone.container.common.volume.HddsVolume;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.protocol.commands.CommandStatus;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.test.PathUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.AfterClass;
|
||||
|
@ -174,6 +176,53 @@ public class TestEndPoint {
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckVersionResponse() throws Exception {
|
||||
OzoneConfiguration conf = SCMTestUtils.getConf();
|
||||
try (EndpointStateMachine rpcEndPoint = createEndpoint(conf,
|
||||
serverAddress, 1000)) {
|
||||
GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
|
||||
.captureLogs(VersionEndpointTask.LOG);
|
||||
OzoneContainer ozoneContainer = new OzoneContainer(getDatanodeDetails(),
|
||||
conf);
|
||||
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.GETVERSION);
|
||||
VersionEndpointTask versionTask = new VersionEndpointTask(rpcEndPoint,
|
||||
conf, ozoneContainer);
|
||||
EndpointStateMachine.EndPointStates newState = versionTask.call();
|
||||
|
||||
// if version call worked the endpoint should automatically move to the
|
||||
// next state.
|
||||
Assert.assertEquals(EndpointStateMachine.EndPointStates.REGISTER,
|
||||
newState);
|
||||
|
||||
// Now rpcEndpoint should remember the version it got from SCM
|
||||
Assert.assertNotNull(rpcEndPoint.getVersion());
|
||||
|
||||
// Now change server scmId, so datanode scmId will be
|
||||
// different from SCM server response scmId
|
||||
String newScmId = UUID.randomUUID().toString();
|
||||
scmServerImpl.setScmId(newScmId);
|
||||
newState = versionTask.call();
|
||||
Assert.assertEquals(EndpointStateMachine.EndPointStates.SHUTDOWN,
|
||||
newState);
|
||||
List<HddsVolume> volumesList = ozoneContainer.getVolumeSet()
|
||||
.getFailedVolumesList();
|
||||
Assert.assertTrue(volumesList.size() == 1);
|
||||
File expectedScmDir = new File(volumesList.get(0).getHddsRootDir(),
|
||||
scmServerImpl.getScmId());
|
||||
Assert.assertTrue(logCapturer.getOutput().contains("expected scm " +
|
||||
"directory " + expectedScmDir.getAbsolutePath() + " does not " +
|
||||
"exist"));
|
||||
Assert.assertTrue(ozoneContainer.getVolumeSet().getVolumesList().size()
|
||||
== 0);
|
||||
Assert.assertTrue(ozoneContainer.getVolumeSet().getFailedVolumesList()
|
||||
.size() == 1);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Test
|
||||
/**
|
||||
* This test makes a call to end point where there is no SCM server. We
|
||||
|
|
Loading…
Reference in New Issue