HDDS-1341. TestContainerReplication#testContainerReplication fails intermittently. Contributed by Elek, Marton. (#862)
This commit is contained in:
parent
d78854b928
commit
79d14d0d42
|
@ -481,4 +481,9 @@ public class DatanodeStateMachine implements Closeable {
|
||||||
public CommandDispatcher getCommandDispatcher() {
|
public CommandDispatcher getCommandDispatcher() {
|
||||||
return commandDispatcher;
|
return commandDispatcher;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public ReplicationSupervisor getSupervisor() {
|
||||||
|
return supervisor;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,10 @@
|
||||||
|
|
||||||
package org.apache.hadoop.ozone.container.common.volume;
|
package org.apache.hadoop.ozone.container.common.volume;
|
||||||
|
|
||||||
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.sun.istack.Nullable;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.GetSpaceUsed;
|
import org.apache.hadoop.fs.GetSpaceUsed;
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.concurrent.ConcurrentHashMap.KeySetView;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
|
||||||
|
@ -42,6 +43,7 @@ public class ReplicationSupervisor {
|
||||||
private final ContainerSet containerSet;
|
private final ContainerSet containerSet;
|
||||||
private final ContainerReplicator replicator;
|
private final ContainerReplicator replicator;
|
||||||
private final ThreadPoolExecutor executor;
|
private final ThreadPoolExecutor executor;
|
||||||
|
private final AtomicLong replicationCounter;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A set of container IDs that are currently being downloaded
|
* A set of container IDs that are currently being downloaded
|
||||||
|
@ -56,6 +58,7 @@ public class ReplicationSupervisor {
|
||||||
this.containerSet = containerSet;
|
this.containerSet = containerSet;
|
||||||
this.replicator = replicator;
|
this.replicator = replicator;
|
||||||
this.containersInFlight = ConcurrentHashMap.newKeySet();
|
this.containersInFlight = ConcurrentHashMap.newKeySet();
|
||||||
|
replicationCounter = new AtomicLong();
|
||||||
this.executor = new ThreadPoolExecutor(
|
this.executor = new ThreadPoolExecutor(
|
||||||
0, poolSize, 60, TimeUnit.SECONDS,
|
0, poolSize, 60, TimeUnit.SECONDS,
|
||||||
new LinkedBlockingQueue<>(),
|
new LinkedBlockingQueue<>(),
|
||||||
|
@ -123,7 +126,12 @@ public class ReplicationSupervisor {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
containersInFlight.remove(task.getContainerId());
|
containersInFlight.remove(task.getContainerId());
|
||||||
|
replicationCounter.incrementAndGet();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getReplicationCounter() {
|
||||||
|
return replicationCounter.get();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -41,11 +41,13 @@ import org.apache.hadoop.ozone.HddsDatanodeService;
|
||||||
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
import org.apache.hadoop.ozone.MiniOzoneCluster;
|
||||||
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
import org.apache.hadoop.ozone.container.common.helpers.BlockData;
|
||||||
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
import org.apache.hadoop.ozone.container.common.interfaces.Container;
|
||||||
|
import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine;
|
||||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
|
||||||
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
|
import org.apache.hadoop.ozone.container.keyvalue.KeyValueHandler;
|
||||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||||
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
|
import org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer;
|
||||||
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
|
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
|
||||||
import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer
|
import static org.apache.hadoop.ozone.container.ozoneimpl.TestOzoneContainer
|
||||||
.writeChunkForContainer;
|
.writeChunkForContainer;
|
||||||
|
@ -123,10 +125,16 @@ public class TestContainerReplication {
|
||||||
new ReplicateContainerCommand(containerId,
|
new ReplicateContainerCommand(containerId,
|
||||||
sourcePipelines.getNodes()));
|
sourcePipelines.getNodes()));
|
||||||
|
|
||||||
Thread.sleep(3000);
|
DatanodeStateMachine destinationDatanodeDatanodeStateMachine =
|
||||||
|
destinationDatanode.getDatanodeStateMachine();
|
||||||
|
|
||||||
|
//wait for the replication
|
||||||
|
GenericTestUtils.waitFor(()
|
||||||
|
-> destinationDatanodeDatanodeStateMachine.getSupervisor()
|
||||||
|
.getReplicationCounter() > 0, 1000, 20_000);
|
||||||
|
|
||||||
OzoneContainer ozoneContainer =
|
OzoneContainer ozoneContainer =
|
||||||
destinationDatanode.getDatanodeStateMachine().getContainer();
|
destinationDatanodeDatanodeStateMachine.getContainer();
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue