HDFS-11811. Ozone: SCM: Support Delete Block. Contributed by Xiaoyu Yao.

This commit is contained in:
Xiaoyu Yao 2017-05-11 16:28:56 -07:00 committed by Owen O'Malley
parent 9bc494b909
commit b592df6413
10 changed files with 283 additions and 39 deletions

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.scm.container.common.helpers;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlockResult;
/**
* Class wraps storage container manager block deletion results.
*/
public class DeleteBlockResult {
private String key;
private DeleteScmBlockResult.Result result;
public DeleteBlockResult(final String key,
final DeleteScmBlockResult.Result result) {
this.key = key;
this.result = result;
}
/**
* Get key deleted.
* @return key name.
*/
public String getKey() {
return key;
}
/**
* Get key deletion result.
* @return key deletion result.
*/
public DeleteScmBlockResult.Result getResult() {
return result;
}
}

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.scm.protocol;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
/**
* ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
@ -50,4 +52,14 @@ Set<AllocatedBlock> getBlockLocations(Set<String> keys)
*/
AllocatedBlock allocateBlock(long size) throws IOException;
/**
* Delete the set of keys specified.
*
* @param keys batch of block keys to delete.
* @return list of block deletion results.
* @throws IOException if there is any failure.
*
*/
List<DeleteBlockResult> deleteBlocks(Set<String> keys);
}

View File

