ozone.acl.authorizer.class
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
index 92806485c44..b93529bdaae 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/OmUtils.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.ozone;
import java.io.File;
+import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collection;
import java.util.Optional;
@@ -28,6 +29,7 @@ import org.apache.hadoop.hdds.server.ServerUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,7 +46,7 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys.OZONE_OM_PORT_DEFAULT;
* communication.
*/
public final class OmUtils {
- private static final Logger LOG = LoggerFactory.getLogger(OmUtils.class);
+ public static final Logger LOG = LoggerFactory.getLogger(OmUtils.class);
private OmUtils() {
}
@@ -133,4 +135,46 @@ public final class OmUtils {
OMConfigKeys.OZONE_OM_DB_DIRS, HddsConfigKeys.OZONE_METADATA_DIRS);
return ServerUtils.getOzoneMetaDirPath(conf);
}
+
+ /**
+ * Checks if the OM request is read only or not.
+ * @param omRequest OMRequest proto
+ * @return True if its readOnly, false otherwise.
+ */
+ public static boolean isReadOnly(
+ OzoneManagerProtocolProtos.OMRequest omRequest) {
+ OzoneManagerProtocolProtos.Type cmdType = omRequest.getCmdType();
+ switch (cmdType) {
+ case CheckVolumeAccess:
+ case InfoVolume:
+ case ListVolume:
+ case InfoBucket:
+ case ListBuckets:
+ case LookupKey:
+ case ListKeys:
+ case InfoS3Bucket:
+ case ListS3Buckets:
+ case ServiceList:
+ return true;
+ case CreateVolume:
+ case SetVolumeProperty:
+ case DeleteVolume:
+ case CreateBucket:
+ case SetBucketProperty:
+ case DeleteBucket:
+ case CreateKey:
+ case RenameKey:
+ case DeleteKey:
+ case CommitKey:
+ case AllocateBlock:
+ case CreateS3Bucket:
+ case DeleteS3Bucket:
+ case InitiateMultiPartUpload:
+ case CommitMultiPartUpload:
+ return false;
+ default:
+ LOG.error("CmdType {} is not categorized as readOnly or not.", cmdType);
+ return false;
+ }
+ }
}
diff --git a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
index 82eca4e8380..3cc4434bf56 100644
--- a/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
+++ b/hadoop-ozone/common/src/main/java/org/apache/hadoop/ozone/om/OMConfigKeys.java
@@ -153,9 +153,26 @@ public final class OMConfigKeys {
= TimeDuration.valueOf(1, TimeUnit.SECONDS);
// OM Ratis client configurations
- public static final String OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_KEY
- = "ozone.om.ratis.client.request.timeout";
+ public static final String OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY
+ = "ozone.om.ratis.client.request.timeout.duration";
public static final TimeDuration
- OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DEFAULT
+ OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
= TimeDuration.valueOf(3000, TimeUnit.MILLISECONDS);
+ public static final String OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY
+ = "ozone.om.ratis.client.request.max.retries";
+ public static final int OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT
+ = 180;
+ public static final String OZONE_OM_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY
+ = "ozone.om.ratis.client.request.retry.interval";
+ public static final TimeDuration
+ OZONE_OM_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT
+ = TimeDuration.valueOf(100, TimeUnit.MILLISECONDS);
+
+ // OM Ratis Leader Election configurations
+ public static final String
+ OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY =
+ "ozone.om.leader.election.minimum.timeout.duration";
+ public static final TimeDuration
+ OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT =
+ TimeDuration.valueOf(1, TimeUnit.SECONDS);
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
index a98e38811f9..4fc0813e2f3 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/OzoneManager.java
@@ -68,9 +68,12 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
@@ -134,7 +137,9 @@ import static org.apache.hadoop.ozone.om.OMConfigKeys
.OZONE_OM_METRICS_SAVE_INTERVAL;
import static org.apache.hadoop.ozone.om.OMConfigKeys
.OZONE_OM_METRICS_SAVE_INTERVAL_DEFAULT;
-import static org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneManagerService.newReflectiveBlockingService;
+import static org.apache.hadoop.ozone.protocol.proto
+ .OzoneManagerProtocolProtos.OzoneManagerService
+ .newReflectiveBlockingService;
import static org.apache.hadoop.util.ExitUtil.terminate;
/**
@@ -156,7 +161,9 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
private final OzoneConfiguration configuration;
private RPC.Server omRpcServer;
private InetSocketAddress omRpcAddress;
+ private String omId;
private OzoneManagerRatisServer omRatisServer;
+ private OzoneManagerRatisClient omRatisClient;
private final OMMetadataManager metadataManager;
private final VolumeManager volumeManager;
private final BucketManager bucketManager;
@@ -185,6 +192,7 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
Preconditions.checkNotNull(conf);
configuration = conf;
omStorage = new OMStorage(conf);
+ omId = omStorage.getOmId();
scmBlockClient = getScmBlockClient(configuration);
scmContainerClient = getScmContainerClient(configuration);
if (omStorage.getState() != StorageState.INITIALIZED) {
@@ -584,8 +592,32 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
InetSocketAddress omNodeRpcAddr = getOmAddress(configuration);
int handlerCount = configuration.getInt(OZONE_OM_HANDLER_COUNT_KEY,
OZONE_OM_HANDLER_COUNT_DEFAULT);
+
+ // This is a temporary check. Once fully implemented, all OM state change
+ // should go through Ratis - either standalone (for non-HA) or replicated
+ // (for HA).
+ boolean omRatisEnabled = configuration.getBoolean(
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
+ if (omRatisEnabled) {
+ omRatisServer = OzoneManagerRatisServer.newOMRatisServer(
+ omId, omRpcAddress.getAddress(), configuration);
+ omRatisServer.start();
+
+ LOG.info("OzoneManager Ratis server started at port {}",
+ omRatisServer.getServerPort());
+
+ omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(
+ omId, omRatisServer.getRaftGroup(), configuration);
+ omRatisClient.connect();
+ } else {
+ omRatisServer = null;
+ omRatisClient = null;
+ }
+
BlockingService omService = newReflectiveBlockingService(
- new OzoneManagerProtocolServerSideTranslatorPB(this));
+ new OzoneManagerProtocolServerSideTranslatorPB(
+ this, omRatisClient, omRatisEnabled));
omRpcServer = startRpcServer(configuration, omNodeRpcAddr,
OzoneManagerProtocolPB.class, omService,
handlerCount);
@@ -596,23 +628,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
LOG.info(buildRpcServerStartMessage("OzoneManager RPC server",
omRpcAddress));
- boolean omRatisEnabled = configuration.getBoolean(
- OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
- OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
- // This is a temporary check. Once fully implemented, all OM state change
- // should go through Ratis, either standalone (for non-HA) or replicated
- // (for HA).
- if (omRatisEnabled) {
- omRatisServer = OzoneManagerRatisServer.newOMRatisServer(
- omStorage.getOmId(), configuration);
- omRatisServer.start();
-
- LOG.info("OzoneManager Ratis server started at port {}",
- omRatisServer.getServerPort());
- } else {
- omRatisServer = null;
- }
-
DefaultMetricsSystem.initialize("OzoneManager");
metadataManager.start(configuration);
@@ -687,6 +702,22 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
}
}
+ /**
+ * Validates that the incoming OM request has required parameters.
+ * TODO: Add more validation checks before writing the request to Ratis log.
+ * @param omRequest client request to OM
+ * @throws OMException thrown if required parameters are set to null.
+ */
+ public void validateRequest(OMRequest omRequest) throws OMException {
+ Type cmdType = omRequest.getCmdType();
+ if (cmdType == null) {
+ throw new OMException("CmdType is null", ResultCodes.INVALID_REQUEST);
+ }
+ if (omRequest.getClientId() == null) {
+ throw new OMException("ClientId is null", ResultCodes.INVALID_REQUEST);
+ }
+ }
+
/**
* Creates a volume.
*
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
index bdf2fb50bf5..6d93a787051 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/exceptions/OMException.java
@@ -118,6 +118,7 @@ public class OMException extends IOException {
S3_BUCKET_NOT_FOUND,
INITIATE_MULTIPART_UPLOAD_FAILED,
NO_SUCH_MULTIPART_UPLOAD,
- UPLOAD_PART_FAILED;
+ UPLOAD_PART_FAILED,
+ INVALID_REQUEST;
}
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
new file mode 100644
index 00000000000..73ee517b2c9
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OMRatisHelper.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.StorageUnit;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
+import org.apache.ratis.RaftConfigKeys;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.conf.RaftProperties;
+import org.apache.ratis.grpc.GrpcConfigKeys;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftPeerId;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.SizeInBytes;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Ratis helper methods for OM Ratis server and client.
+ */
+public class OMRatisHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(
+ OMRatisHelper.class);
+
+ private OMRatisHelper() {
+ }
+
+ /**
+ * Creates a new RaftClient object.
+ * @param rpcType Replication Type
+ * @param omId OM id of the client
+ * @param group RaftGroup
+ * @param retryPolicy Retry policy
+ * @return RaftClient object
+ */
+ static RaftClient newRaftClient(RpcType rpcType, String omId, RaftGroup
+ group, RetryPolicy retryPolicy, Configuration conf) {
+ LOG.trace("newRaftClient: {}, leader={}, group={}", rpcType, omId, group);
+ final RaftProperties properties = new RaftProperties();
+ RaftConfigKeys.Rpc.setType(properties, rpcType);
+
+ final int raftSegmentPreallocatedSize = (int) conf.getStorageSize(
+ OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_SEGMENT_PREALLOCATED_SIZE_DEFAULT,
+ StorageUnit.BYTES);
+ GrpcConfigKeys.setMessageSizeMax(
+ properties, SizeInBytes.valueOf(raftSegmentPreallocatedSize));
+
+ return RaftClient.newBuilder()
+ .setRaftGroup(group)
+ .setLeaderId(getRaftPeerId(omId))
+ .setProperties(properties)
+ .setRetryPolicy(retryPolicy)
+ .build();
+ }
+
+ static RaftPeerId getRaftPeerId(String omId) {
+ return RaftPeerId.valueOf(omId);
+ }
+
+ static ByteString convertRequestToByteString(OMRequest request) {
+ byte[] requestBytes = request.toByteArray();
+ return ByteString.copyFrom(requestBytes);
+ }
+
+ static OMRequest convertByteStringToOMRequest(ByteString byteString)
+ throws InvalidProtocolBufferException {
+ byte[] bytes = byteString.toByteArray();
+ return OMRequest.parseFrom(bytes);
+ }
+
+ static ByteString convertResponseToByteString(OMResponse response) {
+ byte[] requestBytes = response.toByteArray();
+ return ByteString.copyFrom(requestBytes);
+ }
+
+ static OMResponse convertByteStringToOMResponse(ByteString byteString)
+ throws InvalidProtocolBufferException {
+ byte[] bytes = byteString.toByteArray();
+ return OMResponse.parseFrom(bytes);
+ }
+
+ static OMResponse getErrorResponse(Type cmdType, Exception e) {
+ return OMResponse.newBuilder()
+ .setCmdType(cmdType)
+ .setSuccess(false)
+ .setMessage(e.getMessage())
+ .build();
+ }
+
+ static CompletableFuture completeExceptionally(Exception e) {
+ final CompletableFuture future = new CompletableFuture<>();
+ future.completeExceptionally(e);
+ return future;
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
new file mode 100644
index 00000000000..c18437c8bc7
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisClient.java
@@ -0,0 +1,194 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import com.google.protobuf.InvalidProtocolBufferException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMResponse;
+import org.apache.ratis.client.RaftClient;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftClientReply;
+import org.apache.ratis.protocol.RaftGroup;
+import org.apache.ratis.protocol.RaftRetryFailureException;
+import org.apache.ratis.retry.RetryPolicies;
+import org.apache.ratis.retry.RetryPolicy;
+import org.apache.ratis.rpc.RpcType;
+import org.apache.ratis.rpc.SupportedRpcType;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.apache.ratis.util.TimeDuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * OM Ratis client to interact with OM Ratis server endpoint.
+ */
+public final class OzoneManagerRatisClient implements Closeable {
+ static final Logger LOG = LoggerFactory.getLogger(
+ OzoneManagerRatisClient.class);
+
+ private final RaftGroup raftGroup;
+ private final String omID;
+ private final RpcType rpcType;
+ private final AtomicReference client = new AtomicReference<>();
+ private final RetryPolicy retryPolicy;
+ private final Configuration conf;
+
+ private OzoneManagerRatisClient(String omId, RaftGroup raftGroup,
+ RpcType rpcType, RetryPolicy retryPolicy,
+ Configuration config) {
+ this.raftGroup = raftGroup;
+ this.omID = omId;
+ this.rpcType = rpcType;
+ this.retryPolicy = retryPolicy;
+ this.conf = config;
+ }
+
+ public static OzoneManagerRatisClient newOzoneManagerRatisClient(
+ String omId, RaftGroup raftGroup, Configuration conf) {
+ final String rpcType = conf.get(
+ OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_RPC_TYPE_DEFAULT);
+
+ final int maxRetryCount = conf.getInt(
+ OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_MAX_RETRIES_DEFAULT);
+ final long retryInterval = conf.getTimeDuration(
+ OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_RETRY_INTERVAL_DEFAULT
+ .toIntExact(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
+ final TimeDuration sleepDuration = TimeDuration.valueOf(
+ retryInterval, TimeUnit.MILLISECONDS);
+ final RetryPolicy retryPolicy = RetryPolicies
+ .retryUpToMaximumCountWithFixedSleep(maxRetryCount, sleepDuration);
+
+ return new OzoneManagerRatisClient(omId, raftGroup,
+ SupportedRpcType.valueOfIgnoreCase(rpcType), retryPolicy, conf);
+ }
+
+ public void connect() {
+ LOG.debug("Connecting to OM Ratis Server GroupId:{} OM:{}",
+ raftGroup.getGroupId().getUuid().toString(), omID);
+
+ // TODO : XceiverClient ratis should pass the config value of
+ // maxOutstandingRequests so as to set the upper bound on max no of async
+ // requests to be handled by raft client
+
+ if (!client.compareAndSet(null, OMRatisHelper.newRaftClient(
+ rpcType, omID, raftGroup, retryPolicy, conf))) {
+ throw new IllegalStateException("Client is already connected.");
+ }
+ }
+
+ @Override
+ public void close() {
+ final RaftClient c = client.getAndSet(null);
+ if (c != null) {
+ closeRaftClient(c);
+ }
+ }
+
+ private void closeRaftClient(RaftClient raftClient) {
+ try {
+ raftClient.close();
+ } catch (IOException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ private RaftClient getClient() {
+ return Objects.requireNonNull(client.get(), "client is null");
+ }
+
+ /**
+ * Sends a given request to server and gets the reply back.
+ * @param request Request
+ * @return Response to the command
+ */
+ public OMResponse sendCommand(OMRequest request) {
+ try {
+ CompletableFuture reply = sendCommandAsync(request);
+ return reply.get();
+ } catch (ExecutionException | InterruptedException e) {
+ LOG.error("Failed to execute command: " + request, e);
+ return OMRatisHelper.getErrorResponse(request.getCmdType(), e);
+ }
+ }
+
+ /**
+ * Sends a given command to server gets a waitable future back.
+ *
+ * @param request Request
+ * @return Response to the command
+ */
+ private CompletableFuture sendCommandAsync(OMRequest request) {
+ CompletableFuture raftClientReply =
+ sendRequestAsync(request);
+
+ CompletableFuture omRatisResponse =
+ raftClientReply.whenComplete((reply, e) -> LOG.debug(
+ "received reply {} for request: cmdType={} traceID={} " +
+ "exception: {}", reply, request.getCmdType(),
+ request.getTraceID(), e))
+ .thenApply(reply -> {
+ try {
+ // we need to handle RaftRetryFailure Exception
+ RaftRetryFailureException raftRetryFailureException =
+ reply.getRetryFailureException();
+ if (raftRetryFailureException != null) {
+ throw new CompletionException(raftRetryFailureException);
+ }
+ OMResponse response = OMRatisHelper
+ .convertByteStringToOMResponse(reply.getMessage()
+ .getContent());
+ return response;
+ } catch (InvalidProtocolBufferException e) {
+ throw new CompletionException(e);
+ }
+ });
+ return omRatisResponse;
+ }
+
+ /**
+ * Submits {@link RaftClient#sendReadOnlyAsync(Message)} request to Ratis
+ * server if the request is readOnly. Otherwise, submits
+ * {@link RaftClient#sendAsync(Message)} request.
+ * @param request OMRequest
+ * @return RaftClient response
+ */
+ private CompletableFuture sendRequestAsync(
+ OMRequest request) {
+ boolean isReadOnlyRequest = OmUtils.isReadOnly(request);
+ ByteString byteString = OMRatisHelper.convertRequestToByteString(request);
+ LOG.debug("sendOMRequestAsync {} {}", isReadOnlyRequest, request);
+ return isReadOnlyRequest ? getClient().sendReadOnlyAsync(() -> byteString) :
+ getClient().sendAsync(() -> byteString);
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
index 6d9801b8c26..f28f2ce5295 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerRatisServer.java
@@ -22,12 +22,14 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import java.io.File;
import java.io.IOException;
+import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.net.SocketAddress;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit;
@@ -38,7 +40,9 @@ import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.netty.NettyConfigKeys;
+import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.protocol.RaftPeer;
import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.rpc.RpcType;
import org.apache.ratis.rpc.SupportedRpcType;
@@ -59,23 +63,43 @@ public final class OzoneManagerRatisServer {
.getLogger(OzoneManagerRatisServer.class);
private final int port;
+ private final InetSocketAddress omRatisAddress;
private final RaftServer server;
+ private final RaftGroupId raftGroupId;
+ private final RaftGroup raftGroup;
+ private final RaftPeerId raftPeerId;
- private OzoneManagerRatisServer(String omId, int port, Configuration conf)
- throws IOException {
+ private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
+
+ private static long nextCallId() {
+ return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
+ }
+
+ private OzoneManagerRatisServer(String omId, InetAddress addr, int port,
+ Configuration conf) throws IOException {
Objects.requireNonNull(omId, "omId == null");
this.port = port;
+ this.omRatisAddress = new InetSocketAddress(addr.getHostAddress(), port);
RaftProperties serverProperties = newRaftProperties(conf);
+ // TODO: When implementing replicated OM ratis servers, RaftGroupID
+ // should be the same across all the OMs. Add all the OM servers as Raft
+ // Peers.
+ this.raftGroupId = RaftGroupId.randomId();
+ this.raftPeerId = RaftPeerId.getRaftPeerId(omId);
+
+ RaftPeer raftPeer = new RaftPeer(raftPeerId, omRatisAddress);
+ this.raftGroup = RaftGroup.valueOf(raftGroupId, raftPeer);
this.server = RaftServer.newBuilder()
- .setServerId(RaftPeerId.valueOf(omId))
+ .setServerId(this.raftPeerId)
+ .setGroup(this.raftGroup)
.setProperties(serverProperties)
- .setStateMachineRegistry(this::getStateMachine)
+ .setStateMachine(getStateMachine(this.raftGroupId))
.build();
}
public static OzoneManagerRatisServer newOMRatisServer(String omId,
- Configuration ozoneConf) throws IOException {
+ InetAddress omAddress, Configuration ozoneConf) throws IOException {
int localPort = ozoneConf.getInt(
OMConfigKeys.OZONE_OM_RATIS_PORT_KEY,
OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT);
@@ -96,7 +120,11 @@ public final class OzoneManagerRatisServer {
+ "fallback to use default port {}", localPort, e);
}
}
- return new OzoneManagerRatisServer(omId, localPort, ozoneConf);
+ return new OzoneManagerRatisServer(omId, omAddress, localPort, ozoneConf);
+ }
+
+ public RaftGroup getRaftGroup() {
+ return this.raftGroup;
}
/**
@@ -104,7 +132,7 @@ public final class OzoneManagerRatisServer {
* TODO: Implement a state machine on OM.
*/
private BaseStateMachine getStateMachine(RaftGroupId gid) {
- return new BaseStateMachine();
+ return new OzoneManagerStateMachine(null);
}
public void start() throws IOException {
@@ -163,8 +191,8 @@ public final class OzoneManagerRatisServer {
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT,
OMConfigKeys.OZONE_OM_RATIS_LOG_APPENDER_QUEUE_BYTE_LIMIT_DEFAULT,
StorageUnit.BYTES);
- RaftServerConfigKeys.Log.Appender
- .setBufferElementLimit(properties, logAppenderQueueNumElements);
+ RaftServerConfigKeys.Log.Appender.setBufferElementLimit(properties,
+ logAppenderQueueNumElements);
RaftServerConfigKeys.Log.Appender.setBufferByteLimit(properties,
SizeInBytes.valueOf(logAppenderQueueByteLimit));
RaftServerConfigKeys.Log.setPreallocatedSize(properties,
@@ -197,8 +225,8 @@ public final class OzoneManagerRatisServer {
.getDuration(), retryCacheTimeoutUnit);
final TimeDuration retryCacheTimeout = TimeDuration.valueOf(
retryCacheTimeoutDuration, retryCacheTimeoutUnit);
- RaftServerConfigKeys.RetryCache
- .setExpiryTime(properties, retryCacheTimeout);
+ RaftServerConfigKeys.RetryCache.setExpiryTime(properties,
+ retryCacheTimeout);
// Set the server min and max timeout
TimeUnit serverMinTimeoutUnit =
@@ -222,11 +250,11 @@ public final class OzoneManagerRatisServer {
RaftServerConfigKeys.Log.setMaxCachedSegmentNum(properties, 2);
// Set the client request timeout
- TimeUnit clientRequestTimeoutUnit =
- OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DEFAULT.getUnit();
+ TimeUnit clientRequestTimeoutUnit = OMConfigKeys
+ .OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT .getUnit();
long clientRequestTimeoutDuration = conf.getTimeDuration(
- OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_KEY,
- OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DEFAULT
+ OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_KEY,
+ OMConfigKeys.OZONE_OM_RATIS_CLIENT_REQUEST_TIMEOUT_DURATION_DEFAULT
.getDuration(), clientRequestTimeoutUnit);
final TimeDuration clientRequestTimeout = TimeDuration.valueOf(
clientRequestTimeoutDuration, clientRequestTimeoutUnit);
@@ -243,10 +271,24 @@ public final class OzoneManagerRatisServer {
/**
* TODO: set following ratis leader election related configs when
* replicated ratis server is implemented.
- * 1. leader election timeout
- * 2. node failure timeout
- * 3.
+ * 1. node failure timeout
*/
+ // Set the ratis leader election timeout
+ TimeUnit leaderElectionMinTimeoutUnit =
+ OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
+ .getUnit();
+ long leaderElectionMinTimeoutduration = conf.getTimeDuration(
+ OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+ OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_DEFAULT
+ .getDuration(), leaderElectionMinTimeoutUnit);
+ final TimeDuration leaderElectionMinTimeout = TimeDuration.valueOf(
+ leaderElectionMinTimeoutduration, leaderElectionMinTimeoutUnit);
+ RaftServerConfigKeys.Rpc.setTimeoutMin(properties,
+ leaderElectionMinTimeout);
+ long leaderElectionMaxTimeout = leaderElectionMinTimeout.toLong(
+ TimeUnit.MILLISECONDS) + 200;
+ RaftServerConfigKeys.Rpc.setTimeoutMax(properties,
+ TimeDuration.valueOf(leaderElectionMaxTimeout, TimeUnit.MILLISECONDS));
/**
* TODO: when ratis snapshots are implemented, set snapshot threshold and
@@ -276,5 +318,4 @@ public final class OzoneManagerRatisServer {
}
return storageDir;
}
-
}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
new file mode 100644
index 00000000000..5ea0b495302
--- /dev/null
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/ratis/OzoneManagerStateMachine.java
@@ -0,0 +1,90 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.om.ratis;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import java.io.IOException;
+import java.util.concurrent.CompletableFuture;
+import org.apache.hadoop.ozone.container.common.transport.server.ratis
+ .ContainerStateMachine;
+import org.apache.hadoop.ozone.om.OzoneManager;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.*;
+import org.apache.ratis.protocol.Message;
+import org.apache.ratis.protocol.RaftGroupId;
+import org.apache.ratis.server.RaftServer;
+import org.apache.ratis.server.storage.RaftStorage;
+import org.apache.ratis.statemachine.TransactionContext;
+import org.apache.ratis.statemachine.impl.BaseStateMachine;
+import org.apache.ratis.statemachine.impl.SimpleStateMachineStorage;
+import org.apache.ratis.thirdparty.com.google.protobuf.ByteString;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The OM StateMachine is the state machine for OM Ratis server. It is
+ * responsible for applying ratis committed transactions to
+ * {@link OzoneManager}.
+ */
+public class OzoneManagerStateMachine extends BaseStateMachine {
+
+ static final Logger LOG =
+ LoggerFactory.getLogger(ContainerStateMachine.class);
+ private final SimpleStateMachineStorage storage =
+ new SimpleStateMachineStorage();
+ private final OzoneManager ozoneManager;
+
+ public OzoneManagerStateMachine(OzoneManager om) {
+ // OzoneManager is required when implementing StateMachine
+ this.ozoneManager = om;
+ }
+
+ /**
+ * Initializes the State Machine with the given server, group and storage.
+ * TODO: Load the latest snapshot from the file system.
+ */
+ @Override
+ public void initialize(
+ RaftServer server, RaftGroupId id, RaftStorage raftStorage)
+ throws IOException {
+ super.initialize(server, id, raftStorage);
+ storage.init(raftStorage);
+ }
+
+ /*
+ * Apply a committed log entry to the state machine. This function
+ * currently returns a dummy message.
+ * TODO: Apply transaction to OM state machine
+ */
+ @Override
+ public CompletableFuture applyTransaction(TransactionContext trx) {
+ String errorMessage;
+ ByteString logData = trx.getStateMachineLogEntry().getLogData();
+ try {
+ OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(logData);
+ LOG.debug("Received request: cmdType={} traceID={} ",
+ omRequest.getCmdType(), omRequest.getTraceID());
+ errorMessage = "Dummy response from Ratis server for command type: " +
+ omRequest.getCmdType();
+ } catch (InvalidProtocolBufferException e) {
+ errorMessage = e.getMessage();
+ }
+
+ // TODO: When State Machine is implemented, send the actual response back
+ return OMRatisHelper.completeExceptionally(new IOException(errorMessage));
+ }
+}
diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
index 5e33dbf799d..33453ac892f 100644
--- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
+++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/protocolPB/OzoneManagerProtocolServerSideTranslatorPB.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
+import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.AllocateBlockRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
@@ -160,6 +161,8 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
private static final Logger LOG = LoggerFactory
.getLogger(OzoneManagerProtocolServerSideTranslatorPB.class);
private final OzoneManagerProtocol impl;
+ private final OzoneManagerRatisClient omRatisClient;
+ private final boolean isRatisEnabled;
/**
* Constructs an instance of the server handler.
@@ -167,8 +170,11 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
* @param impl OzoneManagerProtocolPB
*/
public OzoneManagerProtocolServerSideTranslatorPB(
- OzoneManagerProtocol impl) {
+ OzoneManagerProtocol impl, OzoneManagerRatisClient ratisClient,
+ boolean enableRatis) {
this.impl = impl;
+ this.omRatisClient = ratisClient;
+ this.isRatisEnabled = enableRatis;
}
/**
@@ -179,10 +185,29 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
@Override
public OMResponse submitRequest(RpcController controller,
OMRequest request) throws ServiceException {
- Type cmdType = request.getCmdType();
+ if (isRatisEnabled) {
+ return submitRequestToRatis(request);
+ } else {
+ return submitRequestToOM(request);
+ }
+ }
+ /**
+ * Submits request to OM's Ratis server.
+ */
+ private OMResponse submitRequestToRatis(OMRequest request) {
+ return omRatisClient.sendCommand(request);
+ }
+
+ /**
+ * Submits request directly to OM.
+ */
+ private OMResponse submitRequestToOM(OMRequest request)
+ throws ServiceException {
+ Type cmdType = request.getCmdType();
OMResponse.Builder responseBuilder = OMResponse.newBuilder()
.setCmdType(cmdType);
+
switch (cmdType) {
case CreateVolume:
CreateVolumeResponse createVolumeResponse = createVolume(
@@ -318,7 +343,6 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
}
return responseBuilder.build();
}
-
// Convert and exception to corresponding status code
private Status exceptionToResponseStatus(IOException ex) {
if (ex instanceof OMException) {
diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
index 564deb2e67b..d8915ae09f0 100644
--- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
+++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ratis/TestOzoneManagerRatisServer.java
@@ -18,35 +18,58 @@
package org.apache.hadoop.ozone.om.ratis;
+import java.net.InetAddress;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.UUID;
-import org.apache.hadoop.conf.Configuration;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.ozone.OmUtils;
+import org.apache.hadoop.ozone.om.OMConfigKeys;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMRequest;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
+ .OMResponse;
+import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.ratis.util.LifeCycle;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.Assert.assertFalse;
/**
* Test OM Ratis server.
*/
public class TestOzoneManagerRatisServer {
- private Configuration conf;
+ private OzoneConfiguration conf;
private OzoneManagerRatisServer omRatisServer;
+ private OzoneManagerRatisClient omRatisClient;
private String omID;
+ private String clientId = UUID.randomUUID().toString();
+ private static final long LEADER_ELECTION_TIMEOUT = 500L;
@Before
- public void init() {
+ public void init() throws Exception {
conf = new OzoneConfiguration();
omID = UUID.randomUUID().toString();
final String path = GenericTestUtils.getTempPath(omID);
Path metaDirPath = Paths.get(path, "om-meta");
conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, metaDirPath.toString());
+ conf.setTimeDuration(
+ OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
+ LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
+ omRatisServer = OzoneManagerRatisServer.newOMRatisServer(omID,
+ InetAddress.getLocalHost(), conf);
+ omRatisServer.start();
+ omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(omID,
+ omRatisServer.getRaftGroup(), conf);
+ omRatisClient.connect();
}
@After
@@ -54,6 +77,9 @@ public class TestOzoneManagerRatisServer {
if (omRatisServer != null) {
omRatisServer.stop();
}
+ if (omRatisClient != null) {
+ omRatisClient.close();
+ }
}
/**
@@ -61,9 +87,59 @@ public class TestOzoneManagerRatisServer {
*/
@Test
public void testStartOMRatisServer() throws Exception {
- omRatisServer = OzoneManagerRatisServer.newOMRatisServer(omID, conf);
- omRatisServer.start();
Assert.assertEquals("Ratis Server should be in running state",
LifeCycle.State.RUNNING, omRatisServer.getServerState());
}
+
+ /**
+ * Submit any request to OM Ratis server and check that the dummy response
+ * message is received.
+ * TODO: Once state machine is implemented, submitting a request to Ratis
+ * server should result in a valid response.
+ */
+ @Test
+ public void testSubmitRatisRequest() throws Exception {
+ // Wait for leader election
+ Thread.sleep(LEADER_ELECTION_TIMEOUT * 2);
+
+ OMRequest request = OMRequest.newBuilder()
+ .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
+ .setClientId(clientId)
+ .build();
+
+ OMResponse response = omRatisClient.sendCommand(request);
+
+ // Since the state machine is not implemented yet, we should get the
+ // configured dummy message from Ratis.
+ Assert.assertEquals(false, response.getSuccess());
+ Assert.assertTrue(response.getMessage().contains("Dummy response from " +
+ "Ratis server for command type: " +
+ OzoneManagerProtocolProtos.Type.CreateVolume));
+ Assert.assertEquals(false, response.hasCreateVolumeResponse());
+ }
+
+ /**
+ * Test that all of {@link OzoneManagerProtocolProtos.Type} enum values are
+ * categorized in {@link OmUtils#isReadOnly(OMRequest)}.
+ */
+ @Test
+ public void testIsReadOnlyCapturesAllCmdTypeEnums() throws Exception {
+ GenericTestUtils.LogCapturer logCapturer = GenericTestUtils.LogCapturer
+ .captureLogs(LoggerFactory.getLogger(OmUtils.class));
+ String clientId = UUID.randomUUID().toString();
+ OzoneManagerProtocolProtos.Type[] cmdTypes =
+ OzoneManagerProtocolProtos.Type.values();
+
+ for (OzoneManagerProtocolProtos.Type cmdtype : cmdTypes) {
+ OMRequest request = OMRequest.newBuilder()
+ .setCmdType(cmdtype)
+ .setClientId(clientId)
+ .build();
+ OmUtils.isReadOnly(request);
+ assertFalse(cmdtype + "is not categorized in OmUtils#isReadyOnly",
+ logCapturer.getOutput().contains("CmdType " + cmdtype +" is not " +
+ "categorized as readOnly or not."));
+ logCapturer.clearOutput();
+ }
+ }
}