HDFS-11480. Ozone: TestEndpoint task failure. Contributed by Xiaoyu Yao.
This commit is contained in:
parent
09ad229db9
commit
02c35065f7
|
@ -17,10 +17,13 @@
|
||||||
package org.apache.hadoop.ozone.container.common;
|
package org.apache.hadoop.ozone.container.common;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
|
||||||
import org.apache.hadoop.ozone.container.common.statemachine
|
import org.apache.hadoop.ozone.container.common.statemachine
|
||||||
.EndpointStateMachine;
|
.EndpointStateMachine;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
||||||
import org.apache.hadoop.ozone.container.common.states.endpoint
|
import org.apache.hadoop.ozone.container.common.states.endpoint
|
||||||
.HeartbeatEndpointTask;
|
.HeartbeatEndpointTask;
|
||||||
import org.apache.hadoop.ozone.container.common.states.endpoint
|
import org.apache.hadoop.ozone.container.common.states.endpoint
|
||||||
|
@ -31,6 +34,10 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
|
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||||
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
.StorageContainerDatanodeProtocolProtos.SCMStorageReport;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
|
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
|
@ -38,6 +45,7 @@ import org.apache.hadoop.ozone.protocol.proto
|
||||||
import org.apache.hadoop.ozone.protocol.proto
|
import org.apache.hadoop.ozone.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.Type;
|
.StorageContainerDatanodeProtocolProtos.Type;
|
||||||
import org.apache.hadoop.ozone.scm.VersionInfo;
|
import org.apache.hadoop.ozone.scm.VersionInfo;
|
||||||
|
import org.apache.hadoop.test.PathUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
@ -45,9 +53,12 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.internal.matchers.LessOrEqual;
|
import org.mockito.internal.matchers.LessOrEqual;
|
||||||
|
|
||||||
|
import java.io.File;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Tests the endpoints.
|
* Tests the endpoints.
|
||||||
*/
|
*/
|
||||||
|
@ -55,6 +66,7 @@ public class TestEndPoint {
|
||||||
private static InetSocketAddress serverAddress;
|
private static InetSocketAddress serverAddress;
|
||||||
private static RPC.Server scmServer;
|
private static RPC.Server scmServer;
|
||||||
private static ScmTestMock scmServerImpl;
|
private static ScmTestMock scmServerImpl;
|
||||||
|
private static File testDir;
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
/**
|
/**
|
||||||
|
@ -169,12 +181,13 @@ public class TestEndPoint {
|
||||||
|
|
||||||
private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
|
private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress,
|
||||||
int rpcTimeout, boolean clearContainerID) throws Exception {
|
int rpcTimeout, boolean clearContainerID) throws Exception {
|
||||||
|
Configuration conf = SCMTestUtils.getConf();
|
||||||
EndpointStateMachine rpcEndPoint =
|
EndpointStateMachine rpcEndPoint =
|
||||||
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
|
SCMTestUtils.createEndpoint(conf,
|
||||||
scmAddress, rpcTimeout);
|
scmAddress, rpcTimeout);
|
||||||
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.REGISTER);
|
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.REGISTER);
|
||||||
RegisterEndpointTask endpointTask =
|
RegisterEndpointTask endpointTask =
|
||||||
new RegisterEndpointTask(rpcEndPoint, SCMTestUtils.getConf());
|
new RegisterEndpointTask(rpcEndPoint, conf);
|
||||||
if (!clearContainerID) {
|
if (!clearContainerID) {
|
||||||
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
|
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
|
||||||
.setClusterID(UUID.randomUUID().toString())
|
.setClusterID(UUID.randomUUID().toString())
|
||||||
|
@ -236,8 +249,13 @@ public class TestEndPoint {
|
||||||
try (EndpointStateMachine rpcEndPoint =
|
try (EndpointStateMachine rpcEndPoint =
|
||||||
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
|
SCMTestUtils.createEndpoint(SCMTestUtils.getConf(),
|
||||||
serverAddress, 1000)) {
|
serverAddress, 1000)) {
|
||||||
|
SCMNodeReport.Builder nrb = SCMNodeReport.newBuilder();
|
||||||
|
SCMStorageReport.Builder srb = SCMStorageReport.newBuilder();
|
||||||
|
srb.setStorageUuid(UUID.randomUUID().toString());
|
||||||
|
srb.setCapacity(2000).setScmUsed(500).setRemaining(1500).build();
|
||||||
|
nrb.addStorageReport(srb);
|
||||||
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
|
SCMHeartbeatResponseProto responseProto = rpcEndPoint.getEndPoint()
|
||||||
.sendHeartbeat(dataNode, null);
|
.sendHeartbeat(dataNode, nrb.build());
|
||||||
Assert.assertNotNull(responseProto);
|
Assert.assertNotNull(responseProto);
|
||||||
Assert.assertEquals(1, responseProto.getCommandsCount());
|
Assert.assertEquals(1, responseProto.getCommandsCount());
|
||||||
Assert.assertNotNull(responseProto.getCommandsList().get(0));
|
Assert.assertNotNull(responseProto.getCommandsList().get(0));
|
||||||
|
@ -248,16 +266,24 @@ public class TestEndPoint {
|
||||||
|
|
||||||
private EndpointStateMachine heartbeatTaskHelper(InetSocketAddress scmAddress,
|
private EndpointStateMachine heartbeatTaskHelper(InetSocketAddress scmAddress,
|
||||||
int rpcTimeout) throws Exception {
|
int rpcTimeout) throws Exception {
|
||||||
|
Configuration conf = SCMTestUtils.getConf();
|
||||||
EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(
|
EndpointStateMachine rpcEndPoint = SCMTestUtils.createEndpoint(
|
||||||
SCMTestUtils.getConf(),
|
conf, scmAddress, rpcTimeout);
|
||||||
scmAddress, rpcTimeout);
|
|
||||||
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
|
ContainerNodeIDProto containerNodeID = ContainerNodeIDProto.newBuilder()
|
||||||
.setClusterID(UUID.randomUUID().toString())
|
.setClusterID(UUID.randomUUID().toString())
|
||||||
.setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
|
.setDatanodeID(SCMTestUtils.getDatanodeID().getProtoBufMessage())
|
||||||
.build();
|
.build();
|
||||||
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
|
rpcEndPoint.setState(EndpointStateMachine.EndPointStates.HEARTBEAT);
|
||||||
|
|
||||||
|
// Create a datanode state machine for stateConext used by endpoint task
|
||||||
|
conf.set(DFS_DATANODE_DATA_DIR_KEY, testDir.getAbsolutePath());
|
||||||
|
final DatanodeStateMachine stateMachine = new DatanodeStateMachine(conf);
|
||||||
|
final StateContext stateContext = new StateContext(conf,
|
||||||
|
DatanodeStateMachine.DatanodeStates.RUNNING,
|
||||||
|
stateMachine);
|
||||||
|
|
||||||
HeartbeatEndpointTask endpointTask =
|
HeartbeatEndpointTask endpointTask =
|
||||||
new HeartbeatEndpointTask(rpcEndPoint, SCMTestUtils.getConf(), null);
|
new HeartbeatEndpointTask(rpcEndPoint, conf, stateContext);
|
||||||
endpointTask.setContainerNodeIDProto(containerNodeID);
|
endpointTask.setContainerNodeIDProto(containerNodeID);
|
||||||
endpointTask.call();
|
endpointTask.call();
|
||||||
Assert.assertNotNull(endpointTask.getContainerNodeIDProto());
|
Assert.assertNotNull(endpointTask.getContainerNodeIDProto());
|
||||||
|
@ -302,6 +328,7 @@ public class TestEndPoint {
|
||||||
if (scmServer != null) {
|
if (scmServer != null) {
|
||||||
scmServer.stop();
|
scmServer.stop();
|
||||||
}
|
}
|
||||||
|
FileUtil.fullyDelete(testDir);
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
@ -310,5 +337,6 @@ public class TestEndPoint {
|
||||||
scmServerImpl = new ScmTestMock();
|
scmServerImpl = new ScmTestMock();
|
||||||
scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
|
scmServer = SCMTestUtils.startScmRpcServer(SCMTestUtils.getConf(),
|
||||||
scmServerImpl, serverAddress, 10);
|
scmServerImpl, serverAddress, 10);
|
||||||
|
testDir = PathUtils.getTestDir(TestEndPoint.class);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue