HDFS-11811. Ozone: SCM: Support Delete Block. Contributed by Xiaoyu Yao.

This commit is contained in:
Xiaoyu Yao 2017-05-11 16:28:56 -07:00
parent 7bf301e20e
commit 7e8b3e2540
10 changed files with 283 additions and 39 deletions

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.scm.container.common.helpers;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlockResult;
/**
* Class wraps storage container manager block deletion results.
*/
public class DeleteBlockResult {
private String key;
private DeleteScmBlockResult.Result result;
public DeleteBlockResult(final String key,
final DeleteScmBlockResult.Result result) {
this.key = key;
this.result = result;
}
/**
* Get key deleted.
* @return key name.
*/
public String getKey() {
return key;
}
/**
* Get key deletion result.
* @return key deletion result.
*/
public DeleteScmBlockResult.Result getResult() {
return result;
}
}

View File

@ -19,9 +19,11 @@
package org.apache.hadoop.scm.protocol; package org.apache.hadoop.scm.protocol;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Set; import java.util.Set;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
/** /**
* ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes * ScmBlockLocationProtocol is used by an HDFS node to find the set of nodes
@ -50,4 +52,14 @@ public interface ScmBlockLocationProtocol {
*/ */
AllocatedBlock allocateBlock(long size) throws IOException; AllocatedBlock allocateBlock(long size) throws IOException;
/**
* Delete the set of keys specified.
*
* @param keys batch of block keys to delete.
* @return list of block deletion results.
* @throws IOException if there is any failure.
*
*/
List<DeleteBlockResult> deleteBlocks(Set<String> keys);
} }

View File