@ -131,6 +131,31 @@ message AllocateScmBlockRequestProto {
required uint64 size = 1;
}
/**
* keys - batch of block keys to deleted
*/
message DeleteScmBlocksRequestProto {
repeated string keys = 1;
}
/**
* deletedKeys - keys that are deleted successfully
*/
message DeleteScmBlocksResponseProto {
repeated DeleteScmBlockResult results = 1;
}
message DeleteScmBlockResult {
enum Result {
success = 1;
chillMode = 2;
errorNotFound = 3;
unknownFailure = 4;
}
required Result result = 1;
required string key = 2;
}
/**
* Reply from SCM indicating that the container.
*/
@ -185,8 +210,14 @@ service StorageContainerLocationProtocolService {
returns (GetScmBlockLocationsResponseProto);
/**
Creates a block entry in SCM.
* Creates a block entry in SCM.
*/
rpc allocateScmBlock(AllocateScmBlockRequestProto)
returns (AllocateScmBlockResponseProto);
/**
* Deletes one or multiple block keys from SCM.
*/
rpc deleteScmBlocks(DeleteScmBlocksRequestProto)
returns (DeleteScmBlocksResponseProto);
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.protocolPB;
import java.io.IOException;
import java.util.List;
import java.util.Set;
import com.google.common.collect.Sets;
@ -28,11 +29,10 @@
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos
.GetStorageContainerLocationsRequestProto;
@ -44,12 +44,20 @@
.ScmLocatedBlockProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.LocatedContainerProto;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.AllocateScmBlockRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.AllocateScmBlockResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlocksRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlocksResponseProto;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlockResult;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.GetScmBlockLocationsRequestProto;
import org.apache.hadoop.ozone.protocol.proto
@ -124,8 +132,7 @@ public GetStorageContainerLocationsResponseProto getStorageContainerLocations(
@Override
public ContainerResponseProto allocateContainer(RpcController unused,
StorageContainerLocationProtocolProtos.ContainerRequestProto request)
throws ServiceException {
ContainerRequestProto request) throws ServiceException {
try {
Pipeline pipeline = impl.allocateContainer(request.getContainerName());
return ContainerResponseProto.newBuilder()
@ -199,16 +206,16 @@ public AllocateScmBlockResponseProto allocateScmBlock(
AllocatedBlock allocatedBlock =
blockImpl.allocateBlock(request.getSize());
if (allocatedBlock != null) {
return StorageContainerLocationProtocolProtos
.AllocateScmBlockResponseProto.newBuilder()
return
AllocateScmBlockResponseProto.newBuilder()
.setKey(allocatedBlock.getKey())
.setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
.setCreateContainer(allocatedBlock.getCreateContainer())
.setErrorCode(AllocateScmBlockResponseProto.Error.success)
.build();
} else {
return StorageContainerLocationProtocolProtos
.AllocateScmBlockResponseProto.newBuilder()
return
AllocateScmBlockResponseProto.newBuilder()
.setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
.build();
}
@ -216,4 +223,24 @@ public AllocateScmBlockResponseProto allocateScmBlock(
throw new ServiceException(e);
}
}
@Override
public DeleteScmBlocksResponseProto deleteScmBlocks(
RpcController controller, DeleteScmBlocksRequestProto req)
throws ServiceException {
Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
req.getKeysCount());
for (String key : req.getKeysList()) {
keys.add(key);
}
final List<DeleteBlockResult> results = blockImpl.deleteBlocks(keys);
DeleteScmBlocksResponseProto.Builder resp =
DeleteScmBlocksResponseProto.newBuilder();
for (DeleteBlockResult result: results) {
DeleteScmBlockResult.Builder deleteResult = DeleteScmBlockResult
.newBuilder().setKey(result.getKey()).setResult(result.getResult());
resp.addResults(deleteResult.build());
}
return resp.build();
}
}

View File

@ -32,8 +32,10 @@
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.scm.block.BlockManager;
import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
@ -92,6 +94,9 @@
import java.util.Set;
import java.util.UUID;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlockResult.Result;
import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_CLIENT_ADDRESS_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys
@ -592,4 +597,37 @@ public Set<AllocatedBlock> getBlockLocations(final Set<String> keys)
public AllocatedBlock allocateBlock(final long size) throws IOException {
return scmBlockManager.allocateBlock(size);
}
/**
* Delete blocks.
* @param keys batch of block keys to delete.
* @return deletion results.
*/
public List<DeleteBlockResult> deleteBlocks(final Set<String> keys) {
List<DeleteBlockResult> results = new LinkedList<>();
for (String key: keys) {
Result resultCode;
try {
scmBlockManager.deleteBlock(key);
resultCode = Result.success;
} catch (SCMException scmEx) {
LOG.warn("Fail to delete block: {}", key, scmEx);
switch (scmEx.getResult()) {
case CHILL_MODE_EXCEPTION:
resultCode = Result.chillMode;
break;
case FAILED_TO_FIND_BLOCK:
resultCode = Result.errorNotFound;
break;
default:
resultCode = Result.unknownFailure;
}
} catch (IOException ex) {
LOG.warn("Fail to delete block: {}", key, ex);
resultCode = Result.unknownFailure;
}
results.add(new DeleteBlockResult(key, resultCode));
}
return results;
}
}

View File

@ -17,7 +17,7 @@
package org.apache.hadoop.ozone.scm.block;
import org.apache.commons.lang.NotImplementedException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -28,9 +28,11 @@
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,18 +51,20 @@
import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
import static org.apache.hadoop.ozone.OzoneConsts.OPEN_CONTAINERS_DB;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
.ResultCodes.CHILL_MODE_EXCEPTION;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
.ResultCodes.FAILED_TO_ALLOCATE_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
.ResultCodes.FAILED_TO_FIND_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
.ResultCodes.FAILED_TO_FIND_CONTAINER_WITH_SAPCE;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
.ResultCodes.FAILED_TO_LOAD_OPEN_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException
.ResultCodes.INVALID_BLOCK_SIZE;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
CHILL_MODE_EXCEPTION;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
FAILED_TO_ALLOCATE_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
FAILED_TO_FIND_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
FAILED_TO_FIND_CONTAINER_WITH_SAPCE;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
FAILED_TO_FIND_BLOCK;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
FAILED_TO_LOAD_OPEN_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
INVALID_BLOCK_SIZE;
/**
* Block Manager manages the block access for SCM.
@ -290,8 +294,8 @@ public Pipeline getBlock(final String key) throws IOException {
try {
byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key));
if (containerBytes == null) {
throw new IOException("Specified block key does not exist. key : " +
key);
throw new SCMException("Specified block key does not exist. key : " +
key, FAILED_TO_FIND_BLOCK);
}
return containerManager.getContainer(
DFSUtil.bytes2String(containerBytes));
@ -307,7 +311,41 @@ public Pipeline getBlock(final String key) throws IOException {
*/
@Override
public void deleteBlock(final String key) throws IOException {
throw new NotImplementedException("deleteBlock is not supported");
if (!nodeManager.isOutOfNodeChillMode()) {
throw new SCMException("Unable to delete block while in chill mode",
CHILL_MODE_EXCEPTION);
}
lock.lock();
try {
byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key));
if (containerBytes == null) {
throw new SCMException("Specified block key does not exist. key : " +
key, FAILED_TO_FIND_BLOCK);
}
try (WriteBatch wb = blockStore.createWriteBatch()) {
containerManager.getContainer(
DFSUtil.bytes2String(containerBytes));
String deletedKeyName = getDeletedKeyName(key);
// Add a tombstone for the deleted key
wb.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
// Delete the block key
wb.delete(DFSUtil.string2Bytes(key));
blockStore.commitWriteBatch(wb);
// TODO: Add async tombstone clean thread to send delete command to
// datanodes in the pipeline to clean up the blocks from containers.
// TODO: Container report handling of the deleted blocks:
// Remove tombstone and update open container usage.
// We will revisit this when the closed container replication is done.
}
} finally {
lock.unlock();
}
}
@VisibleForTesting
public String getDeletedKeyName(String key) {
return StringUtils.format(".Deleted/%s", key);
}
/**

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.client.ScmClient;
@ -169,8 +170,8 @@ public Pipeline getContainer(final String containerName) throws IOException {
byte[] pipelineBytes =
containerStore.get(containerName.getBytes(encoding));
if (pipelineBytes == null) {
throw new IOException("Specified key does not exist. key : " +
containerName);
throw new SCMException("Specified key does not exist. key : " +
containerName, SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
pipeline = Pipeline.getFromProtoBuf(
OzoneProtos.Pipeline.PARSER.parseFrom(pipelineBytes));
@ -208,7 +209,8 @@ public Pipeline allocateContainer(final String containerName,
Preconditions.checkState(!containerName.isEmpty());
Pipeline pipeline = null;
if (!nodeManager.isOutOfNodeChillMode()) {
throw new IOException("Unable to create container while in chill mode");
throw new SCMException("Unable to create container while in chill mode",
SCMException.ResultCodes.CHILL_MODE_EXCEPTION);
}
lock.lock();
@ -216,8 +218,8 @@ public Pipeline allocateContainer(final String containerName,
byte[] pipelineBytes =
containerStore.get(containerName.getBytes(encoding));
if (pipelineBytes != null) {
throw new IOException("Specified container already exists. key : " +
containerName);
throw new SCMException("Specified container already exists. key : " +
containerName, SCMException.ResultCodes.CONTAINER_EXISTS);
}
List<DatanodeID> datanodes = placementPolicy.chooseDatanodes(
replicationFactor.getValue(), containerSize);
@ -253,8 +255,9 @@ public void deleteContainer(String containerName) throws IOException {
byte[] pipelineBytes =
containerStore.get(dbKey);
if(pipelineBytes == null) {
throw new IOException("Failed to delete container "
+ containerName + ", reason : container doesn't exist.");
throw new SCMException("Failed to delete container "
+ containerName + ", reason : container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
}
containerStore.delete(dbKey);
} finally {

View File

@ -95,6 +95,7 @@ public ResultCodes getResult() {
* Error codes to make it easy to decode these exceptions.
*/
public enum ResultCodes {
SUCCEESS,
FAILED_TO_LOAD_NODEPOOL,
FAILED_TO_FIND_NODE_IN_POOL,
FAILED_TO_FIND_HEALTHY_NODES,
@ -105,7 +106,10 @@ public enum ResultCodes {
CHILL_MODE_EXCEPTION,
FAILED_TO_LOAD_OPEN_CONTAINER,
FAILED_TO_ALLOCATE_CONTAINER,
CONTAINER_EXISTS,
FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SAPCE,
BLOCK_EXISTS,
FAILED_TO_FIND_BLOCK
}
}

