HDDS-2072. Make StorageContainerLocationProtocolService message based

Contributed by Elek, Marton.
This commit is contained in:
Anu Engineer 2019-10-02 16:15:31 -07:00
parent 1303255aee
commit 4c24f2434d
8 changed files with 629 additions and 531 deletions

View File

@ -16,64 +16,57 @@
*/
package org.apache.hadoop.hdds.scm.protocolPB;
import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.GetScmInfoResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ActivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ClosePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.DeactivatePipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest.Builder;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.Type;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.GetContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.GetContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.NodeQueryRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.PipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
/**
* This class is the client-side translator to translate the requests made on
@ -101,14 +94,35 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
this.rpcProxy = rpcProxy;
}
/**
* Helper method to wrap the request and send the message.
*/
private ScmContainerLocationResponse submitRequest(
StorageContainerLocationProtocolProtos.Type type,
Consumer<Builder> builderConsumer) throws IOException {
final ScmContainerLocationResponse response;
try {
Builder builder = ScmContainerLocationRequest.newBuilder()
.setCmdType(type)
.setTraceID(TracingUtil.exportCurrentSpan());
builderConsumer.accept(builder);
ScmContainerLocationRequest wrapper = builder.build();
response = rpcProxy.submitRequest(NULL_RPC_CONTROLLER, wrapper);
} catch (ServiceException ex) {
throw ProtobufHelper.getRemoteException(ex);
}
return response;
}
/**
* Asks SCM where a container should be allocated. SCM responds with the set
* of datanodes that should be used creating this container. Ozone/SCM only
* supports replication factor of either 1 or 3.
* @param type - Replication Type
*
* @param type - Replication Type
* @param factor - Replication Count
* @return
* @throws IOException
*/
@Override
public ContainerWithPipeline allocateContainer(
@ -122,12 +136,11 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
.setOwner(owner)
.build();
final ContainerResponseProto response;
try {
response = rpcProxy.allocateContainer(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
ContainerResponseProto response =
submitRequest(Type.AllocateContainer,
builder -> builder.setContainerRequest(request))
.getContainerResponse();
//TODO should be migrated to use the top level status structure.
if (response.getErrorCode() != ContainerResponseProto.Error.success) {
throw new IOException(response.hasErrorMessage() ?
response.getErrorMessage() : "Allocate container failed.");
@ -144,13 +157,12 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
.setContainerID(containerID)
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
try {
GetContainerResponseProto response =
rpcProxy.getContainer(NULL_RPC_CONTROLLER, request);
return ContainerInfo.fromProtobuf(response.getContainerInfo());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
ScmContainerLocationResponse response =
submitRequest(Type.GetContainer,
(builder) -> builder.setGetContainerRequest(request));
return ContainerInfo
.fromProtobuf(response.getGetContainerResponse().getContainerInfo());
}
/**
@ -164,14 +176,15 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
GetContainerWithPipelineRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setContainerID(containerID).build();
try {
GetContainerWithPipelineResponseProto response =
rpcProxy.getContainerWithPipeline(NULL_RPC_CONTROLLER, request);
return ContainerWithPipeline.fromProtobuf(
response.getContainerWithPipeline());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
ScmContainerLocationResponse response =
submitRequest(Type.GetContainerWithPipeline,
(builder) -> builder.setGetContainerWithPipelineRequest(request));
return ContainerWithPipeline.fromProtobuf(
response.getGetContainerWithPipelineResponse()
.getContainerWithPipeline());
}
/**
@ -191,26 +204,22 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
builder.setTraceID(TracingUtil.exportCurrentSpan());
SCMListContainerRequestProto request = builder.build();
try {
SCMListContainerResponseProto response =
rpcProxy.listContainer(NULL_RPC_CONTROLLER, request);
List<ContainerInfo> containerList = new ArrayList<>();
for (HddsProtos.ContainerInfoProto containerInfoProto : response
.getContainersList()) {
containerList.add(ContainerInfo.fromProtobuf(containerInfoProto));
}
return containerList;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
SCMListContainerResponseProto response =
submitRequest(Type.ListContainer,
builder1 -> builder1.setScmListContainerRequest(request))
.getScmListContainerResponse();
List<ContainerInfo> containerList = new ArrayList<>();
for (HddsProtos.ContainerInfoProto containerInfoProto : response
.getContainersList()) {
containerList.add(ContainerInfo.fromProtobuf(containerInfoProto));
}
return containerList;
}
/**
* Ask SCM to delete a container by name. SCM will remove
* the container mapping in its database.
*
* @param containerID
* @throws IOException
*/
@Override
public void deleteContainer(long containerID)
@ -222,18 +231,13 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
.setTraceID(TracingUtil.exportCurrentSpan())
.setContainerID(containerID)
.build();
try {
rpcProxy.deleteContainer(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
submitRequest(Type.DeleteContainer,
builder -> builder.setScmDeleteContainerRequest(request));
}
/**
* Queries a list of Node Statuses.
*
* @param nodeStatuses
* @return List of Datanodes.
*/
@Override
public List<HddsProtos.Node> queryNode(HddsProtos.NodeState
@ -246,21 +250,18 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
.setState(nodeStatuses)
.setTraceID(TracingUtil.exportCurrentSpan())
.setScope(queryScope).setPoolName(poolName).build();
try {
NodeQueryResponseProto response =
rpcProxy.queryNode(NULL_RPC_CONTROLLER, request);
return response.getDatanodesList();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
NodeQueryResponseProto response = submitRequest(Type.QueryNode,
builder -> builder.setNodeQueryRequest(request)).getNodeQueryResponse();
return response.getDatanodesList();
}
/**
* Notify from client that creates object on datanodes.
* @param type object type
* @param id object id
* @param op operation type (e.g., create, close, delete)
*
* @param type object type
* @param id object id
* @param op operation type (e.g., create, close, delete)
* @param stage object creation stage : begin/complete
*/
@Override
@ -278,20 +279,17 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
.setOp(op)
.setStage(stage)
.build();
try {
rpcProxy.notifyObjectStageChange(NULL_RPC_CONTROLLER, request);
} catch(ServiceException e){
throw ProtobufHelper.getRemoteException(e);
}
submitRequest(Type.NotifyObjectStageChange,
builder -> builder.setObjectStageChangeRequest(request));
}
/**
* Creates a replication pipeline of a specified type.
*
* @param replicationType - replication type
* @param factor - factor 1 or 3
* @param nodePool - optional machine list to build a pipeline.
* @throws IOException
* @param factor - factor 1 or 3
* @param nodePool - optional machine list to build a pipeline.
*/
@Override
public Pipeline createReplicationPipeline(HddsProtos.ReplicationType
@ -303,87 +301,82 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
.setReplicationFactor(factor)
.setReplicationType(replicationType)
.build();
try {
PipelineResponseProto response =
rpcProxy.allocatePipeline(NULL_RPC_CONTROLLER, request);
if (response.getErrorCode() ==
PipelineResponseProto.Error.success) {
Preconditions.checkState(response.hasPipeline(), "With success, " +
"must come a pipeline");
return Pipeline.getFromProtobuf(response.getPipeline());
} else {
String errorMessage = String.format("create replication pipeline " +
"failed. code : %s Message: %s", response.getErrorCode(),
response.hasErrorMessage() ? response.getErrorMessage() : "");
throw new IOException(errorMessage);
}
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
PipelineResponseProto response =
submitRequest(Type.AllocatePipeline,
builder -> builder.setPipelineRequest(request))
.getPipelineResponse();
if (response.getErrorCode() ==
PipelineResponseProto.Error.success) {
Preconditions.checkState(response.hasPipeline(), "With success, " +
"must come a pipeline");
return Pipeline.getFromProtobuf(response.getPipeline());
} else {
String errorMessage = String.format("create replication pipeline " +
"failed. code : %s Message: %s", response.getErrorCode(),
response.hasErrorMessage() ? response.getErrorMessage() : "");
throw new IOException(errorMessage);
}
}
@Override
public List<Pipeline> listPipelines() throws IOException {
try {
ListPipelineRequestProto request = ListPipelineRequestProto
.newBuilder().setTraceID(TracingUtil.exportCurrentSpan())
.build();
ListPipelineResponseProto response = rpcProxy.listPipelines(
NULL_RPC_CONTROLLER, request);
List<Pipeline> list = new ArrayList<>();
for (HddsProtos.Pipeline pipeline : response.getPipelinesList()) {
Pipeline fromProtobuf = Pipeline.getFromProtobuf(pipeline);
list.add(fromProtobuf);
}
return list;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
ListPipelineRequestProto request = ListPipelineRequestProto
.newBuilder().setTraceID(TracingUtil.exportCurrentSpan())
.build();
ListPipelineResponseProto response = submitRequest(Type.ListPipelines,
builder -> builder.setListPipelineRequest(request))
.getListPipelineResponse();
List<Pipeline> list = new ArrayList<>();
for (HddsProtos.Pipeline pipeline : response.getPipelinesList()) {
Pipeline fromProtobuf = Pipeline.getFromProtobuf(pipeline);
list.add(fromProtobuf);
}
return list;
}
@Override
public void activatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
try {
ActivatePipelineRequestProto request =
ActivatePipelineRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setPipelineID(pipelineID)
.build();
rpcProxy.activatePipeline(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
ActivatePipelineRequestProto request =
ActivatePipelineRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setPipelineID(pipelineID)
.build();
submitRequest(Type.ActivatePipeline,
builder -> builder.setActivatePipelineRequest(request));
}
@Override
public void deactivatePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
try {
DeactivatePipelineRequestProto request =
DeactivatePipelineRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setPipelineID(pipelineID)
.build();
rpcProxy.deactivatePipeline(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
DeactivatePipelineRequestProto request =
DeactivatePipelineRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setPipelineID(pipelineID)
.build();
submitRequest(Type.DeactivatePipeline,
builder -> builder.setDeactivatePipelineRequest(request));
}
@Override
public void closePipeline(HddsProtos.PipelineID pipelineID)
throws IOException {
try {
ClosePipelineRequestProto request =
ClosePipelineRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setPipelineID(pipelineID)
.build();
rpcProxy.closePipeline(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
ClosePipelineRequestProto request =
ClosePipelineRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.setPipelineID(pipelineID)
.build();
submitRequest(Type.ClosePipeline,
builder -> builder.setClosePipelineRequest(request));
}
@Override
@ -392,16 +385,14 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
HddsProtos.GetScmInfoRequestProto.newBuilder()
.setTraceID(TracingUtil.exportCurrentSpan())
.build();
try {
HddsProtos.GetScmInfoResponseProto resp = rpcProxy.getScmInfo(
NULL_RPC_CONTROLLER, request);
ScmInfo.Builder builder = new ScmInfo.Builder()
.setClusterId(resp.getClusterId())
.setScmId(resp.getScmId());
return builder.build();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
GetScmInfoResponseProto resp = submitRequest(Type.GetScmInfo,
builder -> builder.setGetScmInfoRequest(request))
.getGetScmInfoResponse();
ScmInfo.Builder builder = new ScmInfo.Builder()
.setClusterId(resp.getClusterId())
.setScmId(resp.getScmId());
return builder.build();
}
@ -409,73 +400,67 @@ public final class StorageContainerLocationProtocolClientSideTranslatorPB
* Check if SCM is in safe mode.
*
* @return Returns true if SCM is in safe mode else returns false.
* @throws IOException
*/
@Override
public boolean inSafeMode() throws IOException {
InSafeModeRequestProto request =
InSafeModeRequestProto.getDefaultInstance();
try {
InSafeModeResponseProto resp = rpcProxy.inSafeMode(
NULL_RPC_CONTROLLER, request);
return resp.getInSafeMode();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
return submitRequest(Type.InSafeMode,
builder -> builder.setInSafeModeRequest(request))
.getInSafeModeResponse().getInSafeMode();
}
/**
* Force SCM out of Safe mode.
*
* @return returns true if operation is successful.
* @throws IOException
*/
@Override
public boolean forceExitSafeMode() throws IOException {
ForceExitSafeModeRequestProto request =
ForceExitSafeModeRequestProto.getDefaultInstance();
try {
ForceExitSafeModeResponseProto resp = rpcProxy
.forceExitSafeMode(NULL_RPC_CONTROLLER, request);
return resp.getExitedSafeMode();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
ForceExitSafeModeResponseProto resp =
submitRequest(Type.ForceExitSafeMode,
builder -> builder.setForceExitSafeModeRequest(request))
.getForceExitSafeModeResponse();
return resp.getExitedSafeMode();
}
@Override
public void startReplicationManager() throws IOException {
try {
StartReplicationManagerRequestProto request =
StartReplicationManagerRequestProto.getDefaultInstance();
rpcProxy.startReplicationManager(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
StartReplicationManagerRequestProto request =
StartReplicationManagerRequestProto.getDefaultInstance();
submitRequest(Type.StartReplicationManager,
builder -> builder.setStartReplicationManagerRequest(request));
}
@Override
public void stopReplicationManager() throws IOException {
try {
StopReplicationManagerRequestProto request =
StopReplicationManagerRequestProto.getDefaultInstance();
rpcProxy.stopReplicationManager(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
StopReplicationManagerRequestProto request =
StopReplicationManagerRequestProto.getDefaultInstance();
submitRequest(Type.StopReplicationManager,
builder -> builder.setStopReplicationManagerRequest(request));
}
@Override
public boolean getReplicationManagerStatus() throws IOException {
try {
ReplicationManagerStatusRequestProto request =
ReplicationManagerStatusRequestProto.getDefaultInstance();
ReplicationManagerStatusResponseProto response =
rpcProxy.getReplicationManagerStatus(NULL_RPC_CONTROLLER, request);
return response.getIsRunning();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
ReplicationManagerStatusRequestProto request =
ReplicationManagerStatusRequestProto.getDefaultInstance();
ReplicationManagerStatusResponseProto response =
submitRequest(Type.GetReplicationManagerStatus,
builder -> builder.setSeplicationManagerStatusRequest(request))
.getReplicationManagerStatusResponse();
return response.getIsRunning();
}
@Override

View File

@ -26,7 +26,7 @@ option java_package = "org.apache.hadoop.hdds.protocol.proto";
option java_outer_classname = "ScmBlockLocationProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdds;
package hadoop.hdds.block;
import "hdds.proto";

View File

@ -26,10 +26,100 @@ option java_package = "org.apache.hadoop.hdds.protocol.proto";
option java_outer_classname = "StorageContainerLocationProtocolProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
package hadoop.hdds;
package hadoop.hdds.container;
import "hdds.proto";
/**
All functions are dispatched as Request/Response under Ozone.
if you add newe functions, please add them using same pattern.
*/
message ScmContainerLocationRequest {
required Type cmdType = 1; // Type of the command
// A string that identifies this command, we generate Trace ID in Ozone
// frontend and this allows us to trace that command all over ozone.
optional string traceID = 2;
optional ContainerRequestProto containerRequest = 6;
optional GetContainerRequestProto getContainerRequest = 7;
optional GetContainerWithPipelineRequestProto getContainerWithPipelineRequest = 8;
optional SCMListContainerRequestProto scmListContainerRequest = 9;
optional SCMDeleteContainerRequestProto scmDeleteContainerRequest = 10;
optional NodeQueryRequestProto nodeQueryRequest = 11;
optional ObjectStageChangeRequestProto objectStageChangeRequest = 12;
optional PipelineRequestProto pipelineRequest = 13;
optional ListPipelineRequestProto listPipelineRequest = 14;
optional ActivatePipelineRequestProto activatePipelineRequest = 15;
optional DeactivatePipelineRequestProto deactivatePipelineRequest = 16;
optional ClosePipelineRequestProto closePipelineRequest = 17;
optional GetScmInfoRequestProto getScmInfoRequest = 18;
optional InSafeModeRequestProto inSafeModeRequest = 19;
optional ForceExitSafeModeRequestProto forceExitSafeModeRequest = 20;
optional StartReplicationManagerRequestProto startReplicationManagerRequest = 21;
optional StopReplicationManagerRequestProto stopReplicationManagerRequest = 22;
optional ReplicationManagerStatusRequestProto seplicationManagerStatusRequest = 23;
}
message ScmContainerLocationResponse {
required Type cmdType = 1; // Type of the command
optional string traceID = 2;
optional bool success = 3 [default = true];
optional string message = 4;
required Status status = 5;
optional ContainerResponseProto containerResponse = 6;
optional GetContainerResponseProto getContainerResponse = 7;
optional GetContainerWithPipelineResponseProto getContainerWithPipelineResponse = 8;
optional SCMListContainerResponseProto scmListContainerResponse = 9;
optional SCMDeleteContainerResponseProto scmDeleteContainerResponse = 10;
optional NodeQueryResponseProto nodeQueryResponse = 11;
optional ObjectStageChangeResponseProto objectStageChangeResponse = 12;
optional PipelineResponseProto pipelineResponse = 13;
optional ListPipelineResponseProto listPipelineResponse = 14;
optional ActivatePipelineResponseProto activatePipelineResponse = 15;
optional DeactivatePipelineResponseProto deactivatePipelineResponse = 16;
optional ClosePipelineResponseProto closePipelineResponse = 17;
optional GetScmInfoResponseProto getScmInfoResponse = 18;
optional InSafeModeResponseProto inSafeModeResponse = 19;
optional ForceExitSafeModeResponseProto forceExitSafeModeResponse = 20;
optional StartReplicationManagerResponseProto startReplicationManagerResponse = 21;
optional StopReplicationManagerResponseProto stopReplicationManagerResponse = 22;
optional ReplicationManagerStatusResponseProto replicationManagerStatusResponse = 23;
enum Status {
OK = 1;
CONTAINER_ALREADY_EXISTS = 2;
CONTAINER_IS_MISSING = 3;
}
}
enum Type {
AllocateContainer = 1;
GetContainer = 2;
GetContainerWithPipeline = 3;
ListContainer = 4;
DeleteContainer = 5;
QueryNode = 6;
NotifyObjectStageChange = 7;
AllocatePipeline = 8;
ListPipelines = 9;
ActivatePipeline = 10;
DeactivatePipeline = 11;
ClosePipeline = 12;
GetScmInfo = 13;
InSafeMode = 14;
ForceExitSafeMode = 15;
StartReplicationManager = 16;
StopReplicationManager = 17;
GetReplicationManagerStatus = 18;
}
/**
* Request send to SCM asking where the container should be created.
*/
@ -235,97 +325,6 @@ message ReplicationManagerStatusResponseProto {
* and response messages for details of the RPC calls.
*/
service StorageContainerLocationProtocolService {
rpc submitRequest (ScmContainerLocationRequest) returns (ScmContainerLocationResponse);
/**
* Creates a container entry in SCM.
*/
rpc allocateContainer(ContainerRequestProto) returns (ContainerResponseProto);
/**
* Returns the pipeline for a given container.
*/
rpc getContainer(GetContainerRequestProto) returns (GetContainerResponseProto);
/**
* Returns the pipeline for a given container.
*/
rpc getContainerWithPipeline(GetContainerWithPipelineRequestProto) returns (GetContainerWithPipelineResponseProto);
rpc listContainer(SCMListContainerRequestProto) returns (SCMListContainerResponseProto);
/**
* Deletes a container in SCM.
*/
rpc deleteContainer(SCMDeleteContainerRequestProto) returns (SCMDeleteContainerResponseProto);
/**
* Returns a set of Nodes that meet a criteria.
*/
rpc queryNode(NodeQueryRequestProto) returns (NodeQueryResponseProto);
/**
* Notify from client when begin or finish container or pipeline operations on datanodes.
*/
rpc notifyObjectStageChange(ObjectStageChangeRequestProto) returns (ObjectStageChangeResponseProto);
/*
* Apis that Manage Pipelines.
*
* Pipelines are abstractions offered by SCM and Datanode that allows users
* to create a replication pipeline.
*
* These following APIs allow command line programs like SCM CLI to list
* and manage pipelines.
*/
/**
* Creates a replication pipeline.
*/
rpc allocatePipeline(PipelineRequestProto)
returns (PipelineResponseProto);
/**
* Returns the list of Pipelines managed by SCM.
*/
rpc listPipelines(ListPipelineRequestProto)
returns (ListPipelineResponseProto);
rpc activatePipeline(ActivatePipelineRequestProto)
returns (ActivatePipelineResponseProto);
rpc deactivatePipeline(DeactivatePipelineRequestProto)
returns (DeactivatePipelineResponseProto);
/**
* Closes a pipeline.
*/
rpc closePipeline(ClosePipelineRequestProto)
returns (ClosePipelineResponseProto);
/**
* Returns information about SCM.
*/
rpc getScmInfo(GetScmInfoRequestProto)
returns (GetScmInfoResponseProto);
/**
* Checks if SCM is in SafeMode.
*/
rpc inSafeMode(InSafeModeRequestProto)
returns (InSafeModeResponseProto);
/**
* Returns information about SCM.
*/
rpc forceExitSafeMode(ForceExitSafeModeRequestProto)
returns (ForceExitSafeModeResponseProto);
rpc startReplicationManager(StartReplicationManagerRequestProto)
returns (StartReplicationManagerResponseProto);
rpc stopReplicationManager(StopReplicationManagerRequestProto)
returns (StopReplicationManagerResponseProto);
rpc getReplicationManagerStatus(ReplicationManagerStatusRequestProto)
returns (ReplicationManagerStatusResponseProto);
}

View File

@ -1,4 +1,3 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
@ -7,9 +6,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
*
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@ -42,16 +41,18 @@ import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolPro
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ListPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.NodeQueryResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ObjectStageChangeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.PipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMDeleteContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.SCMListContainerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationRequest;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ScmContainerLocationResponse.Status;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
@ -61,11 +62,13 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.hdds.server.OzoneProtocolMessageDispatcher;
import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import io.opentracing.Scope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class is the server-side translator that forwards requests received on
@ -76,288 +79,315 @@ import io.opentracing.Scope;
public final class StorageContainerLocationProtocolServerSideTranslatorPB
implements StorageContainerLocationProtocolPB {
private static final Logger LOG =
LoggerFactory.getLogger(
StorageContainerLocationProtocolServerSideTranslatorPB.class);
private final StorageContainerLocationProtocol impl;
private OzoneProtocolMessageDispatcher<ScmContainerLocationRequest,
ScmContainerLocationResponse>
dispatcher;
/**
* Creates a new StorageContainerLocationProtocolServerSideTranslatorPB.
*
* @param impl {@link StorageContainerLocationProtocol} server implementation
* @param impl {@link StorageContainerLocationProtocol} server
* implementation
* @param protocolMetrics
*/
public StorageContainerLocationProtocolServerSideTranslatorPB(
StorageContainerLocationProtocol impl) throws IOException {
StorageContainerLocationProtocol impl,
ProtocolMessageMetrics protocolMetrics) throws IOException {
this.impl = impl;
this.dispatcher =
new OzoneProtocolMessageDispatcher<>("ScmContainerLocation",
protocolMetrics, LOG);
}
@Override
public ContainerResponseProto allocateContainer(RpcController unused,
ContainerRequestProto request) throws ServiceException {
try (Scope scope = TracingUtil
.importAndCreateScope("allocateContainer", request.getTraceID())) {
ContainerWithPipeline containerWithPipeline = impl
.allocateContainer(request.getReplicationType(),
request.getReplicationFactor(), request.getOwner());
return ContainerResponseProto.newBuilder()
.setContainerWithPipeline(containerWithPipeline.getProtobuf())
.setErrorCode(ContainerResponseProto.Error.success)
.build();
public ScmContainerLocationResponse submitRequest(RpcController controller,
ScmContainerLocationRequest request) throws ServiceException {
return dispatcher
.processRequest(request, this::processRequest, request.getCmdType(),
request.getTraceID());
}
public ScmContainerLocationResponse processRequest(
ScmContainerLocationRequest request) throws ServiceException {
try {
switch (request.getCmdType()) {
case AllocateContainer:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setContainerResponse(
allocateContainer(request.getContainerRequest()))
.build();
case GetContainer:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setGetContainerResponse(
getContainer(request.getGetContainerRequest()))
.build();
case GetContainerWithPipeline:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setGetContainerWithPipelineResponse(getContainerWithPipeline(
request.getGetContainerWithPipelineRequest()))
.build();
case ListContainer:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setScmListContainerResponse(listContainer(
request.getScmListContainerRequest()))
.build();
case QueryNode:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setNodeQueryResponse(queryNode(request.getNodeQueryRequest()))
.build();
case NotifyObjectStageChange:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setObjectStageChangeResponse(notifyObjectStageChange(
request.getObjectStageChangeRequest()))
.build();
case ListPipelines:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setListPipelineResponse(listPipelines(
request.getListPipelineRequest()))
.build();
case ActivatePipeline:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setActivatePipelineResponse(activatePipeline(
request.getActivatePipelineRequest()))
.build();
case GetScmInfo:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setGetScmInfoResponse(getScmInfo(
request.getGetScmInfoRequest()))
.build();
case InSafeMode:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setInSafeModeResponse(inSafeMode(
request.getInSafeModeRequest()))
.build();
case ForceExitSafeMode:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setForceExitSafeModeResponse(forceExitSafeMode(
request.getForceExitSafeModeRequest()))
.build();
case StartReplicationManager:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setStartReplicationManagerResponse(startReplicationManager(
request.getStartReplicationManagerRequest()))
.build();
case StopReplicationManager:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setStopReplicationManagerResponse(stopReplicationManager(
request.getStopReplicationManagerRequest()))
.build();
case GetReplicationManagerStatus:
return ScmContainerLocationResponse.newBuilder()
.setCmdType(request.getCmdType())
.setStatus(Status.OK)
.setReplicationManagerStatusResponse(getReplicationManagerStatus(
request.getSeplicationManagerStatusRequest()))
.build();
default:
throw new IllegalArgumentException(
"Unknown command type: " + request.getCmdType());
}
} catch (IOException e) {
throw new ServiceException(e);
}
}
@Override
public ContainerResponseProto allocateContainer(ContainerRequestProto request)
throws IOException {
ContainerWithPipeline containerWithPipeline = impl
.allocateContainer(request.getReplicationType(),
request.getReplicationFactor(), request.getOwner());
return ContainerResponseProto.newBuilder()
.setContainerWithPipeline(containerWithPipeline.getProtobuf())
.setErrorCode(ContainerResponseProto.Error.success)
.build();
}
public GetContainerResponseProto getContainer(
RpcController controller, GetContainerRequestProto request)
throws ServiceException {
try (Scope scope = TracingUtil
.importAndCreateScope("getContainer", request.getTraceID())) {
ContainerInfo container = impl.getContainer(request.getContainerID());
return GetContainerResponseProto.newBuilder()
.setContainerInfo(container.getProtobuf())
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
GetContainerRequestProto request) throws IOException {
ContainerInfo container = impl.getContainer(request.getContainerID());
return GetContainerResponseProto.newBuilder()
.setContainerInfo(container.getProtobuf())
.build();
}
@Override
public GetContainerWithPipelineResponseProto getContainerWithPipeline(
RpcController controller, GetContainerWithPipelineRequestProto request)
throws ServiceException {
try (Scope scope = TracingUtil
.importAndCreateScope("getContainerWithPipeline",
request.getTraceID())) {
ContainerWithPipeline container = impl
.getContainerWithPipeline(request.getContainerID());
return GetContainerWithPipelineResponseProto.newBuilder()
.setContainerWithPipeline(container.getProtobuf())
.build();
} catch (IOException e) {
throw new ServiceException(e);
}
GetContainerWithPipelineRequestProto request)
throws IOException {
ContainerWithPipeline container = impl
.getContainerWithPipeline(request.getContainerID());
return GetContainerWithPipelineResponseProto.newBuilder()
.setContainerWithPipeline(container.getProtobuf())
.build();
}
@Override
public SCMListContainerResponseProto listContainer(RpcController controller,
SCMListContainerRequestProto request) throws ServiceException {
try (Scope scope = TracingUtil
.importAndCreateScope("listContainer", request.getTraceID())) {
long startContainerID = 0;
int count = -1;
public SCMListContainerResponseProto listContainer(
SCMListContainerRequestProto request) throws IOException {
// Arguments check.
if (request.hasStartContainerID()) {
// End container name is given.
startContainerID = request.getStartContainerID();
}
count = request.getCount();
List<ContainerInfo> containerList =
impl.listContainer(startContainerID, count);
SCMListContainerResponseProto.Builder builder =
SCMListContainerResponseProto.newBuilder();
for (ContainerInfo container : containerList) {
builder.addContainers(container.getProtobuf());
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
long startContainerID = 0;
int count = -1;
// Arguments check.
if (request.hasStartContainerID()) {
// End container name is given.
startContainerID = request.getStartContainerID();
}
count = request.getCount();
List<ContainerInfo> containerList =
impl.listContainer(startContainerID, count);
SCMListContainerResponseProto.Builder builder =
SCMListContainerResponseProto.newBuilder();
for (ContainerInfo container : containerList) {
builder.addContainers(container.getProtobuf());
}
return builder.build();
}
@Override
public SCMDeleteContainerResponseProto deleteContainer(
RpcController controller, SCMDeleteContainerRequestProto request)
throws ServiceException {
try (Scope scope = TracingUtil
.importAndCreateScope("deleteContainer", request.getTraceID())) {
impl.deleteContainer(request.getContainerID());
return SCMDeleteContainerResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
SCMDeleteContainerRequestProto request)
throws IOException {
impl.deleteContainer(request.getContainerID());
return SCMDeleteContainerResponseProto.newBuilder().build();
}
@Override
public StorageContainerLocationProtocolProtos.NodeQueryResponseProto
queryNode(RpcController controller,
public NodeQueryResponseProto queryNode(
StorageContainerLocationProtocolProtos.NodeQueryRequestProto request)
throws ServiceException {
try (Scope scope = TracingUtil
.importAndCreateScope("queryNode", request.getTraceID())) {
HddsProtos.NodeState nodeState = request.getState();
List<HddsProtos.Node> datanodes = impl.queryNode(nodeState,
request.getScope(), request.getPoolName());
return StorageContainerLocationProtocolProtos
.NodeQueryResponseProto.newBuilder()
.addAllDatanodes(datanodes)
.build();
} catch (Exception e) {
throw new ServiceException(e);
}
throws IOException {
HddsProtos.NodeState nodeState = request.getState();
List<HddsProtos.Node> datanodes = impl.queryNode(nodeState,
request.getScope(), request.getPoolName());
return NodeQueryResponseProto.newBuilder()
.addAllDatanodes(datanodes)
.build();
}
@Override
public ObjectStageChangeResponseProto notifyObjectStageChange(
RpcController controller, ObjectStageChangeRequestProto request)
throws ServiceException {
try (Scope scope = TracingUtil
.importAndCreateScope("notifyObjectStageChange",
request.getTraceID())) {
impl.notifyObjectStageChange(request.getType(), request.getId(),
request.getOp(), request.getStage());
return ObjectStageChangeResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
ObjectStageChangeRequestProto request)
throws IOException {
impl.notifyObjectStageChange(request.getType(), request.getId(),
request.getOp(), request.getStage());
return ObjectStageChangeResponseProto.newBuilder().build();
}
@Override
public PipelineResponseProto allocatePipeline(
RpcController controller, PipelineRequestProto request)
throws ServiceException {
// TODO : Wiring this up requires one more patch.
return null;
}
@Override
public ListPipelineResponseProto listPipelines(
RpcController controller, ListPipelineRequestProto request)
throws ServiceException {
try (Scope scope = TracingUtil
.importAndCreateScope("listPipelines", request.getTraceID())) {
ListPipelineResponseProto.Builder builder = ListPipelineResponseProto
.newBuilder();
List<Pipeline> pipelines = impl.listPipelines();
for (Pipeline pipeline : pipelines) {
HddsProtos.Pipeline protobufMessage = pipeline.getProtobufMessage();
builder.addPipelines(protobufMessage);
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
ListPipelineRequestProto request)
throws IOException {
ListPipelineResponseProto.Builder builder = ListPipelineResponseProto
.newBuilder();
List<Pipeline> pipelines = impl.listPipelines();
for (Pipeline pipeline : pipelines) {
HddsProtos.Pipeline protobufMessage = pipeline.getProtobufMessage();
builder.addPipelines(protobufMessage);
}
return builder.build();
}
@Override
public ActivatePipelineResponseProto activatePipeline(
RpcController controller, ActivatePipelineRequestProto request)
throws ServiceException {
try (Scope ignored = TracingUtil
.importAndCreateScope("activatePipeline", request.getTraceID())) {
impl.activatePipeline(request.getPipelineID());
return ActivatePipelineResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
ActivatePipelineRequestProto request)
throws IOException {
impl.activatePipeline(request.getPipelineID());
return ActivatePipelineResponseProto.newBuilder().build();
}
@Override
public DeactivatePipelineResponseProto deactivatePipeline(
RpcController controller, DeactivatePipelineRequestProto request)
throws ServiceException {
try (Scope ignored = TracingUtil
.importAndCreateScope("deactivatePipeline", request.getTraceID())) {
impl.deactivatePipeline(request.getPipelineID());
return DeactivatePipelineResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
DeactivatePipelineRequestProto request)
throws IOException {
impl.deactivatePipeline(request.getPipelineID());
return DeactivatePipelineResponseProto.newBuilder().build();
}
@Override
public ClosePipelineResponseProto closePipeline(
RpcController controller, ClosePipelineRequestProto request)
throws ServiceException {
try (Scope scope = TracingUtil
.importAndCreateScope("closePipeline", request.getTraceID())) {
impl.closePipeline(request.getPipelineID());
return ClosePipelineResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
throws IOException {
impl.closePipeline(request.getPipelineID());
return ClosePipelineResponseProto.newBuilder().build();
}
@Override
public HddsProtos.GetScmInfoResponseProto getScmInfo(
RpcController controller, HddsProtos.GetScmInfoRequestProto req)
throws ServiceException {
try (Scope scope = TracingUtil
.importAndCreateScope("getScmInfo", req.getTraceID())) {
ScmInfo scmInfo = impl.getScmInfo();
return HddsProtos.GetScmInfoResponseProto.newBuilder()
.setClusterId(scmInfo.getClusterId())
.setScmId(scmInfo.getScmId())
.build();
} catch (IOException ex) {
throw new ServiceException(ex);
}
HddsProtos.GetScmInfoRequestProto req)
throws IOException {
ScmInfo scmInfo = impl.getScmInfo();
return HddsProtos.GetScmInfoResponseProto.newBuilder()
.setClusterId(scmInfo.getClusterId())
.setScmId(scmInfo.getScmId())
.build();
}
@Override
public InSafeModeResponseProto inSafeMode(
RpcController controller,
InSafeModeRequestProto request) throws ServiceException {
try (Scope scope = TracingUtil
.importAndCreateScope("inSafeMode", request.getTraceID())) {
return InSafeModeResponseProto.newBuilder()
.setInSafeMode(impl.inSafeMode()).build();
} catch (IOException ex) {
throw new ServiceException(ex);
}
InSafeModeRequestProto request) throws IOException {
return InSafeModeResponseProto.newBuilder()
.setInSafeMode(impl.inSafeMode()).build();
}
@Override
public ForceExitSafeModeResponseProto forceExitSafeMode(
RpcController controller, ForceExitSafeModeRequestProto request)
throws ServiceException {
try (Scope scope = TracingUtil
.importAndCreateScope("forceExitSafeMode", request.getTraceID())) {
return ForceExitSafeModeResponseProto.newBuilder()
.setExitedSafeMode(impl.forceExitSafeMode()).build();
} catch (IOException ex) {
throw new ServiceException(ex);
}
ForceExitSafeModeRequestProto request)
throws IOException {
return ForceExitSafeModeResponseProto.newBuilder()
.setExitedSafeMode(impl.forceExitSafeMode()).build();
}
@Override
public StartReplicationManagerResponseProto startReplicationManager(
RpcController controller, StartReplicationManagerRequestProto request)
throws ServiceException {
try (Scope ignored = TracingUtil.importAndCreateScope(
"startReplicationManager", request.getTraceID())) {
impl.startReplicationManager();
return StartReplicationManagerResponseProto.newBuilder().build();
} catch (IOException ex) {
throw new ServiceException(ex);
}
StartReplicationManagerRequestProto request)
throws IOException {
impl.startReplicationManager();
return StartReplicationManagerResponseProto.newBuilder().build();
}
@Override
public StopReplicationManagerResponseProto stopReplicationManager(
RpcController controller, StopReplicationManagerRequestProto request)
throws ServiceException {
try (Scope ignored = TracingUtil.importAndCreateScope(
"stopReplicationManager", request.getTraceID())) {
impl.stopReplicationManager();
return StopReplicationManagerResponseProto.newBuilder().build();
} catch (IOException ex) {
throw new ServiceException(ex);
}
StopReplicationManagerRequestProto request)
throws IOException {
impl.stopReplicationManager();
return StopReplicationManagerResponseProto.newBuilder().build();
}
@Override
public ReplicationManagerStatusResponseProto getReplicationManagerStatus(
RpcController controller, ReplicationManagerStatusRequestProto request)
throws ServiceException {
try (Scope ignored = TracingUtil.importAndCreateScope(
"getReplicationManagerStatus", request.getTraceID())) {
return ReplicationManagerStatusResponseProto.newBuilder()
.setIsRunning(impl.getReplicationManagerStatus()).build();
} catch (IOException ex) {
throw new ServiceException(ex);
}
ReplicationManagerStatusRequestProto request)
throws IOException {
return ReplicationManagerStatusResponseProto.newBuilder()
.setIsRunning(impl.getReplicationManagerStatus()).build();
}
}

View File

@ -61,6 +61,7 @@ import org.apache.hadoop.ozone.audit.AuditMessage;
import org.apache.hadoop.ozone.audit.Auditor;
import org.apache.hadoop.ozone.audit.SCMAction;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.protocolPB.ProtocolMessageMetrics;
import org.apache.hadoop.security.UserGroupInformation;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -103,6 +104,7 @@ public class SCMClientProtocolServer implements
private final StorageContainerManager scm;
private final OzoneConfiguration conf;
private SafeModePrecheck safeModePrecheck;
private final ProtocolMessageMetrics protocolMetrics;
public SCMClientProtocolServer(OzoneConfiguration conf,
StorageContainerManager scm) throws IOException {
@ -115,10 +117,16 @@ public class SCMClientProtocolServer implements
RPC.setProtocolEngine(conf, StorageContainerLocationProtocolPB.class,
ProtobufRpcEngine.class);
protocolMetrics = ProtocolMessageMetrics
.create("ScmContainerLocationProtocol",
"SCM ContainerLocation protocol metrics",
StorageContainerLocationProtocolProtos.Type.values());
// SCM Container Service RPC
BlockingService storageProtoPbService =
newReflectiveBlockingService(
new StorageContainerLocationProtocolServerSideTranslatorPB(this));
new StorageContainerLocationProtocolServerSideTranslatorPB(this,
protocolMetrics));
final InetSocketAddress scmAddress = HddsServerUtil
.getScmClientBindAddress(conf);
@ -147,6 +155,7 @@ public class SCMClientProtocolServer implements
}
public void start() {
protocolMetrics.register();
LOG.info(
StorageContainerManager.buildRpcServerStartMessage(
"RPC server for Client ", getClientRpcAddress()));
@ -154,6 +163,7 @@ public class SCMClientProtocolServer implements
}
public void stop() {
protocolMetrics.unregister();
try {
LOG.info("Stopping the RPC server for Client Protocol");
getClientRpcServer().stop();

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.ozone.insight.scm.EventQueueInsight;
import org.apache.hadoop.ozone.insight.scm.NodeManagerInsight;
import org.apache.hadoop.ozone.insight.scm.ReplicaManagerInsight;
import org.apache.hadoop.ozone.insight.scm.ScmProtocolBlockLocationInsight;
import org.apache.hadoop.ozone.insight.scm.ScmProtocolContainerLocationInsight;
import org.apache.hadoop.ozone.insight.scm.ScmProtocolSecurityInsight;
import org.apache.hadoop.ozone.om.OMConfigKeys;
@ -89,6 +90,8 @@ public class BaseInsightSubCommand {
insights.put("scm.event-queue", new EventQueueInsight());
insights.put("scm.protocol.block-location",
new ScmProtocolBlockLocationInsight());
insights.put("scm.protocol.container-location",
new ScmProtocolContainerLocationInsight());
insights.put("scm.protocol.security",
new ScmProtocolSecurityInsight());
insights.put("om.key-manager", new KeyManagerInsight());

View File

@ -23,12 +23,12 @@ import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdds.scm.server.SCMBlockProtocolServer;
import org.apache.hadoop.ozone.insight.BaseInsightPoint;
import org.apache.hadoop.ozone.insight.Component.Type;
import org.apache.hadoop.ozone.insight.LoggerSource;
import org.apache.hadoop.ozone.insight.MetricGroupDisplay;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocolServerSideTranslatorPB;
/**
* Insight metric to check the SCM block location protocol behaviour.

View File

@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.insight.scm;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StorageContainerLocationProtocolService;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocolServerSideTranslatorPB;
import org.apache.hadoop.ozone.insight.BaseInsightPoint;
import org.apache.hadoop.ozone.insight.Component.Type;
import org.apache.hadoop.ozone.insight.LoggerSource;
import org.apache.hadoop.ozone.insight.MetricGroupDisplay;
/**
* Insight metric to check the SCM block location protocol behaviour.
*/
public class ScmProtocolContainerLocationInsight extends BaseInsightPoint {
@Override
public List<LoggerSource> getRelatedLoggers(boolean verbose) {
List<LoggerSource> loggers = new ArrayList<>();
loggers.add(
new LoggerSource(Type.SCM,
StorageContainerLocationProtocolServerSideTranslatorPB.class,
defaultLevel(verbose)));
new LoggerSource(Type.SCM,
StorageContainerLocationProtocolService.class,
defaultLevel(verbose));
return loggers;
}
@Override
public List<MetricGroupDisplay> getMetrics() {
List<MetricGroupDisplay> metrics = new ArrayList<>();
Map<String, String> filter = new HashMap<>();
filter.put("servername", "StorageContainerLocationProtocolService");
addRpcMetrics(metrics, Type.SCM, filter);
addProtocolMessageMetrics(metrics, "scm_container_location_protocol",
Type.SCM, StorageContainerLocationProtocolProtos.Type.values());
return metrics;
}
@Override
public String getDescription() {
return "SCM Container location protocol endpoint";
}
}