HDFS-12282. Ozone: DeleteKey-4: Block delete via HB between SCM and DN. Contributed by Weiwei Yang.

This commit is contained in:
Weiwei Yang 2017-08-28 10:04:46 +08:00 committed by Owen O'Malley
parent 2f4dfbc8fb
commit eef437d5e2
22 changed files with 1216 additions and 26 deletions

View File

@ -0,0 +1,103 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.helpers;
import com.google.common.collect.Maps;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* A helper class to wrap the info about under deletion container blocks.
*/
public final class DeletedContainerBlocksSummary {
private final List<DeletedBlocksTransaction> blocks;
// key : txID
// value : times of this tx has been processed
private final Map<Long, Integer> txSummary;
// key : container name
// value : the number of blocks need to be deleted in this container
// if the message contains multiple entries for same block,
// blocks will be merged
private final Map<String, Integer> blockSummary;
// total number of blocks in this message
private int numOfBlocks;
private DeletedContainerBlocksSummary(List<DeletedBlocksTransaction> blocks) {
this.blocks = blocks;
txSummary = Maps.newHashMap();
blockSummary = Maps.newHashMap();
blocks.forEach(entry -> {
txSummary.put(entry.getTxID(), entry.getCount());
if (blockSummary.containsKey(entry.getContainerName())) {
blockSummary.put(entry.getContainerName(),
blockSummary.get(entry.getContainerName())
+ entry.getBlockIDCount());
} else {
blockSummary.put(entry.getContainerName(), entry.getBlockIDCount());
}
numOfBlocks += entry.getBlockIDCount();
});
}
public static DeletedContainerBlocksSummary getFrom(
List<DeletedBlocksTransaction> blocks) {
return new DeletedContainerBlocksSummary(blocks);
}
public int getNumOfBlocks() {
return numOfBlocks;
}
public int getNumOfContainers() {
return blockSummary.size();
}
public String getTXIDs() {
return String.join(",", txSummary.keySet()
.stream().map(String::valueOf).collect(Collectors.toList()));
}
public String getTxIDSummary() {
List<String> txSummaryEntry = txSummary.entrySet().stream()
.map(entry -> entry.getKey() + "(" + entry.getValue() + ")")
.collect(Collectors.toList());
return "[" + String.join(",", txSummaryEntry) + "]";
}
@Override public String toString() {
StringBuffer sb = new StringBuffer();
for (DeletedBlocksTransaction blks : blocks) {
sb.append(" ")
.append("TXID=")
.append(blks.getTxID())
.append(", ")
.append("TimesProceed=")
.append(blks.getCount())
.append(", ")
.append(blks.getContainerName())
.append(" : [")
.append(String.join(",", blks.getBlockIDList())).append("]")
.append("\n");
}
return sb.toString();
}
}

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.ozone.client.OzoneClientUtils;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
@ -77,15 +78,16 @@ public DatanodeStateMachine(DatanodeID datanodeID,
this.datanodeID = datanodeID;
nextHB = new AtomicLong(Time.monotonicNow());
// When we add new handlers just adding a new handler here should do the
// trick.
commandDispatcher = CommandDispatcher.newBuilder()
.addHandler(new ContainerReportHandler())
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
.build();
.addHandler(new ContainerReportHandler())
.addHandler(new DeleteBlocksCommandHandler(
container.getContainerManager(), conf))
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
.build();
}
public void setDatanodeID(DatanodeID datanodeID) {

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
@ -116,6 +117,11 @@ public BackgroundTaskQueue getTasks() {
LOG.warn("Failed to initiate block deleting tasks, "
+ "caused by unable to get containers info. "
+ "Retry in next interval. ", e);
} catch (Exception e) {
// In case listContainer call throws any uncaught RuntimeException.
if (LOG.isDebugEnabled()) {
LOG.debug("Unexpected error occurs during deleting blocks.", e);
}
}
return queue;
}
@ -159,6 +165,7 @@ private class BlockDeletingTask
@Override
public BackgroundTaskResult call() throws Exception {
ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
long startTime = Time.monotonicNow();
// Scan container's db and get list of under deletion blocks
MetadataStore meta = KeyUtils.getDB(containerData, conf);
@ -175,17 +182,24 @@ public BackgroundTaskResult call() throws Exception {
List<String> succeedBlocks = new LinkedList<>();
LOG.debug("Container : {}, To-Delete blocks : {}",
containerData.getContainerName(), toDeleteBlocks.size());
File dataDir = ContainerUtils.getDataDirectory(containerData).toFile();
if (!dataDir.exists() || !dataDir.isDirectory()) {
LOG.error("Invalid container data dir {} : "
+ "not exist or not a directory", dataDir.getAbsolutePath());
return crr;
}
toDeleteBlocks.forEach(entry -> {
String blockName = DFSUtil.bytes2String(entry.getKey());
LOG.debug("Deleting block {}", blockName);
try {
ContainerProtos.KeyData data =
ContainerProtos.KeyData.parseFrom(entry.getValue());
for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
File chunkFile = new File(chunkInfo.getChunkName());
File chunkFile = dataDir.toPath()
.resolve(chunkInfo.getChunkName()).toFile();
if (FileUtils.deleteQuietly(chunkFile)) {
LOG.debug("block {} chunk {} deleted", blockName,
LOG.info("block {} chunk {} deleted", blockName,
chunkFile.getAbsolutePath());
}
}
@ -201,11 +215,11 @@ public BackgroundTaskResult call() throws Exception {
batch.delete(DFSUtil.string2Bytes(entry)));
meta.writeBatch(batch);
LOG.info("The elapsed time of task@{} for"
+ " deleting blocks: {}ms.",
Integer.toHexString(this.hashCode()),
Time.monotonicNow() - startTime);
ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
if (!succeedBlocks.isEmpty()) {
LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
containerData.getContainerName(), succeedBlocks.size(),
Time.monotonicNow() - startTime);
}
crr.addAll(succeedBlocks);
return crr;
}

View File

@ -0,0 +1,198 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.DeletedContainerBlocksSummary;
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.utils.BatchOperation;
import org.apache.hadoop.utils.MetadataStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.List;
/**
* Handle block deletion commands.
*/
public class DeleteBlocksCommandHandler implements CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);
private ContainerManager containerManager;
private Configuration conf;
private int invocationCount;
private long totalTime;
public DeleteBlocksCommandHandler(ContainerManager containerManager,
Configuration conf) {
this.containerManager = containerManager;
this.conf = conf;
}
@Override
public void handle(SCMCommand command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
if (command.getType() != Type.deleteBlocksCommand) {
LOG.warn("Skipping handling command, expected command "
+ "type {} but found {}",
Type.deleteBlocksCommand, command.getType());
return;
}
LOG.debug("Processing block deletion command.");
invocationCount++;
long startTime = Time.monotonicNow();
// move blocks to deleting state.
// this is a metadata update, the actual deletion happens in another
// recycling thread.
DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted();
DeletedContainerBlocksSummary summary =
DeletedContainerBlocksSummary.getFrom(containerBlocks);
LOG.info("Start to delete container blocks, TXIDs={}, "
+ "numOfContainers={}, numOfBlocks={}",
summary.getTxIDSummary(),
summary.getNumOfContainers(),
summary.getNumOfBlocks());
ContainerBlocksDeletionACKProto.Builder resultBuilder =
ContainerBlocksDeletionACKProto.newBuilder();
containerBlocks.forEach(entry -> {
DeleteBlockTransactionResult.Builder txResultBuilder =
DeleteBlockTransactionResult.newBuilder();
txResultBuilder.setTxID(entry.getTxID());
try {
deleteContainerBlocks(entry, conf);
txResultBuilder.setSuccess(true);
} catch (IOException e) {
LOG.warn("Failed to delete blocks for container={}, TXID={}",
entry.getContainerName(), entry.getTxID(), e);
txResultBuilder.setSuccess(false);
}
resultBuilder.addResults(txResultBuilder.build());
});
ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
// Send ACK back to SCM as long as meta updated
// TODO Or we should wait until the blocks are actually deleted?
if (!containerBlocks.isEmpty()) {
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
try {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending following block deletion ACK to SCM");
for (DeleteBlockTransactionResult result :
blockDeletionACK.getResultsList()) {
LOG.debug(result.getTxID() + " : " + result.getSuccess());
}
}
endPoint.getEndPoint()
.sendContainerBlocksDeletionACK(blockDeletionACK);
} catch (IOException e) {
LOG.error("Unable to send block deletion ACK to SCM {}",
endPoint.getAddress().toString(), e);
}
}
}
long endTime = Time.monotonicNow();
totalTime += endTime - startTime;
}
/**
* Move a bunch of blocks from a container to deleting state.
* This is a meta update, the actual deletes happen in async mode.
*
* @param delTX a block deletion transaction.
* @param config configuration.
* @throws IOException if I/O error occurs.
*/
private void deleteContainerBlocks(DeletedBlocksTransaction delTX,
Configuration config) throws IOException {
String containerId = delTX.getContainerName();
ContainerData containerInfo = containerManager.readContainer(containerId);
if (LOG.isDebugEnabled()) {
LOG.debug("Processing Container : {}, DB path : {}", containerId,
containerInfo.getDBPath());
}
MetadataStore containerDB = KeyUtils.getDB(containerInfo, config);
for (String blk : delTX.getBlockIDList()) {
BatchOperation batch = new BatchOperation();
byte[] blkBytes = DFSUtil.string2Bytes(blk);
byte[] blkInfo = containerDB.get(blkBytes);
if (blkInfo != null) {
// Found the block in container db,
// use an atomic update to change its state to deleting.
batch.put(DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk),
blkInfo);
batch.delete(blkBytes);
try {
containerDB.writeBatch(batch);
LOG.info("Transited Block {} to DELETING state in container {}",
blk, containerId);
} catch (IOException e) {
// if some blocks failed to delete, we fail this TX,
// without sending this ACK to SCM, SCM will resend the TX
// with a certain number of retries.
throw new IOException(
"Failed to delete blocks for TXID = " + delTX.getTxID(), e);
}
} else {
LOG.info("Block {} not found or already under deletion in"
+ " container {}, skip deleting it.", blk, containerId);
}
}
}
@Override
public Type getCommandType() {
return Type.deleteBlocksCommand;
}
@Override
public int getInvocationCount() {
return this.invocationCount;
}
@Override
public long getAverageRunTime() {
if (invocationCount > 0) {
return totalTime / invocationCount;
}
return 0;
}
}

