HDDS-302. Fix javadoc and add implementation details in ContainerStateMachine. Contributed by Shashikant Banerjee.
This commit is contained in:
parent
3108d27edd
commit
952dc2fd55
|
@ -19,6 +19,7 @@
|
||||||
package org.apache.hadoop.hdds.scm;
|
package org.apache.hadoop.hdds.scm;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hadoop.hdds.HddsUtils;
|
||||||
import org.apache.ratis.shaded.com.google.protobuf
|
import org.apache.ratis.shaded.com.google.protobuf
|
||||||
.InvalidProtocolBufferException;
|
.InvalidProtocolBufferException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -183,34 +184,9 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
||||||
return Objects.requireNonNull(client.get(), "client is null");
|
return Objects.requireNonNull(client.get(), "client is null");
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isReadOnly(ContainerCommandRequestProto proto) {
|
|
||||||
switch (proto.getCmdType()) {
|
|
||||||
case ReadContainer:
|
|
||||||
case ReadChunk:
|
|
||||||
case ListKey:
|
|
||||||
case GetKey:
|
|
||||||
case GetSmallFile:
|
|
||||||
case ListContainer:
|
|
||||||
case ListChunk:
|
|
||||||
return true;
|
|
||||||
case CloseContainer:
|
|
||||||
case WriteChunk:
|
|
||||||
case UpdateContainer:
|
|
||||||
case CompactChunk:
|
|
||||||
case CreateContainer:
|
|
||||||
case DeleteChunk:
|
|
||||||
case DeleteContainer:
|
|
||||||
case DeleteKey:
|
|
||||||
case PutKey:
|
|
||||||
case PutSmallFile:
|
|
||||||
default:
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private RaftClientReply sendRequest(ContainerCommandRequestProto request)
|
private RaftClientReply sendRequest(ContainerCommandRequestProto request)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
boolean isReadOnlyRequest = isReadOnly(request);
|
boolean isReadOnlyRequest = HddsUtils.isReadOnly(request);
|
||||||
ByteString byteString = request.toByteString();
|
ByteString byteString = request.toByteString();
|
||||||
LOG.debug("sendCommand {} {}", isReadOnlyRequest, request);
|
LOG.debug("sendCommand {} {}", isReadOnlyRequest, request);
|
||||||
final RaftClientReply reply = isReadOnlyRequest ?
|
final RaftClientReply reply = isReadOnlyRequest ?
|
||||||
|
@ -222,7 +198,7 @@ public final class XceiverClientRatis extends XceiverClientSpi {
|
||||||
|
|
||||||
private CompletableFuture<RaftClientReply> sendRequestAsync(
|
private CompletableFuture<RaftClientReply> sendRequestAsync(
|
||||||
ContainerCommandRequestProto request) throws IOException {
|
ContainerCommandRequestProto request) throws IOException {
|
||||||
boolean isReadOnlyRequest = isReadOnly(request);
|
boolean isReadOnlyRequest = HddsUtils.isReadOnly(request);
|
||||||
ByteString byteString = request.toByteString();
|
ByteString byteString = request.toByteString();
|
||||||
LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request);
|
LOG.debug("sendCommandAsync {} {}", isReadOnlyRequest, request);
|
||||||
return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) :
|
return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) :
|
||||||
|
|
|
@ -24,6 +24,7 @@ import com.google.common.net.HostAndPort;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
|
||||||
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
|
||||||
import org.apache.hadoop.net.DNS;
|
import org.apache.hadoop.net.DNS;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
|
@ -315,4 +316,36 @@ public final class HddsUtils {
|
||||||
return name;
|
return name;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Checks if the container command is read only or not.
|
||||||
|
* @param proto ContainerCommand Request proto
|
||||||
|
* @return True if its readOnly , false otherwise.
|
||||||
|
*/
|
||||||
|
public static boolean isReadOnly(
|
||||||
|
ContainerProtos.ContainerCommandRequestProto proto) {
|
||||||
|
switch (proto.getCmdType()) {
|
||||||
|
case ReadContainer:
|
||||||
|
case ReadChunk:
|
||||||
|
case ListKey:
|
||||||
|
case GetKey:
|
||||||
|
case GetSmallFile:
|
||||||
|
case ListContainer:
|
||||||
|
case ListChunk:
|
||||||
|
case GetCommittedBlockLength:
|
||||||
|
return true;
|
||||||
|
case CloseContainer:
|
||||||
|
case WriteChunk:
|
||||||
|
case UpdateContainer:
|
||||||
|
case CompactChunk:
|
||||||
|
case CreateContainer:
|
||||||
|
case DeleteChunk:
|
||||||
|
case DeleteContainer:
|
||||||
|
case DeleteKey:
|
||||||
|
case PutKey:
|
||||||
|
case PutSmallFile:
|
||||||
|
default:
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -57,7 +57,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
* requests.
|
* requests.
|
||||||
*
|
*
|
||||||
* Read only requests are classified in
|
* Read only requests are classified in
|
||||||
* {@link org.apache.hadoop.hdds.scm.XceiverClientRatis#isReadOnly}
|
* {@link org.apache.hadoop.hdds.HddsUtils#isReadOnly}
|
||||||
* and these readonly requests are replied from the {@link #query(Message)}.
|
* and these readonly requests are replied from the {@link #query(Message)}.
|
||||||
*
|
*
|
||||||
* The write requests can be divided into requests with user data
|
* The write requests can be divided into requests with user data
|
||||||
|
@ -84,6 +84,11 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
* 2) Write chunk commit operation is executed after write chunk state machine
|
* 2) Write chunk commit operation is executed after write chunk state machine
|
||||||
* operation. This will ensure that commit operation is sync'd with the state
|
* operation. This will ensure that commit operation is sync'd with the state
|
||||||
* machine operation.
|
* machine operation.
|
||||||
|
*
|
||||||
|
* Synchronization between {@link #writeStateMachineData} and
|
||||||
|
* {@link #applyTransaction} need to be enforced in the StateMachine
|
||||||
|
* implementation. For example, synchronization between writeChunk and
|
||||||
|
* createContainer in {@link ContainerStateMachine}.
|
||||||
* */
|
* */
|
||||||
public class ContainerStateMachine extends BaseStateMachine {
|
public class ContainerStateMachine extends BaseStateMachine {
|
||||||
static final Logger LOG = LoggerFactory.getLogger(
|
static final Logger LOG = LoggerFactory.getLogger(
|
||||||
|
@ -213,6 +218,10 @@ public class ContainerStateMachine extends BaseStateMachine {
|
||||||
return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
|
return CompletableFuture.completedFuture(() -> ByteString.EMPTY);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* writeStateMachineData calls are not synchronized with each other
|
||||||
|
* and also with applyTransaction.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
|
public CompletableFuture<Message> writeStateMachineData(LogEntryProto entry) {
|
||||||
try {
|
try {
|
||||||
|
@ -244,6 +253,9 @@ public class ContainerStateMachine extends BaseStateMachine {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* ApplyTransaction calls in Ratis are sequential.
|
||||||
|
*/
|
||||||
@Override
|
@Override
|
||||||
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
|
public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
|
||||||
try {
|
try {
|
||||||
|
|
Loading…
Reference in New Issue