HDDS-1258. Fix error propagation for SCM protocol

Closes #1001
This commit is contained in:
S O'Donnell 2019-07-01 20:04:32 +02:00 committed by Márton Elek
parent b1dafc3506
commit f8d62a9c4c
No known key found for this signature in database
GPG Key ID: D51EA8F00EE79B28
6 changed files with 206 additions and 143 deletions

View File

@ -95,7 +95,7 @@ public class SCMException extends IOException {
* Error codes to make it easy to decode these exceptions.
*/
public enum ResultCodes {
SUCCEESS,
OK,
FAILED_TO_LOAD_NODEPOOL,
FAILED_TO_FIND_NODE_IN_POOL,
FAILED_TO_FIND_HEALTHY_NODES,
@ -120,6 +120,8 @@ public class SCMException extends IOException {
NO_SUCH_DATANODE,
NO_REPLICA_FOUND,
FAILED_TO_FIND_ACTIVE_PIPELINE,
FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY
FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY,
FAILED_TO_ALLOCATE_ENOUGH_BLOCKS,
INTERNAL_ERROR
}
}

View File

@ -16,4 +16,6 @@
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.exceptions;
// Exceptions thrown by SCM.
/**
Exception objects for the SCM Server.
*/

View File

@ -37,9 +37,11 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyB
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.tracing.TracingUtil;
import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolTranslator;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ozone.common.BlockGroup;
@ -49,6 +51,8 @@ import com.google.common.base.Preconditions;
import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException;
import static org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Status.OK;
/**
* This class is the client-side translator to translate the requests made on
* the {@link ScmBlockLocationProtocol} interface to the RPC server
@ -85,6 +89,32 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
.setTraceID(TracingUtil.exportCurrentSpan());
}
/**
* Submits client request to SCM server.
* @param req client request
* @return response from SCM
* @throws IOException thrown if any Protobuf service exception occurs
*/
private SCMBlockLocationResponse submitRequest(
SCMBlockLocationRequest req) throws IOException {
try {
SCMBlockLocationResponse response =
rpcProxy.send(NULL_RPC_CONTROLLER, req);
return response;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
private SCMBlockLocationResponse handleError(SCMBlockLocationResponse resp)
throws SCMException {
if (resp.getStatus() != OK) {
throw new SCMException(resp.getMessage(),
SCMException.ResultCodes.values()[resp.getStatus().ordinal()]);
}
return resp;
}
/**
* Asks SCM where a block should be allocated. SCM responds with the
* set of datanodes that should be used creating this block.
@ -117,19 +147,10 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
.setAllocateScmBlockRequest(request)
.build();
final AllocateScmBlockResponseProto response;
final SCMBlockLocationResponse wrappedResponse;
try {
wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper);
response = wrappedResponse.getAllocateScmBlockResponse();
} catch (ServiceException e) {
throw transformServiceException(e);
}
if (response.getErrorCode() !=
AllocateScmBlockResponseProto.Error.success) {
throw new IOException(response.hasErrorMessage() ?
response.getErrorMessage() : "Allocate block failed.");
}
final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
final AllocateScmBlockResponseProto response =
wrappedResponse.getAllocateScmBlockResponse();
List<AllocatedBlock> blocks = new ArrayList<>(response.getBlocksCount());
for (AllocateBlockResponse resp : response.getBlocksList()) {
@ -166,14 +187,11 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
.setDeleteScmKeyBlocksRequest(request)
.build();
final DeleteScmKeyBlocksResponseProto resp;
final SCMBlockLocationResponse wrappedResponse;
try {
wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper);
resp = wrappedResponse.getDeleteScmKeyBlocksResponse();
} catch (ServiceException e) {
throw transformServiceException(e);
}
final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
final DeleteScmKeyBlocksResponseProto resp =
wrappedResponse.getDeleteScmKeyBlocksResponse();
List<DeleteBlockGroupResult> results =
new ArrayList<>(resp.getResultsCount());
results.addAll(resp.getResultsList().stream().map(
@ -184,30 +202,6 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
return results;
}
private IOException transformServiceException(
ServiceException se) throws IOException {
//TODO SCM has no perfect way to return with business exceptions. All
//the exceptions will be mapped to ServiceException.
//ServiceException is handled in a special way in hadoop rpc: the message
//contains the whole stack trace which is not required for the business
//exception. As of now I remove the stack trace (use first line only).
//Long term we need a proper way of the exception propagation.
Throwable cause = se.getCause();
if (cause == null) {
return new IOException(
new ServiceException(useFirstLine(se.getMessage()), se.getCause()));
}
return new IOException(useFirstLine(cause.getMessage()), cause.getCause());
}
private String useFirstLine(String message) {
if (message == null) {
return null;
} else {
return message.split("\n")[0];
}
}
/**
* Gets the cluster Id and Scm Id from SCM.
* @return ScmInfo
@ -224,13 +218,9 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
.setGetScmInfoRequest(request)
.build();
final SCMBlockLocationResponse wrappedResponse;
try {
wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper);
resp = wrappedResponse.getGetScmInfoResponse();
} catch (ServiceException e) {
throw transformServiceException(e);
}
final SCMBlockLocationResponse wrappedResponse =
handleError(submitRequest(wrapper));
resp = wrappedResponse.getGetScmInfoResponse();
ScmInfo.Builder builder = new ScmInfo.Builder()
.setClusterId(resp.getClusterId())
.setScmId(resp.getScmId());

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB;
@ -78,7 +79,6 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
this.impl = impl;
}
private SCMBlockLocationResponse.Builder createSCMBlockResponse(
ScmBlockLocationProtocolProtos.Type cmdType,
String traceID) {
@ -95,100 +95,99 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
SCMBlockLocationResponse.Builder response = createSCMBlockResponse(
request.getCmdType(),
traceId);
response.setSuccess(true);
response.setStatus(Status.OK);
switch (request.getCmdType()) {
case AllocateScmBlock:
response.setAllocateScmBlockResponse(
allocateScmBlock(traceId, request.getAllocateScmBlockRequest()));
break;
case DeleteScmKeyBlocks:
response.setDeleteScmKeyBlocksResponse(
deleteScmKeyBlocks(traceId, request.getDeleteScmKeyBlocksRequest()));
break;
case GetScmInfo:
response.setGetScmInfoResponse(
getScmInfo(traceId, request.getGetScmInfoRequest()));
break;
default:
throw new ServiceException("Unknown Operation");
try(Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol."+request.getCmdType(),
request.getTraceID())) {
switch (request.getCmdType()) {
case AllocateScmBlock:
response.setAllocateScmBlockResponse(
allocateScmBlock(request.getAllocateScmBlockRequest()));
break;
case DeleteScmKeyBlocks:
response.setDeleteScmKeyBlocksResponse(
deleteScmKeyBlocks(request.getDeleteScmKeyBlocksRequest()));
break;
case GetScmInfo:
response.setGetScmInfoResponse(
getScmInfo(request.getGetScmInfoRequest()));
break;
default:
// Should never happen
throw new IOException("Unknown Operation "+request.getCmdType()+
" in ScmBlockLocationProtocol");
}
} catch (IOException e) {
response.setSuccess(false);
response.setStatus(exceptionToResponseStatus(e));
if (e.getMessage() != null) {
response.setMessage(e.getMessage());
}
}
response.setSuccess(true)
.setStatus(Status.OK);
return response.build();
}
public AllocateScmBlockResponseProto allocateScmBlock(
String traceId, AllocateScmBlockRequestProto request)
throws ServiceException {
try(Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol.allocateBlock",
traceId)) {
List<AllocatedBlock> allocatedBlocks =
impl.allocateBlock(request.getSize(),
request.getNumBlocks(), request.getType(),
request.getFactor(), request.getOwner(),
ExcludeList.getFromProtoBuf(request.getExcludeList()));
AllocateScmBlockResponseProto.Builder builder =
AllocateScmBlockResponseProto.newBuilder();
if (allocatedBlocks.size() < request.getNumBlocks()) {
return builder
.setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
.build();
}
for (AllocatedBlock block : allocatedBlocks) {
builder.addBlocks(AllocateBlockResponse.newBuilder()
.setContainerBlockID(block.getBlockID().getProtobuf())
.setPipeline(block.getPipeline().getProtobufMessage()));
}
return builder
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
.build();
} catch (IOException e) {
throw new ServiceException(e);
private Status exceptionToResponseStatus(IOException ex) {
if (ex instanceof SCMException) {
return Status.values()[((SCMException) ex).getResult().ordinal()];
} else {
return Status.INTERNAL_ERROR;
}
}
public AllocateScmBlockResponseProto allocateScmBlock(
AllocateScmBlockRequestProto request)
throws IOException {
List<AllocatedBlock> allocatedBlocks =
impl.allocateBlock(request.getSize(),
request.getNumBlocks(), request.getType(),
request.getFactor(), request.getOwner(),
ExcludeList.getFromProtoBuf(request.getExcludeList()));
AllocateScmBlockResponseProto.Builder builder =
AllocateScmBlockResponseProto.newBuilder();
if (allocatedBlocks.size() < request.getNumBlocks()) {
throw new SCMException("Allocated " + allocatedBlocks.size() +
" blocks. Requested " + request.getNumBlocks() + " blocks",
SCMException.ResultCodes.FAILED_TO_ALLOCATE_ENOUGH_BLOCKS);
}
for (AllocatedBlock block : allocatedBlocks) {
builder.addBlocks(AllocateBlockResponse.newBuilder()
.setContainerBlockID(block.getBlockID().getProtobuf())
.setPipeline(block.getPipeline().getProtobufMessage()));
}
return builder.build();
}
public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks(
String traceId, DeleteScmKeyBlocksRequestProto req)
throws ServiceException {
DeleteScmKeyBlocksRequestProto req)
throws IOException {
DeleteScmKeyBlocksResponseProto.Builder resp =
DeleteScmKeyBlocksResponseProto.newBuilder();
try(Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol.deleteKeyBlocks",
traceId)) {
List<BlockGroup> infoList = req.getKeyBlocksList().stream()
.map(BlockGroup::getFromProto).collect(Collectors.toList());
final List<DeleteBlockGroupResult> results =
impl.deleteKeyBlocks(infoList);
for (DeleteBlockGroupResult result: results) {
DeleteKeyBlocksResultProto.Builder deleteResult =
DeleteKeyBlocksResultProto
.newBuilder()
.setObjectKey(result.getObjectKey())
.addAllBlockResults(result.getBlockResultProtoList());
resp.addResults(deleteResult.build());
}
} catch (IOException ex) {
throw new ServiceException(ex);
List<BlockGroup> infoList = req.getKeyBlocksList().stream()
.map(BlockGroup::getFromProto).collect(Collectors.toList());
final List<DeleteBlockGroupResult> results =
impl.deleteKeyBlocks(infoList);
for (DeleteBlockGroupResult result: results) {
DeleteKeyBlocksResultProto.Builder deleteResult =
DeleteKeyBlocksResultProto
.newBuilder()
.setObjectKey(result.getObjectKey())
.addAllBlockResults(result.getBlockResultProtoList());
resp.addResults(deleteResult.build());
}
return resp.build();
}
public HddsProtos.GetScmInfoResponseProto getScmInfo(
String traceId, HddsProtos.GetScmInfoRequestProto req)
throws ServiceException {
ScmInfo scmInfo;
try(Scope scope = TracingUtil
.importAndCreateScope("ScmBlockLocationProtocol.getInfo",
traceId)) {
scmInfo = impl.getScmInfo();
} catch (IOException ex) {
throw new ServiceException(ex);
}
HddsProtos.GetScmInfoRequestProto req)
throws IOException {
ScmInfo scmInfo = impl.getScmInfo();
return HddsProtos.GetScmInfoResponseProto.newBuilder()
.setClusterId(scmInfo.getClusterId())
.setScmId(scmInfo.getScmId())

View File

@ -84,7 +84,33 @@ message UserInfo {
enum Status {
OK = 1;
UNKNOWN = 2;
FAILED_TO_LOAD_NODEPOOL = 2;
FAILED_TO_FIND_NODE_IN_POOL = 3;
FAILED_TO_FIND_HEALTHY_NODES = 4;
FAILED_TO_FIND_NODES_WITH_SPACE = 5;
FAILED_TO_FIND_SUITABLE_NODE = 6;
INVALID_CAPACITY = 7;
INVALID_BLOCK_SIZE = 8;
SAFE_MODE_EXCEPTION = 9;
FAILED_TO_LOAD_OPEN_CONTAINER = 10;
FAILED_TO_ALLOCATE_CONTAINER = 11;
FAILED_TO_CHANGE_CONTAINER_STATE = 12;
FAILED_TO_CHANGE_PIPELINE_STATE = 13;
CONTAINER_EXISTS = 14;
FAILED_TO_FIND_CONTAINER = 15;
FAILED_TO_FIND_CONTAINER_WITH_SPACE = 16;
BLOCK_EXISTS = 17;
FAILED_TO_FIND_BLOCK = 18;
IO_EXCEPTION = 19;
UNEXPECTED_CONTAINER_STATE = 20;
SCM_NOT_INITIALIZED = 21;
DUPLICATE_DATANODE = 22;
NO_SUCH_DATANODE = 23;
NO_REPLICA_FOUND = 24;
FAILED_TO_FIND_ACTIVE_PIPELINE = 25;
FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY = 26;
FAILED_TO_ALLOCATE_ENOUGH_BLOCKS = 27;
INTERNAL_ERROR = 29;
}
/**
@ -156,14 +182,6 @@ message AllocateBlockResponse {
* Reply from SCM indicating that the container.
*/
message AllocateScmBlockResponseProto {
enum Error {
success = 1;
errorNotEnoughSpace = 2;
errorSizeTooBig = 3;
unknownFailure = 4;
}
required Error errorCode = 1;
optional string errorMessage = 2;
repeated AllocateBlockResponse blocks = 3;
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.exceptions;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.protocol.proto.
ScmBlockLocationProtocolProtos.Status;
import org.junit.Assert;
import org.junit.Test;
/**
* Test Result code mappping between SCMException and the protobuf definitions.
*/
public class TestSCMExceptionResultCodes {
@Test
public void codeMapping() {
// ResultCode = SCMException definition
// Status = protobuf definition
Assert.assertEquals(ResultCodes.values().length, Status.values().length);
for (int i = 0; i < ResultCodes.values().length; i++) {
ResultCodes codeValue = ResultCodes.values()[i];
Status protoBufValue = Status.values()[i];
Assert.assertTrue(String
.format("Protobuf/Enum constant name mismatch %s %s", codeValue,
protoBufValue), sameName(codeValue.name(), protoBufValue.name()));
ResultCodes converted = ResultCodes.values()[protoBufValue.ordinal()];
Assert.assertEquals(codeValue, converted);
}
}
private boolean sameName(String codeValue, String protoBufValue) {
return codeValue.equals(protoBufValue);
}
}