HDDS-1095. OzoneManager#openKey should do multiple block allocations in a single SCM rpc call. Contributed by Mukul Kumar Singh.
(cherry picked from commit daf4660eeb
)
This commit is contained in:
parent
4373c12701
commit
97fd3025a4
|
@ -48,14 +48,17 @@ public interface ScmBlockLocationProtocol extends Closeable {
|
|||
* Asks SCM where a block should be allocated. SCM responds with the
|
||||
* set of datanodes that should be used creating this block.
|
||||
* @param size - size of the block.
|
||||
* @param numBlocks - number of blocks.
|
||||
* @param type - replication type of the blocks.
|
||||
* @param factor - replication factor of the blocks.
|
||||
* @param excludeList List of datanodes/containers to exclude during block
|
||||
* allocation.
|
||||
* @return allocated block accessing info (key, pipeline).
|
||||
* @throws IOException
|
||||
*/
|
||||
AllocatedBlock allocateBlock(long size, ReplicationType type,
|
||||
ReplicationFactor factor, String owner, ExcludeList excludeList)
|
||||
throws IOException;
|
||||
List<AllocatedBlock> allocateBlock(long size, int numBlocks,
|
||||
ReplicationType type, ReplicationFactor factor, String owner,
|
||||
ExcludeList excludeList) throws IOException;
|
||||
|
||||
/**
|
||||
* Delete blocks for a set of object keys.
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.util.stream.Collectors;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdds.client.ContainerBlockID;
|
||||
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
|
||||
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateBlockResponse;
|
||||
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockRequestProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.AllocateScmBlockResponseProto;
|
||||
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos.DeleteScmKeyBlocksRequestProto;
|
||||
|
@ -75,11 +76,15 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
|
|||
* Asks SCM where a block should be allocated. SCM responds with the
|
||||
* set of datanodes that should be used creating this block.
|
||||
* @param size - size of the block.
|
||||
* @param num - number of blocks.
|
||||
* @param type - replication type of the blocks.
|
||||
* @param factor - replication factor of the blocks.
|
||||
* @param excludeList - exclude list while allocating blocks.
|
||||
* @return allocated block accessing info (key, pipeline).
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public AllocatedBlock allocateBlock(long size,
|
||||
public List<AllocatedBlock> allocateBlock(long size, int num,
|
||||
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
|
||||
String owner, ExcludeList excludeList) throws IOException {
|
||||
Preconditions.checkArgument(size > 0, "block size must be greater than 0");
|
||||
|
@ -87,6 +92,7 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
|
|||
AllocateScmBlockRequestProto request =
|
||||
AllocateScmBlockRequestProto.newBuilder()
|
||||
.setSize(size)
|
||||
.setNumBlocks(num)
|
||||
.setType(type)
|
||||
.setFactor(factor)
|
||||
.setOwner(owner)
|
||||
|
@ -104,11 +110,17 @@ public final class ScmBlockLocationProtocolClientSideTranslatorPB
|
|||
throw new IOException(response.hasErrorMessage() ?
|
||||
response.getErrorMessage() : "Allocate block failed.");
|
||||
}
|
||||
AllocatedBlock.Builder builder = new AllocatedBlock.Builder()
|
||||
.setContainerBlockID(
|
||||
ContainerBlockID.getFromProtobuf(response.getContainerBlockID()))
|
||||
.setPipeline(Pipeline.getFromProtobuf(response.getPipeline()));
|
||||
return builder.build();
|
||||
|
||||
List<AllocatedBlock> blocks = new ArrayList<>(response.getBlocksCount());
|
||||
for (AllocateBlockResponse resp : response.getBlocksList()) {
|
||||
AllocatedBlock.Builder builder = new AllocatedBlock.Builder()
|
||||
.setContainerBlockID(
|
||||
ContainerBlockID.getFromProtobuf(resp.getContainerBlockID()))
|
||||
.setPipeline(Pipeline.getFromProtobuf(resp.getPipeline()));
|
||||
blocks.add(builder.build());
|
||||
}
|
||||
|
||||
return blocks;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -198,10 +198,10 @@ public final class OzoneConfigKeys {
|
|||
public static final String OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT
|
||||
= "300s"; // 300s for default
|
||||
|
||||
public static final String OZONE_KEY_PREALLOCATION_MAXSIZE =
|
||||
"ozone.key.preallocation.maxsize";
|
||||
public static final long OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT
|
||||
= 128 * OzoneConsts.MB;
|
||||
public static final String OZONE_KEY_PREALLOCATION_BLOCKS_MAX =
|
||||
"ozone.key.preallocation.max.blocks";
|
||||
public static final int OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT
|
||||
= 64;
|
||||
|
||||
public static final String OZONE_BLOCK_DELETING_LIMIT_PER_CONTAINER =
|
||||
"ozone.block.deleting.limit.per.task";
|
||||
|
|
|
@ -22,6 +22,8 @@ import com.google.protobuf.ServiceException;
|
|||
import io.opentracing.Scope;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdds.protocol.proto.ScmBlockLocationProtocolProtos
|
||||
.AllocateBlockResponse;
|
||||
import org.apache.hadoop.hdds.scm.ScmInfo;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
|
||||
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
|
||||
|
@ -76,22 +78,30 @@ public final class ScmBlockLocationProtocolServerSideTranslatorPB
|
|||
try (Scope scope = TracingUtil
|
||||
.importAndCreateScope("ScmBlockLocationProtocol.allocateBlock",
|
||||
request.getTraceID())) {
|
||||
AllocatedBlock allocatedBlock =
|
||||
impl.allocateBlock(request.getSize(), request.getType(),
|
||||
List<AllocatedBlock> allocatedBlocks =
|
||||
impl.allocateBlock(request.getSize(),
|
||||
request.getNumBlocks(), request.getType(),
|
||||
request.getFactor(), request.getOwner(),
|
||||
ExcludeList.getFromProtoBuf(request.getExcludeList()));
|
||||
if (allocatedBlock != null) {
|
||||
return
|
||||
AllocateScmBlockResponseProto.newBuilder()
|
||||
.setContainerBlockID(allocatedBlock.getBlockID().getProtobuf())
|
||||
.setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
|
||||
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
|
||||
.build();
|
||||
} else {
|
||||
return AllocateScmBlockResponseProto.newBuilder()
|
||||
|
||||
AllocateScmBlockResponseProto.Builder builder =
|
||||
AllocateScmBlockResponseProto.newBuilder();
|
||||
|
||||
if (allocatedBlocks.size() < request.getNumBlocks()) {
|
||||
return builder
|
||||
.setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
|
||||
.build();
|
||||
}
|
||||
|
||||
for (AllocatedBlock block : allocatedBlocks) {
|
||||
builder.addBlocks(AllocateBlockResponse.newBuilder()
|
||||
.setContainerBlockID(block.getBlockID().getProtobuf())
|
||||
.setPipeline(block.getPipeline().getProtobufMessage()));
|
||||
}
|
||||
|
||||
return builder
|
||||
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
|
||||
.build();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
|
|
|
@ -38,12 +38,12 @@ import "hdds.proto";
|
|||
*/
|
||||
message AllocateScmBlockRequestProto {
|
||||
required uint64 size = 1;
|
||||
required ReplicationType type = 2;
|
||||
required hadoop.hdds.ReplicationFactor factor = 3;
|
||||
required string owner = 4;
|
||||
optional string traceID = 5;
|
||||
optional ExcludeListProto excludeList = 6;
|
||||
|
||||
required uint32 numBlocks = 2;
|
||||
required ReplicationType type = 3;
|
||||
required hadoop.hdds.ReplicationFactor factor = 4;
|
||||
required string owner = 5;
|
||||
optional string traceID = 6;
|
||||
optional ExcludeListProto excludeList = 7;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -96,6 +96,11 @@ message DeleteScmBlockResult {
|
|||
required BlockID blockID = 2;
|
||||
}
|
||||
|
||||
message AllocateBlockResponse {
|
||||
optional ContainerBlockID containerBlockID = 1;
|
||||
optional hadoop.hdds.Pipeline pipeline = 2;
|
||||
}
|
||||
|
||||
/**
|
||||
* Reply from SCM indicating that the container.
|
||||
*/
|
||||
|
@ -107,9 +112,8 @@ message AllocateScmBlockResponseProto {
|
|||
unknownFailure = 4;
|
||||
}
|
||||
required Error errorCode = 1;
|
||||
optional ContainerBlockID containerBlockID = 2;
|
||||
optional hadoop.hdds.Pipeline pipeline = 3;
|
||||
optional string errorMessage = 4;
|
||||
optional string errorMessage = 2;
|
||||
repeated AllocateBlockResponse blocks = 3;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1089,13 +1089,14 @@
|
|||
</property>
|
||||
|
||||
<property>
|
||||
<name>ozone.key.preallocation.maxsize</name>
|
||||
<value>134217728</value>
|
||||
<name>ozone.key.preallocation.max.blocks</name>
|
||||
<value>64</value>
|
||||
<tag>OZONE, OM, PERFORMANCE</tag>
|
||||
<description>
|
||||
When a new key write request is sent to OM, if a size is requested, at most
|
||||
128MB of size is allocated at request time. If client needs more space for the
|
||||
write, separate block allocation requests will be made.
|
||||
While allocating blocks from OM, this configuration limits the maximum
|
||||
number of blocks being allocated. This configuration ensures that the
|
||||
allocated block response do not exceed rpc payload limit. If client needs
|
||||
more space for the write, separate block allocation requests will be made.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
|
|
|
@ -156,7 +156,7 @@ public class SCMBlockProtocolServer implements
|
|||
}
|
||||
|
||||
@Override
|
||||
public AllocatedBlock allocateBlock(long size,
|
||||
public List<AllocatedBlock> allocateBlock(long size, int num,
|
||||
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
|
||||
String owner, ExcludeList excludeList) throws IOException {
|
||||
Map<String, String> auditMap = Maps.newHashMap();
|
||||
|
@ -164,10 +164,17 @@ public class SCMBlockProtocolServer implements
|
|||
auditMap.put("type", type.name());
|
||||
auditMap.put("factor", factor.name());
|
||||
auditMap.put("owner", owner);
|
||||
List<AllocatedBlock> blocks = new ArrayList<>(num);
|
||||
boolean auditSuccess = true;
|
||||
try {
|
||||
return scm.getScmBlockManager()
|
||||
.allocateBlock(size, type, factor, owner, excludeList);
|
||||
for (int i = 0; i < num; i++) {
|
||||
AllocatedBlock block = scm.getScmBlockManager()
|
||||
.allocateBlock(size, type, factor, owner, excludeList);
|
||||
if (block != null) {
|
||||
blocks.add(block);
|
||||
}
|
||||
}
|
||||
return blocks;
|
||||
} catch (Exception ex) {
|
||||
auditSuccess = false;
|
||||
AUDIT.logWriteFailure(
|
||||
|
|
|
@ -85,8 +85,8 @@ import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVI
|
|||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys.OZONE_SCM_BLOCK_SIZE_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConsts.OM_MULTIPART_MIN_SIZE;
|
||||
|
@ -110,7 +110,7 @@ public class KeyManagerImpl implements KeyManager {
|
|||
private final long scmBlockSize;
|
||||
private final boolean useRatis;
|
||||
|
||||
private final long preallocateMax;
|
||||
private final int preallocateBlocksMax;
|
||||
private final String omId;
|
||||
private final OzoneBlockTokenSecretManager secretManager;
|
||||
private final boolean grpcBlockTokenEnabled;
|
||||
|
@ -136,9 +136,9 @@ public class KeyManagerImpl implements KeyManager {
|
|||
StorageUnit.BYTES);
|
||||
this.useRatis = conf.getBoolean(DFS_CONTAINER_RATIS_ENABLED_KEY,
|
||||
DFS_CONTAINER_RATIS_ENABLED_DEFAULT);
|
||||
this.preallocateMax = conf.getLong(
|
||||
OZONE_KEY_PREALLOCATION_MAXSIZE,
|
||||
OZONE_KEY_PREALLOCATION_MAXSIZE_DEFAULT);
|
||||
this.preallocateBlocksMax = conf.getInt(
|
||||
OZONE_KEY_PREALLOCATION_BLOCKS_MAX,
|
||||
OZONE_KEY_PREALLOCATION_BLOCKS_MAX_DEFAULT);
|
||||
this.omId = omId;
|
||||
start(conf);
|
||||
this.secretManager = secretManager;
|
||||
|
@ -251,36 +251,45 @@ public class KeyManagerImpl implements KeyManager {
|
|||
return locationInfos.get(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* This methods avoids multiple rpc calls to SCM by allocating multiple blocks
|
||||
* in one rpc call.
|
||||
* @param keyInfo - key info for key to be allocated.
|
||||
* @param requestedSize requested length for allocation.
|
||||
* @param excludeList exclude list while allocating blocks.
|
||||
* @param requestedSize requested size to be allocated.
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
private List<OmKeyLocationInfo> allocateBlock(OmKeyInfo keyInfo,
|
||||
ExcludeList excludeList, long requestedSize) throws IOException {
|
||||
int numBlocks = (int) ((requestedSize - 1) / scmBlockSize + 1);
|
||||
int numBlocks = Math.min((int) ((requestedSize - 1) / scmBlockSize + 1),
|
||||
preallocateBlocksMax);
|
||||
List<OmKeyLocationInfo> locationInfos = new ArrayList<>(numBlocks);
|
||||
while (requestedSize > 0) {
|
||||
long allocateSize = Math.min(requestedSize, scmBlockSize);
|
||||
AllocatedBlock allocatedBlock;
|
||||
try {
|
||||
allocatedBlock = scmBlockClient
|
||||
.allocateBlock(allocateSize, keyInfo.getType(), keyInfo.getFactor(),
|
||||
omId, excludeList);
|
||||
} catch (SCMException ex) {
|
||||
if (ex.getResult()
|
||||
.equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
|
||||
throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE);
|
||||
}
|
||||
throw ex;
|
||||
String remoteUser = getRemoteUser().getShortUserName();
|
||||
List<AllocatedBlock> allocatedBlocks;
|
||||
try {
|
||||
allocatedBlocks = scmBlockClient
|
||||
.allocateBlock(scmBlockSize, numBlocks, keyInfo.getType(),
|
||||
keyInfo.getFactor(), omId, excludeList);
|
||||
} catch (SCMException ex) {
|
||||
if (ex.getResult()
|
||||
.equals(SCMException.ResultCodes.CHILL_MODE_EXCEPTION)) {
|
||||
throw new OMException(ex.getMessage(), ResultCodes.SCM_IN_CHILL_MODE);
|
||||
}
|
||||
throw ex;
|
||||
}
|
||||
for (AllocatedBlock allocatedBlock : allocatedBlocks) {
|
||||
OmKeyLocationInfo.Builder builder = new OmKeyLocationInfo.Builder()
|
||||
.setBlockID(new BlockID(allocatedBlock.getBlockID()))
|
||||
.setLength(scmBlockSize)
|
||||
.setOffset(0);
|
||||
if (grpcBlockTokenEnabled) {
|
||||
String remoteUser = getRemoteUser().getShortUserName();
|
||||
builder.setToken(secretManager
|
||||
.generateToken(remoteUser, allocatedBlock.getBlockID().toString(),
|
||||
getAclForUser(remoteUser), scmBlockSize));
|
||||
}
|
||||
locationInfos.add(builder.build());
|
||||
requestedSize -= allocateSize;
|
||||
}
|
||||
return locationInfos;
|
||||
}
|
||||
|
@ -339,7 +348,6 @@ public class KeyManagerImpl implements KeyManager {
|
|||
ReplicationFactor factor = args.getFactor();
|
||||
ReplicationType type = args.getType();
|
||||
long currentTime = Time.monotonicNowNanos();
|
||||
long requestedSize = Math.min(preallocateMax, args.getDataSize());
|
||||
OmKeyInfo keyInfo;
|
||||
String openKey;
|
||||
long openVersion;
|
||||
|
@ -457,9 +465,9 @@ public class KeyManagerImpl implements KeyManager {
|
|||
// the point, if client needs more blocks, client can always call
|
||||
// allocateBlock. But if requested size is not 0, OM will preallocate
|
||||
// some blocks and piggyback to client, to save RPC calls.
|
||||
if (requestedSize > 0) {
|
||||
if (args.getDataSize() > 0) {
|
||||
List<OmKeyLocationInfo> locationInfos =
|
||||
allocateBlock(keyInfo, new ExcludeList(), requestedSize);
|
||||
allocateBlock(keyInfo, new ExcludeList(), args.getDataSize());
|
||||
keyInfo.appendNewBlocks(locationInfos);
|
||||
}
|
||||
metadataManager.getOpenKeyTable().put(openKey, keyInfo);
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
|
@ -114,7 +115,7 @@ public class ScmBlockLocationTestIngClient implements ScmBlockLocationProtocol {
|
|||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public AllocatedBlock allocateBlock(long size,
|
||||
public List<AllocatedBlock> allocateBlock(long size, int num,
|
||||
HddsProtos.ReplicationType type, HddsProtos.ReplicationFactor factor,
|
||||
String owner, ExcludeList excludeList) throws IOException {
|
||||
DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
|
||||
|
@ -125,7 +126,7 @@ public class ScmBlockLocationTestIngClient implements ScmBlockLocationProtocol {
|
|||
new AllocatedBlock.Builder()
|
||||
.setContainerBlockID(new ContainerBlockID(containerID, localID))
|
||||
.setPipeline(pipeline);
|
||||
return abb.build();
|
||||
return Collections.singletonList(abb.build());
|
||||
}
|
||||
|
||||
private Pipeline createPipeline(DatanodeDetails datanode) {
|
||||
|
|
|
@ -86,13 +86,14 @@ public class TestKeyManagerImpl {
|
|||
scmBlockSize = (long) conf
|
||||
.getStorageSize(OZONE_SCM_BLOCK_SIZE, OZONE_SCM_BLOCK_SIZE_DEFAULT,
|
||||
StorageUnit.BYTES);
|
||||
conf.setLong(OZONE_KEY_PREALLOCATION_MAXSIZE, scmBlockSize * 10);
|
||||
conf.setLong(OZONE_KEY_PREALLOCATION_BLOCKS_MAX, 10);
|
||||
|
||||
keyManager =
|
||||
new KeyManagerImpl(scm.getBlockProtocolServer(), metadataManager, conf,
|
||||
"om1", null);
|
||||
Mockito.when(mockScmBlockLocationProtocol
|
||||
.allocateBlock(Mockito.anyLong(), Mockito.any(ReplicationType.class),
|
||||
.allocateBlock(Mockito.anyLong(), Mockito.anyInt(),
|
||||
Mockito.any(ReplicationType.class),
|
||||
Mockito.any(ReplicationFactor.class), Mockito.anyString(),
|
||||
Mockito.any(ExcludeList.class))).thenThrow(
|
||||
new SCMException("ChillModePrecheck failed for allocateBlock",
|
||||
|
|
Loading…
Reference in New Issue