View File

@ -21,11 +21,14 @@
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.container.common.helpers
.DeletedContainerBlocksSummary;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine;
import org.apache.hadoop.ozone.container.common.statemachine
.EndpointStateMachine.EndPointStates;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
@ -144,6 +147,18 @@ private void processResponse(SCMHeartbeatResponseProto response) {
}
}
break;
case deleteBlocksCommand:
DeleteBlocksCommand db = DeleteBlocksCommand
.getFromProtobuf(commandResponseProto.getDeleteBlocksProto());
if (!db.blocksTobeDeleted().isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug(DeletedContainerBlocksSummary
.getFrom(db.blocksTobeDeleted())
.toString());
}
this.context.addCommand(db);
}
break;
default:
throw new IllegalArgumentException("Unknown response : "
+ commandResponseProto.getCmdType().name());

View File

@ -25,7 +25,8 @@
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
import java.io.IOException;
/**
@ -70,4 +71,13 @@ SCMRegisteredCmdResponseProto register(DatanodeID datanodeID,
*/
SCMHeartbeatResponseProto sendContainerReport(ContainerReportsProto reports)
throws IOException;
/**
* Used by datanode to send block deletion ACK to SCM.
* @param request block deletion transactions.
* @return block deletion transaction response.
* @throws IOException
*/
ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
ContainerBlocksDeletionACKProto request) throws IOException;
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.ozone.protocol.commands;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDeleteBlocksCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
import java.util.List;
/**
* A SCM command asks a datanode to delete a number of blocks.
*/
public class DeleteBlocksCommand extends
SCMCommand<SCMDeleteBlocksCmdResponseProto> {
private List<DeletedBlocksTransaction> blocksTobeDeleted;
public DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks) {
this.blocksTobeDeleted = blocks;
}
public List<DeletedBlocksTransaction> blocksTobeDeleted() {
return this.blocksTobeDeleted;
}
@Override
public Type getType() {
return Type.deleteBlocksCommand;
}
@Override
public byte[] getProtoBufMessage() {
return getProto().toByteArray();
}
public static DeleteBlocksCommand getFromProtobuf(
SCMDeleteBlocksCmdResponseProto deleteBlocksProto) {
return new DeleteBlocksCommand(deleteBlocksProto
.getDeletedBlocksTransactionsList());
}
public SCMDeleteBlocksCmdResponseProto getProto() {
return SCMDeleteBlocksCmdResponseProto.newBuilder()
.addAllDeletedBlocksTransactions(blocksTobeDeleted).build();
}
}

