HDDS-85. Send Container State Info while sending the container report from Datanode to SCM. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
745f203e57
commit
fed2bef647
|
@ -131,6 +131,7 @@ enum Result {
|
||||||
UNCLOSED_CONTAINER_IO = 25;
|
UNCLOSED_CONTAINER_IO = 25;
|
||||||
DELETE_ON_OPEN_CONTAINER = 26;
|
DELETE_ON_OPEN_CONTAINER = 26;
|
||||||
CLOSED_CONTAINER_RETRY = 27;
|
CLOSED_CONTAINER_RETRY = 27;
|
||||||
|
INVALID_CONTAINER_STATE = 28;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -339,6 +339,14 @@ public boolean isValid() {
|
||||||
return !(ContainerLifeCycleState.INVALID == state);
|
return !(ContainerLifeCycleState.INVALID == state);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* checks if the container is closed.
|
||||||
|
* @return - boolean
|
||||||
|
*/
|
||||||
|
public synchronized boolean isClosed() {
|
||||||
|
return ContainerLifeCycleState.CLOSED == state;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Marks this container as closed.
|
* Marks this container as closed.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -26,6 +26,8 @@
|
||||||
import org.apache.hadoop.hdds.scm.container.common.helpers
|
import org.apache.hadoop.hdds.scm.container.common.helpers
|
||||||
.StorageContainerException;
|
.StorageContainerException;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||||
|
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
|
||||||
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
|
@ -100,6 +102,8 @@
|
||||||
.Result.UNCLOSED_CONTAINER_IO;
|
.Result.UNCLOSED_CONTAINER_IO;
|
||||||
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos
|
||||||
.Result.UNSUPPORTED_REQUEST;
|
.Result.UNSUPPORTED_REQUEST;
|
||||||
|
import static org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos.
|
||||||
|
Result.INVALID_CONTAINER_STATE;
|
||||||
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
|
import static org.apache.hadoop.ozone.OzoneConsts.CONTAINER_EXTENSION;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -706,6 +710,39 @@ public boolean isOpen(long containerID) throws StorageContainerException {
|
||||||
return containerData.isOpen();
|
return containerData.isOpen();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns LifeCycle State of the container
|
||||||
|
* @param containerID - Id of the container
|
||||||
|
* @return LifeCycle State of the container
|
||||||
|
* @throws StorageContainerException
|
||||||
|
*/
|
||||||
|
private HddsProtos.LifeCycleState getState(long containerID)
|
||||||
|
throws StorageContainerException {
|
||||||
|
LifeCycleState state;
|
||||||
|
final ContainerData data = containerMap.get(containerID);
|
||||||
|
if (data == null) {
|
||||||
|
throw new StorageContainerException(
|
||||||
|
"Container status not found: " + containerID, CONTAINER_NOT_FOUND);
|
||||||
|
}
|
||||||
|
switch (data.getState()) {
|
||||||
|
case OPEN:
|
||||||
|
state = LifeCycleState.OPEN;
|
||||||
|
break;
|
||||||
|
case CLOSING:
|
||||||
|
state = LifeCycleState.CLOSING;
|
||||||
|
break;
|
||||||
|
case CLOSED:
|
||||||
|
state = LifeCycleState.CLOSED;
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new StorageContainerException(
|
||||||
|
"Invalid Container state found: " + containerID,
|
||||||
|
INVALID_CONTAINER_STATE);
|
||||||
|
}
|
||||||
|
|
||||||
|
return state;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Supports clean shutdown of container.
|
* Supports clean shutdown of container.
|
||||||
*
|
*
|
||||||
|
@ -835,14 +872,14 @@ public SCMNodeReport getNodeReport() throws IOException {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public List<ContainerData> getContainerReports() throws IOException {
|
public List<ContainerData> getClosedContainerReports() throws IOException {
|
||||||
LOG.debug("Starting container report iteration.");
|
LOG.debug("Starting container report iteration.");
|
||||||
// No need for locking since containerMap is a ConcurrentSkipListMap
|
// No need for locking since containerMap is a ConcurrentSkipListMap
|
||||||
// And we can never get the exact state since close might happen
|
// And we can never get the exact state since close might happen
|
||||||
// after we iterate a point.
|
// after we iterate a point.
|
||||||
return containerMap.entrySet().stream()
|
return containerMap.entrySet().stream()
|
||||||
.filter(containerData ->
|
.filter(containerData ->
|
||||||
!containerData.getValue().isOpen())
|
containerData.getValue().isClosed())
|
||||||
.map(containerData -> containerData.getValue())
|
.map(containerData -> containerData.getValue())
|
||||||
.collect(Collectors.toList());
|
.collect(Collectors.toList());
|
||||||
}
|
}
|
||||||
|
@ -870,6 +907,7 @@ public ContainerReportsRequestProto getContainerReport() throws IOException {
|
||||||
.setType(ContainerReportsRequestProto.reportType.fullReport);
|
.setType(ContainerReportsRequestProto.reportType.fullReport);
|
||||||
|
|
||||||
for (ContainerData container: containers) {
|
for (ContainerData container: containers) {
|
||||||
|
long containerId = container.getContainerID();
|
||||||
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
|
StorageContainerDatanodeProtocolProtos.ContainerInfo.Builder ciBuilder =
|
||||||
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
|
StorageContainerDatanodeProtocolProtos.ContainerInfo.newBuilder();
|
||||||
ciBuilder.setContainerID(container.getContainerID())
|
ciBuilder.setContainerID(container.getContainerID())
|
||||||
|
@ -879,7 +917,8 @@ public ContainerReportsRequestProto getContainerReport() throws IOException {
|
||||||
.setReadCount(container.getReadCount())
|
.setReadCount(container.getReadCount())
|
||||||
.setWriteCount(container.getWriteCount())
|
.setWriteCount(container.getWriteCount())
|
||||||
.setReadBytes(container.getReadBytes())
|
.setReadBytes(container.getReadBytes())
|
||||||
.setWriteBytes(container.getWriteBytes());
|
.setWriteBytes(container.getWriteBytes())
|
||||||
|
.setState(getState(containerId));
|
||||||
|
|
||||||
crBuilder.addReports(ciBuilder.build());
|
crBuilder.addReports(ciBuilder.build());
|
||||||
}
|
}
|
||||||
|
|
|
@ -185,7 +185,7 @@ void closeContainer(long containerID)
|
||||||
* @return List of all closed containers.
|
* @return List of all closed containers.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
List<ContainerData> getContainerReports() throws IOException;
|
List<ContainerData> getClosedContainerReports() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Increase pending deletion blocks count number of specified container.
|
* Increase pending deletion blocks count number of specified container.
|
||||||
|
|
|
@ -63,13 +63,13 @@ public void handle(SCMCommand command, OzoneContainer container,
|
||||||
invocationCount++;
|
invocationCount++;
|
||||||
long startTime = Time.monotonicNow();
|
long startTime = Time.monotonicNow();
|
||||||
try {
|
try {
|
||||||
ContainerReportsRequestProto contianerReport =
|
ContainerReportsRequestProto containerReport =
|
||||||
container.getContainerReport();
|
container.getContainerReport();
|
||||||
|
|
||||||
// TODO : We send this report to all SCMs.Check if it is enough only to
|
// TODO : We send this report to all SCMs.Check if it is enough only to
|
||||||
// send to the leader once we have RAFT enabled SCMs.
|
// send to the leader once we have RAFT enabled SCMs.
|
||||||
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
|
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
|
||||||
endPoint.getEndPoint().sendContainerReport(contianerReport);
|
endPoint.getEndPoint().sendContainerReport(containerReport);
|
||||||
}
|
}
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
LOG.error("Unable to process the Container Report command.", ex);
|
LOG.error("Unable to process the Container Report command.", ex);
|
||||||
|
|
|
@ -265,8 +265,8 @@ public ContainerReportsRequestProto getContainerReport() throws IOException {
|
||||||
* @return - List of closed containers.
|
* @return - List of closed containers.
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
public List<ContainerData> getContainerReports() throws IOException {
|
public List<ContainerData> getClosedContainerReports() throws IOException {
|
||||||
return this.manager.getContainerReports();
|
return this.manager.getClosedContainerReports();
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
|
|
@ -307,7 +307,7 @@ public void testGetContainerReports() throws Exception{
|
||||||
}
|
}
|
||||||
|
|
||||||
// The container report only returns reports of closed containers.
|
// The container report only returns reports of closed containers.
|
||||||
List<ContainerData> reports = containerManager.getContainerReports();
|
List<ContainerData> reports = containerManager.getClosedContainerReports();
|
||||||
Assert.assertEquals(4, reports.size());
|
Assert.assertEquals(4, reports.size());
|
||||||
for(ContainerData report : reports) {
|
for(ContainerData report : reports) {
|
||||||
long actualContainerID = report.getContainerID();
|
long actualContainerID = report.getContainerID();
|
||||||
|
|
Loading…
Reference in New Issue