HDDS-1888. Add containers to node2container map in SCM as part of ICR processing.
Signed-off-by: Nanda kumar <nanda@apache.org>
This commit is contained in:
parent
00b5a27864
commit
397a5633af
|
@ -20,8 +20,11 @@ package org.apache.hadoop.hdds.scm.container;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
|
||||||
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos
|
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos
|
||||||
.ContainerReplicaProto;
|
.ContainerReplicaProto;
|
||||||
|
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||||
|
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
|
||||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
|
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
|
||||||
.IncrementalContainerReportFromDatanode;
|
.IncrementalContainerReportFromDatanode;
|
||||||
import org.apache.hadoop.hdds.server.events.EventHandler;
|
import org.apache.hadoop.hdds.server.events.EventHandler;
|
||||||
|
@ -39,9 +42,13 @@ public class IncrementalContainerReportHandler extends
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(
|
private static final Logger LOG = LoggerFactory.getLogger(
|
||||||
IncrementalContainerReportHandler.class);
|
IncrementalContainerReportHandler.class);
|
||||||
|
|
||||||
|
private final NodeManager nodeManager;
|
||||||
|
|
||||||
public IncrementalContainerReportHandler(
|
public IncrementalContainerReportHandler(
|
||||||
|
final NodeManager nodeManager,
|
||||||
final ContainerManager containerManager) {
|
final ContainerManager containerManager) {
|
||||||
super(containerManager, LOG);
|
super(containerManager, LOG);
|
||||||
|
this.nodeManager = nodeManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -53,9 +60,16 @@ public class IncrementalContainerReportHandler extends
|
||||||
for (ContainerReplicaProto replicaProto :
|
for (ContainerReplicaProto replicaProto :
|
||||||
report.getReport().getReportList()) {
|
report.getReport().getReportList()) {
|
||||||
try {
|
try {
|
||||||
processContainerReplica(report.getDatanodeDetails(), replicaProto);
|
final DatanodeDetails dd = report.getDatanodeDetails();
|
||||||
|
final ContainerID id = ContainerID.valueof(
|
||||||
|
replicaProto.getContainerID());
|
||||||
|
nodeManager.addContainer(dd, id);
|
||||||
|
processContainerReplica(dd, replicaProto);
|
||||||
} catch (ContainerNotFoundException e) {
|
} catch (ContainerNotFoundException e) {
|
||||||
LOG.warn("Container {} not found!", replicaProto.getContainerID());
|
LOG.warn("Container {} not found!", replicaProto.getContainerID());
|
||||||
|
} catch (NodeNotFoundException ex) {
|
||||||
|
LOG.error("Received ICR from unknown datanode {} {}",
|
||||||
|
report.getDatanodeDetails(), ex);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Exception while processing ICR for container {}",
|
LOG.error("Exception while processing ICR for container {}",
|
||||||
replicaProto.getContainerID());
|
replicaProto.getContainerID());
|
||||||
|
|
|
@ -129,6 +129,17 @@ public interface NodeManager extends StorageContainerNodeProtocol,
|
||||||
*/
|
*/
|
||||||
void removePipeline(Pipeline pipeline);
|
void removePipeline(Pipeline pipeline);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the given container to the specified datanode.
|
||||||
|
*
|
||||||
|
* @param datanodeDetails - DatanodeDetails
|
||||||
|
* @param containerId - containerID
|
||||||
|
* @throws NodeNotFoundException - if datanode is not known. For new datanode
|
||||||
|
* use addDatanodeInContainerMap call.
|
||||||
|
*/
|
||||||
|
void addContainer(DatanodeDetails datanodeDetails,
|
||||||
|
ContainerID containerId) throws NodeNotFoundException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remaps datanode to containers mapping to the new set of containers.
|
* Remaps datanode to containers mapping to the new set of containers.
|
||||||
* @param datanodeDetails - DatanodeDetails
|
* @param datanodeDetails - DatanodeDetails
|
||||||
|
|
|
@ -456,6 +456,21 @@ public class NodeStateManager implements Runnable, Closeable {
|
||||||
public void removePipeline(Pipeline pipeline) {
|
public void removePipeline(Pipeline pipeline) {
|
||||||
node2PipelineMap.removePipeline(pipeline);
|
node2PipelineMap.removePipeline(pipeline);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the given container to the specified datanode.
|
||||||
|
*
|
||||||
|
* @param uuid - datanode uuid
|
||||||
|
* @param containerId - containerID
|
||||||
|
* @throws NodeNotFoundException - if datanode is not known. For new datanode
|
||||||
|
* use addDatanodeInContainerMap call.
|
||||||
|
*/
|
||||||
|
public void addContainer(final UUID uuid,
|
||||||
|
final ContainerID containerId)
|
||||||
|
throws NodeNotFoundException {
|
||||||
|
nodeStateMap.addContainer(uuid, containerId);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update set of containers available on a datanode.
|
* Update set of containers available on a datanode.
|
||||||
* @param uuid - DatanodeID
|
* @param uuid - DatanodeID
|
||||||
|
|
|
@ -500,6 +500,13 @@ public class SCMNodeManager implements NodeManager {
|
||||||
nodeStateManager.removePipeline(pipeline);
|
nodeStateManager.removePipeline(pipeline);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addContainer(final DatanodeDetails datanodeDetails,
|
||||||
|
final ContainerID containerId)
|
||||||
|
throws NodeNotFoundException {
|
||||||
|
nodeStateManager.addContainer(datanodeDetails.getUuid(), containerId);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update set of containers available on a datanode.
|
* Update set of containers available on a datanode.
|
||||||
* @param datanodeDetails - DatanodeID
|
* @param datanodeDetails - DatanodeID
|
||||||
|
|
|
@ -108,6 +108,7 @@ public class NodeStateMap {
|
||||||
NodeState newState)throws NodeNotFoundException {
|
NodeState newState)throws NodeNotFoundException {
|
||||||
lock.writeLock().lock();
|
lock.writeLock().lock();
|
||||||
try {
|
try {
|
||||||
|
checkIfNodeExist(nodeId);
|
||||||
if (stateMap.get(currentState).remove(nodeId)) {
|
if (stateMap.get(currentState).remove(nodeId)) {
|
||||||
stateMap.get(newState).add(nodeId);
|
stateMap.get(newState).add(nodeId);
|
||||||
} else {
|
} else {
|
||||||
|
@ -131,10 +132,8 @@ public class NodeStateMap {
|
||||||
public DatanodeInfo getNodeInfo(UUID uuid) throws NodeNotFoundException {
|
public DatanodeInfo getNodeInfo(UUID uuid) throws NodeNotFoundException {
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
if (nodeMap.containsKey(uuid)) {
|
checkIfNodeExist(uuid);
|
||||||
return nodeMap.get(uuid);
|
return nodeMap.get(uuid);
|
||||||
}
|
|
||||||
throw new NodeNotFoundException("Node UUID: " + uuid);
|
|
||||||
} finally {
|
} finally {
|
||||||
lock.readLock().unlock();
|
lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
|
@ -213,41 +212,70 @@ public class NodeStateMap {
|
||||||
public NodeState getNodeState(UUID uuid) throws NodeNotFoundException {
|
public NodeState getNodeState(UUID uuid) throws NodeNotFoundException {
|
||||||
lock.readLock().lock();
|
lock.readLock().lock();
|
||||||
try {
|
try {
|
||||||
|
checkIfNodeExist(uuid);
|
||||||
for (Map.Entry<NodeState, Set<UUID>> entry : stateMap.entrySet()) {
|
for (Map.Entry<NodeState, Set<UUID>> entry : stateMap.entrySet()) {
|
||||||
if (entry.getValue().contains(uuid)) {
|
if (entry.getValue().contains(uuid)) {
|
||||||
return entry.getKey();
|
return entry.getKey();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw new NodeNotFoundException("Node UUID: " + uuid);
|
throw new NodeNotFoundException("Node not found in node state map." +
|
||||||
|
" UUID: " + uuid);
|
||||||
} finally {
|
} finally {
|
||||||
lock.readLock().unlock();
|
lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds the given container to the specified datanode.
|
||||||
|
*
|
||||||
|
* @param uuid - datanode uuid
|
||||||
|
* @param containerId - containerID
|
||||||
|
* @throws NodeNotFoundException - if datanode is not known. For new datanode
|
||||||
|
* use addDatanodeInContainerMap call.
|
||||||
|
*/
|
||||||
|
public void addContainer(final UUID uuid,
|
||||||
|
final ContainerID containerId)
|
||||||
|
throws NodeNotFoundException {
|
||||||
|
lock.writeLock().lock();
|
||||||
|
try {
|
||||||
|
checkIfNodeExist(uuid);
|
||||||
|
nodeToContainer.get(uuid).add(containerId);
|
||||||
|
} finally {
|
||||||
|
lock.writeLock().unlock();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
public void setContainers(UUID uuid, Set<ContainerID> containers)
|
public void setContainers(UUID uuid, Set<ContainerID> containers)
|
||||||
throws NodeNotFoundException{
|
throws NodeNotFoundException{
|
||||||
if (!nodeToContainer.containsKey(uuid)) {
|
lock.writeLock().lock();
|
||||||
throw new NodeNotFoundException("Node UUID: " + uuid);
|
try {
|
||||||
|
checkIfNodeExist(uuid);
|
||||||
|
nodeToContainer.put(uuid, containers);
|
||||||
|
} finally {
|
||||||
|
lock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
nodeToContainer.put(uuid, containers);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public Set<ContainerID> getContainers(UUID uuid)
|
public Set<ContainerID> getContainers(UUID uuid)
|
||||||
throws NodeNotFoundException {
|
throws NodeNotFoundException {
|
||||||
Set<ContainerID> containers = nodeToContainer.get(uuid);
|
lock.readLock().lock();
|
||||||
if (containers == null) {
|
try {
|
||||||
throw new NodeNotFoundException("Node UUID: " + uuid);
|
checkIfNodeExist(uuid);
|
||||||
|
return Collections.unmodifiableSet(nodeToContainer.get(uuid));
|
||||||
|
} finally {
|
||||||
|
lock.readLock().unlock();
|
||||||
}
|
}
|
||||||
return Collections.unmodifiableSet(containers);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeContainer(UUID uuid, ContainerID containerID) throws
|
public void removeContainer(UUID uuid, ContainerID containerID) throws
|
||||||
NodeNotFoundException {
|
NodeNotFoundException {
|
||||||
Set<ContainerID> containers = nodeToContainer.get(uuid);
|
lock.writeLock().lock();
|
||||||
if (containers == null) {
|
try {
|
||||||
throw new NodeNotFoundException("Node UUID: " + uuid);
|
checkIfNodeExist(uuid);
|
||||||
|
nodeToContainer.get(uuid).remove(containerID);
|
||||||
|
} finally {
|
||||||
|
lock.writeLock().unlock();
|
||||||
}
|
}
|
||||||
containers.remove(containerID);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -269,4 +297,16 @@ public class NodeStateMap {
|
||||||
}
|
}
|
||||||
return builder.toString();
|
return builder.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Throws NodeNotFoundException if the Node for given id doesn't exist.
|
||||||
|
*
|
||||||
|
* @param uuid Node UUID
|
||||||
|
* @throws NodeNotFoundException If the node is missing.
|
||||||
|
*/
|
||||||
|
private void checkIfNodeExist(UUID uuid) throws NodeNotFoundException {
|
||||||
|
if (!nodeToContainer.containsKey(uuid)) {
|
||||||
|
throw new NodeNotFoundException("Node UUID: " + uuid);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -299,7 +299,8 @@ public final class StorageContainerManager extends ServiceRuntimeInfoImpl
|
||||||
new ContainerReportHandler(scmNodeManager, containerManager);
|
new ContainerReportHandler(scmNodeManager, containerManager);
|
||||||
|
|
||||||
IncrementalContainerReportHandler incrementalContainerReportHandler =
|
IncrementalContainerReportHandler incrementalContainerReportHandler =
|
||||||
new IncrementalContainerReportHandler(containerManager);
|
new IncrementalContainerReportHandler(
|
||||||
|
scmNodeManager, containerManager);
|
||||||
|
|
||||||
PipelineActionHandler pipelineActionHandler =
|
PipelineActionHandler pipelineActionHandler =
|
||||||
new PipelineActionHandler(pipelineManager, conf);
|
new PipelineActionHandler(pipelineManager, conf);
|
||||||
|
|
|
@ -267,6 +267,19 @@ public class MockNodeManager implements NodeManager {
|
||||||
node2PipelineMap.removePipeline(pipeline);
|
node2PipelineMap.removePipeline(pipeline);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addContainer(DatanodeDetails dd,
|
||||||
|
ContainerID containerId)
|
||||||
|
throws NodeNotFoundException {
|
||||||
|
try {
|
||||||
|
Set<ContainerID> set = node2ContainerMap.getContainers(dd.getUuid());
|
||||||
|
set.add(containerId);
|
||||||
|
node2ContainerMap.setContainersForDatanode(dd.getUuid(), set);
|
||||||
|
} catch (SCMException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void addDatanodeCommand(UUID dnId, SCMCommand command) {
|
public void addDatanodeCommand(UUID dnId, SCMCommand command) {
|
||||||
if(commandMap.containsKey(dnId)) {
|
if(commandMap.containsKey(dnId)) {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.hdds.protocol.proto
|
||||||
import org.apache.hadoop.hdds.protocol.proto
|
import org.apache.hadoop.hdds.protocol.proto
|
||||||
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
|
.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
|
||||||
import org.apache.hadoop.hdds.scm.TestUtils;
|
import org.apache.hadoop.hdds.scm.TestUtils;
|
||||||
|
import org.apache.hadoop.hdds.scm.node.NodeManager;
|
||||||
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
|
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
|
||||||
.IncrementalContainerReportFromDatanode;
|
.IncrementalContainerReportFromDatanode;
|
||||||
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
import org.apache.hadoop.hdds.server.events.EventPublisher;
|
||||||
|
@ -47,6 +48,7 @@ import static org.apache.hadoop.hdds.scm.TestUtils.getReplicas;
|
||||||
*/
|
*/
|
||||||
public class TestIncrementalContainerReportHandler {
|
public class TestIncrementalContainerReportHandler {
|
||||||
|
|
||||||
|
private NodeManager nodeManager;
|
||||||
private ContainerManager containerManager;
|
private ContainerManager containerManager;
|
||||||
private ContainerStateManager containerStateManager;
|
private ContainerStateManager containerStateManager;
|
||||||
private EventPublisher publisher;
|
private EventPublisher publisher;
|
||||||
|
@ -55,6 +57,7 @@ public class TestIncrementalContainerReportHandler {
|
||||||
public void setup() throws IOException {
|
public void setup() throws IOException {
|
||||||
final Configuration conf = new OzoneConfiguration();
|
final Configuration conf = new OzoneConfiguration();
|
||||||
this.containerManager = Mockito.mock(ContainerManager.class);
|
this.containerManager = Mockito.mock(ContainerManager.class);
|
||||||
|
this.nodeManager = Mockito.mock(NodeManager.class);
|
||||||
this.containerStateManager = new ContainerStateManager(conf);
|
this.containerStateManager = new ContainerStateManager(conf);
|
||||||
this.publisher = Mockito.mock(EventPublisher.class);
|
this.publisher = Mockito.mock(EventPublisher.class);
|
||||||
|
|
||||||
|
@ -88,7 +91,7 @@ public class TestIncrementalContainerReportHandler {
|
||||||
@Test
|
@Test
|
||||||
public void testClosingToClosed() throws IOException {
|
public void testClosingToClosed() throws IOException {
|
||||||
final IncrementalContainerReportHandler reportHandler =
|
final IncrementalContainerReportHandler reportHandler =
|
||||||
new IncrementalContainerReportHandler(containerManager);
|
new IncrementalContainerReportHandler(nodeManager, containerManager);
|
||||||
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
|
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
|
||||||
final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
|
final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
|
||||||
final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
|
final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
|
||||||
|
@ -122,7 +125,7 @@ public class TestIncrementalContainerReportHandler {
|
||||||
@Test
|
@Test
|
||||||
public void testClosingToQuasiClosed() throws IOException {
|
public void testClosingToQuasiClosed() throws IOException {
|
||||||
final IncrementalContainerReportHandler reportHandler =
|
final IncrementalContainerReportHandler reportHandler =
|
||||||
new IncrementalContainerReportHandler(containerManager);
|
new IncrementalContainerReportHandler(nodeManager, containerManager);
|
||||||
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
|
final ContainerInfo container = getContainer(LifeCycleState.CLOSING);
|
||||||
final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
|
final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
|
||||||
final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
|
final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
|
||||||
|
@ -157,7 +160,7 @@ public class TestIncrementalContainerReportHandler {
|
||||||
@Test
|
@Test
|
||||||
public void testQuasiClosedToClosed() throws IOException {
|
public void testQuasiClosedToClosed() throws IOException {
|
||||||
final IncrementalContainerReportHandler reportHandler =
|
final IncrementalContainerReportHandler reportHandler =
|
||||||
new IncrementalContainerReportHandler(containerManager);
|
new IncrementalContainerReportHandler(nodeManager, containerManager);
|
||||||
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
|
final ContainerInfo container = getContainer(LifeCycleState.QUASI_CLOSED);
|
||||||
final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
|
final DatanodeDetails datanodeOne = TestUtils.randomDatanodeDetails();
|
||||||
final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
|
final DatanodeDetails datanodeTwo = TestUtils.randomDatanodeDetails();
|
||||||
|
|
|
@ -182,6 +182,13 @@ public class ReplicationNodeManagerMock implements NodeManager {
|
||||||
throw new UnsupportedOperationException("Not yet implemented");
|
throw new UnsupportedOperationException("Not yet implemented");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void addContainer(DatanodeDetails datanodeDetails,
|
||||||
|
ContainerID containerId)
|
||||||
|
throws NodeNotFoundException {
|
||||||
|
throw new UnsupportedOperationException("Not yet implemented");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Update set of containers available on a datanode.
|
* Update set of containers available on a datanode.
|
||||||
* @param uuid - DatanodeID
|
* @param uuid - DatanodeID
|
||||||
|
|
Loading…
Reference in New Issue