@ -131,6 +131,31 @@ message AllocateScmBlockRequestProto {
required uint64 size = 1; required uint64 size = 1;
} }
/**
* keys - batch of block keys to deleted
*/
message DeleteScmBlocksRequestProto {
repeated string keys = 1;
}
/**
* deletedKeys - keys that are deleted successfully
*/
message DeleteScmBlocksResponseProto {
repeated DeleteScmBlockResult results = 1;
}
message DeleteScmBlockResult {
enum Result {
success = 1;
chillMode = 2;
errorNotFound = 3;
unknownFailure = 4;
}
required Result result = 1;
required string key = 2;
}
/** /**
* Reply from SCM indicating that the container. * Reply from SCM indicating that the container.
*/ */
@ -182,11 +207,17 @@ service StorageContainerLocationProtocolService {
* passing multiple keys. * passing multiple keys.
*/ */
rpc getScmBlockLocations(GetScmBlockLocationsRequestProto) rpc getScmBlockLocations(GetScmBlockLocationsRequestProto)
returns(GetScmBlockLocationsResponseProto); returns (GetScmBlockLocationsResponseProto);
/** /**
Creates a block entry in SCM. * Creates a block entry in SCM.
*/ */
rpc allocateScmBlock(AllocateScmBlockRequestProto) rpc allocateScmBlock(AllocateScmBlockRequestProto)
returns (AllocateScmBlockResponseProto); returns (AllocateScmBlockResponseProto);
/**
* Deletes one or multiple block keys from SCM.
*/
rpc deleteScmBlocks(DeleteScmBlocksRequestProto)
returns (DeleteScmBlocksResponseProto);
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.protocolPB; package org.apache.hadoop.ozone.protocolPB;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.Set; import java.util.Set;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -28,11 +29,10 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient; import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.scm.protocol.LocatedContainer; import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol; import org.apache.hadoop.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos .StorageContainerLocationProtocolProtos
.GetStorageContainerLocationsRequestProto; .GetStorageContainerLocationsRequestProto;
@ -44,12 +44,20 @@ import org.apache.hadoop.ozone.protocol.proto
.ScmLocatedBlockProto; .ScmLocatedBlockProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.LocatedContainerProto; .StorageContainerLocationProtocolProtos.LocatedContainerProto;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerRequestProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.ContainerResponseProto; .StorageContainerLocationProtocolProtos.ContainerResponseProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.AllocateScmBlockRequestProto; .StorageContainerLocationProtocolProtos.AllocateScmBlockRequestProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.AllocateScmBlockResponseProto; .StorageContainerLocationProtocolProtos.AllocateScmBlockResponseProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlocksRequestProto;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlocksResponseProto;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlockResult;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.GetScmBlockLocationsRequestProto; .StorageContainerLocationProtocolProtos.GetScmBlockLocationsRequestProto;
import org.apache.hadoop.ozone.protocol.proto import org.apache.hadoop.ozone.protocol.proto
@ -124,8 +132,7 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
@Override @Override
public ContainerResponseProto allocateContainer(RpcController unused, public ContainerResponseProto allocateContainer(RpcController unused,
StorageContainerLocationProtocolProtos.ContainerRequestProto request) ContainerRequestProto request) throws ServiceException {
throws ServiceException {
try { try {
Pipeline pipeline = impl.allocateContainer(request.getContainerName()); Pipeline pipeline = impl.allocateContainer(request.getContainerName());
return ContainerResponseProto.newBuilder() return ContainerResponseProto.newBuilder()
@ -199,16 +206,16 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
AllocatedBlock allocatedBlock = AllocatedBlock allocatedBlock =
blockImpl.allocateBlock(request.getSize()); blockImpl.allocateBlock(request.getSize());
if (allocatedBlock != null) { if (allocatedBlock != null) {
return StorageContainerLocationProtocolProtos return
.AllocateScmBlockResponseProto.newBuilder() AllocateScmBlockResponseProto.newBuilder()
.setKey(allocatedBlock.getKey()) .setKey(allocatedBlock.getKey())
.setPipeline(allocatedBlock.getPipeline().getProtobufMessage()) .setPipeline(allocatedBlock.getPipeline().getProtobufMessage())
.setCreateContainer(allocatedBlock.getCreateContainer()) .setCreateContainer(allocatedBlock.getCreateContainer())
.setErrorCode(AllocateScmBlockResponseProto.Error.success) .setErrorCode(AllocateScmBlockResponseProto.Error.success)
.build(); .build();
} else { } else {
return StorageContainerLocationProtocolProtos return
.AllocateScmBlockResponseProto.newBuilder() AllocateScmBlockResponseProto.newBuilder()
.setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure) .setErrorCode(AllocateScmBlockResponseProto.Error.unknownFailure)
.build(); .build();
} }
@ -216,4 +223,24 @@ public final class StorageContainerLocationProtocolServerSideTranslatorPB
throw new ServiceException(e); throw new ServiceException(e);
} }
} }
@Override
public DeleteScmBlocksResponseProto deleteScmBlocks(
RpcController controller, DeleteScmBlocksRequestProto req)
throws ServiceException {
Set<String> keys = Sets.newLinkedHashSetWithExpectedSize(
req.getKeysCount());
for (String key : req.getKeysList()) {
keys.add(key);
}
final List<DeleteBlockResult> results = blockImpl.deleteBlocks(keys);
DeleteScmBlocksResponseProto.Builder resp =
DeleteScmBlocksResponseProto.newBuilder();
for (DeleteBlockResult result: results) {
DeleteScmBlockResult.Builder deleteResult = DeleteScmBlockResult
.newBuilder().setKey(result.getKey()).setResult(result.getResult());
resp.addResults(deleteResult.build());
}
return resp.build();
}
} }

View File

@ -32,8 +32,10 @@ import org.apache.hadoop.ozone.OzoneClientUtils;
import org.apache.hadoop.ozone.OzoneConfiguration; import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.scm.block.BlockManager; import org.apache.hadoop.ozone.scm.block.BlockManager;
import org.apache.hadoop.ozone.scm.block.BlockManagerImpl; import org.apache.hadoop.ozone.scm.block.BlockManagerImpl;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.client.ScmClient;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.DeleteBlockResult;
import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol; import org.apache.hadoop.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.scm.protocol.LocatedContainer; import org.apache.hadoop.scm.protocol.LocatedContainer;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol; import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
@ -92,6 +94,9 @@ import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.UUID; import java.util.UUID;
import static org.apache.hadoop.ozone.protocol.proto
.StorageContainerLocationProtocolProtos.DeleteScmBlockResult.Result;
import static org.apache.hadoop.scm.ScmConfigKeys import static org.apache.hadoop.scm.ScmConfigKeys
.OZONE_SCM_CLIENT_ADDRESS_KEY; .OZONE_SCM_CLIENT_ADDRESS_KEY;
import static org.apache.hadoop.scm.ScmConfigKeys import static org.apache.hadoop.scm.ScmConfigKeys
@ -592,4 +597,37 @@ public class StorageContainerManager
public AllocatedBlock allocateBlock(final long size) throws IOException { public AllocatedBlock allocateBlock(final long size) throws IOException {
return scmBlockManager.allocateBlock(size); return scmBlockManager.allocateBlock(size);
} }
/**
* Delete blocks.
* @param keys batch of block keys to delete.
* @return deletion results.
*/
public List<DeleteBlockResult> deleteBlocks(final Set<String> keys) {
List<DeleteBlockResult> results = new LinkedList<>();
for (String key: keys) {
Result resultCode;
try {
scmBlockManager.deleteBlock(key);
resultCode = Result.success;
} catch (SCMException scmEx) {
LOG.warn("Fail to delete block: {}", key, scmEx);
switch (scmEx.getResult()) {
case CHILL_MODE_EXCEPTION:
resultCode = Result.chillMode;
break;
case FAILED_TO_FIND_BLOCK:
resultCode = Result.errorNotFound;
break;
default:
resultCode = Result.unknownFailure;
}
} catch (IOException ex) {
LOG.warn("Fail to delete block: {}", key, ex);
resultCode = Result.unknownFailure;
}
results.add(new DeleteBlockResult(key, resultCode));
}
return results;
}
} }

View File

