HDDS-947. Implement OzoneManager State Machine.

This commit is contained in:
Hanisha Koneru 2019-01-09 10:57:24 -08:00
parent 3420e26ae5
commit 8dd11a1577
10 changed files with 2923 additions and 2726 deletions

View File

@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.client.rpc;
import java.io.IOException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.ozone.MiniOzoneCluster;
import org.apache.hadoop.ozone.client.OzoneClientFactory;
import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.junit.AfterClass;
import org.junit.BeforeClass;
/**
* This class is to test all the public facing APIs of Ozone Client with an
* active OM Ratis server.
*/
public class TestOzoneRpcClientWithRatis extends TestOzoneRpcClientAbstract {
/**
* Create a MiniOzoneCluster for testing.
* Ozone is made active by setting OZONE_ENABLED = true.
* Ozone OM Ratis server is made active by setting
* OZONE_OM_RATIS_ENABLE = true;
*
* @throws IOException
*/
@BeforeClass
public static void init() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(ScmConfigKeys.OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE, 1);
conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true);
cluster = MiniOzoneCluster.newBuilder(conf)
.setNumDatanodes(10)
.setScmId(SCM_ID)
.build();
cluster.waitForClusterToBeReady();
ozClient = OzoneClientFactory.getRpcClient(conf);
store = ozClient.getObjectStore();
storageContainerLocationClient =
cluster.getStorageContainerLocationClient();
ozoneManager = cluster.getOzoneManager();
}
/**
* Close OzoneClient and shutdown MiniOzoneCluster.
*/
@AfterClass
public static void shutdown() throws IOException {
if(ozClient != null) {
ozClient.close();
}
if (storageContainerLocationClient != null) {
storageContainerLocationClient.close();
}
if (cluster != null) {
cluster.shutdown();
}
}
}

View File

@ -72,10 +72,8 @@ import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisServer;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.OzoneAclInfo;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort; import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.ServicePort;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB; import org.apache.hadoop.ozone.protocolPB.OzoneManagerProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer;
import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType; import org.apache.hadoop.ozone.security.acl.IAccessAuthorizer.ACLType;
@ -602,8 +600,8 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY,
OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT); OMConfigKeys.OZONE_OM_RATIS_ENABLE_DEFAULT);
if (omRatisEnabled) { if (omRatisEnabled) {
omRatisServer = OzoneManagerRatisServer.newOMRatisServer( omRatisServer = OzoneManagerRatisServer.newOMRatisServer(this, omId,
omId, omRpcAddress.getAddress(), configuration); omNodeRpcAddr.getAddress(), configuration);
omRatisServer.start(); omRatisServer.start();
LOG.info("OzoneManager Ratis server started at port {}", LOG.info("OzoneManager Ratis server started at port {}",
@ -704,22 +702,6 @@ public final class OzoneManager extends ServiceRuntimeInfoImpl
} }
} }
/**
* Validates that the incoming OM request has required parameters.
* TODO: Add more validation checks before writing the request to Ratis log.
* @param omRequest client request to OM
* @throws OMException thrown if required parameters are set to null.
*/
public void validateRequest(OMRequest omRequest) throws OMException {
Type cmdType = omRequest.getCmdType();
if (cmdType == null) {
throw new OMException("CmdType is null", ResultCodes.INVALID_REQUEST);
}
if (omRequest.getClientId() == null) {
throw new OMException("ClientId is null", ResultCodes.INVALID_REQUEST);
}
}
/** /**
* Creates a volume. * Creates a volume.
* *

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.ozone.om.ratis; package org.apache.hadoop.ozone.om.ratis;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.InvalidProtocolBufferException;
import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMConfigKeys;
@ -31,6 +30,7 @@ import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClient; import org.apache.ratis.client.RaftClient;
import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.conf.RaftProperties;
import org.apache.ratis.grpc.GrpcConfigKeys; import org.apache.ratis.grpc.GrpcConfigKeys;
import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftGroup; import org.apache.ratis.protocol.RaftGroup;
import org.apache.ratis.protocol.RaftPeerId; import org.apache.ratis.protocol.RaftPeerId;
import org.apache.ratis.retry.RetryPolicy; import org.apache.ratis.retry.RetryPolicy;
@ -44,7 +44,6 @@ import org.slf4j.LoggerFactory;
* Ratis helper methods for OM Ratis server and client. * Ratis helper methods for OM Ratis server and client.
*/ */
public final class OMRatisHelper { public final class OMRatisHelper {
private static final Logger LOG = LoggerFactory.getLogger( private static final Logger LOG = LoggerFactory.getLogger(
OMRatisHelper.class); OMRatisHelper.class);
@ -95,9 +94,9 @@ public final class OMRatisHelper {
return OMRequest.parseFrom(bytes); return OMRequest.parseFrom(bytes);
} }
static ByteString convertResponseToByteString(OMResponse response) { static Message convertResponseToMessage(OMResponse response) {
byte[] requestBytes = response.toByteArray(); byte[] requestBytes = response.toByteArray();
return ByteString.copyFrom(requestBytes); return Message.valueOf(ByteString.copyFrom(requestBytes));
} }
static OMResponse convertByteStringToOMResponse(ByteString byteString) static OMResponse convertByteStringToOMResponse(ByteString byteString)
@ -113,10 +112,4 @@ public final class OMRatisHelper {
.setMessage(e.getMessage()) .setMessage(e.getMessage())
.build(); .build();
} }
static <T> CompletableFuture<T> completeExceptionally(Exception e) {
final CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
} }

View File

