From f8d62a9c4c03a637896bf4f1795176901c4e7235 Mon Sep 17 00:00:00 2001 From: S O'Donnell Date: Mon, 1 Jul 2019 20:04:32 +0200 Subject: [PATCH] HDDS-1258. Fix error propagation for SCM protocol Closes #1001 --- .../hdds/scm/exceptions/SCMException.java | 6 +- .../hdds/scm/exceptions/package-info.java | 4 +- ...ocationProtocolClientSideTranslatorPB.java | 94 +++++------ ...ocationProtocolServerSideTranslatorPB.java | 157 +++++++++--------- .../main/proto/ScmBlockLocationProtocol.proto | 36 +++- .../TestSCMExceptionResultCodes.java | 52 ++++++ 6 files changed, 206 insertions(+), 143 deletions(-) create mode 100644 hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/exceptions/TestSCMExceptionResultCodes.java diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java index ec75eec1f63..db1f82ae411 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/SCMException.java @@ -95,7 +95,7 @@ public class SCMException extends IOException { * Error codes to make it easy to decode these exceptions. */ public enum ResultCodes { - SUCCEESS, + OK, FAILED_TO_LOAD_NODEPOOL, FAILED_TO_FIND_NODE_IN_POOL, FAILED_TO_FIND_HEALTHY_NODES, @@ -120,6 +120,8 @@ public class SCMException extends IOException { NO_SUCH_DATANODE, NO_REPLICA_FOUND, FAILED_TO_FIND_ACTIVE_PIPELINE, - FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY + FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY, + FAILED_TO_ALLOCATE_ENOUGH_BLOCKS, + INTERNAL_ERROR } } diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/package-info.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/package-info.java index 26cf4dccfd5..721a32b48e2 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/package-info.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/exceptions/package-info.java @@ -16,4 +16,6 @@ * limitations under the License. */ package org.apache.hadoop.hdds.scm.exceptions; -// Exceptions thrown by SCM. +/** + Exception objects for the SCM Server. + */ diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index aadf585775d..af53ea1688c 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -37,9 +37,11 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.KeyB import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.pipeline.Pipeline; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.tracing.TracingUtil; +import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolTranslator; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ozone.common.BlockGroup; @@ -49,6 +51,8 @@ import com.google.common.base.Preconditions; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import static org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.Status.OK; + /** * This class is the client-side translator to translate the requests made on * the {@link ScmBlockLocationProtocol} interface to the RPC server @@ -85,6 +89,32 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB .setTraceID(TracingUtil.exportCurrentSpan()); } + /** + * Submits client request to SCM server. + * @param req client request + * @return response from SCM + * @throws IOException thrown if any Protobuf service exception occurs + */ + private SCMBlockLocationResponse submitRequest( + SCMBlockLocationRequest req) throws IOException { + try { + SCMBlockLocationResponse response = + rpcProxy.send(NULL_RPC_CONTROLLER, req); + return response; + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } + + private SCMBlockLocationResponse handleError(SCMBlockLocationResponse resp) + throws SCMException { + if (resp.getStatus() != OK) { + throw new SCMException(resp.getMessage(), + SCMException.ResultCodes.values()[resp.getStatus().ordinal()]); + } + return resp; + } + /** * Asks SCM where a block should be allocated. SCM responds with the * set of datanodes that should be used creating this block. @@ -117,19 +147,10 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB .setAllocateScmBlockRequest(request) .build(); - final AllocateScmBlockResponseProto response; - final SCMBlockLocationResponse wrappedResponse; - try { - wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper); - response = wrappedResponse.getAllocateScmBlockResponse(); - } catch (ServiceException e) { - throw transformServiceException(e); - } - if (response.getErrorCode() != - AllocateScmBlockResponseProto.Error.success) { - throw new IOException(response.hasErrorMessage() ? - response.getErrorMessage() : "Allocate block failed."); - } + final SCMBlockLocationResponse wrappedResponse = + handleError(submitRequest(wrapper)); + final AllocateScmBlockResponseProto response = + wrappedResponse.getAllocateScmBlockResponse(); List blocks = new ArrayList<>(response.getBlocksCount()); for (AllocateBlockResponse resp : response.getBlocksList()) { @@ -166,14 +187,11 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB .setDeleteScmKeyBlocksRequest(request) .build(); - final DeleteScmKeyBlocksResponseProto resp; - final SCMBlockLocationResponse wrappedResponse; - try { - wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper); - resp = wrappedResponse.getDeleteScmKeyBlocksResponse(); - } catch (ServiceException e) { - throw transformServiceException(e); - } + final SCMBlockLocationResponse wrappedResponse = + handleError(submitRequest(wrapper)); + final DeleteScmKeyBlocksResponseProto resp = + wrappedResponse.getDeleteScmKeyBlocksResponse(); + List results = new ArrayList<>(resp.getResultsCount()); results.addAll(resp.getResultsList().stream().map( @@ -184,30 +202,6 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB return results; } - private IOException transformServiceException( - ServiceException se) throws IOException { - //TODO SCM has no perfect way to return with business exceptions. All - //the exceptions will be mapped to ServiceException. - //ServiceException is handled in a special way in hadoop rpc: the message - //contains the whole stack trace which is not required for the business - //exception. As of now I remove the stack trace (use first line only). - //Long term we need a proper way of the exception propagation. - Throwable cause = se.getCause(); - if (cause == null) { - return new IOException( - new ServiceException(useFirstLine(se.getMessage()), se.getCause())); - } - return new IOException(useFirstLine(cause.getMessage()), cause.getCause()); - } - - private String useFirstLine(String message) { - if (message == null) { - return null; - } else { - return message.split("\n")[0]; - } - } - /** * Gets the cluster Id and Scm Id from SCM. * @return ScmInfo @@ -224,13 +218,9 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB .setGetScmInfoRequest(request) .build(); - final SCMBlockLocationResponse wrappedResponse; - try { - wrappedResponse = rpcProxy.send(NULL_RPC_CONTROLLER, wrapper); - resp = wrappedResponse.getGetScmInfoResponse(); - } catch (ServiceException e) { - throw transformServiceException(e); - } + final SCMBlockLocationResponse wrappedResponse = + handleError(submitRequest(wrapper)); + resp = wrappedResponse.getGetScmInfoResponse(); ScmInfo.Builder builder = new ScmInfo.Builder() .setClusterId(resp.getClusterId()) .setScmId(resp.getScmId()); diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java index db1240a5f6d..935d240031a 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -28,6 +28,7 @@ import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.hdds.scm.protocolPB.ScmBlockLocationProtocolPB; @@ -78,7 +79,6 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB this.impl = impl; } - private SCMBlockLocationResponse.Builder createSCMBlockResponse( ScmBlockLocationProtocolProtos.Type cmdType, String traceID) { @@ -95,100 +95,99 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB SCMBlockLocationResponse.Builder response = createSCMBlockResponse( request.getCmdType(), traceId); + response.setSuccess(true); + response.setStatus(Status.OK); - switch (request.getCmdType()) { - case AllocateScmBlock: - response.setAllocateScmBlockResponse( - allocateScmBlock(traceId, request.getAllocateScmBlockRequest())); - break; - case DeleteScmKeyBlocks: - response.setDeleteScmKeyBlocksResponse( - deleteScmKeyBlocks(traceId, request.getDeleteScmKeyBlocksRequest())); - break; - case GetScmInfo: - response.setGetScmInfoResponse( - getScmInfo(traceId, request.getGetScmInfoRequest())); - break; - default: - throw new ServiceException("Unknown Operation"); + try(Scope scope = TracingUtil + .importAndCreateScope("ScmBlockLocationProtocol."+request.getCmdType(), + request.getTraceID())) { + switch (request.getCmdType()) { + case AllocateScmBlock: + response.setAllocateScmBlockResponse( + allocateScmBlock(request.getAllocateScmBlockRequest())); + break; + case DeleteScmKeyBlocks: + response.setDeleteScmKeyBlocksResponse( + deleteScmKeyBlocks(request.getDeleteScmKeyBlocksRequest())); + break; + case GetScmInfo: + response.setGetScmInfoResponse( + getScmInfo(request.getGetScmInfoRequest())); + break; + default: + // Should never happen + throw new IOException("Unknown Operation "+request.getCmdType()+ + " in ScmBlockLocationProtocol"); + } + } catch (IOException e) { + response.setSuccess(false); + response.setStatus(exceptionToResponseStatus(e)); + if (e.getMessage() != null) { + response.setMessage(e.getMessage()); + } } - response.setSuccess(true) - .setStatus(Status.OK); return response.build(); } - public AllocateScmBlockResponseProto allocateScmBlock( - String traceId, AllocateScmBlockRequestProto request) - throws ServiceException { - try(Scope scope = TracingUtil - .importAndCreateScope("ScmBlockLocationProtocol.allocateBlock", - traceId)) { - List allocatedBlocks = - impl.allocateBlock(request.getSize(), - request.getNumBlocks(), request.getType(), - request.getFactor(), request.getOwner(), - ExcludeList.getFromProtoBuf(request.getExcludeList())); - - AllocateScmBlockResponseProto.Builder builder = - AllocateScmBlockResponseProto.newBuilder(); - - if (allocatedBlocks.size() < request.getNumBlocks()) { - return builder - .setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure) - .build(); - } - - for (AllocatedBlock block : allocatedBlocks) { - builder.addBlocks(AllocateBlockResponse.newBuilder() - .setContainerBlockID(block.getBlockID().getProtobuf()) - .setPipeline(block.getPipeline().getProtobufMessage())); - } - - return builder - .setErrorCode(AllocateScmBlockResponseProto.Error.success) - .build(); - } catch (IOException e) { - throw new ServiceException(e); + private Status exceptionToResponseStatus(IOException ex) { + if (ex instanceof SCMException) { + return Status.values()[((SCMException) ex).getResult().ordinal()]; + } else { + return Status.INTERNAL_ERROR; } } + public AllocateScmBlockResponseProto allocateScmBlock( + AllocateScmBlockRequestProto request) + throws IOException { + List allocatedBlocks = + impl.allocateBlock(request.getSize(), + request.getNumBlocks(), request.getType(), + request.getFactor(), request.getOwner(), + ExcludeList.getFromProtoBuf(request.getExcludeList())); + + AllocateScmBlockResponseProto.Builder builder = + AllocateScmBlockResponseProto.newBuilder(); + + if (allocatedBlocks.size() < request.getNumBlocks()) { + throw new SCMException("Allocated " + allocatedBlocks.size() + + " blocks. Requested " + request.getNumBlocks() + " blocks", + SCMException.ResultCodes.FAILED_TO_ALLOCATE_ENOUGH_BLOCKS); + } + for (AllocatedBlock block : allocatedBlocks) { + builder.addBlocks(AllocateBlockResponse.newBuilder() + .setContainerBlockID(block.getBlockID().getProtobuf()) + .setPipeline(block.getPipeline().getProtobufMessage())); + } + + return builder.build(); + } + public DeleteScmKeyBlocksResponseProto deleteScmKeyBlocks( - String traceId, DeleteScmKeyBlocksRequestProto req) - throws ServiceException { + DeleteScmKeyBlocksRequestProto req) + throws IOException { DeleteScmKeyBlocksResponseProto.Builder resp = DeleteScmKeyBlocksResponseProto.newBuilder(); - try(Scope scope = TracingUtil - .importAndCreateScope("ScmBlockLocationProtocol.deleteKeyBlocks", - traceId)) { - List infoList = req.getKeyBlocksList().stream() - .map(BlockGroup::getFromProto).collect(Collectors.toList()); - final List results = - impl.deleteKeyBlocks(infoList); - for (DeleteBlockGroupResult result: results) { - DeleteKeyBlocksResultProto.Builder deleteResult = - DeleteKeyBlocksResultProto - .newBuilder() - .setObjectKey(result.getObjectKey()) - .addAllBlockResults(result.getBlockResultProtoList()); - resp.addResults(deleteResult.build()); - } - } catch (IOException ex) { - throw new ServiceException(ex); + + List infoList = req.getKeyBlocksList().stream() + .map(BlockGroup::getFromProto).collect(Collectors.toList()); + final List results = + impl.deleteKeyBlocks(infoList); + for (DeleteBlockGroupResult result: results) { + DeleteKeyBlocksResultProto.Builder deleteResult = + DeleteKeyBlocksResultProto + .newBuilder() + .setObjectKey(result.getObjectKey()) + .addAllBlockResults(result.getBlockResultProtoList()); + resp.addResults(deleteResult.build()); } return resp.build(); } public HddsProtos.GetScmInfoResponseProto getScmInfo( - String traceId, HddsProtos.GetScmInfoRequestProto req) - throws ServiceException { - ScmInfo scmInfo; - try(Scope scope = TracingUtil - .importAndCreateScope("ScmBlockLocationProtocol.getInfo", - traceId)) { - scmInfo = impl.getScmInfo(); - } catch (IOException ex) { - throw new ServiceException(ex); - } + HddsProtos.GetScmInfoRequestProto req) + throws IOException { + ScmInfo scmInfo = impl.getScmInfo(); return HddsProtos.GetScmInfoResponseProto.newBuilder() .setClusterId(scmInfo.getClusterId()) .setScmId(scmInfo.getScmId()) diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto index 8222d8b45b1..81144ab9f07 100644 --- a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto @@ -84,7 +84,33 @@ message UserInfo { enum Status { OK = 1; - UNKNOWN = 2; + FAILED_TO_LOAD_NODEPOOL = 2; + FAILED_TO_FIND_NODE_IN_POOL = 3; + FAILED_TO_FIND_HEALTHY_NODES = 4; + FAILED_TO_FIND_NODES_WITH_SPACE = 5; + FAILED_TO_FIND_SUITABLE_NODE = 6; + INVALID_CAPACITY = 7; + INVALID_BLOCK_SIZE = 8; + SAFE_MODE_EXCEPTION = 9; + FAILED_TO_LOAD_OPEN_CONTAINER = 10; + FAILED_TO_ALLOCATE_CONTAINER = 11; + FAILED_TO_CHANGE_CONTAINER_STATE = 12; + FAILED_TO_CHANGE_PIPELINE_STATE = 13; + CONTAINER_EXISTS = 14; + FAILED_TO_FIND_CONTAINER = 15; + FAILED_TO_FIND_CONTAINER_WITH_SPACE = 16; + BLOCK_EXISTS = 17; + FAILED_TO_FIND_BLOCK = 18; + IO_EXCEPTION = 19; + UNEXPECTED_CONTAINER_STATE = 20; + SCM_NOT_INITIALIZED = 21; + DUPLICATE_DATANODE = 22; + NO_SUCH_DATANODE = 23; + NO_REPLICA_FOUND = 24; + FAILED_TO_FIND_ACTIVE_PIPELINE = 25; + FAILED_TO_INIT_CONTAINER_PLACEMENT_POLICY = 26; + FAILED_TO_ALLOCATE_ENOUGH_BLOCKS = 27; + INTERNAL_ERROR = 29; } /** @@ -156,14 +182,6 @@ message AllocateBlockResponse { * Reply from SCM indicating that the container. */ message AllocateScmBlockResponseProto { - enum Error { - success = 1; - errorNotEnoughSpace = 2; - errorSizeTooBig = 3; - unknownFailure = 4; - } - required Error errorCode = 1; - optional string errorMessage = 2; repeated AllocateBlockResponse blocks = 3; } diff --git a/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/exceptions/TestSCMExceptionResultCodes.java b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/exceptions/TestSCMExceptionResultCodes.java new file mode 100644 index 00000000000..b5b4684dda0 --- /dev/null +++ b/hadoop-hdds/common/src/test/java/org/apache/hadoop/hdds/scm/exceptions/TestSCMExceptionResultCodes.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.exceptions; + +import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes; +import org.apache.hadoop.hdds.protocol.proto. + ScmBlockLocationProtocolProtos.Status; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test Result code mappping between SCMException and the protobuf definitions. + */ +public class TestSCMExceptionResultCodes { + + @Test + public void codeMapping() { + // ResultCode = SCMException definition + // Status = protobuf definition + Assert.assertEquals(ResultCodes.values().length, Status.values().length); + for (int i = 0; i < ResultCodes.values().length; i++) { + ResultCodes codeValue = ResultCodes.values()[i]; + Status protoBufValue = Status.values()[i]; + Assert.assertTrue(String + .format("Protobuf/Enum constant name mismatch %s %s", codeValue, + protoBufValue), sameName(codeValue.name(), protoBufValue.name())); + ResultCodes converted = ResultCodes.values()[protoBufValue.ordinal()]; + Assert.assertEquals(codeValue, converted); + } + } + + private boolean sameName(String codeValue, String protoBufValue) { + return codeValue.equals(protoBufValue); + } + +} +