@ -17,7 +17,7 @@
package org.apache.hadoop.ozone.scm.block; package org.apache.hadoop.ozone.scm.block;
import org.apache.commons.lang.NotImplementedException; import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConfigKeys; import org.apache.hadoop.ozone.OzoneConfigKeys;
@ -28,9 +28,11 @@ import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock; import org.apache.hadoop.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.scm.container.common.helpers.Pipeline; import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.utils.LevelDBStore; import org.apache.hadoop.utils.LevelDBStore;
import org.iq80.leveldb.DBIterator; import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options; import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -49,18 +51,20 @@ import java.util.UUID;
import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB; import static org.apache.hadoop.ozone.OzoneConsts.BLOCK_DB;
import static org.apache.hadoop.ozone.OzoneConsts.OPEN_CONTAINERS_DB; import static org.apache.hadoop.ozone.OzoneConsts.OPEN_CONTAINERS_DB;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
.ResultCodes.CHILL_MODE_EXCEPTION; CHILL_MODE_EXCEPTION;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
.ResultCodes.FAILED_TO_ALLOCATE_CONTAINER; FAILED_TO_ALLOCATE_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
.ResultCodes.FAILED_TO_FIND_CONTAINER; FAILED_TO_FIND_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
.ResultCodes.FAILED_TO_FIND_CONTAINER_WITH_SAPCE; FAILED_TO_FIND_CONTAINER_WITH_SAPCE;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
.ResultCodes.FAILED_TO_LOAD_OPEN_CONTAINER; FAILED_TO_FIND_BLOCK;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
.ResultCodes.INVALID_BLOCK_SIZE; FAILED_TO_LOAD_OPEN_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
INVALID_BLOCK_SIZE;
/** /**
* Block Manager manages the block access for SCM. * Block Manager manages the block access for SCM.
@ -290,8 +294,8 @@ public class BlockManagerImpl implements BlockManager {
try { try {
byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key)); byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key));
if (containerBytes == null) { if (containerBytes == null) {
throw new IOException("Specified block key does not exist. key : " + throw new SCMException("Specified block key does not exist. key : " +
key); key, FAILED_TO_FIND_BLOCK);
} }
return containerManager.getContainer( return containerManager.getContainer(
DFSUtil.bytes2String(containerBytes)); DFSUtil.bytes2String(containerBytes));
@ -307,7 +311,41 @@ public class BlockManagerImpl implements BlockManager {
*/ */
@Override @Override
public void deleteBlock(final String key) throws IOException { public void deleteBlock(final String key) throws IOException {
throw new NotImplementedException("deleteBlock is not supported"); if (!nodeManager.isOutOfNodeChillMode()) {
throw new SCMException("Unable to delete block while in chill mode",
CHILL_MODE_EXCEPTION);
}
lock.lock();
try {
byte[] containerBytes = blockStore.get(DFSUtil.string2Bytes(key));
if (containerBytes == null) {
throw new SCMException("Specified block key does not exist. key : " +
key, FAILED_TO_FIND_BLOCK);
}
try (WriteBatch wb = blockStore.createWriteBatch()) {
containerManager.getContainer(
DFSUtil.bytes2String(containerBytes));
String deletedKeyName = getDeletedKeyName(key);
// Add a tombstone for the deleted key
wb.put(DFSUtil.string2Bytes(deletedKeyName), containerBytes);
// Delete the block key
wb.delete(DFSUtil.string2Bytes(key));
blockStore.commitWriteBatch(wb);
// TODO: Add async tombstone clean thread to send delete command to
// datanodes in the pipeline to clean up the blocks from containers.
// TODO: Container report handling of the deleted blocks:
// Remove tombstone and update open container usage.
// We will revisit this when the closed container replication is done.
}
} finally {
lock.unlock();
}
}
@VisibleForTesting
public String getDeletedKeyName(String key) {
return StringUtils.format(".Deleted/%s", key);
} }
/** /**

View File

@ -26,6 +26,7 @@ import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConsts; import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy; import org.apache.hadoop.ozone.scm.container.placement.algorithms.ContainerPlacementPolicy;
import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom; import org.apache.hadoop.ozone.scm.container.placement.algorithms.SCMContainerPlacementRandom;
import org.apache.hadoop.ozone.scm.exceptions.SCMException;
import org.apache.hadoop.ozone.scm.node.NodeManager; import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.scm.ScmConfigKeys; import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.scm.client.ScmClient; import org.apache.hadoop.scm.client.ScmClient;
@ -169,8 +170,8 @@ public class ContainerMapping implements Mapping {
byte[] pipelineBytes = byte[] pipelineBytes =
containerStore.get(containerName.getBytes(encoding)); containerStore.get(containerName.getBytes(encoding));
if (pipelineBytes == null) { if (pipelineBytes == null) {
throw new IOException("Specified key does not exist. key : " + throw new SCMException("Specified key does not exist. key : " +
containerName); containerName, SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
} }
pipeline = Pipeline.getFromProtoBuf( pipeline = Pipeline.getFromProtoBuf(
OzoneProtos.Pipeline.PARSER.parseFrom(pipelineBytes)); OzoneProtos.Pipeline.PARSER.parseFrom(pipelineBytes));
@ -208,7 +209,8 @@ public class ContainerMapping implements Mapping {
Preconditions.checkState(!containerName.isEmpty()); Preconditions.checkState(!containerName.isEmpty());
Pipeline pipeline = null; Pipeline pipeline = null;
if (!nodeManager.isOutOfNodeChillMode()) { if (!nodeManager.isOutOfNodeChillMode()) {
throw new IOException("Unable to create container while in chill mode"); throw new SCMException("Unable to create container while in chill mode",
SCMException.ResultCodes.CHILL_MODE_EXCEPTION);
} }
lock.lock(); lock.lock();
@ -216,8 +218,8 @@ public class ContainerMapping implements Mapping {
byte[] pipelineBytes = byte[] pipelineBytes =
containerStore.get(containerName.getBytes(encoding)); containerStore.get(containerName.getBytes(encoding));
if (pipelineBytes != null) { if (pipelineBytes != null) {
throw new IOException("Specified container already exists. key : " + throw new SCMException("Specified container already exists. key : " +
containerName); containerName, SCMException.ResultCodes.CONTAINER_EXISTS);
} }
List<DatanodeID> datanodes = placementPolicy.chooseDatanodes( List<DatanodeID> datanodes = placementPolicy.chooseDatanodes(
replicationFactor.getValue(), containerSize); replicationFactor.getValue(), containerSize);
@ -253,8 +255,9 @@ public class ContainerMapping implements Mapping {
byte[] pipelineBytes = byte[] pipelineBytes =
containerStore.get(dbKey); containerStore.get(dbKey);
if(pipelineBytes == null) { if(pipelineBytes == null) {
throw new IOException("Failed to delete container " throw new SCMException("Failed to delete container "
+ containerName + ", reason : container doesn't exist."); + containerName + ", reason : container doesn't exist.",
SCMException.ResultCodes.FAILED_TO_FIND_CONTAINER);
} }
containerStore.delete(dbKey); containerStore.delete(dbKey);
} finally { } finally {

View File

@ -95,6 +95,7 @@ public class SCMException extends IOException {
* Error codes to make it easy to decode these exceptions. * Error codes to make it easy to decode these exceptions.
*/ */
public enum ResultCodes { public enum ResultCodes {
SUCCEESS,
FAILED_TO_LOAD_NODEPOOL, FAILED_TO_LOAD_NODEPOOL,
FAILED_TO_FIND_NODE_IN_POOL, FAILED_TO_FIND_NODE_IN_POOL,
FAILED_TO_FIND_HEALTHY_NODES, FAILED_TO_FIND_HEALTHY_NODES,
@ -105,7 +106,10 @@ public class SCMException extends IOException {
CHILL_MODE_EXCEPTION, CHILL_MODE_EXCEPTION,
FAILED_TO_LOAD_OPEN_CONTAINER, FAILED_TO_LOAD_OPEN_CONTAINER,
FAILED_TO_ALLOCATE_CONTAINER, FAILED_TO_ALLOCATE_CONTAINER,
CONTAINER_EXISTS,
FAILED_TO_FIND_CONTAINER, FAILED_TO_FIND_CONTAINER,
FAILED_TO_FIND_CONTAINER_WITH_SAPCE, FAILED_TO_FIND_CONTAINER_WITH_SAPCE,
BLOCK_EXISTS,
FAILED_TO_FIND_BLOCK
} }
} }