View File

@ -19,10 +19,7 @@
package org.apache.hadoop.utils;
import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteOptions;
import org.iq80.leveldb.*;
import java.io.Closeable;
import java.io.File;
@ -157,6 +154,30 @@ public void destroy() throws IOException {
JniDBFactory.factory.destroy(dbFile, dbOptions);
}
/**
* Returns a write batch for write multiple key-value pairs atomically.
* @return write batch that can be commit atomically.
*/
public WriteBatch createWriteBatch() {
return db.createWriteBatch();
}
/**
* Commit multiple writes of key-value pairs atomically.
* @param wb
*/
public void commitWriteBatch(WriteBatch wb) {
db.write(wb);
}
/**
* Close a write batch of multiple writes to key-value pairs.
* @param wb - write batch.
* @throws IOException
*/
public void closeWriteBatch(WriteBatch wb) throws IOException {
wb.close();
}
/**
* Compacts the DB by removing deleted keys etc.

View File

@ -49,7 +49,7 @@
public class TestBlockManager {
private static ContainerMapping mapping;
private static MockNodeManager nodeManager;
private static BlockManager blockManager;
private static BlockManagerImpl blockManager;
private static File testDir;
private final static long DEFAULT_BLOCK_SIZE = 128 * MB;
@ -103,7 +103,25 @@ public void testGetAllocatedBlock() throws IOException {
}
@Test
public void testAllocateOversidedBlock() throws IOException {
public void testDeleteBlock() throws Exception {
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
Assert.assertNotNull(block);
blockManager.deleteBlock(block.getKey());
// Deleted block can not be retrieved
thrown.expectMessage("Specified block key does not exist.");
blockManager.getBlock(block.getKey());
// Tombstone of the deleted block can be retrieved if it has not been
// cleaned yet.
String deletedKeyName = blockManager.getDeletedKeyName(block.getKey());
Pipeline pipeline = blockManager.getBlock(deletedKeyName);
Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(),
block.getPipeline().getLeader().getDatanodeUuid());
}
@Test
public void testAllocateOversizedBlock() throws IOException {
long size = 6 * GB;
thrown.expectMessage("Unsupported block size");
AllocatedBlock block = blockManager.allocateBlock(size);