HDDS-2117. ContainerStateMachine#writeStateMachineData times out. (#1430)

This commit is contained in:
bshashikant 2019-09-17 16:49:25 +05:30 committed by Lokesh Jain
parent f3de141787
commit 7f9073132d
2 changed files with 29 additions and 11 deletions

View File

@ -236,9 +236,7 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
if (container2BCSIDMap != null) {
// adds this container to list of containers created in the pipeline
// with initial BCSID recorded as 0.
Preconditions
.checkArgument(!container2BCSIDMap.containsKey(containerID));
container2BCSIDMap.put(containerID, Long.valueOf(0));
container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
}
container = getContainer(containerID);
}
@ -290,6 +288,11 @@ public class HddsDispatcher implements ContainerDispatcher, Auditor {
// state here.
Result result = responseProto.getResult();
if (cmdType == ContainerProtos.Type.CreateContainer
&& result == Result.SUCCESS && dispatcherContext != null) {
Preconditions.checkNotNull(dispatcherContext.getContainer2BCSIDMap());
container2BCSIDMap.putIfAbsent(containerID, Long.valueOf(0));
}
if (!HddsUtils.isReadOnly(msg) && !canIgnoreException(result)) {
// If the container is open/closing and the container operation
// has failed, it should be first marked unhealthy and the initiate the

View File

@ -435,13 +435,20 @@ public class ContainerStateMachine extends BaseStateMachine {
.setStage(DispatcherContext.WriteChunkStage.WRITE_DATA)
.setContainer2BCSIDMap(container2BCSIDMap)
.build();
CompletableFuture<Message> raftFuture = new CompletableFuture<>();
// ensure the write chunk happens asynchronously in writeChunkExecutor pool
// thread.
CompletableFuture<ContainerCommandResponseProto> writeChunkFuture =
CompletableFuture.supplyAsync(() ->
runCommand(requestProto, context), chunkExecutor);
CompletableFuture<Message> raftFuture = new CompletableFuture<>();
CompletableFuture.supplyAsync(() -> {
try {
return runCommand(requestProto, context);
} catch (Exception e) {
LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId"
+ write.getBlockID() + " logIndex " + entryIndex + " chunkName "
+ write.getChunkData().getChunkName() + e);
raftFuture.completeExceptionally(e);
throw e;
}}, chunkExecutor);
writeChunkFutureMap.put(entryIndex, writeChunkFuture);
LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " +
@ -698,7 +705,7 @@ public class ContainerStateMachine extends BaseStateMachine {
.setStage(DispatcherContext.WriteChunkStage.COMMIT_DATA);
}
if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile
|| cmdType == Type.PutBlock) {
|| cmdType == Type.PutBlock || cmdType == Type.CreateContainer) {
builder.setContainer2BCSIDMap(container2BCSIDMap);
}
CompletableFuture<Message> applyTransactionFuture =
@ -706,9 +713,17 @@ public class ContainerStateMachine extends BaseStateMachine {
// Ensure the command gets executed in a separate thread than
// stateMachineUpdater thread which is calling applyTransaction here.
CompletableFuture<ContainerCommandResponseProto> future =
CompletableFuture.supplyAsync(
() -> runCommand(requestProto, builder.build()),
getCommandExecutor(requestProto));
CompletableFuture.supplyAsync(() -> {
try {
return runCommand(requestProto, builder.build());
} catch (Exception e) {
LOG.error("gid {} : ApplyTransaction failed. cmd {} logIndex "
+ "{} exception {}", gid, requestProto.getCmdType(),
index, e);
applyTransactionFuture.completeExceptionally(e);
throw e;
}
}, getCommandExecutor(requestProto));
future.thenApply(r -> {
if (trx.getServerRole() == RaftPeerRole.LEADER) {
long startTime = (long) trx.getStateMachineContext();