HDDS-824. WriteStateMachineData timesout leading to Datanode crash. Contributed by Mukul Kumar Singh.

This commit is contained in:
Mukul Kumar Singh 2018-11-09 18:15:24 +05:30
parent a5b72cb1b8
commit feb43ecbb4
1 changed files with 11 additions and 29 deletions

View File

@ -116,8 +116,7 @@ public class ContainerStateMachine extends BaseStateMachine {
private final XceiverServerRatis ratisServer; private final XceiverServerRatis ratisServer;
private final ConcurrentHashMap<Long, CompletableFuture<Message>> private final ConcurrentHashMap<Long, CompletableFuture<Message>>
writeChunkFutureMap; writeChunkFutureMap;
private final ConcurrentHashMap<Long, CompletableFuture<Message>> private final ConcurrentHashMap<Long, Message> createContainerResponseMap;
createContainerFutureMap;
private ExecutorService[] executors; private ExecutorService[] executors;
private final int numExecutors; private final int numExecutors;
private final Map<Long, Long> containerCommandCompletionMap; private final Map<Long, Long> containerCommandCompletionMap;
@ -137,7 +136,7 @@ public class ContainerStateMachine extends BaseStateMachine {
this.numExecutors = executors.size(); this.numExecutors = executors.size();
this.executors = executors.toArray(new ExecutorService[numExecutors]); this.executors = executors.toArray(new ExecutorService[numExecutors]);
this.writeChunkFutureMap = new ConcurrentHashMap<>(); this.writeChunkFutureMap = new ConcurrentHashMap<>();
this.createContainerFutureMap = new ConcurrentHashMap<>(); this.createContainerResponseMap = new ConcurrentHashMap<>();
containerCommandCompletionMap = new ConcurrentHashMap<>(); containerCommandCompletionMap = new ConcurrentHashMap<>();
} }
@ -289,17 +288,8 @@ public class ContainerStateMachine extends BaseStateMachine {
private CompletableFuture<Message> handleWriteChunk( private CompletableFuture<Message> handleWriteChunk(
ContainerCommandRequestProto requestProto, long entryIndex) { ContainerCommandRequestProto requestProto, long entryIndex) {
final WriteChunkRequestProto write = requestProto.getWriteChunk(); final WriteChunkRequestProto write = requestProto.getWriteChunk();
long containerID = write.getBlockID().getContainerID(); CompletableFuture<Message> writeChunkFuture = CompletableFuture.supplyAsync(
CompletableFuture<Message> future = () -> runCommand(requestProto), chunkExecutor);
createContainerFutureMap.get(containerID);
CompletableFuture<Message> writeChunkFuture;
if (future != null) {
writeChunkFuture = future.thenApplyAsync(
v -> runCommand(requestProto), chunkExecutor);
} else {
writeChunkFuture = CompletableFuture.supplyAsync(
() -> runCommand(requestProto), chunkExecutor);
}
writeChunkFutureMap.put(entryIndex, writeChunkFuture); writeChunkFutureMap.put(entryIndex, writeChunkFuture);
LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID() LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID()
+ " logIndex " + entryIndex + " chunkName " + write.getChunkData() + " logIndex " + entryIndex + " chunkName " + write.getChunkData()
@ -319,9 +309,9 @@ public class ContainerStateMachine extends BaseStateMachine {
private CompletableFuture<Message> handleCreateContainer( private CompletableFuture<Message> handleCreateContainer(
ContainerCommandRequestProto requestProto) { ContainerCommandRequestProto requestProto) {
long containerID = requestProto.getContainerID(); long containerID = requestProto.getContainerID();
createContainerFutureMap. Message response = runCommand(requestProto);
computeIfAbsent(containerID, k -> new CompletableFuture<>()); createContainerResponseMap.put(containerID, response);
return CompletableFuture.completedFuture(() -> ByteString.EMPTY); return CompletableFuture.completedFuture(response);
} }
/* /*
@ -508,6 +498,10 @@ public class ContainerStateMachine extends BaseStateMachine {
future = CompletableFuture future = CompletableFuture
.supplyAsync(() -> runCommand(containerCommandRequestProto), .supplyAsync(() -> runCommand(containerCommandRequestProto),
getCommandExecutor(requestProto)); getCommandExecutor(requestProto));
} else if (cmdType == Type.CreateContainer) {
long containerID = requestProto.getContainerID();
return CompletableFuture.completedFuture(
createContainerResponseMap.get(containerID));
} else { } else {
// Make sure that in write chunk, the user data is not set // Make sure that in write chunk, the user data is not set
if (cmdType == Type.WriteChunk) { if (cmdType == Type.WriteChunk) {
@ -517,18 +511,6 @@ public class ContainerStateMachine extends BaseStateMachine {
future = CompletableFuture.supplyAsync(() -> runCommand(requestProto), future = CompletableFuture.supplyAsync(() -> runCommand(requestProto),
getCommandExecutor(requestProto)); getCommandExecutor(requestProto));
} }
// Mark the createContainerFuture complete so that writeStateMachineData
// for WriteChunk gets unblocked
if (cmdType == Type.CreateContainer) {
long containerID = requestProto.getContainerID();
future.thenApply(
r -> {
createContainerFutureMap.remove(containerID).complete(null);
LOG.info("create Container Transaction completed for container " +
containerID + " log index " + index);
return r;
});
}
future.thenAccept(m -> { future.thenAccept(m -> {
final Long previous = final Long previous =