diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java index 9d5a7781b03..47c249200d0 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java @@ -16,6 +16,7 @@ */ package org.apache.hadoop.ozone.container.common.statemachine; +import com.google.common.base.Preconditions; import com.google.protobuf.GeneratedMessage; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -43,6 +44,7 @@ import org.slf4j.LoggerFactory; import java.util.LinkedList; import java.util.List; import java.util.Queue; +import java.util.ArrayList; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; @@ -50,7 +52,6 @@ import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -import java.util.stream.Collectors; import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; @@ -206,9 +207,18 @@ public class StateContext { * @return List */ public List getReports(int maxLimit) { + List reportList = new ArrayList<>(); synchronized (reports) { - return reports.parallelStream().limit(maxLimit) - .collect(Collectors.toList()); + if (!reports.isEmpty()) { + int size = reports.size(); + int limit = size > maxLimit ? maxLimit : size; + for (int count = 0; count < limit; count++) { + GeneratedMessage report = reports.poll(); + Preconditions.checkNotNull(report); + reportList.add(report); + } + } + return reportList; } } @@ -254,9 +264,20 @@ public class StateContext { * @return List */ public List getPendingContainerAction(int maxLimit) { + List containerActionList = new ArrayList<>(); synchronized (containerActions) { - return containerActions.parallelStream().limit(maxLimit) - .collect(Collectors.toList()); + if (!containerActions.isEmpty()) { + int size = containerActions.size(); + int limit = size > maxLimit ? maxLimit : size; + for (int count = 0; count < limit; count++) { + // we need to remove the action from the containerAction queue + // as well + ContainerAction action = containerActions.poll(); + Preconditions.checkNotNull(action); + containerActionList.add(action); + } + } + return containerActionList; } } @@ -295,9 +316,16 @@ public class StateContext { * @return List */ public List getPendingPipelineAction(int maxLimit) { + List pipelineActionList = new ArrayList<>(); synchronized (pipelineActions) { - return pipelineActions.parallelStream().limit(maxLimit) - .collect(Collectors.toList()); + if (!pipelineActions.isEmpty()) { + int size = pipelineActions.size(); + int limit = size > maxLimit ? maxLimit : size; + for (int count = 0; count < limit; count++) { + pipelineActionList.add(pipelineActions.poll()); + } + } + return pipelineActionList; } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java index 030a35788fb..d4e13ee8443 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java @@ -28,6 +28,7 @@ import org.apache.hadoop.ozone.container.common.statemachine.StateContext; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.SCMCommand; import org.apache.hadoop.util.Time; +import org.apache.ratis.protocol.NotLeaderException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,9 +90,21 @@ public class CloseContainerCommandHandler implements CommandHandler { // submit the close container request for the XceiverServer to handle container.submitContainerRequest( request.build(), replicationType, pipelineID); - cmdExecuted = true; } catch (Exception e) { - LOG.error("Can't close container " + containerID, e); + if (e instanceof NotLeaderException) { + // If the particular datanode is not the Ratis leader, the close + // container command will not be executed by the follower but will be + // executed by Ratis stateMachine transactions via leader to follower. + // There can also be case where the datanode is in candidate state. + // In these situations, NotLeaderException is thrown. Remove the status + // from cmdStatus Map here so that it will be retried only by SCM if the + // leader could not not close the container after a certain time. + context.removeCommandStatus(containerID); + LOG.info(e.getLocalizedMessage()); + } else { + LOG.error("Can't close container " + containerID, e); + cmdExecuted = false; + } } finally { updateCommandStatus(context, command, cmdExecuted, LOG); long endTime = Time.monotonicNow(); diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java index 83e742c171d..c51da98fa83 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/XceiverServerGrpc.java @@ -21,12 +21,15 @@ package org.apache.hadoop.ozone.container.common.transport.server; import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos .ContainerCommandRequestProto; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReport; import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; +import org.apache.hadoop.hdds.scm.container.common.helpers. + StorageContainerException; import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.container.common.interfaces.ContainerDispatcher; @@ -128,8 +131,13 @@ public final class XceiverServerGrpc implements XceiverServerSpi { @Override public void submitRequest(ContainerCommandRequestProto request, - HddsProtos.PipelineID pipelineID) { - storageContainer.dispatch(request); + HddsProtos.PipelineID pipelineID) throws IOException { + ContainerProtos.ContainerCommandResponseProto response = + storageContainer.dispatch(request); + if (response.getResult() != ContainerProtos.Result.SUCCESS) { + throw new StorageContainerException(response.getMessage(), + response.getResult()); + } } @Override diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java index d88995b3d9e..c2ef504c988 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/XceiverServerRatis.java @@ -74,7 +74,6 @@ import java.util.List; import java.util.Objects; import java.util.UUID; import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -319,39 +318,35 @@ public final class XceiverServerRatis implements XceiverServerSpi { return server; } - private void processReply(RaftClientReply reply) { - + private void processReply(RaftClientReply reply) throws IOException { // NotLeader exception is thrown only when the raft server to which the // request is submitted is not the leader. The request will be rejected - // and will eventually be executed once the request comnes via the leader + // and will eventually be executed once the request comes via the leader // node. NotLeaderException notLeaderException = reply.getNotLeaderException(); if (notLeaderException != null) { - LOG.info(reply.getNotLeaderException().getLocalizedMessage()); + throw notLeaderException; } StateMachineException stateMachineException = reply.getStateMachineException(); if (stateMachineException != null) { - // In case the request could not be completed, StateMachine Exception - // will be thrown. For now, Just log the message. - // If the container could not be closed, SCM will come to know - // via containerReports. CloseContainer should be re tried via SCM. - LOG.error(stateMachineException.getLocalizedMessage()); + throw stateMachineException; } } @Override - public void submitRequest( - ContainerCommandRequestProto request, HddsProtos.PipelineID pipelineID) - throws IOException { - // ReplicationLevel.MAJORITY ensures the transactions corresponding to - // the request here are applied on all the raft servers. + public void submitRequest(ContainerCommandRequestProto request, + HddsProtos.PipelineID pipelineID) throws IOException { + RaftClientReply reply; RaftClientRequest raftClientRequest = createRaftClientRequest(request, pipelineID, RaftClientRequest.writeRequestType(replicationLevel)); - CompletableFuture reply = - server.submitClientRequestAsync(raftClientRequest); - reply.thenAccept(this::processReply); + try { + reply = server.submitClientRequestAsync(raftClientRequest).get(); + } catch (Exception e) { + throw new IOException(e.getMessage(), e); + } + processReply(reply); } private RaftClientRequest createRaftClientRequest( diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java index ebacf756fcf..da587722656 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/OzoneContainer.java @@ -23,7 +23,8 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; @@ -46,12 +47,14 @@ import org.slf4j.LoggerFactory; import java.io.*; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; +import java.util.Map; import static org.apache.hadoop.ozone.OzoneConsts.INVALID_PORT; /** - * Ozone main class sets up the network server and initializes the container + * Ozone main class sets up the network servers and initializes the container * layer. */ public class OzoneContainer { @@ -64,7 +67,7 @@ public class OzoneContainer { private final OzoneConfiguration config; private final VolumeSet volumeSet; private final ContainerSet containerSet; - private final XceiverServerSpi[] server; + private final Map servers; /** * Construct OzoneContainer object. @@ -82,14 +85,13 @@ public class OzoneContainer { buildContainerSet(); hddsDispatcher = new HddsDispatcher(config, containerSet, volumeSet, context); - server = new XceiverServerSpi[]{ - new XceiverServerGrpc(datanodeDetails, this.config, this - .hddsDispatcher, createReplicationService()), - XceiverServerRatis.newXceiverServerRatis(datanodeDetails, this - .config, hddsDispatcher, context) - }; - - + servers = new HashMap<>(); + servers.put(ReplicationType.STAND_ALONE, + new XceiverServerGrpc(datanodeDetails, config, hddsDispatcher, + createReplicationService())); + servers.put(ReplicationType.RATIS, XceiverServerRatis + .newXceiverServerRatis(datanodeDetails, config, hddsDispatcher, + context)); } private GrpcReplicationService createReplicationService() { @@ -133,7 +135,7 @@ public class OzoneContainer { */ public void start() throws IOException { LOG.info("Attempting to start container services."); - for (XceiverServerSpi serverinstance : server) { + for (XceiverServerSpi serverinstance : servers.values()) { serverinstance.start(); } hddsDispatcher.init(); @@ -145,7 +147,7 @@ public class OzoneContainer { public void stop() { //TODO: at end of container IO integration work. LOG.info("Attempting to stop container services."); - for(XceiverServerSpi serverinstance: server) { + for(XceiverServerSpi serverinstance: servers.values()) { serverinstance.stop(); } hddsDispatcher.shutdown(); @@ -169,7 +171,7 @@ public class OzoneContainer { public PipelineReportsProto getPipelineReport() { PipelineReportsProto.Builder pipelineReportsProto = PipelineReportsProto.newBuilder(); - for (XceiverServerSpi serverInstance : server) { + for (XceiverServerSpi serverInstance : servers.values()) { pipelineReportsProto .addAllPipelineReport(serverInstance.getPipelineReport()); } @@ -181,82 +183,38 @@ public class OzoneContainer { * @param request * @param replicationType * @param pipelineID - * @throws IOException */ public void submitContainerRequest( ContainerProtos.ContainerCommandRequestProto request, - HddsProtos.ReplicationType replicationType, - HddsProtos.PipelineID pipelineID) throws IOException { - XceiverServerSpi serverInstance; - long containerId = getContainerIdForCmd(request); - if (replicationType == HddsProtos.ReplicationType.RATIS) { - serverInstance = getRatisSerer(); - Preconditions.checkNotNull(serverInstance); - serverInstance.submitRequest(request, pipelineID); - LOG.info("submitting {} request over RATIS server for container {}", - request.getCmdType(), containerId); - } else { - serverInstance = getStandaAloneSerer(); - Preconditions.checkNotNull(serverInstance); - getStandaAloneSerer().submitRequest(request, pipelineID); - LOG.info( - "submitting {} request over STAND_ALONE server for container {}", - request.getCmdType(), containerId); + ReplicationType replicationType, + PipelineID pipelineID) throws IOException { + if (containerSet.getContainer(request.getContainerID()) + .getContainerData().isClosed()) { + LOG.debug("Container {} is already closed", request.getContainerID()); + // It might happen that the where the first attempt of closing the + // container failed with NOT_LEADER_EXCEPTION. In such cases, SCM will + // retry to check the container got really closed via Ratis. + // In such cases of the retry attempt, if the container is already closed + // via Ratis, we should just return. } - + LOG.info("submitting {} request over {} server for container {}", + request.getCmdType(), replicationType, request.getContainerID()); + Preconditions.checkState(servers.containsKey(replicationType)); + servers.get(replicationType).submitRequest(request, pipelineID); } - private long getContainerIdForCmd( - ContainerProtos.ContainerCommandRequestProto request) - throws IllegalArgumentException { - ContainerProtos.Type type = request.getCmdType(); - switch (type) { - case CloseContainer: - return request.getContainerID(); - // Right now, we handle only closeContainer via queuing it over the - // over the XceiVerServer. For all other commands we throw Illegal - // argument exception here. Will need to extend the switch cases - // in case we want add another commands here. - default: - throw new IllegalArgumentException("Cmd " + request.getCmdType() - + " not supported over HearBeat Response"); - } - } - - private XceiverServerSpi getRatisSerer() { - for (XceiverServerSpi serverInstance : server) { - if (serverInstance instanceof XceiverServerRatis) { - return serverInstance; - } - } - return null; - } - - private XceiverServerSpi getStandaAloneSerer() { - for (XceiverServerSpi serverInstance : server) { - if (!(serverInstance instanceof XceiverServerRatis)) { - return serverInstance; - } - } - return null; - } - - private int getPortbyType(HddsProtos.ReplicationType replicationType) { - for (XceiverServerSpi serverinstance : server) { - if (serverinstance.getServerType() == replicationType) { - return serverinstance.getIPCPort(); - } - } - return INVALID_PORT; + private int getPortByType(ReplicationType replicationType) { + return servers.containsKey(replicationType) ? + servers.get(replicationType).getIPCPort() : INVALID_PORT; } /** - * Returns the container server IPC port. + * Returns the container servers IPC port. * - * @return Container server IPC port. + * @return Container servers IPC port. */ public int getContainerServerPort() { - return getPortbyType(HddsProtos.ReplicationType.STAND_ALONE); + return getPortByType(ReplicationType.STAND_ALONE); } /** @@ -265,7 +223,7 @@ public class OzoneContainer { * @return Ratis port. */ public int getRatisContainerServerPort() { - return getPortbyType(HddsProtos.ReplicationType.RATIS); + return getPortByType(ReplicationType.RATIS); } /** diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java index aaa5f112a3d..c2c20a4742f 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java @@ -30,25 +30,13 @@ import org.apache.hadoop.hdds.scm.container.common.helpers.PipelineID; public class CloseContainerCommand extends SCMCommand { - private long containerID; private HddsProtos.ReplicationType replicationType; private PipelineID pipelineID; public CloseContainerCommand(long containerID, HddsProtos.ReplicationType replicationType, PipelineID pipelineID) { - super(); - this.containerID = containerID; - this.replicationType = replicationType; - this.pipelineID = pipelineID; - } - - // Should be called only for protobuf conversion - private CloseContainerCommand(long containerID, - HddsProtos.ReplicationType replicationType, - PipelineID pipelineID, long id) { - super(id); - this.containerID = containerID; + super(containerID); this.replicationType = replicationType; this.pipelineID = pipelineID; } @@ -75,7 +63,7 @@ public class CloseContainerCommand public CloseContainerCommandProto getProto() { return CloseContainerCommandProto.newBuilder() - .setContainerID(containerID) + .setContainerID(getId()) .setCmdId(getId()) .setReplicationType(replicationType) .setPipelineID(pipelineID.getProtobuf()) @@ -85,13 +73,12 @@ public class CloseContainerCommand public static CloseContainerCommand getFromProtobuf( CloseContainerCommandProto closeContainerProto) { Preconditions.checkNotNull(closeContainerProto); - return new CloseContainerCommand(closeContainerProto.getContainerID(), + return new CloseContainerCommand(closeContainerProto.getCmdId(), closeContainerProto.getReplicationType(), - PipelineID.getFromProtobuf(closeContainerProto.getPipelineID()), - closeContainerProto.getCmdId()); + PipelineID.getFromProtobuf(closeContainerProto.getPipelineID())); } public long getContainerID() { - return containerID; + return getId(); } } diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java index b94ce4fcb25..7baecc4b1f8 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java @@ -17,10 +17,10 @@ package org.apache.hadoop.hdds.scm.container; import java.io.IOException; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; +import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline; import org.apache.hadoop.hdds.server.events.EventHandler; import org.apache.hadoop.hdds.server.events.EventPublisher; import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload; @@ -57,7 +57,7 @@ public class CloseContainerEventHandler implements EventHandler { LOG.info("Close container Event triggered for container : {}", containerID.getId()); - ContainerWithPipeline containerWithPipeline = null; + ContainerWithPipeline containerWithPipeline; ContainerInfo info; try { containerWithPipeline = @@ -74,42 +74,66 @@ public class CloseContainerEventHandler implements EventHandler { return; } - if (info.getState() == HddsProtos.LifeCycleState.OPEN) { - for (DatanodeDetails datanode : - containerWithPipeline.getPipeline().getMachines()) { - CommandForDatanode closeContainerCommand = new CommandForDatanode<>( - datanode.getUuid(), - new CloseContainerCommand(containerID.getId(), - info.getReplicationType(), info.getPipelineID())); - publisher.fireEvent(DATANODE_COMMAND, closeContainerCommand); - publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ, new - CloseContainerRetryableReq(containerID)); - } - try { - // Finalize event will make sure the state of the container transitions - // from OPEN to CLOSING in containerStateManager. - containerManager.updateContainerState(containerID.getId(), - HddsProtos.LifeCycleEvent.FINALIZE); - } catch (IOException ex) { - LOG.error("Failed to update the container state to FINALIZE for" - + "container : {}" + containerID, ex); - } - } else if (info.getState() == HddsProtos.LifeCycleState.ALLOCATED) { - try { - // Create event will make sure the state of the container transitions - // from OPEN to CREATING in containerStateManager, this will move - // the container out of active allocation path. + HddsProtos.LifeCycleState state = info.getState(); + try { + switch (state) { + case ALLOCATED: + // We cannot close a container in ALLOCATED state, moving the + // container to CREATING state, this should eventually + // timeout and the container will be moved to DELETING state. + LOG.debug("Closing container {} in {} state", containerID, state); containerManager.updateContainerState(containerID.getId(), HddsProtos.LifeCycleEvent.CREATE); - } catch (IOException ex) { - LOG.error("Failed to update the container state to CREATE for" - + "container:{}" + containerID, ex); + break; + case CREATING: + // We cannot close a container in CREATING state, it will eventually + // timeout and moved to DELETING state. + LOG.debug("Closing container {} in {} state", containerID, state); + break; + case OPEN: + containerManager.updateContainerState(containerID.getId(), + HddsProtos.LifeCycleEvent.FINALIZE); + fireCloseContainerEvents(containerWithPipeline, info, publisher); + break; + case CLOSING: + fireCloseContainerEvents(containerWithPipeline, info, publisher); + break; + case CLOSED: + case DELETING: + case DELETED: + LOG.info( + "container with id : {} is in {} state and need not be closed.", + containerID.getId(), info.getState()); + break; + default: + throw new IOException( + "Invalid container state for container " + containerID); } - } else { - LOG.info("container with id : {} is in {} state and need not be closed.", - containerID.getId(), info.getState()); + } catch (IOException ex) { + LOG.error("Failed to update the container state for" + "container : {}" + + containerID, ex); } + } + private void fireCloseContainerEvents( + ContainerWithPipeline containerWithPipeline, ContainerInfo info, + EventPublisher publisher) { + ContainerID containerID = info.containerID(); + // fire events. + CloseContainerCommand closeContainerCommand = + new CloseContainerCommand(containerID.getId(), + info.getReplicationType(), info.getPipelineID()); + + Pipeline pipeline = containerWithPipeline.getPipeline(); + pipeline.getMachines().stream().map( + datanode -> new CommandForDatanode<>(datanode.getUuid(), + closeContainerCommand)).forEach((command) -> { + publisher.fireEvent(DATANODE_COMMAND, command); + }); + publisher.fireEvent(CLOSE_CONTAINER_RETRYABLE_REQ, + new CloseContainerRetryableReq(containerID)); + LOG.trace("Issuing {} on Pipeline {} for container", closeContainerCommand, + pipeline, containerID); } /** diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java index ed9c54dad3f..8c52847fbbc 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java @@ -132,7 +132,6 @@ public class TestCloseContainerByPipeline { // Make sure the closeContainerCommandHandler is Invoked Assert.assertTrue( closeContainerHandler.getInvocationCount() > lastInvocationCount); - } @Test @@ -190,6 +189,7 @@ public class TestCloseContainerByPipeline { Assert.assertFalse((logCapturer.getOutput().contains( "submitting CloseContainer request over RATIS server for container " + containerID))); + logCapturer.stopCapturing(); } @Test @@ -239,13 +239,14 @@ public class TestCloseContainerByPipeline { Assert.assertTrue(isContainerClosed(cluster, containerID, datanodeDetails)); } + // Make sure it was really closed via Ratis not STAND_ALONE server Assert.assertFalse(logCapturer.getOutput().contains( "submitting CloseContainer request over STAND_ALONE " + "server for container " + containerID)); - // Make sure it was really closed via StandAlone not Ratis server Assert.assertTrue((logCapturer.getOutput().contains( "submitting CloseContainer request over RATIS server for container " + containerID))); + logCapturer.stopCapturing(); } private Boolean isContainerClosed(MiniOzoneCluster cluster, long containerID,