From 0976f6fc30ed8bb774d823f09c58cea54be05ae7 Mon Sep 17 00:00:00 2001 From: Mukul Kumar Singh Date: Sun, 14 Jul 2019 10:53:51 +0530 Subject: [PATCH] HDDS-1766. ContainerStateMachine is unable to increment lastAppliedTermIndex. Contributed by Mukul Kumar Singh. (#1072) --- .../server/ratis/ContainerStateMachine.java | 39 ++++++++++--------- 1 file changed, 20 insertions(+), 19 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index f3e4391046b..7e4d4817356 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -197,17 +197,16 @@ public class ContainerStateMachine extends BaseStateMachine { if (snapshot == null) { TermIndex empty = TermIndex.newTermIndex(0, RaftLog.INVALID_LOG_INDEX); - LOG.info( - "The snapshot info is null." + "Setting the last applied index to:" - + empty); + LOG.info("{}: The snapshot info is null. Setting the last applied index" + + "to:{}", gid, empty); setLastAppliedTermIndex(empty); - return RaftLog.INVALID_LOG_INDEX; + return empty.getIndex(); } final File snapshotFile = snapshot.getFile().getPath().toFile(); final TermIndex last = SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile); - LOG.info("Setting the last applied index to " + last); + LOG.info("{}: Setting the last applied index to {}", gid, last); setLastAppliedTermIndex(last); // initialize the dispatcher with snapshot so that it build the missing @@ -243,18 +242,20 @@ public class ContainerStateMachine extends BaseStateMachine { @Override public long takeSnapshot() throws IOException { TermIndex ti = getLastAppliedTermIndex(); - LOG.info("Taking snapshot at termIndex:" + ti); + long startTime = Time.monotonicNow(); if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) { final File snapshotFile = storage.getSnapshotFile(ti.getTerm(), ti.getIndex()); - LOG.info("Taking a snapshot to file {}", snapshotFile); + LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile); try (FileOutputStream fos = new FileOutputStream(snapshotFile)) { persistContainerSet(fos); } catch (IOException ioe) { - LOG.warn("Failed to write snapshot file \"" + snapshotFile - + "\", last applied index=" + ti); + LOG.info("{}: Failed to write snapshot at:{} file {}", gid, ti, + snapshotFile); throw ioe; } + LOG.info("{}: Finished taking a snapshot at:{} file:{} time:{}", + gid, ti, snapshotFile, (Time.monotonicNow() - startTime)); return ti.getIndex(); } return -1; @@ -337,7 +338,7 @@ public class ContainerStateMachine extends BaseStateMachine { private ContainerCommandResponseProto dispatchCommand( ContainerCommandRequestProto requestProto, DispatcherContext context) { - LOG.trace("dispatch {} containerID={} pipelineID={} traceID={}", + LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid, requestProto.getCmdType(), requestProto.getContainerID(), requestProto.getPipelineID(), requestProto.getTraceID()); if (isBlockTokenEnabled) { @@ -355,7 +356,7 @@ public class ContainerStateMachine extends BaseStateMachine { } ContainerCommandResponseProto response = dispatcher.dispatch(requestProto, context); - LOG.trace("response {}", response); + LOG.trace("{}: response {}", gid, response); return response; } @@ -395,18 +396,18 @@ public class ContainerStateMachine extends BaseStateMachine { .supplyAsync(() -> runCommand(requestProto, context), chunkExecutor); writeChunkFutureMap.put(entryIndex, writeChunkFuture); - LOG.debug("writeChunk writeStateMachineData : blockId " + write.getBlockID() - + " logIndex " + entryIndex + " chunkName " + write.getChunkData() - .getChunkName()); + LOG.debug(gid + ": writeChunk writeStateMachineData : blockId " + + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + + write.getChunkData().getChunkName()); // Remove the future once it finishes execution from the // writeChunkFutureMap. writeChunkFuture.thenApply(r -> { metrics.incNumBytesWrittenCount( requestProto.getWriteChunk().getChunkData().getLen()); writeChunkFutureMap.remove(entryIndex); - LOG.debug("writeChunk writeStateMachineData completed: blockId " + write - .getBlockID() + " logIndex " + entryIndex + " chunkName " + write - .getChunkData().getChunkName()); + LOG.debug(gid + ": writeChunk writeStateMachineData completed: blockId" + + write.getBlockID() + " logIndex " + entryIndex + " chunkName " + + write.getChunkData().getChunkName()); return r; }); return writeChunkFuture; @@ -564,12 +565,12 @@ public class ContainerStateMachine extends BaseStateMachine { } } catch (Exception e) { metrics.incNumReadStateMachineFails(); - LOG.error("unable to read stateMachineData:" + e); + LOG.error("{} unable to read stateMachineData:", gid, e); return completeExceptionally(e); } } - private void updateLastApplied() { + private synchronized void updateLastApplied() { Long appliedTerm = null; long appliedIndex = -1; for(long i = getLastAppliedTermIndex().getIndex() + 1;; i++) {