View File

@ -38,6 +38,8 @@
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
import java.io.Closeable;
import java.io.IOException;
@ -177,4 +179,16 @@ public SCMHeartbeatResponseProto sendContainerReport(
return resp;
}
@Override
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
ContainerBlocksDeletionACKProto deletedBlocks) throws IOException {
final ContainerBlocksDeletionACKResponseProto resp;
try {
resp = rpcProxy.sendContainerBlocksDeletionACK(NULL_RPC_CONTROLLER,
deletedBlocks);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
return resp;
}
}

View File

@ -24,6 +24,8 @@
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import java.io.IOException;
@ -97,4 +99,15 @@ public StorageContainerDatanodeProtocolServerSideTranslatorPB(
throw new ServiceException(e);
}
}
@Override
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
RpcController controller, ContainerBlocksDeletionACKProto request)
throws ServiceException {
try {
return impl.sendContainerBlocksDeletionACK(request);
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.ozone.OzoneConfigKeys;
import org.apache.hadoop.ozone.OzoneConfiguration;
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
@ -52,6 +53,9 @@
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
@ -91,6 +95,8 @@
import java.util.Set;
import java.util.TreeSet;
import java.util.UUID;
import java.util.Collections;
import java.util.stream.Collectors;
import static org.apache.hadoop.ozone.protocol.proto
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
@ -322,8 +328,8 @@ public static void main(String[] argv) throws IOException {
* @throws InvalidProtocolBufferException
*/
@VisibleForTesting
public static SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
throws InvalidProtocolBufferException {
public SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
throws IOException {
Type type = cmd.getType();
SCMCommandResponseProto.Builder builder =
SCMCommandResponseProto.newBuilder();
@ -346,6 +352,17 @@ public static SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
.setReregisterProto(SCMReregisterCmdResponseProto
.getDefaultInstance())
.build();
case deleteBlocksCommand:
// Once SCM sends out the deletion message, increment the count.
// this is done here instead of when SCM receives the ACK, because
// DN might not be able to response the ACK for sometime. In case
// it times out, SCM needs to re-send the message some more times.
List<Long> txs = ((DeleteBlocksCommand) cmd).blocksTobeDeleted()
.stream().map(tx -> tx.getTxID()).collect(Collectors.toList());
this.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
return builder.setCmdType(Type.deleteBlocksCommand)
.setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
.build();
default:
throw new IllegalArgumentException("Not implemented");
}
@ -591,6 +608,7 @@ public void start() throws IOException {
datanodeRpcAddress));
datanodeRpcServer.start();
httpServer.start();
scmBlockManager.start();
setStartTime();
@ -628,6 +646,13 @@ public void stop() {
LOG.error("Storage Container Manager HTTP server stop failed.", ex);
}
try {
LOG.info("Stopping Block Manager Service.");
scmBlockManager.stop();
} catch (Exception ex) {
LOG.error("SCM block manager service stop failed.", ex);
}
unregisterMXBean();
IOUtils.cleanupWithLogger(LOG, scmContainerManager);
IOUtils.cleanupWithLogger(LOG, scmBlockManager);
@ -714,6 +739,38 @@ public SCMHeartbeatResponseProto sendHeartbeat(DatanodeID datanodeID,
.build();
}
/**
* Handles the block deletion ACKs sent by datanodes. Once ACKs recieved,
* SCM considers the blocks are deleted and update the metadata in SCM DB.
*
* @param acks
* @return
* @throws IOException
*/
@Override
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
ContainerBlocksDeletionACKProto acks) throws IOException {
if (acks.getResultsCount() > 0) {
List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
for (DeleteBlockTransactionResult result : resultList) {
if (LOG.isDebugEnabled()) {
LOG.debug("Got block deletion ACK from datanode, TXIDs={}, "
+ "success={}", result.getTxID(), result.getSuccess());
}
if (result.getSuccess()) {
LOG.info("Purging TXID={} from block deletion log", result.getTxID());
this.getScmBlockManager().getDeletedBlockLog()
.commitTransactions(Collections.singletonList(result.getTxID()));
} else {
LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
+ "TX in next interval", result.getTxID());
}
}
}
return ContainerBlocksDeletionACKResponseProto.newBuilder()
.getDefaultInstanceForType();
}
/**
* Returns the Number of Datanodes that are communicating with SCM.
*
@ -742,6 +799,11 @@ public NodeManager getScmNodeManager() {
return scmNodeManager;
}
@VisibleForTesting
public BlockManager getScmBlockManager() {
return scmBlockManager;
}
/**
* Get block locations.
* @param keys batch of block keys to retrieve.

View File

@ -51,4 +51,21 @@ public interface BlockManager extends Closeable {
* @throws IOException
*/
void deleteBlock(String key) throws IOException;
/**
* @return the block deletion transaction log maintained by SCM.
*/
DeletedBlockLog getDeletedBlockLog();
/**
* Start block manager background services.
* @throws IOException
*/
void start() throws IOException;
/**
* Shutdown block manager background services.
* @throws IOException
*/
void stop() throws IOException;
}

View File

@ -46,6 +46,7 @@
import java.util.ArrayList;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.List;
@ -70,6 +71,14 @@
FAILED_TO_LOAD_OPEN_CONTAINER;
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
INVALID_BLOCK_SIZE;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
import static org.apache.hadoop.ozone.OzoneConfigKeys
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
/**
* Block Manager manages the block access for SCM.
@ -89,6 +98,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
// Track all containers owned by block service.
private final MetadataStore containerStore;
private final DeletedBlockLog deletedBlockLog;
private final SCMBlockDeletingService blockDeletingService;
private Map<OzoneProtos.LifeCycleState,
Map<String, BlockContainerInfo>> containers;
@ -143,7 +153,34 @@ public BlockManagerImpl(final Configuration conf,
this.lock = new ReentrantLock();
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
// SCM block deleting transaction log and deleting service.
deletedBlockLog = new DeletedBlockLogImpl(conf);
int svcInterval = conf.getInt(
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
long serviceTimeout = conf.getTimeDuration(
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
blockDeletingService = new SCMBlockDeletingService(deletedBlockLog,
containerManager, nodeManager, svcInterval, serviceTimeout);
}
/**
* Start block manager services.
* @throws IOException
*/
public void start() throws IOException {
this.blockDeletingService.start();
}
/**
* Shutdown block manager services.
* @throws IOException
*/
public void stop() throws IOException {
this.blockDeletingService.shutdown();
this.close();
}
// TODO: close full (or almost full) containers with a separate thread.
@ -475,6 +512,11 @@ public void deleteBlock(final String key) throws IOException {
}
}
@Override
public DeletedBlockLog getDeletedBlockLog() {
return this.deletedBlockLog;
}
@VisibleForTesting
public String getDeletedKeyName(String key) {
return StringUtils.format(".Deleted/%s", key);
@ -495,6 +537,7 @@ public void close() throws IOException {
if (deletedBlockLog != null) {
deletedBlockLog.close();
}
blockDeletingService.shutdown();
MBeans.unregister(mxBean);
}

View File

@ -17,7 +17,6 @@
*/
package org.apache.hadoop.ozone.scm.block;
import org.apache.hadoop.ozone.protocol.proto
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
@ -45,6 +44,17 @@ public interface DeletedBlockLog extends Closeable {
List<DeletedBlocksTransaction> getTransactions(int count)
throws IOException;
/**
* Return all failed transactions in the log. A transaction is considered
* to be failed if it has been sent more than MAX_RETRY limit and its
* count is reset to -1.
*
* @return a list of failed deleted block transactions.
* @throws IOException
*/
List<DeletedBlocksTransaction> getFailedTransactions()
throws IOException;
/**
* Increments count for given list of transactions by 1.
* The log maintains a valid range of counts for each transaction
@ -75,4 +85,13 @@ void incrementCount(List<Long> txIDs)
*/
void addTransaction(String containerName, List<String> blocks)
throws IOException;
/**
* Returns the total number of valid transactions. A transaction is
* considered to be valid as long as its count is in range [0, MAX_RETRY].
*
* @return number of a valid transactions.
* @throws IOException
*/
int getNumOfValidTransactions() throws IOException;
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ozone.scm.block;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Lists;
import com.google.common.primitives.Longs;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSUtil;
@ -36,6 +37,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@ -147,6 +149,28 @@ public List<DeletedBlocksTransaction> getTransactions(
return result;
}
@Override
public List<DeletedBlocksTransaction> getFailedTransactions()
throws IOException {
lock.lock();
try {
final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
deletedStore.iterate(null, (key, value) -> {
if (!Arrays.equals(LATEST_TXID, key)) {
DeletedBlocksTransaction delTX =
DeletedBlocksTransaction.parseFrom(value);
if (delTX.getCount() == -1) {
failedTXs.add(delTX);
}
}
return true;
});
return failedTXs;
} finally {
lock.unlock();
}
}
/**
* {@inheritDoc}
*
@ -163,13 +187,14 @@ public void incrementCount(List<Long> txIDs) throws IOException {
DeletedBlocksTransaction block = DeletedBlocksTransaction
.parseFrom(deletedStore.get(Longs.toByteArray(txID)));
DeletedBlocksTransaction.Builder builder = block.toBuilder();
if (block.getCount() > -1) {
builder.setCount(block.getCount() + 1);
int currentCount = block.getCount();
if (currentCount > -1) {
builder.setCount(++currentCount);
}
// if the retry time exceeds the maxRetry value
// then set the retry value to -1, stop retrying, admins can
// analyze those blocks and purge them manually by SCMCli.
if (block.getCount() > maxRetry) {
if (currentCount > maxRetry) {
builder.setCount(-1);
}
deletedStore.put(Longs.toByteArray(txID),
@ -237,6 +262,28 @@ public void addTransaction(String containerName, List<String> blocks)
}
}
@Override
public int getNumOfValidTransactions() throws IOException {
lock.lock();
try {
final AtomicInteger num = new AtomicInteger(0);
deletedStore.iterate(null, (key, value) -> {
// Exclude latest txid record
if (!Arrays.equals(LATEST_TXID, key)) {
DeletedBlocksTransaction delTX =
DeletedBlocksTransaction.parseFrom(value);
if (delTX.getCount() > -1) {
num.incrementAndGet();
}
}
return true;
});
return num.get();
} finally {
lock.unlock();
}
}
@Override
public void close() throws IOException {
if (deletedStore != null) {

View File

@ -0,0 +1,217 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone.scm.block;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.ozone.scm.container.Mapping;
import org.apache.hadoop.ozone.scm.node.NodeManager;
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.utils.BackgroundService;
import org.apache.hadoop.utils.BackgroundTask;
import org.apache.hadoop.utils.BackgroundTaskQueue;
import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
/**
* A background service running in SCM to delete blocks. This service scans
* block deletion log in certain interval and caches block deletion commands
* in {@link org.apache.hadoop.ozone.scm.node.CommandQueue}, asynchronously
* SCM HB thread polls cached commands and sends them to datanode for physical
* processing.
*/
public class SCMBlockDeletingService extends BackgroundService {
private static final Logger LOG =
LoggerFactory.getLogger(SCMBlockDeletingService.class);
// ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2;
private final DeletedBlockLog deletedBlockLog;
private final Mapping mappingService;
private final NodeManager nodeManager;
// Default container size is 5G and block size is 256MB, a full container
// at most contains 20 blocks. At most each TX contains 20 blocks.
// When SCM sends block deletion TXs to datanode, each command we allow
// at most 50 containers so that will limit number of to be deleted blocks
// less than 1000.
// TODO - a better throttle algorithm
// Note, this is not an accurate limit of blocks. When we scan
// the log, worst case we may get 50 TX for 50 different datanodes,
// that will cause the deletion message sent by SCM extremely small.
// As a result, the deletion will be slow. An improvement is to scan
// log multiple times until we get enough TXs for each datanode, or
// the entire log is scanned.
private static final int BLOCK_DELETE_TX_PER_REQUEST_LIMIT = 50;
public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
Mapping mapper, NodeManager nodeManager,
int interval, long serviceTimeout) {
super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
this.deletedBlockLog = deletedBlockLog;
this.mappingService = mapper;
this.nodeManager = nodeManager;
}
@Override
public BackgroundTaskQueue getTasks() {
BackgroundTaskQueue queue = new BackgroundTaskQueue();
queue.add(new DeletedBlockTransactionScanner());
return queue;
}
private class DeletedBlockTransactionScanner
implements BackgroundTask<EmptyTaskResult> {
@Override
public int getPriority() {
return 1;
}
@Override
public EmptyTaskResult call() throws Exception {
// Scan SCM DB in HB interval and collect a throttled list of
// to delete blocks.
LOG.info("Running DeletedBlockTransactionScanner");
DatanodeDeletedBlockTransactions transactions =
getToDeleteContainerBlocks();
if (transactions != null && !transactions.isEmpty()) {
transactions.getDatanodes().forEach(datanodeID -> {
List<DeletedBlocksTransaction> dnTXs =
transactions.getDatanodeTransactions(datanodeID);
// TODO commandQueue needs a cap.
// We should stop caching new commands if num of un-processed
// command is bigger than a limit, e.g 50. In case datanode goes
// offline for sometime, the cached commands be flooded.
nodeManager.addDatanodeCommand(datanodeID,
new DeleteBlocksCommand(dnTXs));
LOG.info("Added delete block command for datanode {} in the queue,"
+ " number delete block transactions: {}, TxID list: {}",
datanodeID, dnTXs.size(),
String.join(",", transactions.getTransactionIDList(datanodeID)));
});
}
return EmptyTaskResult.newResult();
}
// Scan deleteBlocks.db to get a number of to-delete blocks.
// this is going to be properly throttled.
private DatanodeDeletedBlockTransactions getToDeleteContainerBlocks() {
DatanodeDeletedBlockTransactions dnTXs =
new DatanodeDeletedBlockTransactions();
List<DeletedBlocksTransaction> txs = null;
try {
// Get a limited number of TXs to send via HB at a time.
txs = deletedBlockLog
.getTransactions(BLOCK_DELETE_TX_PER_REQUEST_LIMIT);
LOG.info("Scanned deleted blocks log and got {} delTX to process",
txs.size());
} catch (IOException e) {
// We may tolerant a number of failures for sometime
// but if it continues to fail, at some point we need to raise
// an exception and probably fail the SCM ? At present, it simply
// continues to retry the scanning.
LOG.error("Failed to get block deletion transactions from delTX log",
e);
}
if (txs != null) {
for (DeletedBlocksTransaction tx : txs) {
try {
ContainerInfo info = mappingService
.getContainer(tx.getContainerName());
// Find out the datanode where this TX is supposed to send to.
info.getPipeline().getMachines()
.forEach(entry -> dnTXs.addTransaction(entry, tx));
} catch (IOException e) {
LOG.warn("Container {} not found, continue to process next",
tx.getContainerName(), e);
}
}
}
return dnTXs;
}
}
/**
* A wrapper class to hold info about datanode and all deleted block
* transactions that will be sent to this datanode.
*/
private static class DatanodeDeletedBlockTransactions {
// A list of TXs mapped to a certain datanode ID.
private final Map<DatanodeID, List<DeletedBlocksTransaction>> transactions;
DatanodeDeletedBlockTransactions() {
this.transactions = Maps.newHashMap();
}
void addTransaction(DatanodeID dnID, DeletedBlocksTransaction tx) {
if (transactions.containsKey(dnID)) {
transactions.get(dnID).add(tx);
} else {
List<DeletedBlocksTransaction> first = Lists.newArrayList();
first.add(tx);
transactions.put(dnID, first);
}
LOG.info("Transaction added: {} <- TX({})", dnID, tx.getTxID());
}
Set<DatanodeID> getDatanodes() {
return transactions.keySet();
}
boolean isEmpty() {
return transactions.isEmpty();
}
boolean hasTransactions(DatanodeID dnID) {
return transactions.containsKey(dnID) &&
!transactions.get(dnID).isEmpty();
}
List<DeletedBlocksTransaction> getDatanodeTransactions(DatanodeID dnID) {
return transactions.get(dnID);
}
List<String> getTransactionIDList(DatanodeID dnID) {
if (hasTransactions(dnID)) {
return transactions.get(dnID).stream()
.map(DeletedBlocksTransaction::getTxID)
.map(String::valueOf)
.collect(Collectors.toList());
} else {
return Collections.emptyList();
}
}
}
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
@ -134,4 +135,12 @@ public interface NodeManager extends StorageContainerNodeProtocol,
* @return Healthy/Stale/Dead.
*/
NodeState getNodeState(DatanodeID id);
/**
* Add a {@link SCMCommand} to the command queue, which are
* handled by HB thread asynchronously.
* @param id
* @param command
*/
default void addDatanodeCommand(DatanodeID id, SCMCommand command) {}
}

View File

@ -836,4 +836,9 @@ public Map<String, Integer> getNodeCount() {
}
return nodeCountMap;
}
@Override
public void addDatanodeCommand(DatanodeID id, SCMCommand command) {
this.commandQueue.addCommand(id, command);
}
}

View File

@ -23,7 +23,22 @@
public interface BackgroundTaskResult {
/**
* Returns the size of entries included in this result.
* Returns the size of entries included in this result.
*/
int getSize();
/**
* An empty task result implementation.
*/
class EmptyTaskResult implements BackgroundTaskResult {
public static EmptyTaskResult newResult() {
return new EmptyTaskResult();
}
@Override
public int getSize() {
return 0;
}
}
}

View File

@ -122,7 +122,6 @@ message ContainerReportsProto {
required reportType type = 3;
}
/**
* This message is send along with the heart beat to report datanode
* storage utilization by SCM.
@ -210,6 +209,7 @@ enum Type {
registeredCommand = 3;
sendContainerReport = 4;
reregisterCommand = 5;
deleteBlocksCommand = 6;
}
/*
@ -221,6 +221,7 @@ message SCMCommandResponseProto {
optional SCMVersionResponseProto versionProto = 4;
optional SendContainerReportProto sendReport = 5;
optional SCMReregisterCmdResponseProto reregisterProto = 6;
optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 7;
}
@ -231,6 +232,24 @@ message SCMHeartbeatResponseProto {
repeated SCMCommandResponseProto commands = 1;
}
// HB response from SCM, contains a list of block deletion transactions.
message SCMDeleteBlocksCmdResponseProto {
repeated DeletedBlocksTransaction deletedBlocksTransactions = 1;
}
// SendACK response returned by datanode to SCM, currently empty.
message ContainerBlocksDeletionACKResponseProto {
}
// ACK message datanode sent to SCM, contains the result of
// block deletion transactions.
message ContainerBlocksDeletionACKProto {
message DeleteBlockTransactionResult {
required int64 txID = 1;
required bool success = 2;
}
repeated DeleteBlockTransactionResult results = 1;
}
/**
* Protocol used from a datanode to StorageContainerManager.
@ -318,6 +337,10 @@ service StorageContainerDatanodeProtocolService {
send container reports sends the container report to SCM. This will
return a null command as response.
*/
rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto);
rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto);
/**
* Sends the block deletion ACK to SCM.
*/
rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns (ContainerBlocksDeletionACKResponseProto);
}

