HDDS-1208. ContainerStateMachine should set chunk data as state machine data for ratis. Contributed by Lokesh Jain.

This commit is contained in:
Lokesh Jain 2019-03-06 17:00:37 +05:30
parent c79f139519
commit 129fd5dd18
1 changed files with 23 additions and 50 deletions

View File

@ -78,7 +78,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
@ -277,7 +276,7 @@ public class ContainerStateMachine extends BaseStateMachine {
public TransactionContext startTransaction(RaftClientRequest request)
throws IOException {
final ContainerCommandRequestProto proto =
getRequestProto(request.getMessage().getContent());
getContainerCommandRequestProto(request.getMessage().getContent());
Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
try (Scope scope = TracingUtil
.importAndCreateScope(proto.getCmdType().name(), proto.getTraceID())) {
@ -294,17 +293,6 @@ public class ContainerStateMachine extends BaseStateMachine {
}
if (proto.getCmdType() == Type.WriteChunk) {
final WriteChunkRequestProto write = proto.getWriteChunk();
// create the state machine data proto
final WriteChunkRequestProto dataWriteChunkProto =
WriteChunkRequestProto
.newBuilder(write)
.build();
ContainerCommandRequestProto dataContainerCommandProto =
ContainerCommandRequestProto
.newBuilder(proto)
.setWriteChunk(dataWriteChunkProto)
.build();
// create the log entry proto
final WriteChunkRequestProto commitWriteChunkProto =
WriteChunkRequestProto.newBuilder()
@ -323,7 +311,7 @@ public class ContainerStateMachine extends BaseStateMachine {
.setClientRequest(request)
.setStateMachine(this)
.setServerRole(RaftPeerRole.LEADER)
.setStateMachineData(dataContainerCommandProto.toByteString())
.setStateMachineData(write.getData())
.setLogData(commitContainerCommandProto.toByteString())
.build();
} else {
@ -341,8 +329,8 @@ public class ContainerStateMachine extends BaseStateMachine {
return entryProto.getStateMachineEntry().getStateMachineData();
}
private ContainerCommandRequestProto getRequestProto(ByteString request)
throws InvalidProtocolBufferException {
private ContainerCommandRequestProto getContainerCommandRequestProto(
ByteString request) throws InvalidProtocolBufferException {
// TODO: We can avoid creating new builder and set pipeline Id if
// the client is already sending the pipeline id, then we just have to
// validate the pipeline Id.
@ -353,7 +341,9 @@ public class ContainerStateMachine extends BaseStateMachine {
private ContainerCommandResponseProto dispatchCommand(
ContainerCommandRequestProto requestProto, DispatcherContext context) {
LOG.trace("dispatch {}", requestProto);
LOG.trace("dispatch {} containerID={} pipelineID={} traceID={}",
requestProto.getCmdType(), requestProto.getContainerID(),
requestProto.getPipelineID(), requestProto.getTraceID());
if (isBlockTokenEnabled) {
try {
// ServerInterceptors intercepts incoming request and creates ugi.
@ -432,8 +422,15 @@ public class ContainerStateMachine extends BaseStateMachine {
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
try {
metrics.incNumWriteStateMachineOps();
final ContainerCommandRequestProto requestProto =
getRequestProto(getStateMachineData(entry.getStateMachineLogEntry()));
ContainerCommandRequestProto requestProto =
getContainerCommandRequestProto(
entry.getStateMachineLogEntry().getLogData());
WriteChunkRequestProto writeChunk =
WriteChunkRequestProto.newBuilder(requestProto.getWriteChunk())
.setData(getStateMachineData(entry.getStateMachineLogEntry()))
.build();
requestProto = ContainerCommandRequestProto.newBuilder(requestProto)
.setWriteChunk(writeChunk).build();
Type cmdType = requestProto.getCmdType();
// For only writeChunk, there will be writeStateMachineData call.
@ -457,7 +454,7 @@ public class ContainerStateMachine extends BaseStateMachine {
try {
metrics.incNumReadStateMachineOps();
final ContainerCommandRequestProto requestProto =
getRequestProto(request.getContent());
getContainerCommandRequestProto(request.getContent());
return CompletableFuture.completedFuture(runCommand(requestProto, null));
} catch (IOException e) {
metrics.incNumReadStateMachineFails();
@ -507,34 +504,8 @@ public class ContainerStateMachine extends BaseStateMachine {
*/
private ByteString getCachedStateMachineData(Long logIndex, long term,
ContainerCommandRequestProto requestProto) throws ExecutionException {
try {
return reconstructWriteChunkRequest(
stateMachineDataCache.get(logIndex, new Callable<ByteString>() {
@Override
public ByteString call() throws Exception {
return readStateMachineData(requestProto, term, logIndex);
}
}), requestProto);
} catch (ExecutionException e) {
throw e;
}
}
private ByteString reconstructWriteChunkRequest(ByteString data,
ContainerCommandRequestProto requestProto) {
WriteChunkRequestProto writeChunkRequestProto =
requestProto.getWriteChunk();
// reconstruct the write chunk request
final WriteChunkRequestProto.Builder dataWriteChunkProto =
WriteChunkRequestProto.newBuilder(writeChunkRequestProto)
// adding the state machine data
.setData(data);
ContainerCommandRequestProto.Builder newStateMachineProto =
ContainerCommandRequestProto.newBuilder(requestProto)
.setWriteChunk(dataWriteChunkProto);
return newStateMachineProto.build().toByteString();
return stateMachineDataCache.get(logIndex,
() -> readStateMachineData(requestProto, term, logIndex));
}
/**
@ -568,7 +539,8 @@ public class ContainerStateMachine extends BaseStateMachine {
}
try {
final ContainerCommandRequestProto requestProto =
getRequestProto(entry.getStateMachineLogEntry().getLogData());
getContainerCommandRequestProto(
entry.getStateMachineLogEntry().getLogData());
// readStateMachineData should only be called for "write" to Ratis.
Preconditions.checkArgument(!HddsUtils.isReadOnly(requestProto));
if (requestProto.getCmdType() == Type.WriteChunk) {
@ -632,7 +604,8 @@ public class ContainerStateMachine extends BaseStateMachine {
try {
metrics.incNumApplyTransactionsOps();
ContainerCommandRequestProto requestProto =
getRequestProto(trx.getStateMachineLogEntry().getLogData());
getContainerCommandRequestProto(
trx.getStateMachineLogEntry().getLogData());
Type cmdType = requestProto.getCmdType();
// Make sure that in write chunk, the user data is not set
if (cmdType == Type.WriteChunk) {