@ -35,6 +35,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.StorageUnit; import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.HddsServerUtil; import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.ozone.om.OMConfigKeys; import org.apache.hadoop.ozone.om.OMConfigKeys;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.ratis.RaftConfigKeys; import org.apache.ratis.RaftConfigKeys;
import org.apache.ratis.client.RaftClientConfigKeys; import org.apache.ratis.client.RaftClientConfigKeys;
import org.apache.ratis.conf.RaftProperties; import org.apache.ratis.conf.RaftProperties;
@ -68,6 +69,7 @@ public final class OzoneManagerRatisServer {
private final RaftGroupId raftGroupId; private final RaftGroupId raftGroupId;
private final RaftGroup raftGroup; private final RaftGroup raftGroup;
private final RaftPeerId raftPeerId; private final RaftPeerId raftPeerId;
private final OzoneManagerProtocol ozoneManager;
private static final AtomicLong CALL_ID_COUNTER = new AtomicLong(); private static final AtomicLong CALL_ID_COUNTER = new AtomicLong();
@ -75,9 +77,10 @@ public final class OzoneManagerRatisServer {
return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE; return CALL_ID_COUNTER.getAndIncrement() & Long.MAX_VALUE;
} }
private OzoneManagerRatisServer(String omId, InetAddress addr, int port, private OzoneManagerRatisServer(OzoneManagerProtocol om, String omId,
Configuration conf) throws IOException { InetAddress addr, int port, Configuration conf) throws IOException {
Objects.requireNonNull(omId, "omId == null"); Objects.requireNonNull(omId, "omId is null");
this.ozoneManager = om;
this.port = port; this.port = port;
this.omRatisAddress = new InetSocketAddress(addr.getHostAddress(), port); this.omRatisAddress = new InetSocketAddress(addr.getHostAddress(), port);
RaftProperties serverProperties = newRaftProperties(conf); RaftProperties serverProperties = newRaftProperties(conf);
@ -98,8 +101,9 @@ public final class OzoneManagerRatisServer {
.build(); .build();
} }
public static OzoneManagerRatisServer newOMRatisServer(String omId, public static OzoneManagerRatisServer newOMRatisServer(
InetAddress omAddress, Configuration ozoneConf) throws IOException { OzoneManagerProtocol om, String omId, InetAddress omAddress,
Configuration ozoneConf) throws IOException {
int localPort = ozoneConf.getInt( int localPort = ozoneConf.getInt(
OMConfigKeys.OZONE_OM_RATIS_PORT_KEY, OMConfigKeys.OZONE_OM_RATIS_PORT_KEY,
OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT); OMConfigKeys.OZONE_OM_RATIS_PORT_DEFAULT);
@ -120,7 +124,8 @@ public final class OzoneManagerRatisServer {
+ "fallback to use default port {}", localPort, e); + "fallback to use default port {}", localPort, e);
} }
} }
return new OzoneManagerRatisServer(omId, omAddress, localPort, ozoneConf); return new OzoneManagerRatisServer(om, omId, omAddress, localPort,
ozoneConf);
} }
public RaftGroup getRaftGroup() { public RaftGroup getRaftGroup() {
@ -128,11 +133,10 @@ public final class OzoneManagerRatisServer {
} }
/** /**
* Return a dummy StateMachine. * Returns OzoneManager StateMachine.
* TODO: Implement a state machine on OM.
*/ */
private BaseStateMachine getStateMachine(RaftGroupId gid) { private BaseStateMachine getStateMachine(RaftGroupId gid) {
return new OzoneManagerStateMachine(null); return new OzoneManagerStateMachine(ozoneManager);
} }
public void start() throws IOException { public void start() throws IOException {
@ -199,8 +203,7 @@ public final class OzoneManagerRatisServer {
SizeInBytes.valueOf(raftSegmentPreallocatedSize)); SizeInBytes.valueOf(raftSegmentPreallocatedSize));
// For grpc set the maximum message size // For grpc set the maximum message size
// TODO: calculate the max message size based on the max size of a // TODO: calculate the optimal max message size
// PutSmallFileRequest's file size limit
GrpcConfigKeys.setMessageSizeMax(properties, GrpcConfigKeys.setMessageSizeMax(properties,
SizeInBytes.valueOf(logAppenderQueueByteLimit)); SizeInBytes.valueOf(logAppenderQueueByteLimit));
@ -263,11 +266,6 @@ public final class OzoneManagerRatisServer {
// TODO: set max write buffer size // TODO: set max write buffer size
/**
* TODO: when state machine is implemented, enable StateMachineData sync
* and set sync timeout and number of sync retries.
*/
/** /**
* TODO: set following ratis leader election related configs when * TODO: set following ratis leader election related configs when
* replicated ratis server is implemented. * replicated ratis server is implemented.

View File

@ -17,14 +17,22 @@
package org.apache.hadoop.ozone.om.ratis; package org.apache.hadoop.ozone.om.ratis;
import com.google.protobuf.InvalidProtocolBufferException; import com.google.common.base.Preconditions;
import com.google.protobuf.ServiceException;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.ozone.container.common.transport.server.ratis import org.apache.hadoop.ozone.container.common.transport.server.ratis
.ContainerStateMachine; .ContainerStateMachine;
import org.apache.hadoop.ozone.om.OzoneManager; import org.apache.hadoop.ozone.om.OzoneManager;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.*; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocolPB.OzoneManagerRequestHandler;
import org.apache.ratis.proto.RaftProtos;
import org.apache.ratis.protocol.Message; import org.apache.ratis.protocol.Message;
import org.apache.ratis.protocol.RaftClientRequest;
import org.apache.ratis.protocol.RaftGroupId; import org.apache.ratis.protocol.RaftGroupId;
import org.apache.ratis.server.RaftServer; import org.apache.ratis.server.RaftServer;
import org.apache.ratis.server.storage.RaftStorage; import org.apache.ratis.server.storage.RaftStorage;
@ -46,11 +54,11 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
LoggerFactory.getLogger(ContainerStateMachine.class); LoggerFactory.getLogger(ContainerStateMachine.class);
private final SimpleStateMachineStorage storage = private final SimpleStateMachineStorage storage =
new SimpleStateMachineStorage(); new SimpleStateMachineStorage();
private final OzoneManager ozoneManager; private final OzoneManagerRequestHandler handler;
private RaftGroupId raftGroupId;
public OzoneManagerStateMachine(OzoneManager om) { public OzoneManagerStateMachine(OzoneManagerProtocol om) {
// OzoneManager is required when implementing StateMachine this.handler = new OzoneManagerRequestHandler(om);
this.ozoneManager = om;
} }
/** /**
@ -62,29 +70,88 @@ public class OzoneManagerStateMachine extends BaseStateMachine {
RaftServer server, RaftGroupId id, RaftStorage raftStorage) RaftServer server, RaftGroupId id, RaftStorage raftStorage)
throws IOException { throws IOException {
super.initialize(server, id, raftStorage); super.initialize(server, id, raftStorage);
this.raftGroupId = id;
storage.init(raftStorage); storage.init(raftStorage);
} }
/**
* Validate/pre-process the incoming update request in the state machine.
* @return the content to be written to the log entry. Null means the request
* should be rejected.
* @throws IOException thrown by the state machine while validating
*/
public TransactionContext startTransaction(
RaftClientRequest raftClientRequest) throws IOException {
ByteString messageContent = raftClientRequest.getMessage().getContent();
OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(
messageContent);
Preconditions.checkArgument(raftClientRequest.getRaftGroupId().equals(
raftGroupId));
try {
handler.validateRequest(omRequest);
} catch (IOException ioe) {
TransactionContext ctxt = TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.build();
ctxt.setException(ioe);
return ctxt;
}
return TransactionContext.newBuilder()
.setClientRequest(raftClientRequest)
.setStateMachine(this)
.setServerRole(RaftProtos.RaftPeerRole.LEADER)
.setLogData(messageContent)
.build();
}
/* /*
* Apply a committed log entry to the state machine. This function * Apply a committed log entry to the state machine.
* currently returns a dummy message.
* TODO: Apply transaction to OM state machine
*/ */
@Override @Override
public CompletableFuture<Message> applyTransaction(TransactionContext trx) { public CompletableFuture<Message> applyTransaction(TransactionContext trx) {
String errorMessage;
ByteString logData = trx.getStateMachineLogEntry().getLogData();
try { try {
OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(logData); OMRequest request = OMRatisHelper.convertByteStringToOMRequest(
LOG.debug("Received request: cmdType={} traceID={} ", trx.getStateMachineLogEntry().getLogData());
omRequest.getCmdType(), omRequest.getTraceID()); CompletableFuture<Message> future = CompletableFuture
errorMessage = "Dummy response from Ratis server for command type: " + .supplyAsync(() -> runCommand(request));
omRequest.getCmdType(); return future;
} catch (InvalidProtocolBufferException e) { } catch (IOException e) {
errorMessage = e.getMessage(); return completeExceptionally(e);
} }
// TODO: When State Machine is implemented, send the actual response back
return OMRatisHelper.completeExceptionally(new IOException(errorMessage));
} }
/**
* Query the state machine. The request must be read-only.
*/
@Override
public CompletableFuture<Message> query(Message request) {
try {
OMRequest omRequest = OMRatisHelper.convertByteStringToOMRequest(
request.getContent());
return CompletableFuture.completedFuture(runCommand(omRequest));
} catch (IOException e) {
return completeExceptionally(e);
}
}
/**
* Submits request to OM and returns the response Message.
* @param request OMRequest
* @return response from OM
* @throws ServiceException
*/
private Message runCommand(OMRequest request) {
OMResponse response = handler.handle(request);
return OMRatisHelper.convertResponseToMessage(response);
}
private static <T> CompletableFuture<T> completeExceptionally(Exception e) {
final CompletableFuture<T> future = new CompletableFuture<>();
future.completeExceptionally(e);
return future;
}
} }

View File

@ -16,153 +16,20 @@
*/ */
package org.apache.hadoop.ozone.protocolPB; package org.apache.hadoop.ozone.protocolPB;
import com.google.common.collect.Lists;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol; import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB; import org.apache.hadoop.ozone.om.protocolPB.OzoneManagerProtocolPB;
import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient; import org.apache.hadoop.ozone.om.ratis.OzoneManagerRatisClient;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.AllocateBlockRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.AllocateBlockResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CheckVolumeAccessRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CheckVolumeAccessResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CommitKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CommitKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.InfoBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.InfoBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.InfoVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.InfoVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListBucketsRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListBucketsResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListKeysRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListKeysResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.LookupKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.LookupKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.
MultipartUploadAbortRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadAbortResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartCommitUploadPartRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartCommitUploadPartResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadCompleteRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadCompleteResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest; .OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse; .OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Part;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.RenameKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.RenameKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3BucketInfoRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3BucketInfoResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3CreateBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3CreateBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3DeleteBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3DeleteBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3ListBucketsResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3ListBucketsRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ServiceListRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ServiceListResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.SetBucketPropertyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.SetBucketPropertyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.SetVolumePropertyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.SetVolumePropertyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.Type;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
import java.util.TreeMap;
import java.util.stream.Collectors;
/** /**
* This class is the server-side translator that forwards requests received on * This class is the server-side translator that forwards requests received on
* {@link OzoneManagerProtocolPB} * {@link OzoneManagerProtocolPB}
@ -172,8 +39,8 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
OzoneManagerProtocolPB { OzoneManagerProtocolPB {
private static final Logger LOG = LoggerFactory private static final Logger LOG = LoggerFactory
.getLogger(OzoneManagerProtocolServerSideTranslatorPB.class); .getLogger(OzoneManagerProtocolServerSideTranslatorPB.class);
private final OzoneManagerProtocol impl;
private final OzoneManagerRatisClient omRatisClient; private final OzoneManagerRatisClient omRatisClient;
private final OzoneManagerRequestHandler handler;
private final boolean isRatisEnabled; private final boolean isRatisEnabled;
/** /**
@ -184,7 +51,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
public OzoneManagerProtocolServerSideTranslatorPB( public OzoneManagerProtocolServerSideTranslatorPB(
OzoneManagerProtocol impl, OzoneManagerRatisClient ratisClient, OzoneManagerProtocol impl, OzoneManagerRatisClient ratisClient,
boolean enableRatis) { boolean enableRatis) {
this.impl = impl; handler = new OzoneManagerRequestHandler(impl);
this.omRatisClient = ratisClient; this.omRatisClient = ratisClient;
this.isRatisEnabled = enableRatis; this.isRatisEnabled = enableRatis;
} }
@ -200,7 +67,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
if (isRatisEnabled) { if (isRatisEnabled) {
return submitRequestToRatis(request); return submitRequestToRatis(request);
} else { } else {
return submitRequestToOM(request); return submitRequestDirectlyToOM(request);
} }
} }
@ -214,738 +81,7 @@ public class OzoneManagerProtocolServerSideTranslatorPB implements
/** /**
* Submits request directly to OM. * Submits request directly to OM.
*/ */
@SuppressWarnings("methodlength") private OMResponse submitRequestDirectlyToOM(OMRequest request) {
private OMResponse submitRequestToOM(OMRequest request) return handler.handle(request);
throws ServiceException {
Type cmdType = request.getCmdType();
OMResponse.Builder responseBuilder = OMResponse.newBuilder()
.setCmdType(cmdType);
switch (cmdType) {
case CreateVolume:
CreateVolumeResponse createVolumeResponse = createVolume(
request.getCreateVolumeRequest());
responseBuilder.setCreateVolumeResponse(createVolumeResponse);
break;
case SetVolumeProperty:
SetVolumePropertyResponse setVolumePropertyResponse = setVolumeProperty(
request.getSetVolumePropertyRequest());
responseBuilder.setSetVolumePropertyResponse(setVolumePropertyResponse);
break;
case CheckVolumeAccess:
CheckVolumeAccessResponse checkVolumeAccessResponse = checkVolumeAccess(
request.getCheckVolumeAccessRequest());
responseBuilder.setCheckVolumeAccessResponse(checkVolumeAccessResponse);
break;
case InfoVolume:
InfoVolumeResponse infoVolumeResponse = infoVolume(
request.getInfoVolumeRequest());
responseBuilder.setInfoVolumeResponse(infoVolumeResponse);
break;
case DeleteVolume:
DeleteVolumeResponse deleteVolumeResponse = deleteVolume(
request.getDeleteVolumeRequest());
responseBuilder.setDeleteVolumeResponse(deleteVolumeResponse);
break;
case ListVolume:
ListVolumeResponse listVolumeResponse = listVolumes(
request.getListVolumeRequest());
responseBuilder.setListVolumeResponse(listVolumeResponse);
break;
case CreateBucket:
CreateBucketResponse createBucketResponse = createBucket(
request.getCreateBucketRequest());
responseBuilder.setCreateBucketResponse(createBucketResponse);
break;
case InfoBucket:
InfoBucketResponse infoBucketResponse = infoBucket(
request.getInfoBucketRequest());
responseBuilder.setInfoBucketResponse(infoBucketResponse);
break;
case SetBucketProperty:
SetBucketPropertyResponse setBucketPropertyResponse = setBucketProperty(
request.getSetBucketPropertyRequest());
responseBuilder.setSetBucketPropertyResponse(setBucketPropertyResponse);
break;
case DeleteBucket:
DeleteBucketResponse deleteBucketResponse = deleteBucket(
request.getDeleteBucketRequest());
responseBuilder.setDeleteBucketResponse(deleteBucketResponse);
break;
case ListBuckets:
ListBucketsResponse listBucketsResponse = listBuckets(
request.getListBucketsRequest());
responseBuilder.setListBucketsResponse(listBucketsResponse);
break;
case CreateKey:
CreateKeyResponse createKeyResponse = createKey(
request.getCreateKeyRequest());
responseBuilder.setCreateKeyResponse(createKeyResponse);
break;
case LookupKey:
LookupKeyResponse lookupKeyResponse = lookupKey(
request.getLookupKeyRequest());
responseBuilder.setLookupKeyResponse(lookupKeyResponse);
break;
case RenameKey:
RenameKeyResponse renameKeyResponse = renameKey(
request.getRenameKeyRequest());
responseBuilder.setRenameKeyResponse(renameKeyResponse);
break;
case DeleteKey:
DeleteKeyResponse deleteKeyResponse = deleteKey(
request.getDeleteKeyRequest());
responseBuilder.setDeleteKeyResponse(deleteKeyResponse);
break;
case ListKeys:
ListKeysResponse listKeysResponse = listKeys(
request.getListKeysRequest());
responseBuilder.setListKeysResponse(listKeysResponse);
break;
case CommitKey:
CommitKeyResponse commitKeyResponse = commitKey(
request.getCommitKeyRequest());
responseBuilder.setCommitKeyResponse(commitKeyResponse);
break;
case AllocateBlock:
AllocateBlockResponse allocateBlockResponse = allocateBlock(
request.getAllocateBlockRequest());
responseBuilder.setAllocateBlockResponse(allocateBlockResponse);
break;
case CreateS3Bucket:
S3CreateBucketResponse s3CreateBucketResponse = createS3Bucket(
request.getCreateS3BucketRequest());
responseBuilder.setCreateS3BucketResponse(s3CreateBucketResponse);
break;
case DeleteS3Bucket:
S3DeleteBucketResponse s3DeleteBucketResponse = deleteS3Bucket(
request.getDeleteS3BucketRequest());
responseBuilder.setDeleteS3BucketResponse(s3DeleteBucketResponse);
break;
case InfoS3Bucket:
S3BucketInfoResponse s3BucketInfoResponse = getS3Bucketinfo(
request.getInfoS3BucketRequest());
responseBuilder.setInfoS3BucketResponse(s3BucketInfoResponse);
break;
case ListS3Buckets:
S3ListBucketsResponse s3ListBucketsResponse = listS3Buckets(
request.getListS3BucketsRequest());
responseBuilder.setListS3BucketsResponse(s3ListBucketsResponse);
break;
case InitiateMultiPartUpload:
MultipartInfoInitiateResponse multipartInfoInitiateResponse =
initiateMultiPartUpload(request.getInitiateMultiPartUploadRequest());
responseBuilder.setInitiateMultiPartUploadResponse(
multipartInfoInitiateResponse);
break;
case CommitMultiPartUpload:
MultipartCommitUploadPartResponse commitUploadPartResponse =
commitMultipartUploadPart(request.getCommitMultiPartUploadRequest());
responseBuilder.setCommitMultiPartUploadResponse(
commitUploadPartResponse);
break;
case CompleteMultiPartUpload:
MultipartUploadCompleteResponse completeMultipartUploadResponse =
completeMultipartUpload(
request.getCompleteMultiPartUploadRequest());
responseBuilder.setCompleteMultiPartUploadResponse(
completeMultipartUploadResponse);
break;
case AbortMultiPartUpload:
MultipartUploadAbortResponse multipartUploadAbortResponse =
abortMultipartUpload(request.getAbortMultiPartUploadRequest());
responseBuilder.setAbortMultiPartUploadResponse(
multipartUploadAbortResponse);
break;
case ServiceList:
ServiceListResponse serviceListResponse = getServiceList(
request.getServiceListRequest());
responseBuilder.setServiceListResponse(serviceListResponse);
break;
default:
responseBuilder.setSuccess(false);
responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
break;
}
return responseBuilder.build();
} }
// Convert and exception to corresponding status code
private Status exceptionToResponseStatus(IOException ex) {
if (ex instanceof OMException) {
OMException omException = (OMException)ex;
switch (omException.getResult()) {
case FAILED_VOLUME_ALREADY_EXISTS:
return Status.VOLUME_ALREADY_EXISTS;
case FAILED_TOO_MANY_USER_VOLUMES:
return Status.USER_TOO_MANY_VOLUMES;
case FAILED_VOLUME_NOT_FOUND:
return Status.VOLUME_NOT_FOUND;
case FAILED_VOLUME_NOT_EMPTY:
return Status.VOLUME_NOT_EMPTY;
case FAILED_USER_NOT_FOUND:
return Status.USER_NOT_FOUND;
case FAILED_BUCKET_ALREADY_EXISTS:
return Status.BUCKET_ALREADY_EXISTS;
case FAILED_BUCKET_NOT_FOUND:
return Status.BUCKET_NOT_FOUND;
case FAILED_BUCKET_NOT_EMPTY:
return Status.BUCKET_NOT_EMPTY;
case FAILED_KEY_ALREADY_EXISTS:
return Status.KEY_ALREADY_EXISTS;
case FAILED_KEY_NOT_FOUND:
return Status.KEY_NOT_FOUND;
case FAILED_INVALID_KEY_NAME:
return Status.INVALID_KEY_NAME;
case FAILED_KEY_ALLOCATION:
return Status.KEY_ALLOCATION_ERROR;
case FAILED_KEY_DELETION:
return Status.KEY_DELETION_ERROR;
case FAILED_KEY_RENAME:
return Status.KEY_RENAME_ERROR;
case FAILED_METADATA_ERROR:
return Status.METADATA_ERROR;
case OM_NOT_INITIALIZED:
return Status.OM_NOT_INITIALIZED;
case SCM_VERSION_MISMATCH_ERROR:
return Status.SCM_VERSION_MISMATCH_ERROR;
case S3_BUCKET_ALREADY_EXISTS:
return Status.S3_BUCKET_ALREADY_EXISTS;
case S3_BUCKET_NOT_FOUND:
return Status.S3_BUCKET_NOT_FOUND;
case INITIATE_MULTIPART_UPLOAD_FAILED:
return Status.INITIATE_MULTIPART_UPLOAD_ERROR;
case NO_SUCH_MULTIPART_UPLOAD:
return Status.NO_SUCH_MULTIPART_UPLOAD_ERROR;
case UPLOAD_PART_FAILED:
return Status.MULTIPART_UPLOAD_PARTFILE_ERROR;
case COMPLETE_MULTIPART_UPLOAD_FAILED:
return Status.COMPLETE_MULTIPART_UPLOAD_ERROR;
case MISMATCH_MULTIPART_LIST:
return Status.MISMATCH_MULTIPART_LIST;
case MISSING_UPLOAD_PARTS:
return Status.MISSING_UPLOAD_PARTS;
case ENTITY_TOO_SMALL:
return Status.ENTITY_TOO_SMALL;
case ABORT_MULTIPART_UPLOAD_FAILED:
return Status.ABORT_MULTIPART_UPLOAD_FAILED;
default:
return Status.INTERNAL_ERROR;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unknown error occurs", ex);
}
return Status.INTERNAL_ERROR;
}
}
private CreateVolumeResponse createVolume(CreateVolumeRequest request) {
CreateVolumeResponse.Builder resp = CreateVolumeResponse.newBuilder();
resp.setStatus(Status.OK);
try {
impl.createVolume(OmVolumeArgs.getFromProtobuf(request.getVolumeInfo()));
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private SetVolumePropertyResponse setVolumeProperty(
SetVolumePropertyRequest request) {
SetVolumePropertyResponse.Builder resp =
SetVolumePropertyResponse.newBuilder();
resp.setStatus(Status.OK);
String volume = request.getVolumeName();
try {
if (request.hasQuotaInBytes()) {
long quota = request.getQuotaInBytes();
impl.setQuota(volume, quota);
} else {
String owner = request.getOwnerName();
impl.setOwner(volume, owner);
}
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private CheckVolumeAccessResponse checkVolumeAccess(
CheckVolumeAccessRequest request) {
CheckVolumeAccessResponse.Builder resp =
CheckVolumeAccessResponse.newBuilder();
resp.setStatus(Status.OK);
try {
boolean access = impl.checkVolumeAccess(request.getVolumeName(),
request.getUserAcl());
// if no access, set the response status as access denied
if (!access) {
resp.setStatus(Status.ACCESS_DENIED);
}
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private InfoVolumeResponse infoVolume(InfoVolumeRequest request) {
InfoVolumeResponse.Builder resp = InfoVolumeResponse.newBuilder();
resp.setStatus(Status.OK);
String volume = request.getVolumeName();
try {
OmVolumeArgs ret = impl.getVolumeInfo(volume);
resp.setVolumeInfo(ret.getProtobuf());
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private DeleteVolumeResponse deleteVolume(DeleteVolumeRequest request) {
DeleteVolumeResponse.Builder resp = DeleteVolumeResponse.newBuilder();
resp.setStatus(Status.OK);
try {
impl.deleteVolume(request.getVolumeName());
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private ListVolumeResponse listVolumes(ListVolumeRequest request)
throws ServiceException {
ListVolumeResponse.Builder resp = ListVolumeResponse.newBuilder();
List<OmVolumeArgs> result = Lists.newArrayList();
try {
if (request.getScope()
== ListVolumeRequest.Scope.VOLUMES_BY_USER) {
result = impl.listVolumeByUser(request.getUserName(),
request.getPrefix(), request.getPrevKey(), request.getMaxKeys());
} else if (request.getScope()
== ListVolumeRequest.Scope.VOLUMES_BY_CLUSTER) {
result = impl.listAllVolumes(request.getPrefix(), request.getPrevKey(),
request.getMaxKeys());
}
if (result == null) {
throw new ServiceException("Failed to get volumes for given scope "
+ request.getScope());
}
result.forEach(item -> resp.addVolumeInfo(item.getProtobuf()));
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private CreateBucketResponse createBucket(CreateBucketRequest request) {
CreateBucketResponse.Builder resp =
CreateBucketResponse.newBuilder();
try {
impl.createBucket(OmBucketInfo.getFromProtobuf(
request.getBucketInfo()));
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private InfoBucketResponse infoBucket(InfoBucketRequest request) {
InfoBucketResponse.Builder resp =
InfoBucketResponse.newBuilder();
try {
OmBucketInfo omBucketInfo = impl.getBucketInfo(
request.getVolumeName(), request.getBucketName());
resp.setStatus(Status.OK);
resp.setBucketInfo(omBucketInfo.getProtobuf());
} catch(IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private CreateKeyResponse createKey(CreateKeyRequest request) {
CreateKeyResponse.Builder resp =
CreateKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
HddsProtos.ReplicationType type =
keyArgs.hasType()? keyArgs.getType() : null;
HddsProtos.ReplicationFactor factor =
keyArgs.hasFactor()? keyArgs.getFactor() : null;
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setDataSize(keyArgs.getDataSize())
.setType(type)
.setFactor(factor)
.setIsMultipartKey(keyArgs.getIsMultipartKey())
.setMultipartUploadID(keyArgs.getMultipartUploadID())
.setMultipartUploadPartNumber(keyArgs.getMultipartNumber())
.build();
if (keyArgs.hasDataSize()) {
omKeyArgs.setDataSize(keyArgs.getDataSize());
} else {
omKeyArgs.setDataSize(0);
}
OpenKeySession openKey = impl.openKey(omKeyArgs);
resp.setKeyInfo(openKey.getKeyInfo().getProtobuf());
resp.setID(openKey.getId());
resp.setOpenVersion(openKey.getOpenVersion());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private LookupKeyResponse lookupKey(LookupKeyRequest request) {
LookupKeyResponse.Builder resp =
LookupKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.build();
OmKeyInfo keyInfo = impl.lookupKey(omKeyArgs);
resp.setKeyInfo(keyInfo.getProtobuf());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private RenameKeyResponse renameKey(RenameKeyRequest request) {
RenameKeyResponse.Builder resp = RenameKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.build();
impl.renameKey(omKeyArgs, request.getToKeyName());
resp.setStatus(Status.OK);
} catch (IOException e){
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private SetBucketPropertyResponse setBucketProperty(
SetBucketPropertyRequest request) {
SetBucketPropertyResponse.Builder resp =
SetBucketPropertyResponse.newBuilder();
try {
impl.setBucketProperty(OmBucketArgs.getFromProtobuf(
request.getBucketArgs()));
resp.setStatus(Status.OK);
} catch(IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private DeleteKeyResponse deleteKey(DeleteKeyRequest request) {
DeleteKeyResponse.Builder resp =
DeleteKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.build();
impl.deleteKey(omKeyArgs);
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private DeleteBucketResponse deleteBucket(DeleteBucketRequest request) {
DeleteBucketResponse.Builder resp = DeleteBucketResponse.newBuilder();
resp.setStatus(Status.OK);
try {
impl.deleteBucket(request.getVolumeName(), request.getBucketName());
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private ListBucketsResponse listBuckets(ListBucketsRequest request) {
ListBucketsResponse.Builder resp =
ListBucketsResponse.newBuilder();
try {
List<OmBucketInfo> buckets = impl.listBuckets(
request.getVolumeName(),
request.getStartKey(),
request.getPrefix(),
request.getCount());
for(OmBucketInfo bucket : buckets) {
resp.addBucketInfo(bucket.getProtobuf());
}
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private ListKeysResponse listKeys(ListKeysRequest request) {
ListKeysResponse.Builder resp =
ListKeysResponse.newBuilder();
try {
List<OmKeyInfo> keys = impl.listKeys(
request.getVolumeName(),
request.getBucketName(),
request.getStartKey(),
request.getPrefix(),
request.getCount());
for(OmKeyInfo key : keys) {
resp.addKeyInfo(key.getProtobuf());
}
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private CommitKeyResponse commitKey(CommitKeyRequest request) {
CommitKeyResponse.Builder resp =
CommitKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
HddsProtos.ReplicationType type =
keyArgs.hasType()? keyArgs.getType() : null;
HddsProtos.ReplicationFactor factor =
keyArgs.hasFactor()? keyArgs.getFactor() : null;
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setLocationInfoList(keyArgs.getKeyLocationsList().stream()
.map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList()))
.setType(type)
.setFactor(factor)
.setDataSize(keyArgs.getDataSize())
.build();
impl.commitKey(omKeyArgs, request.getClientID());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private AllocateBlockResponse allocateBlock(AllocateBlockRequest request) {
AllocateBlockResponse.Builder resp =
AllocateBlockResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.build();
OmKeyLocationInfo newLocation = impl.allocateBlock(omKeyArgs,
request.getClientID());
resp.setKeyLocation(newLocation.getProtobuf());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private ServiceListResponse getServiceList(ServiceListRequest request) {
ServiceListResponse.Builder resp = ServiceListResponse.newBuilder();
try {
resp.addAllServiceInfo(impl.getServiceList().stream()
.map(ServiceInfo::getProtobuf)
.collect(Collectors.toList()));
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private S3CreateBucketResponse createS3Bucket(S3CreateBucketRequest request) {
S3CreateBucketResponse.Builder resp = S3CreateBucketResponse.newBuilder();
try {
impl.createS3Bucket(request.getUserName(), request.getS3Bucketname());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private S3DeleteBucketResponse deleteS3Bucket(S3DeleteBucketRequest request) {
S3DeleteBucketResponse.Builder resp = S3DeleteBucketResponse.newBuilder();
try {
impl.deleteS3Bucket(request.getS3BucketName());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private S3BucketInfoResponse getS3Bucketinfo(S3BucketInfoRequest request) {
S3BucketInfoResponse.Builder resp = S3BucketInfoResponse.newBuilder();
try {
resp.setOzoneMapping(
impl.getOzoneBucketMapping(request.getS3BucketName()));
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private S3ListBucketsResponse listS3Buckets(S3ListBucketsRequest request) {
S3ListBucketsResponse.Builder resp = S3ListBucketsResponse.newBuilder();
try {
List<OmBucketInfo> buckets = impl.listS3Buckets(
request.getUserName(),
request.getStartKey(),
request.getPrefix(),
request.getCount());
for(OmBucketInfo bucket : buckets) {
resp.addBucketInfo(bucket.getProtobuf());
}
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private MultipartInfoInitiateResponse initiateMultiPartUpload(
MultipartInfoInitiateRequest request) {
MultipartInfoInitiateResponse.Builder resp = MultipartInfoInitiateResponse
.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setType(keyArgs.getType())
.setFactor(keyArgs.getFactor())
.build();
OmMultipartInfo multipartInfo = impl.initiateMultipartUpload(omKeyArgs);
resp.setVolumeName(multipartInfo.getVolumeName());
resp.setBucketName(multipartInfo.getBucketName());
resp.setKeyName(multipartInfo.getKeyName());
resp.setMultipartUploadID(multipartInfo.getUploadID());
resp.setStatus(Status.OK);
} catch (IOException ex) {
resp.setStatus(exceptionToResponseStatus(ex));
}
return resp.build();
}
private MultipartCommitUploadPartResponse commitMultipartUploadPart(
MultipartCommitUploadPartRequest request) {
MultipartCommitUploadPartResponse.Builder resp =
MultipartCommitUploadPartResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setMultipartUploadID(keyArgs.getMultipartUploadID())
.setIsMultipartKey(keyArgs.getIsMultipartKey())
.setMultipartUploadPartNumber(keyArgs.getMultipartNumber())
.setDataSize(keyArgs.getDataSize())
.setLocationInfoList(keyArgs.getKeyLocationsList().stream()
.map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList()))
.build();
OmMultipartCommitUploadPartInfo commitUploadPartInfo =
impl.commitMultipartUploadPart(omKeyArgs, request.getClientID());
resp.setPartName(commitUploadPartInfo.getPartName());
resp.setStatus(Status.OK);
} catch (IOException ex) {
resp.setStatus(exceptionToResponseStatus(ex));
}
return resp.build();
}
private MultipartUploadCompleteResponse completeMultipartUpload(
MultipartUploadCompleteRequest request) {
MultipartUploadCompleteResponse.Builder response =
MultipartUploadCompleteResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
List<Part> partsList = request.getPartsListList();
TreeMap<Integer, String> partsMap = new TreeMap<>();
for (Part part : partsList) {
partsMap.put(part.getPartNumber(), part.getPartName());
}
OmMultipartUploadList omMultipartUploadList =
new OmMultipartUploadList(partsMap);
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setMultipartUploadID(keyArgs.getMultipartUploadID())
.build();
OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = impl
.completeMultipartUpload(omKeyArgs, omMultipartUploadList);
response.setVolume(omMultipartUploadCompleteInfo.getVolume())
.setBucket(omMultipartUploadCompleteInfo.getBucket())
.setKey(omMultipartUploadCompleteInfo.getKey())
.setHash(omMultipartUploadCompleteInfo.getHash());
response.setStatus(Status.OK);
} catch (IOException ex) {
response.setStatus(exceptionToResponseStatus(ex));
}
return response.build();
}
private MultipartUploadAbortResponse abortMultipartUpload(
MultipartUploadAbortRequest multipartUploadAbortRequest) {
MultipartUploadAbortResponse.Builder response =
MultipartUploadAbortResponse.newBuilder();
try {
KeyArgs keyArgs = multipartUploadAbortRequest.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setMultipartUploadID(keyArgs.getMultipartUploadID())
.build();
impl.abortMultipartUpload(omKeyArgs);
response.setStatus(Status.OK);
} catch (IOException ex) {
response.setStatus(exceptionToResponseStatus(ex));
}
return response.build();
}
} }

View File

@ -0,0 +1,904 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.protocolPB;
import com.google.common.collect.Lists;
import java.io.IOException;
import java.util.List;
import java.util.TreeMap;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.ozone.om.exceptions.OMException;
import org.apache.hadoop.ozone.om.helpers.OmBucketArgs;
import org.apache.hadoop.ozone.om.helpers.OmBucketInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyArgs;
import org.apache.hadoop.ozone.om.helpers.OmKeyInfo;
import org.apache.hadoop.ozone.om.helpers.OmKeyLocationInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartCommitUploadPartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadCompleteInfo;
import org.apache.hadoop.ozone.om.helpers.OmMultipartUploadList;
import org.apache.hadoop.ozone.om.helpers.OmVolumeArgs;
import org.apache.hadoop.ozone.om.helpers.OpenKeySession;
import org.apache.hadoop.ozone.om.helpers.ServiceInfo;
import org.apache.hadoop.ozone.om.protocol.OzoneManagerProtocol;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.AllocateBlockRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.AllocateBlockResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CheckVolumeAccessRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CheckVolumeAccessResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CommitKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CommitKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.CreateVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.DeleteVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.InfoBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.InfoBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.InfoVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.InfoVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.KeyArgs;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListBucketsRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListBucketsResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListKeysRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListKeysResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListVolumeRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ListVolumeResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.LookupKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.LookupKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartCommitUploadPartRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartCommitUploadPartResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartInfoInitiateResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadAbortRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadAbortResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadCompleteRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.MultipartUploadCompleteResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.OMResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.Part;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.RenameKeyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.RenameKeyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3BucketInfoRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3BucketInfoResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3CreateBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3CreateBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3DeleteBucketRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3DeleteBucketResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3ListBucketsRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.S3ListBucketsResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ServiceListRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.ServiceListResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.SetBucketPropertyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.SetBucketPropertyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.SetVolumePropertyRequest;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos
.SetVolumePropertyResponse;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Status;
import org.apache.hadoop.ozone.protocol.proto.OzoneManagerProtocolProtos.Type;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Command Handler for OM requests. OM State Machine calls this handler for
* deserializing the client request and sending it to OM.
*/
public class OzoneManagerRequestHandler {
static final Logger LOG =
LoggerFactory.getLogger(OzoneManagerRequestHandler.class);
private final OzoneManagerProtocol impl;
public OzoneManagerRequestHandler(OzoneManagerProtocol om) {
this.impl = om;
}
public OMResponse handle(OMRequest request) {
LOG.debug("Received OMRequest: {}, ", request);
Type cmdType = request.getCmdType();
OMResponse.Builder responseBuilder = OMResponse.newBuilder()
.setCmdType(cmdType);
switch (cmdType) {
case CreateVolume:
CreateVolumeResponse createVolumeResponse = createVolume(
request.getCreateVolumeRequest());
responseBuilder.setCreateVolumeResponse(createVolumeResponse);
break;
case SetVolumeProperty:
SetVolumePropertyResponse setVolumePropertyResponse = setVolumeProperty(
request.getSetVolumePropertyRequest());
responseBuilder.setSetVolumePropertyResponse(setVolumePropertyResponse);
break;
case CheckVolumeAccess:
CheckVolumeAccessResponse checkVolumeAccessResponse = checkVolumeAccess(
request.getCheckVolumeAccessRequest());
responseBuilder.setCheckVolumeAccessResponse(checkVolumeAccessResponse);
break;
case InfoVolume:
InfoVolumeResponse infoVolumeResponse = infoVolume(
request.getInfoVolumeRequest());
responseBuilder.setInfoVolumeResponse(infoVolumeResponse);
break;
case DeleteVolume:
DeleteVolumeResponse deleteVolumeResponse = deleteVolume(
request.getDeleteVolumeRequest());
responseBuilder.setDeleteVolumeResponse(deleteVolumeResponse);
break;
case ListVolume:
ListVolumeResponse listVolumeResponse = listVolumes(
request.getListVolumeRequest());
responseBuilder.setListVolumeResponse(listVolumeResponse);
break;
case CreateBucket:
CreateBucketResponse createBucketResponse = createBucket(
request.getCreateBucketRequest());
responseBuilder.setCreateBucketResponse(createBucketResponse);
break;
case InfoBucket:
InfoBucketResponse infoBucketResponse = infoBucket(
request.getInfoBucketRequest());
responseBuilder.setInfoBucketResponse(infoBucketResponse);
break;
case SetBucketProperty:
SetBucketPropertyResponse setBucketPropertyResponse = setBucketProperty(
request.getSetBucketPropertyRequest());
responseBuilder.setSetBucketPropertyResponse(setBucketPropertyResponse);
break;
case DeleteBucket:
DeleteBucketResponse deleteBucketResponse = deleteBucket(
request.getDeleteBucketRequest());
responseBuilder.setDeleteBucketResponse(deleteBucketResponse);
break;
case ListBuckets:
ListBucketsResponse listBucketsResponse = listBuckets(
request.getListBucketsRequest());
responseBuilder.setListBucketsResponse(listBucketsResponse);
break;
case CreateKey:
CreateKeyResponse createKeyResponse = createKey(
request.getCreateKeyRequest());
responseBuilder.setCreateKeyResponse(createKeyResponse);
break;
case LookupKey:
LookupKeyResponse lookupKeyResponse = lookupKey(
request.getLookupKeyRequest());
responseBuilder.setLookupKeyResponse(lookupKeyResponse);
break;
case RenameKey:
RenameKeyResponse renameKeyResponse = renameKey(
request.getRenameKeyRequest());
responseBuilder.setRenameKeyResponse(renameKeyResponse);
break;
case DeleteKey:
DeleteKeyResponse deleteKeyResponse = deleteKey(
request.getDeleteKeyRequest());
responseBuilder.setDeleteKeyResponse(deleteKeyResponse);
break;
case ListKeys:
ListKeysResponse listKeysResponse = listKeys(
request.getListKeysRequest());
responseBuilder.setListKeysResponse(listKeysResponse);
break;
case CommitKey:
CommitKeyResponse commitKeyResponse = commitKey(
request.getCommitKeyRequest());
responseBuilder.setCommitKeyResponse(commitKeyResponse);
break;
case AllocateBlock:
AllocateBlockResponse allocateBlockResponse = allocateBlock(
request.getAllocateBlockRequest());
responseBuilder.setAllocateBlockResponse(allocateBlockResponse);
break;
case CreateS3Bucket:
S3CreateBucketResponse s3CreateBucketResponse = createS3Bucket(
request.getCreateS3BucketRequest());
responseBuilder.setCreateS3BucketResponse(s3CreateBucketResponse);
break;
case DeleteS3Bucket:
S3DeleteBucketResponse s3DeleteBucketResponse = deleteS3Bucket(
request.getDeleteS3BucketRequest());
responseBuilder.setDeleteS3BucketResponse(s3DeleteBucketResponse);
break;
case InfoS3Bucket:
S3BucketInfoResponse s3BucketInfoResponse = getS3Bucketinfo(
request.getInfoS3BucketRequest());
responseBuilder.setInfoS3BucketResponse(s3BucketInfoResponse);
break;
case ListS3Buckets:
S3ListBucketsResponse s3ListBucketsResponse = listS3Buckets(
request.getListS3BucketsRequest());
responseBuilder.setListS3BucketsResponse(s3ListBucketsResponse);
break;
case InitiateMultiPartUpload:
MultipartInfoInitiateResponse multipartInfoInitiateResponse =
initiateMultiPartUpload(request.getInitiateMultiPartUploadRequest());
responseBuilder.setInitiateMultiPartUploadResponse(
multipartInfoInitiateResponse);
break;
case CommitMultiPartUpload:
MultipartCommitUploadPartResponse commitUploadPartResponse =
commitMultipartUploadPart(request.getCommitMultiPartUploadRequest());
responseBuilder.setCommitMultiPartUploadResponse(
commitUploadPartResponse);
break;
case CompleteMultiPartUpload:
MultipartUploadCompleteResponse completeMultiPartUploadResponse =
completeMultipartUpload(request.getCompleteMultiPartUploadRequest());
responseBuilder.setCompleteMultiPartUploadResponse(
completeMultiPartUploadResponse);
break;
case AbortMultiPartUpload:
MultipartUploadAbortResponse abortMultiPartAbortResponse =
abortMultipartUpload(request.getAbortMultiPartUploadRequest());
responseBuilder.setAbortMultiPartUploadResponse(
abortMultiPartAbortResponse);
break;
case ServiceList:
ServiceListResponse serviceListResponse = getServiceList(
request.getServiceListRequest());
responseBuilder.setServiceListResponse(serviceListResponse);
break;
default:
responseBuilder.setSuccess(false);
responseBuilder.setMessage("Unrecognized Command Type: " + cmdType);
break;
}
return responseBuilder.build();
}
// Convert and exception to corresponding status code
private Status exceptionToResponseStatus(IOException ex) {
if (ex instanceof OMException) {
OMException omException = (OMException)ex;
switch (omException.getResult()) {
case FAILED_VOLUME_ALREADY_EXISTS:
return Status.VOLUME_ALREADY_EXISTS;
case FAILED_TOO_MANY_USER_VOLUMES:
return Status.USER_TOO_MANY_VOLUMES;
case FAILED_VOLUME_NOT_FOUND:
return Status.VOLUME_NOT_FOUND;
case FAILED_VOLUME_NOT_EMPTY:
return Status.VOLUME_NOT_EMPTY;
case FAILED_USER_NOT_FOUND:
return Status.USER_NOT_FOUND;
case FAILED_BUCKET_ALREADY_EXISTS:
return Status.BUCKET_ALREADY_EXISTS;
case FAILED_BUCKET_NOT_FOUND:
return Status.BUCKET_NOT_FOUND;
case FAILED_BUCKET_NOT_EMPTY:
return Status.BUCKET_NOT_EMPTY;
case FAILED_KEY_ALREADY_EXISTS:
return Status.KEY_ALREADY_EXISTS;
case FAILED_KEY_NOT_FOUND:
return Status.KEY_NOT_FOUND;
case FAILED_INVALID_KEY_NAME:
return Status.INVALID_KEY_NAME;
case FAILED_KEY_ALLOCATION:
return Status.KEY_ALLOCATION_ERROR;
case FAILED_KEY_DELETION:
return Status.KEY_DELETION_ERROR;
case FAILED_KEY_RENAME:
return Status.KEY_RENAME_ERROR;
case FAILED_METADATA_ERROR:
return Status.METADATA_ERROR;
case OM_NOT_INITIALIZED:
return Status.OM_NOT_INITIALIZED;
case SCM_VERSION_MISMATCH_ERROR:
return Status.SCM_VERSION_MISMATCH_ERROR;
case S3_BUCKET_ALREADY_EXISTS:
return Status.S3_BUCKET_ALREADY_EXISTS;
case S3_BUCKET_NOT_FOUND:
return Status.S3_BUCKET_NOT_FOUND;
case INITIATE_MULTIPART_UPLOAD_FAILED:
return Status.INITIATE_MULTIPART_UPLOAD_ERROR;
case NO_SUCH_MULTIPART_UPLOAD:
return Status.NO_SUCH_MULTIPART_UPLOAD_ERROR;
case UPLOAD_PART_FAILED:
return Status.MULTIPART_UPLOAD_PARTFILE_ERROR;
default:
return Status.INTERNAL_ERROR;
}
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Unknown error occurs", ex);
}
return Status.INTERNAL_ERROR;
}
}
/**
* Validates that the incoming OM request has required parameters.
* TODO: Add more validation checks before writing the request to Ratis log.
* @param omRequest client request to OM
* @throws OMException thrown if required parameters are set to null.
*/
public void validateRequest(OMRequest omRequest) throws OMException {
Type cmdType = omRequest.getCmdType();
if (cmdType == null) {
throw new OMException("CmdType is null",
OMException.ResultCodes.INVALID_REQUEST);
}
if (omRequest.getClientId() == null) {
throw new OMException("ClientId is null",
OMException.ResultCodes.INVALID_REQUEST);
}
}
private CreateVolumeResponse createVolume(CreateVolumeRequest request) {
CreateVolumeResponse.Builder resp = CreateVolumeResponse.newBuilder();
resp.setStatus(Status.OK);
try {
impl.createVolume(OmVolumeArgs.getFromProtobuf(request.getVolumeInfo()));
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private SetVolumePropertyResponse setVolumeProperty(
SetVolumePropertyRequest request) {
SetVolumePropertyResponse.Builder resp =
SetVolumePropertyResponse.newBuilder();
resp.setStatus(Status.OK);
String volume = request.getVolumeName();
try {
if (request.hasQuotaInBytes()) {
long quota = request.getQuotaInBytes();
impl.setQuota(volume, quota);
} else {
String owner = request.getOwnerName();
impl.setOwner(volume, owner);
}
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private CheckVolumeAccessResponse checkVolumeAccess(
CheckVolumeAccessRequest request) {
CheckVolumeAccessResponse.Builder resp =
CheckVolumeAccessResponse.newBuilder();
resp.setStatus(Status.OK);
try {
boolean access = impl.checkVolumeAccess(request.getVolumeName(),
request.getUserAcl());
// if no access, set the response status as access denied
if (!access) {
resp.setStatus(Status.ACCESS_DENIED);
}
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private InfoVolumeResponse infoVolume(InfoVolumeRequest request) {
InfoVolumeResponse.Builder resp = InfoVolumeResponse.newBuilder();
resp.setStatus(Status.OK);
String volume = request.getVolumeName();
try {
OmVolumeArgs ret = impl.getVolumeInfo(volume);
resp.setVolumeInfo(ret.getProtobuf());
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private DeleteVolumeResponse deleteVolume(DeleteVolumeRequest request) {
DeleteVolumeResponse.Builder resp = DeleteVolumeResponse.newBuilder();
resp.setStatus(Status.OK);
try {
impl.deleteVolume(request.getVolumeName());
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private ListVolumeResponse listVolumes(ListVolumeRequest request) {
ListVolumeResponse.Builder resp = ListVolumeResponse.newBuilder();
List<OmVolumeArgs> result = Lists.newArrayList();
try {
if (request.getScope()
== ListVolumeRequest.Scope.VOLUMES_BY_USER) {
result = impl.listVolumeByUser(request.getUserName(),
request.getPrefix(), request.getPrevKey(), request.getMaxKeys());
} else if (request.getScope()
== ListVolumeRequest.Scope.VOLUMES_BY_CLUSTER) {
result = impl.listAllVolumes(request.getPrefix(), request.getPrevKey(),
request.getMaxKeys());
}
result.forEach(item -> resp.addVolumeInfo(item.getProtobuf()));
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private CreateBucketResponse createBucket(CreateBucketRequest request) {
CreateBucketResponse.Builder resp =
CreateBucketResponse.newBuilder();
try {
impl.createBucket(OmBucketInfo.getFromProtobuf(
request.getBucketInfo()));
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private InfoBucketResponse infoBucket(InfoBucketRequest request) {
InfoBucketResponse.Builder resp =
InfoBucketResponse.newBuilder();
try {
OmBucketInfo omBucketInfo = impl.getBucketInfo(
request.getVolumeName(), request.getBucketName());
resp.setStatus(Status.OK);
resp.setBucketInfo(omBucketInfo.getProtobuf());
} catch(IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private CreateKeyResponse createKey(CreateKeyRequest request) {
CreateKeyResponse.Builder resp =
CreateKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
HddsProtos.ReplicationType type =
keyArgs.hasType()? keyArgs.getType() : null;
HddsProtos.ReplicationFactor factor =
keyArgs.hasFactor()? keyArgs.getFactor() : null;
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setDataSize(keyArgs.getDataSize())
.setType(type)
.setFactor(factor)
.setIsMultipartKey(keyArgs.getIsMultipartKey())
.setMultipartUploadID(keyArgs.getMultipartUploadID())
.setMultipartUploadPartNumber(keyArgs.getMultipartNumber())
.build();
if (keyArgs.hasDataSize()) {
omKeyArgs.setDataSize(keyArgs.getDataSize());
} else {
omKeyArgs.setDataSize(0);
}
OpenKeySession openKey = impl.openKey(omKeyArgs);
resp.setKeyInfo(openKey.getKeyInfo().getProtobuf());
resp.setID(openKey.getId());
resp.setOpenVersion(openKey.getOpenVersion());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private LookupKeyResponse lookupKey(LookupKeyRequest request) {
LookupKeyResponse.Builder resp =
LookupKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.build();
OmKeyInfo keyInfo = impl.lookupKey(omKeyArgs);
resp.setKeyInfo(keyInfo.getProtobuf());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private RenameKeyResponse renameKey(RenameKeyRequest request) {
RenameKeyResponse.Builder resp = RenameKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.build();
impl.renameKey(omKeyArgs, request.getToKeyName());
resp.setStatus(Status.OK);
} catch (IOException e){
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private SetBucketPropertyResponse setBucketProperty(
SetBucketPropertyRequest request) {
SetBucketPropertyResponse.Builder resp =
SetBucketPropertyResponse.newBuilder();
try {
impl.setBucketProperty(OmBucketArgs.getFromProtobuf(
request.getBucketArgs()));
resp.setStatus(Status.OK);
} catch(IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private DeleteKeyResponse deleteKey(DeleteKeyRequest request) {
DeleteKeyResponse.Builder resp =
DeleteKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.build();
impl.deleteKey(omKeyArgs);
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private DeleteBucketResponse deleteBucket(DeleteBucketRequest request) {
DeleteBucketResponse.Builder resp = DeleteBucketResponse.newBuilder();
resp.setStatus(Status.OK);
try {
impl.deleteBucket(request.getVolumeName(), request.getBucketName());
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private ListBucketsResponse listBuckets(ListBucketsRequest request) {
ListBucketsResponse.Builder resp =
ListBucketsResponse.newBuilder();
try {
List<OmBucketInfo> buckets = impl.listBuckets(
request.getVolumeName(),
request.getStartKey(),
request.getPrefix(),
request.getCount());
for(OmBucketInfo bucket : buckets) {
resp.addBucketInfo(bucket.getProtobuf());
}
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private ListKeysResponse listKeys(ListKeysRequest request) {
ListKeysResponse.Builder resp =
ListKeysResponse.newBuilder();
try {
List<OmKeyInfo> keys = impl.listKeys(
request.getVolumeName(),
request.getBucketName(),
request.getStartKey(),
request.getPrefix(),
request.getCount());
for(OmKeyInfo key : keys) {
resp.addKeyInfo(key.getProtobuf());
}
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private CommitKeyResponse commitKey(CommitKeyRequest request) {
CommitKeyResponse.Builder resp =
CommitKeyResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
HddsProtos.ReplicationType type =
keyArgs.hasType()? keyArgs.getType() : null;
HddsProtos.ReplicationFactor factor =
keyArgs.hasFactor()? keyArgs.getFactor() : null;
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setLocationInfoList(keyArgs.getKeyLocationsList().stream()
.map(OmKeyLocationInfo::getFromProtobuf)
.collect(Collectors.toList()))
.setType(type)
.setFactor(factor)
.setDataSize(keyArgs.getDataSize())
.build();
impl.commitKey(omKeyArgs, request.getClientID());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private AllocateBlockResponse allocateBlock(AllocateBlockRequest request) {
AllocateBlockResponse.Builder resp =
AllocateBlockResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.build();
OmKeyLocationInfo newLocation = impl.allocateBlock(omKeyArgs,
request.getClientID());
resp.setKeyLocation(newLocation.getProtobuf());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private ServiceListResponse getServiceList(ServiceListRequest request) {
ServiceListResponse.Builder resp = ServiceListResponse.newBuilder();
try {
resp.addAllServiceInfo(impl.getServiceList().stream()
.map(ServiceInfo::getProtobuf)
.collect(Collectors.toList()));
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private S3CreateBucketResponse createS3Bucket(S3CreateBucketRequest request) {
S3CreateBucketResponse.Builder resp = S3CreateBucketResponse.newBuilder();
try {
impl.createS3Bucket(request.getUserName(), request.getS3Bucketname());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private S3DeleteBucketResponse deleteS3Bucket(S3DeleteBucketRequest request) {
S3DeleteBucketResponse.Builder resp = S3DeleteBucketResponse.newBuilder();
try {
impl.deleteS3Bucket(request.getS3BucketName());
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private S3BucketInfoResponse getS3Bucketinfo(S3BucketInfoRequest request) {
S3BucketInfoResponse.Builder resp = S3BucketInfoResponse.newBuilder();
try {
resp.setOzoneMapping(
impl.getOzoneBucketMapping(request.getS3BucketName()));
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private S3ListBucketsResponse listS3Buckets(S3ListBucketsRequest request) {
S3ListBucketsResponse.Builder resp = S3ListBucketsResponse.newBuilder();
try {
List<OmBucketInfo> buckets = impl.listS3Buckets(
request.getUserName(),
request.getStartKey(),
request.getPrefix(),
request.getCount());
for(OmBucketInfo bucket : buckets) {
resp.addBucketInfo(bucket.getProtobuf());
}
resp.setStatus(Status.OK);
} catch (IOException e) {
resp.setStatus(exceptionToResponseStatus(e));
}
return resp.build();
}
private MultipartInfoInitiateResponse initiateMultiPartUpload(
MultipartInfoInitiateRequest request) {
MultipartInfoInitiateResponse.Builder resp = MultipartInfoInitiateResponse
.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setType(keyArgs.getType())
.setFactor(keyArgs.getFactor())
.build();
OmMultipartInfo multipartInfo = impl.initiateMultipartUpload(omKeyArgs);
resp.setVolumeName(multipartInfo.getVolumeName());
resp.setBucketName(multipartInfo.getBucketName());
resp.setKeyName(multipartInfo.getKeyName());
resp.setMultipartUploadID(multipartInfo.getUploadID());
resp.setStatus(Status.OK);
} catch (IOException ex) {
resp.setStatus(exceptionToResponseStatus(ex));
}
return resp.build();
}
private MultipartCommitUploadPartResponse commitMultipartUploadPart(
MultipartCommitUploadPartRequest request) {
MultipartCommitUploadPartResponse.Builder resp =
MultipartCommitUploadPartResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setMultipartUploadID(keyArgs.getMultipartUploadID())
.setIsMultipartKey(keyArgs.getIsMultipartKey())
.setMultipartUploadPartNumber(keyArgs.getMultipartNumber())
.build();
OmMultipartCommitUploadPartInfo commitUploadPartInfo =
impl.commitMultipartUploadPart(omKeyArgs, request.getClientID());
resp.setPartName(commitUploadPartInfo.getPartName());
resp.setStatus(Status.OK);
} catch (IOException ex) {
resp.setStatus(exceptionToResponseStatus(ex));
}
return resp.build();
}
private MultipartUploadCompleteResponse completeMultipartUpload(
MultipartUploadCompleteRequest request) {
MultipartUploadCompleteResponse.Builder response =
MultipartUploadCompleteResponse.newBuilder();
try {
KeyArgs keyArgs = request.getKeyArgs();
List<Part> partsList = request.getPartsListList();
TreeMap<Integer, String> partsMap = new TreeMap<>();
for (Part part : partsList) {
partsMap.put(part.getPartNumber(), part.getPartName());
}
OmMultipartUploadList omMultipartUploadList =
new OmMultipartUploadList(partsMap);
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setMultipartUploadID(keyArgs.getMultipartUploadID())
.build();
OmMultipartUploadCompleteInfo omMultipartUploadCompleteInfo = impl
.completeMultipartUpload(omKeyArgs, omMultipartUploadList);
response.setVolume(omMultipartUploadCompleteInfo.getVolume())
.setBucket(omMultipartUploadCompleteInfo.getBucket())
.setKey(omMultipartUploadCompleteInfo.getKey())
.setHash(omMultipartUploadCompleteInfo.getHash());
response.setStatus(Status.OK);
} catch (IOException ex) {
response.setStatus(exceptionToResponseStatus(ex));
}
return response.build();
}
private MultipartUploadAbortResponse abortMultipartUpload(
MultipartUploadAbortRequest multipartUploadAbortRequest) {
MultipartUploadAbortResponse.Builder response =
MultipartUploadAbortResponse.newBuilder();
try {
KeyArgs keyArgs = multipartUploadAbortRequest.getKeyArgs();
OmKeyArgs omKeyArgs = new OmKeyArgs.Builder()
.setVolumeName(keyArgs.getVolumeName())
.setBucketName(keyArgs.getBucketName())
.setKeyName(keyArgs.getKeyName())
.setMultipartUploadID(keyArgs.getMultipartUploadID())
.build();
impl.abortMultipartUpload(omKeyArgs);
response.setStatus(Status.OK);
} catch (IOException ex) {
response.setStatus(exceptionToResponseStatus(ex));
}
return response.build();
}
}

View File

@ -64,7 +64,7 @@ public class TestOzoneManagerRatisServer {
conf.setTimeDuration( conf.setTimeDuration(
OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY, OMConfigKeys.OZONE_OM_LEADER_ELECTION_MINIMUM_TIMEOUT_DURATION_KEY,
LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS); LEADER_ELECTION_TIMEOUT, TimeUnit.MILLISECONDS);
omRatisServer = OzoneManagerRatisServer.newOMRatisServer(omID, omRatisServer = OzoneManagerRatisServer.newOMRatisServer(null, omID,
InetAddress.getLocalHost(), conf); InetAddress.getLocalHost(), conf);
omRatisServer.start(); omRatisServer.start();
omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(omID, omRatisClient = OzoneManagerRatisClient.newOzoneManagerRatisClient(omID,
@ -101,7 +101,6 @@ public class TestOzoneManagerRatisServer {
public void testSubmitRatisRequest() throws Exception { public void testSubmitRatisRequest() throws Exception {
// Wait for leader election // Wait for leader election
Thread.sleep(LEADER_ELECTION_TIMEOUT * 2); Thread.sleep(LEADER_ELECTION_TIMEOUT * 2);
OMRequest request = OMRequest.newBuilder() OMRequest request = OMRequest.newBuilder()
.setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume) .setCmdType(OzoneManagerProtocolProtos.Type.CreateVolume)
.setClientId(clientId) .setClientId(clientId)
@ -109,12 +108,9 @@ public class TestOzoneManagerRatisServer {
OMResponse response = omRatisClient.sendCommand(request); OMResponse response = omRatisClient.sendCommand(request);
// Since the state machine is not implemented yet, we should get the Assert.assertEquals(OzoneManagerProtocolProtos.Type.CreateVolume,
// configured dummy message from Ratis. response.getCmdType());
Assert.assertEquals(false, response.getSuccess()); Assert.assertEquals(false, response.getSuccess());
Assert.assertTrue(response.getMessage().contains("Dummy response from " +
"Ratis server for command type: " +
OzoneManagerProtocolProtos.Type.CreateVolume));
Assert.assertEquals(false, response.hasCreateVolumeResponse()); Assert.assertEquals(false, response.hasCreateVolumeResponse());
} }