From d37705c0f153a14ecdcc94b0727607c33a15e662 Mon Sep 17 00:00:00 2001 From: Shashikant Banerjee Date: Wed, 24 Oct 2018 16:04:57 +0530 Subject: [PATCH] HDDS-716. Update ozone to latest ratis snapshot build(0.3.0-aa38160-SNAPSHOT). Contributed by Mukul Kumar Singh. (cherry picked from commit 0891cdda7961f7d0d7debdb8e89b7816f39f7c7b) --- .../server/ratis/ContainerStateMachine.java | 72 ++++++++++++------- hadoop-project/pom.xml | 2 +- 2 files changed, 48 insertions(+), 26 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java index fa9fbf30e27..bcbf93f1587 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java @@ -22,6 +22,7 @@ import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.HddsUtils; import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos; import org.apache.hadoop.ozone.container.common.helpers.BlockData; +import org.apache.ratis.proto.RaftProtos.StateMachineEntryProto; import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.server.RaftServer; @@ -49,7 +50,7 @@ import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.thirdparty.com.google.protobuf.ByteString; import org.apache.ratis.proto.RaftProtos.RoleInfoProto; import org.apache.ratis.proto.RaftProtos.LogEntryProto; -import org.apache.ratis.proto.RaftProtos.SMLogEntryProto; +import org.apache.ratis.proto.RaftProtos.StateMachineLogEntryProto; import org.apache.ratis.statemachine.StateMachineStorage; import org.apache.ratis.statemachine.TransactionContext; import org.apache.ratis.statemachine.impl.BaseStateMachine; @@ -207,7 +208,7 @@ public class ContainerStateMachine extends BaseStateMachine { final ContainerCommandRequestProto proto = getRequestProto(request.getMessage().getContent()); - final SMLogEntryProto log; + final StateMachineLogEntryProto log; if (proto.getCmdType() == Type.WriteChunk) { final WriteChunkRequestProto write = proto.getWriteChunk(); // create the state machine data proto @@ -237,23 +238,39 @@ public class ContainerStateMachine extends BaseStateMachine { .setWriteChunk(commitWriteChunkProto) .build(); - log = SMLogEntryProto.newBuilder() - .setData(commitContainerCommandProto.toByteString()) - .setStateMachineData(dataContainerCommandProto.toByteString()) - .build(); + log = createSMLogEntryProto(request, + commitContainerCommandProto.toByteString(), + dataContainerCommandProto.toByteString()); } else if (proto.getCmdType() == Type.CreateContainer) { - log = SMLogEntryProto.newBuilder() - .setData(request.getMessage().getContent()) - .setStateMachineData(request.getMessage().getContent()) - .build(); + log = createSMLogEntryProto(request, + request.getMessage().getContent(), request.getMessage().getContent()); } else { - log = SMLogEntryProto.newBuilder() - .setData(request.getMessage().getContent()) - .build(); + log = createSMLogEntryProto(request, request.getMessage().getContent(), + null); } return new TransactionContextImpl(this, request, log); } + private StateMachineLogEntryProto createSMLogEntryProto(RaftClientRequest r, + ByteString logData, ByteString smData) { + StateMachineLogEntryProto.Builder builder = + StateMachineLogEntryProto.newBuilder(); + + builder.setCallId(r.getCallId()) + .setClientId(r.getClientId().toByteString()) + .setLogData(logData); + + if (smData != null) { + builder.setStateMachineEntry(StateMachineEntryProto.newBuilder() + .setStateMachineData(smData).build()); + } + return builder.build(); + } + + private ByteString getStateMachineData(StateMachineLogEntryProto entryProto) { + return entryProto.getStateMachineEntry().getStateMachineData(); + } + private ContainerCommandRequestProto getRequestProto(ByteString request) throws InvalidProtocolBufferException { return ContainerCommandRequestProto.parseFrom(request); @@ -315,7 +332,7 @@ public class ContainerStateMachine extends BaseStateMachine { try { metrics.incNumWriteStateMachineOps(); final ContainerCommandRequestProto requestProto = - getRequestProto(entry.getSmLogEntry().getStateMachineData()); + getRequestProto(getStateMachineData(entry.getStateMachineLogEntry())); Type cmdType = requestProto.getCmdType(); switch (cmdType) { case CreateContainer: @@ -345,8 +362,8 @@ public class ContainerStateMachine extends BaseStateMachine { } } - private ByteString readStateMachineData(LogEntryProto entry, - ContainerCommandRequestProto requestProto) { + private ByteString readStateMachineData(ContainerCommandRequestProto + requestProto) { WriteChunkRequestProto writeChunkRequestProto = requestProto.getWriteChunk(); // Assert that store log entry is for COMMIT_DATA, the WRITE_DATA is @@ -361,7 +378,8 @@ public class ContainerStateMachine extends BaseStateMachine { .setChunkData(writeChunkRequestProto.getChunkData()); ContainerCommandRequestProto dataContainerCommandProto = ContainerCommandRequestProto.newBuilder(requestProto) - .setCmdType(Type.ReadChunk).setReadChunk(readChunkRequestProto) + .setCmdType(Type.ReadChunk) + .setReadChunk(readChunkRequestProto) .build(); // read the chunk @@ -376,7 +394,8 @@ public class ContainerStateMachine extends BaseStateMachine { final WriteChunkRequestProto.Builder dataWriteChunkProto = WriteChunkRequestProto.newBuilder(writeChunkRequestProto) // adding the state machine data - .setData(responseProto.getData()).setStage(Stage.WRITE_DATA); + .setData(responseProto.getData()) + .setStage(Stage.WRITE_DATA); ContainerCommandRequestProto.Builder newStateMachineProto = ContainerCommandRequestProto.newBuilder(requestProto) @@ -410,21 +429,20 @@ public class ContainerStateMachine extends BaseStateMachine { @Override public CompletableFuture readStateMachineData( LogEntryProto entry) { - SMLogEntryProto smLogEntryProto = entry.getSmLogEntry(); - if (!smLogEntryProto.getStateMachineData().isEmpty()) { + StateMachineLogEntryProto smLogEntryProto = entry.getStateMachineLogEntry(); + if (!getStateMachineData(smLogEntryProto).isEmpty()) { return CompletableFuture.completedFuture(ByteString.EMPTY); } try { final ContainerCommandRequestProto requestProto = - getRequestProto(entry.getSmLogEntry().getData()); + getRequestProto(entry.getStateMachineLogEntry().getLogData()); // readStateMachineData should only be called for "write" to Ratis. Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto)); if (requestProto.getCmdType() == Type.WriteChunk) { return CompletableFuture.supplyAsync(() -> - readStateMachineData(entry, requestProto), - chunkExecutor); + readStateMachineData(requestProto), chunkExecutor); } else if (requestProto.getCmdType() == Type.CreateContainer) { return CompletableFuture.completedFuture(requestProto.toByteString()); } else { @@ -462,7 +480,7 @@ public class ContainerStateMachine extends BaseStateMachine { try { metrics.incNumApplyTransactionsOps(); ContainerCommandRequestProto requestProto = - getRequestProto(trx.getSMLogEntry().getData()); + getRequestProto(trx.getStateMachineLogEntry().getLogData()); Type cmdType = requestProto.getCmdType(); CompletableFuture future; if (cmdType == Type.PutBlock) { @@ -490,6 +508,11 @@ public class ContainerStateMachine extends BaseStateMachine { .supplyAsync(() -> runCommand(containerCommandRequestProto), getCommandExecutor(requestProto)); } else { + // Make sure that in write chunk, the user data is not set + if (cmdType == Type.WriteChunk) { + Preconditions.checkArgument(requestProto + .getWriteChunk().getData().isEmpty()); + } future = CompletableFuture.supplyAsync(() -> runCommand(requestProto), getCommandExecutor(requestProto)); } @@ -534,7 +557,6 @@ public class ContainerStateMachine extends BaseStateMachine { @Override public void close() throws IOException { - takeSnapshot(); for (int i = 0; i < numExecutors; i++) { executors[i].shutdown(); } diff --git a/hadoop-project/pom.xml b/hadoop-project/pom.xml index dae075f2789..22c3b35d604 100644 --- a/hadoop-project/pom.xml +++ b/hadoop-project/pom.xml @@ -101,7 +101,7 @@ 1.0.0-M33 - 0.3.0-9b2d7b6-SNAPSHOT + 0.3.0-aa38160-SNAPSHOT 1.0-alpha-1 3.3.1 2.4.12