View File

@ -19,10 +19,7 @@
package org.apache.hadoop.utils; package org.apache.hadoop.utils;
import org.fusesource.leveldbjni.JniDBFactory; import org.fusesource.leveldbjni.JniDBFactory;
import org.iq80.leveldb.DB; import org.iq80.leveldb.*;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteOptions;
import java.io.Closeable; import java.io.Closeable;
import java.io.File; import java.io.File;
@ -157,6 +154,30 @@ public class LevelDBStore implements Closeable {
JniDBFactory.factory.destroy(dbFile, dbOptions); JniDBFactory.factory.destroy(dbFile, dbOptions);
} }
/**
* Returns a write batch for write multiple key-value pairs atomically.
* @return write batch that can be commit atomically.
*/
public WriteBatch createWriteBatch() {
return db.createWriteBatch();
}
/**
* Commit multiple writes of key-value pairs atomically.
* @param wb
*/
public void commitWriteBatch(WriteBatch wb) {
db.write(wb);
}
/**
* Close a write batch of multiple writes to key-value pairs.
* @param wb - write batch.
* @throws IOException
*/
public void closeWriteBatch(WriteBatch wb) throws IOException {
wb.close();
}
/** /**
* Compacts the DB by removing deleted keys etc. * Compacts the DB by removing deleted keys etc.

View File

@ -49,7 +49,7 @@ import static org.apache.hadoop.ozone.OzoneConsts.MB;
public class TestBlockManager { public class TestBlockManager {
private static ContainerMapping mapping; private static ContainerMapping mapping;
private static MockNodeManager nodeManager; private static MockNodeManager nodeManager;
private static BlockManager blockManager; private static BlockManagerImpl blockManager;
private static File testDir; private static File testDir;
private final static long DEFAULT_BLOCK_SIZE = 128 * MB; private final static long DEFAULT_BLOCK_SIZE = 128 * MB;
@ -103,7 +103,25 @@ public class TestBlockManager {
} }
@Test @Test
public void testAllocateOversidedBlock() throws IOException { public void testDeleteBlock() throws Exception {
AllocatedBlock block = blockManager.allocateBlock(DEFAULT_BLOCK_SIZE);
Assert.assertNotNull(block);
blockManager.deleteBlock(block.getKey());
// Deleted block can not be retrieved
thrown.expectMessage("Specified block key does not exist.");
blockManager.getBlock(block.getKey());
// Tombstone of the deleted block can be retrieved if it has not been
// cleaned yet.
String deletedKeyName = blockManager.getDeletedKeyName(block.getKey());
Pipeline pipeline = blockManager.getBlock(deletedKeyName);
Assert.assertEquals(pipeline.getLeader().getDatanodeUuid(),
block.getPipeline().getLeader().getDatanodeUuid());
}
@Test
public void testAllocateOversizedBlock() throws IOException {
long size = 6 * GB; long size = 6 * GB;
thrown.expectMessage("Unsupported block size"); thrown.expectMessage("Unsupported block size");
AllocatedBlock block = blockManager.allocateBlock(size); AllocatedBlock block = blockManager.allocateBlock(size);