View File

@ -17,22 +17,35 @@
*/
package org.apache.hadoop.ozone;
import static org.junit.Assert.*;
import static org.junit.Assert.fail;
import java.io.IOException;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
import org.apache.hadoop.ozone.scm.StorageContainerManager;
import org.apache.hadoop.ozone.scm.block.DeletedBlockLog;
import org.apache.hadoop.scm.XceiverClientManager;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.junit.Rule;
import org.junit.Assert;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import java.util.List;
import java.util.HashSet;
import java.util.Set;
import java.util.Map;
import java.util.Collections;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
import org.apache.hadoop.scm.ScmConfigKeys;
import org.apache.hadoop.io.IOUtils;
import org.junit.rules.Timeout;
import org.mockito.Mockito;
import org.apache.hadoop.test.GenericTestUtils;
/**
* Test class that exercises the StorageContainerManager.
@ -149,4 +162,103 @@ private void verifyPermissionDeniedException(Exception e, String userName) {
Assert.assertTrue(e instanceof IOException);
Assert.assertEquals(expectedErrorMessage, e.getMessage());
}
@Test
public void testBlockDeletionTransactions() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 5);
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 3000);
conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5);
conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000);
MiniOzoneCluster cluster =
new MiniOzoneCluster.Builder(conf).numDataNodes(1)
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
DeletedBlockLog delLog = cluster.getStorageContainerManager()
.getScmBlockManager().getDeletedBlockLog();
Assert.assertEquals(0, delLog.getNumOfValidTransactions());
// Create 20 random names keys.
TestStorageContainerManagerHelper helper =
new TestStorageContainerManagerHelper(cluster, conf);
Map<String, KsmKeyInfo> keyLocations = helper.createKeys(20, 4096);
// These keys will be written into a bunch of containers,
// gets a set of container names, verify container containerBlocks
// on datanodes.
Set<String> containerNames = new HashSet<>();
for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) {
entry.getValue().getKeyLocationList()
.forEach(loc -> containerNames.add(loc.getContainerName()));
}
// Total number of containerBlocks of these containers should be equal to
// total number of containerBlocks via creation call.
int totalCreatedBlocks = 0;
for (KsmKeyInfo info : keyLocations.values()) {
totalCreatedBlocks += info.getKeyLocationList().size();
}
Assert.assertTrue(totalCreatedBlocks > 0);
Assert.assertEquals(totalCreatedBlocks,
helper.getAllBlocks(containerNames).size());
// Create a deletion TX for each key.
Map<String, List<String>> containerBlocks = Maps.newHashMap();
for (KsmKeyInfo info : keyLocations.values()) {
List<KsmKeyLocationInfo> list = info.getKeyLocationList();
list.forEach(location -> {
if (containerBlocks.containsKey(location.getContainerName())) {
containerBlocks.get(location.getContainerName())
.add(location.getBlockID());
} else {
List<String> blks = Lists.newArrayList();
blks.add(location.getBlockID());
containerBlocks.put(location.getContainerName(), blks);
}
});
}
for (Map.Entry<String, List<String>> tx : containerBlocks.entrySet()) {
delLog.addTransaction(tx.getKey(), tx.getValue());
}
// Verify a few TX gets created in the TX log.
Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
// Once TXs are written into the log, SCM starts to fetch TX
// entries from the log and schedule block deletions in HB interval,
// after sometime, all the TX should be proceed and by then
// the number of containerBlocks of all known containers will be
// empty again.
GenericTestUtils.waitFor(() -> {
try {
return delLog.getNumOfValidTransactions() == 0;
} catch (IOException e) {
return false;
}
}, 1000, 10000);
Assert.assertTrue(helper.getAllBlocks(containerNames).isEmpty());
// Continue the work, add some TXs that with known container names,
// but unknown block IDs.
for (String containerName : containerBlocks.keySet()) {
// Add 2 TXs per container.
delLog.addTransaction(containerName,
Collections.singletonList(RandomStringUtils.randomAlphabetic(5)));
delLog.addTransaction(containerName,
Collections.singletonList(RandomStringUtils.randomAlphabetic(5)));
}
// Verify a few TX gets created in the TX log.
Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
// These blocks cannot be found in the container, skip deleting them
// eventually these TX will success.
GenericTestUtils.waitFor(() -> {
try {
return delLog.getFailedTransactions().size() == 0;
} catch (IOException e) {
return false;
}
}, 1000, 10000);
}
}

View File

@ -0,0 +1,180 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.ozone;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import org.apache.commons.lang.RandomStringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
import org.apache.hadoop.ozone.web.handlers.UserArgs;
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
import org.apache.hadoop.utils.MetadataStore;
import java.io.IOException;
import java.io.OutputStream;
import java.util.List;
import java.util.Map;
import java.util.LinkedList;
import java.util.Set;
/**
* A helper class used by {@link TestStorageContainerManager} to generate
* some keys and helps to verify containers and blocks locations.
*/
public class TestStorageContainerManagerHelper {
private final MiniOzoneCluster cluster;
private final Configuration conf;
private final StorageHandler storageHandler;
public TestStorageContainerManagerHelper(MiniOzoneCluster cluster,
Configuration conf) throws IOException {
this.cluster = cluster;
this.conf = conf;
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
}
public Map<String, KsmKeyInfo> createKeys(int numOfKeys, int keySize)
throws Exception {
Map<String, KsmKeyInfo> keyLocationMap = Maps.newHashMap();
String volume = "volume" + RandomStringUtils.randomNumeric(5);
String bucket = "bucket" + RandomStringUtils.randomNumeric(5);
String userName = "user" + RandomStringUtils.randomNumeric(5);
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
UserArgs userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
null, null, null, null);
VolumeArgs createVolumeArgs = new VolumeArgs(volume, userArgs);
createVolumeArgs.setUserName(userName);
createVolumeArgs.setAdminName(adminName);
storageHandler.createVolume(createVolumeArgs);
BucketArgs bucketArgs = new BucketArgs(bucket, createVolumeArgs);
bucketArgs.setAddAcls(new LinkedList<>());
bucketArgs.setRemoveAcls(new LinkedList<>());
bucketArgs.setStorageType(StorageType.DISK);
storageHandler.createBucket(bucketArgs);
// Write 20 keys in bucket.
Set<String> keyNames = Sets.newHashSet();
KeyArgs keyArgs;
for (int i = 0; i < numOfKeys; i++) {
String keyName = RandomStringUtils.randomAlphabetic(5) + i;
keyNames.add(keyName);
keyArgs = new KeyArgs(keyName, bucketArgs);
keyArgs.setSize(keySize);
// Just for testing list keys call, so no need to write real data.
OutputStream stream = storageHandler.newKeyWriter(keyArgs);
stream.write(DFSUtil.string2Bytes(
RandomStringUtils.randomAlphabetic(5)));
stream.close();
}
for (String key : keyNames) {
KsmKeyArgs arg = new KsmKeyArgs.Builder()
.setVolumeName(volume)
.setBucketName(bucket)
.setKeyName(key)
.build();
KsmKeyInfo location = cluster.getKeySpaceManager()
.lookupKey(arg);
keyLocationMap.put(key, location);
}
return keyLocationMap;
}
public List<String> getPendingDeletionBlocks(String containerName)
throws IOException {
List<String> pendingDeletionBlocks = Lists.newArrayList();
MetadataStore meta = getContainerMetadata(containerName);
KeyPrefixFilter filter =
new KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> kvs = meta
.getRangeKVs(null, Integer.MAX_VALUE, filter);
kvs.forEach(entry -> {
String key = DFSUtil.bytes2String(entry.getKey());
pendingDeletionBlocks
.add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, ""));
});
return pendingDeletionBlocks;
}
public List<String> getAllBlocks(Set<String> containerNames)
throws IOException {
List<String> allBlocks = Lists.newArrayList();
for (String containerName : containerNames) {
allBlocks.addAll(getAllBlocks(containerName));
}
return allBlocks;
}
public List<String> getAllBlocks(String containerName) throws IOException {
List<String> allBlocks = Lists.newArrayList();
MetadataStore meta = getContainerMetadata(containerName);
MetadataKeyFilter filter =
(preKey, currentKey, nextKey) -> !DFSUtil.bytes2String(currentKey)
.startsWith(OzoneConsts.DELETING_KEY_PREFIX);
List<Map.Entry<byte[], byte[]>> kvs =
meta.getRangeKVs(null, Integer.MAX_VALUE, filter);
kvs.forEach(entry -> {
String key = DFSUtil.bytes2String(entry.getKey());
allBlocks.add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, ""));
});
return allBlocks;
}
private MetadataStore getContainerMetadata(String containerName)
throws IOException {
Pipeline pipeline = cluster.getStorageContainerManager()
.getContainer(containerName);
DatanodeID leadDN = pipeline.getLeader();
OzoneContainer containerServer =
getContainerServerByDatanodeID(leadDN.getDatanodeUuid());
ContainerData containerData = containerServer.getContainerManager()
.readContainer(containerName);
return KeyUtils.getDB(containerData, conf);
}
private OzoneContainer getContainerServerByDatanodeID(String dnUUID)
throws IOException {
for (DataNode dn : cluster.getDataNodes()) {
if (dn.getDatanodeId().getDatanodeUuid().equals(dnUUID)) {
return dn.getOzoneContainerManager();
}
}
throw new IOException("Unable to get the ozone container "
+ "for given datanode ID " + dnUUID);
}
}

View File

@ -25,6 +25,8 @@
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
import org.apache.hadoop.ozone.scm.VersionInfo;
import java.io.IOException;
@ -188,6 +190,13 @@ private void sleepIfNeeded() {
.build();
}
@Override
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
ContainerBlocksDeletionACKProto request) throws IOException {
return ContainerBlocksDeletionACKResponseProto
.newBuilder().getDefaultInstanceForType();
}
public ReportState getReportState() {
return this.reportState;
}