diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java index 0119d23fe69..c9eb7024eaf 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java @@ -481,4 +481,9 @@ public class DatanodeStateMachine implements Closeable { public CommandDispatcher getCommandDispatcher() { return commandDispatcher; } + + @VisibleForTesting + public ReplicationSupervisor getSupervisor() { + return supervisor; + } } diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java index 3a2034580ad..4eb16c166c5 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/volume/HddsVolume.java @@ -18,9 +18,10 @@ package org.apache.hadoop.ozone.container.common.volume; +import javax.annotation.Nullable; + import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; -import com.sun.istack.Nullable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.GetSpaceUsed; import org.apache.hadoop.fs.StorageType; diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java index c59d643c47b..7a07c4df71e 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java @@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap.KeySetView; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.ozone.container.common.impl.ContainerSet; @@ -42,6 +43,7 @@ public class ReplicationSupervisor { private final ContainerSet containerSet; private final ContainerReplicator replicator; private final ThreadPoolExecutor executor; + private final AtomicLong replicationCounter; /** * A set of container IDs that are currently being downloaded @@ -56,6 +58,7 @@ public class ReplicationSupervisor { this.containerSet = containerSet; this.replicator = replicator; this.containersInFlight = ConcurrentHashMap.newKeySet(); + replicationCounter = new AtomicLong(); this.executor = new ThreadPoolExecutor( 0, poolSize, 60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(), @@ -123,7 +126,12 @@ public class ReplicationSupervisor { } } finally { containersInFlight.remove(task.getContainerId()); + replicationCounter.incrementAndGet(); } } } + + public long getReplicationCounter() { + return replicationCounter.get(); + } } diff --git a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java index 84d6fd5a9f0..ab78705559e 100644 --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java @@ -41,11 +41,13 @@ import org.apache.hadoop.ozone.HddsDatanodeService; import org.apache.hadoop.ozone.MiniOzoneCluster; import org.apache.hadoop.ozone.container.common.helpers.BlockData; import org.apache.hadoop.ozone.container.common.interfaces.Container; +import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler; import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer; import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.test.GenericTestUtils; import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer .writeChunkForContainer; @@ -123,10 +125,16 @@ public class TestContainerReplication { new ReplicateContainerCommand(containerId, sourcePipelines.getNodes())); - Thread.sleep(3000); + DatanodeStateMachine destinationDatanodeDatanodeStateMachine = + destinationDatanode.getDatanodeStateMachine(); + + //wait for the replication + GenericTestUtils.waitFor(() + -> destinationDatanodeDatanodeStateMachine.getSupervisor() + .getReplicationCounter() > 0, 1000, 20_000); OzoneContainer ozoneContainer = - destinationDatanode.getDatanodeStateMachine().getContainer(); + destinationDatanodeDatanodeStateMachine.getContainer();