diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java index 1a326726177..12b591210ac 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/ScmBlockLocationProtocol.java @@ -48,14 +48,17 @@ public interface ScmBlockLocationProtocol extends Closeable { * Asks SCM where a block should be allocated. SCM responds with the * set of datanodes that should be used creating this block. * @param size - size of the block. + * @param numBlocks - number of blocks. + * @param type - replication type of the blocks. + * @param factor - replication factor of the blocks. * @param excludeList List of datanodes/containers to exclude during block * allocation. * @return allocated block accessing info (key, pipeline). * @throws IOException */ - AllocatedBlock allocateBlock(long size, ReplicationType type, - ReplicationFactor factor, String owner, ExcludeList excludeList) - throws IOException; + List allocateBlock(long size, int numBlocks, + ReplicationType type, ReplicationFactor factor, String owner, + ExcludeList excludeList) throws IOException; /** * Delete blocks for a set of object keys. diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java index 96276f501c1..71f9831bc10 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocolPB/ScmBlockLocationProtocolClientSideTranslatorPB.java @@ -25,6 +25,7 @@ import java.util.stream.Collectors; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdds.client.ContainerBlockID; import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto; import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto; @@ -75,11 +76,15 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB * Asks SCM where a block should be allocated. SCM responds with the * set of datanodes that should be used creating this block. * @param size - size of the block. + * @param num - number of blocks. + * @param type - replication type of the blocks. + * @param factor - replication factor of the blocks. + * @param excludeList - exclude list while allocating blocks. * @return allocated block accessing info (key, pipeline). * @throws IOException */ @Override - public AllocatedBlock allocateBlock(long size, + public List allocateBlock(long size, int num, HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, String owner, ExcludeList excludeList) throws IOException { Preconditions.checkArgument(size > 0, "block size must be greater than 0"); @@ -87,6 +92,7 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB AllocateScmBlockRequestProto request = AllocateScmBlockRequestProto.newBuilder() .setSize(size) + .setNumBlocks(num) .setType(type) .setFactor(factor) .setOwner(owner) @@ -104,11 +110,17 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB throw new IOException(response.hasErrorMessage() ? response.getErrorMessage() : "Allocate block failed."); } - AllocatedBlock.Builder builder = new AllocatedBlock.Builder() - .setContainerBlockID( - ContainerBlockID.getFromProtobuf(response.getContainerBlockID())) - .setPipeline(Pipeline.getFromProtobuf(response.getPipeline())); - return builder.build(); + + List blocks = new ArrayList<>(response.getBlocksCount()); + for (AllocateBlockResponse resp : response.getBlocksList()) { + AllocatedBlock.Builder builder = new AllocatedBlock.Builder() + .setContainerBlockID( + ContainerBlockID.getFromProtobuf(resp.getContainerBlockID())) + .setPipeline(Pipeline.getFromProtobuf(resp.getPipeline())); + blocks.add(builder.build()); + } + + return blocks; } /** diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java index cd40f7c0918..15eba9d3ef0 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/OzoneConfigKeys.java @@ -198,10 +198,10 @@ public final class OzoneConfigKeys { public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT = "300s"; // 300s for default - public static final String OZONE_KEY_PREALLOCATION_MAXSIZE = - "ozone.key.preallocation.maxsize"; - public static final long OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT - = 128 * OzoneConsts.MB; + public static final String OZONE_KEY_PREALLOCATION_BLOCKS_MAX = + "ozone.key.preallocation.max.blocks"; + public static final int OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT + = 64; public static final String OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER = "ozone.block.deleting.limit.per.task"; diff --git a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java index 0d434c172a5..9ff6879573f 100644 --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/ozone/protocolPB/ScmBlockLocationProtocolServerSideTranslatorPB.java @@ -22,6 +22,8 @@ import com.google.protobuf.ServiceException; import io.opentracing.Scope; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos + .AllocateBlockResponse; import org.apache.hadoop.hdds.scm.ScmInfo; import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList; @@ -76,22 +78,30 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB try (Scope scope = TracingUtil .importAndCreateScope("ScmBlockLocationProtocol.allocateBlock", request.getTraceID())) { - AllocatedBlock allocatedBlock = - impl.allocateBlock(request.getSize(), request.getType(), + List allocatedBlocks = + impl.allocateBlock(request.getSize(), + request.getNumBlocks(), request.getType(), request.getFactor(), request.getOwner(), ExcludeList.getFromProtoBuf(request.getExcludeList())); - if (allocatedBlock != null) { - return - AllocateScmBlockResponseProto.newBuilder() - .setContainerBlockID(allocatedBlock.getBlockID().getProtobuf()) - .setPipeline(allocatedBlock.getPipeline().getProtobufMessage()) - .setErrorCode(AllocateScmBlockResponseProto.Error.success) - .build(); - } else { - return AllocateScmBlockResponseProto.newBuilder() + + AllocateScmBlockResponseProto.Builder builder = + AllocateScmBlockResponseProto.newBuilder(); + + if (allocatedBlocks.size() < request.getNumBlocks()) { + return builder .setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure) .build(); } + + for (AllocatedBlock block : allocatedBlocks) { + builder.addBlocks(AllocateBlockResponse.newBuilder() + .setContainerBlockID(block.getBlockID().getProtobuf()) + .setPipeline(block.getPipeline().getProtobufMessage())); + } + + return builder + .setErrorCode(AllocateScmBlockResponseProto.Error.success) + .build(); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto index 85fbca600c4..c1390894057 100644 --- a/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto +++ b/hadoop-hdds/common/src/main/proto/ScmBlockLocationProtocol.proto @@ -38,12 +38,12 @@ import "hdds.proto"; */ message AllocateScmBlockRequestProto { required uint64 size = 1; - required ReplicationType type = 2; - required hadoop.hdds.ReplicationFactor factor = 3; - required string owner = 4; - optional string traceID = 5; - optional ExcludeListProto excludeList = 6; - + required uint32 numBlocks = 2; + required ReplicationType type = 3; + required hadoop.hdds.ReplicationFactor factor = 4; + required string owner = 5; + optional string traceID = 6; + optional ExcludeListProto excludeList = 7; } /** @@ -96,6 +96,11 @@ message DeleteScmBlockResult { required BlockID blockID = 2; } +message AllocateBlockResponse { + optional ContainerBlockID containerBlockID = 1; + optional hadoop.hdds.Pipeline pipeline = 2; +} + /** * Reply from SCM indicating that the container. */ @@ -107,9 +112,8 @@ message AllocateScmBlockResponseProto { unknownFailure = 4; } required Error errorCode = 1; - optional ContainerBlockID containerBlockID = 2; - optional hadoop.hdds.Pipeline pipeline = 3; - optional string errorMessage = 4; + optional string errorMessage = 2; + repeated AllocateBlockResponse blocks = 3; } /** diff --git a/hadoop-hdds/common/src/main/resources/ozone-default.xml b/hadoop-hdds/common/src/main/resources/ozone-default.xml index a4f49e762b3..49c81b92de5 100644 --- a/hadoop-hdds/common/src/main/resources/ozone-default.xml +++ b/hadoop-hdds/common/src/main/resources/ozone-default.xml @@ -1089,13 +1089,14 @@ - ozone.key.preallocation.maxsize - 134217728 + ozone.key.preallocation.max.blocks + 64 OZONE, OM, PERFORMANCE - When a new key write request is sent to OM, if a size is requested, at most - 128MB of size is allocated at request time. If client needs more space for the - write, separate block allocation requests will be made. + While allocating blocks from OM, this configuration limits the maximum + number of blocks being allocated. This configuration ensures that the + allocated block response do not exceed rpc payload limit. If client needs + more space for the write, separate block allocation requests will be made. diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java index 52cb0454dd7..b8f89a31e84 100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMBlockProtocolServer.java @@ -156,7 +156,7 @@ public class SCMBlockProtocolServer implements } @Override - public AllocatedBlock allocateBlock(long size, + public List allocateBlock(long size, int num, HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, String owner, ExcludeList excludeList) throws IOException { Map auditMap = Maps.newHashMap(); @@ -164,10 +164,17 @@ public class SCMBlockProtocolServer implements auditMap.put("type", type.name()); auditMap.put("factor", factor.name()); auditMap.put("owner", owner); + List blocks = new ArrayList<>(num); boolean auditSuccess = true; try { - return scm.getScmBlockManager() - .allocateBlock(size, type, factor, owner, excludeList); + for (int i = 0; i < num; i++) { + AllocatedBlock block = scm.getScmBlockManager() + .allocateBlock(size, type, factor, owner, excludeList); + if (block != null) { + blocks.add(block); + } + } + return blocks; } catch (Exception ex) { auditSuccess = false; AUDIT.logWriteFailure( diff --git a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java index 6500a5ec186..fbbb10490c7 100644 --- a/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/main/java/org/apache/hadoop/ozone/om/KeyManagerImpl.java @@ -85,8 +85,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVI import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE; -import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX; +import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE; import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT; import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE; @@ -110,7 +110,7 @@ public class KeyManagerImpl implements KeyManager { private final long scmBlockSize; private final boolean useRatis; - private final long preallocateMax; + private final int preallocateBlocksMax; private final String omId; private final OzoneBlockTokenSecretManager secretManager; private final boolean grpcBlockTokenEnabled; @@ -136,9 +136,9 @@ public class KeyManagerImpl implements KeyManager { StorageUnit.BYTES); this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY, DFS_CONTAINER_RATIS_ENABLED_DEFAULT); - this.preallocateMax = conf.getLong( - OZONE_KEY_PREALLOCATION_MAXSIZE, - OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT); + this.preallocateBlocksMax = conf.getInt( + OZONE_KEY_PREALLOCATION_BLOCKS_MAX, + OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT); this.omId = omId; start(conf); this.secretManager = secretManager; @@ -251,36 +251,45 @@ public class KeyManagerImpl implements KeyManager { return locationInfos.get(0); } + /** + * This methods avoids multiple rpc calls to SCM by allocating multiple blocks + * in one rpc call. + * @param keyInfo - key info for key to be allocated. + * @param requestedSize requested length for allocation. + * @param excludeList exclude list while allocating blocks. + * @param requestedSize requested size to be allocated. + * @return + * @throws IOException + */ private List allocateBlock(OmKeyInfo keyInfo, ExcludeList excludeList, long requestedSize) throws IOException { - int numBlocks = (int) ((requestedSize - 1) / scmBlockSize + 1); + int numBlocks = Math.min((int) ((requestedSize - 1) / scmBlockSize + 1), + preallocateBlocksMax); List locationInfos = new ArrayList<>(numBlocks); - while (requestedSize > 0) { - long allocateSize = Math.min(requestedSize, scmBlockSize); - AllocatedBlock allocatedBlock; - try { - allocatedBlock = scmBlockClient - .allocateBlock(allocateSize, keyInfo.getType(), keyInfo.getFactor(), - omId, excludeList); - } catch (SCMException ex) { - if (ex.getResult() - .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) { - throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE); - } - throw ex; + String remoteUser = getRemoteUser().getShortUserName(); + List allocatedBlocks; + try { + allocatedBlocks = scmBlockClient + .allocateBlock(scmBlockSize, numBlocks, keyInfo.getType(), + keyInfo.getFactor(), omId, excludeList); + } catch (SCMException ex) { + if (ex.getResult() + .equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) { + throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE); } + throw ex; + } + for (AllocatedBlock allocatedBlock : allocatedBlocks) { OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder() .setBlockID(new BlockID(allocatedBlock.getBlockID())) .setLength(scmBlockSize) .setOffset(0); if (grpcBlockTokenEnabled) { - String remoteUser = getRemoteUser().getShortUserName(); builder.setToken(secretManager .generateToken(remoteUser, allocatedBlock.getBlockID().toString(), getAclForUser(remoteUser), scmBlockSize)); } locationInfos.add(builder.build()); - requestedSize -= allocateSize; } return locationInfos; } @@ -339,7 +348,6 @@ public class KeyManagerImpl implements KeyManager { ReplicationFactor factor = args.getFactor(); ReplicationType type = args.getType(); long currentTime = Time.monotonicNowNanos(); - long requestedSize = Math.min(preallocateMax, args.getDataSize()); OmKeyInfo keyInfo; String openKey; long openVersion; @@ -457,9 +465,9 @@ public class KeyManagerImpl implements KeyManager { // the point, if client needs more blocks, client can always call // allocateBlock. But if requested size is not 0, OM will preallocate // some blocks and piggyback to client, to save RPC calls. - if (requestedSize > 0) { + if (args.getDataSize() > 0) { List locationInfos = - allocateBlock(keyInfo, new ExcludeList(), requestedSize); + allocateBlock(keyInfo, new ExcludeList(), args.getDataSize()); keyInfo.appendNewBlocks(locationInfos); } metadataManager.getOpenKeyTable().put(openKey, keyInfo); diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java index 6ff927bf591..e546d79cb70 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/ScmBlockLocationTestIngClient.java @@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.UUID; @@ -114,7 +115,7 @@ public class ScmBlockLocationTestIngClient implements ScmBlockLocationProtocol { * @throws IOException */ @Override - public AllocatedBlock allocateBlock(long size, + public List allocateBlock(long size, int num, HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor, String owner, ExcludeList excludeList) throws IOException { DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails(); @@ -125,7 +126,7 @@ public class ScmBlockLocationTestIngClient implements ScmBlockLocationProtocol { new AllocatedBlock.Builder() .setContainerBlockID(new ContainerBlockID(containerID, localID)) .setPipeline(pipeline); - return abb.build(); + return Collections.singletonList(abb.build()); } private Pipeline createPipeline(DatanodeDetails datanode) { diff --git a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java index a76d052b416..d5c17b50a19 100644 --- a/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java +++ b/hadoop-ozone/ozone-manager/src/test/java/org/apache/hadoop/ozone/om/TestKeyManagerImpl.java @@ -86,13 +86,14 @@ public class TestKeyManagerImpl { scmBlockSize = (long) conf .getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT, StorageUnit.BYTES); - conf.setLong(OZONE_KEY_PREALLOCATION_MAXSIZE, scmBlockSize * 10); + conf.setLong(OZONE_KEY_PREALLOCATION_BLOCKS_MAX, 10); keyManager = new KeyManagerImpl(scm.getBlockProtocolServer(), metadataManager, conf, "om1", null); Mockito.when(mockScmBlockLocationProtocol - .allocateBlock(Mockito.anyLong(), Mockito.any(ReplicationType.class), + .allocateBlock(Mockito.anyLong(), Mockito.anyInt(), + Mockito.any(ReplicationType.class), Mockito.any(ReplicationFactor.class), Mockito.anyString(), Mockito.any(ExcludeList.class))).thenThrow( new SCMException("ChillModePrecheck failed for allocateBlock",