HDDS-544. Unconditional wait findbug warning from ReplicationSupervisor.

Contributed by Arpit Agarwal.
This commit is contained in:
Anu Engineer 2018-10-21 23:18:38 -07:00
parent c9077a9f5a
commit 7db3bb3ac1
3 changed files with 74 additions and 97 deletions

View File

@ -308,7 +308,6 @@ public class DatanodeStateMachine implements Closeable {
public void startDaemon() {
Runnable startStateMachineTask = () -> {
try {
supervisor.start();
start();
LOG.info("Ozone container server started.");
} catch (Exception ex) {

View File

@ -17,11 +17,13 @@
*/
package org.apache.hadoop.ozone.container.replication;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentHashMap.KeySetView;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.replication.ReplicationTask.Status;
@ -37,106 +39,91 @@ public class ReplicationSupervisor {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationSupervisor.class);
private final Set<Worker> threadPool = new HashSet<>();
private final Map<Long, ReplicationTask> queue = new TreeMap();
private final ContainerSet containerSet;
private final ContainerReplicator replicator;
private final ThreadPoolExecutor executor;
private final int poolSize;
/**
* A set of container IDs that are currently being downloaded
* or queued for download. Tracked so we don't schedule > 1
* concurrent download for the same container.
*/
private final KeySetView<Object, Boolean> containersInFlight;
public ReplicationSupervisor(
ContainerSet containerSet,
ContainerReplicator replicator, int poolSize) {
this.containerSet = containerSet;
this.replicator = replicator;
this.poolSize = poolSize;
this.containersInFlight = ConcurrentHashMap.newKeySet();
this.executor = new ThreadPoolExecutor(
0, poolSize, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<>(),
new ThreadFactoryBuilder().setDaemon(true)
.setNameFormat("ContainerReplicationThread-%d")
.build());
}
public synchronized void addTask(ReplicationTask task) {
queue.putIfAbsent(task.getContainerId(), task);
synchronized (threadPool) {
threadPool.notify();
/**
* Queue an asynchronous download of the given container.
*/
public void addTask(ReplicationTask task) {
if (containersInFlight.add(task.getContainerId())) {
executor.submit(new TaskRunner(task));
}
}
public void start() {
for (int i = 0; i < poolSize; i++) {
Worker worker = new Worker();
Thread thread = new Thread(worker, "ContainerReplication-" + i);
thread.setDaemon(true);
thread.start();
threadPool.add(worker);
}
}
public synchronized ReplicationTask selectTask() {
for (ReplicationTask task : queue.values()) {
if (task.getStatus() == Status.QUEUED) {
if (containerSet.getContainer(task.getContainerId()) == null) {
task.setStatus(Status.DOWNLOADING);
return task;
} else {
LOG.debug("Container {} has already been downloaded.",
task.getContainerId());
queue.remove(task.getContainerId());
}
} else if (task.getStatus() == Status.FAILED) {
LOG.error(
"Container {} can't be downloaded from any of the datanodes.",
task.getContainerId());
queue.remove(task.getContainerId());
} else if (task.getStatus() == Status.DONE) {
queue.remove(task.getContainerId());
LOG.info("Container {} is replicated.", task.getContainerId());
}
}
//no available task.
return null;
}
public void stop() {
for (Worker worker : threadPool) {
worker.stop();
try {
executor.shutdown();
if (!executor.awaitTermination(3, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException ie) {
// Ignore, we don't really care about the failure.
Thread.currentThread().interrupt();
}
}
/**
* Get the number of containers currently being downloaded
* or scheduled for download.
* @return Count of in-flight replications.
*/
@VisibleForTesting
public int getQueueSize() {
return queue.size();
public int getInFlightReplications() {
return containersInFlight.size();
}
private class Worker implements Runnable {
private final class TaskRunner implements Runnable {
private final ReplicationTask task;
private boolean running = true;
private TaskRunner(ReplicationTask task) {
this.task = task;
}
@Override
public void run() {
try {
while (running) {
ReplicationTask task = selectTask();
if (task == null) {
synchronized (threadPool) {
threadPool.wait();
}
} else {
replicator.replicate(task);
}
if (containerSet.getContainer(task.getContainerId()) != null) {
LOG.debug("Container {} has already been downloaded.",
task.getContainerId());
return;
}
} catch (Exception ex) {
LOG.error("Error on doing replication", ex);
try {
Thread.sleep(200);
} catch (InterruptedException e) {
LOG.error("Error on waiting after failed replication task", e);
}
}
}
public void stop() {
running = false;
task.setStatus(Status.DOWNLOADING);
replicator.replicate(task);
if (task.getStatus() == Status.FAILED) {
LOG.error(
"Container {} can't be downloaded from any of the datanodes.",
task.getContainerId());
} else if (task.getStatus() == Status.DONE) {
LOG.info("Container {} is replicated.", task.getContainerId());
}
} finally {
containersInFlight.remove(task.getContainerId());
}
}
}
}

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.ozone.container.common.impl.ContainerSet;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainer;
import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.mockito.Mockito;
@ -41,7 +42,7 @@ public class TestReplicationSupervisor {
private OzoneConfiguration conf = new OzoneConfiguration();
@Test
public void normal() {
public void normal() throws Exception {
//GIVEN
ContainerSet set = new ContainerSet();
@ -54,7 +55,6 @@ public class TestReplicationSupervisor {
.collect(Collectors.toList());
try {
supervisor.start();
//WHEN
supervisor.addTask(new ReplicationTask(1L, datanodes));
supervisor.addTask(new ReplicationTask(1L, datanodes));
@ -62,16 +62,11 @@ public class TestReplicationSupervisor {
supervisor.addTask(new ReplicationTask(2L, datanodes));
supervisor.addTask(new ReplicationTask(2L, datanodes));
supervisor.addTask(new ReplicationTask(3L, datanodes));
try {
Thread.sleep(300);
} catch (InterruptedException e) {
e.printStackTrace();
}
//THEN
System.out.println(replicator.replicated.get(0));
LambdaTestUtils.await(200_000, 1000,
() -> supervisor.getInFlightReplications() == 0);
Assert
.assertEquals(3, replicator.replicated.size());
Assert.assertEquals(3, replicator.replicated.size());
} finally {
supervisor.stop();
@ -79,7 +74,7 @@ public class TestReplicationSupervisor {
}
@Test
public void duplicateMessageAfterAWhile() throws InterruptedException {
public void duplicateMessageAfterAWhile() throws Exception {
//GIVEN
ContainerSet set = new ContainerSet();
@ -92,22 +87,18 @@ public class TestReplicationSupervisor {
.collect(Collectors.toList());
try {
supervisor.start();
//WHEN
supervisor.addTask(new ReplicationTask(1L, datanodes));
Thread.sleep(400);
LambdaTestUtils.await(200_000, 1000,
() -> supervisor.getInFlightReplications() == 0);
supervisor.addTask(new ReplicationTask(1L, datanodes));
Thread.sleep(300);
LambdaTestUtils.await(200_000, 1000,
() -> supervisor.getInFlightReplications() == 0);
//THEN
System.out.println(replicator.replicated.get(0));
Assert
.assertEquals(1, replicator.replicated.size());
//the last item is still in the queue as we cleanup the queue during the
// selection
Assert.assertEquals(1, supervisor.getQueueSize());
Assert.assertEquals(1, replicator.replicated.size());
} finally {
supervisor.stop();