HDFS-12282. Ozone: DeleteKey-4: Block delete via HB between SCM and DN. Contributed by Weiwei Yang.
This commit is contained in:
parent
1586f20fc5
commit
76a156ebce
|
@ -0,0 +1,103 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.helpers;
|
||||
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A helper class to wrap the info about under deletion container blocks.
|
||||
*/
|
||||
public final class DeletedContainerBlocksSummary {
|
||||
|
||||
private final List<DeletedBlocksTransaction> blocks;
|
||||
// key : txID
|
||||
// value : times of this tx has been processed
|
||||
private final Map<Long, Integer> txSummary;
|
||||
// key : container name
|
||||
// value : the number of blocks need to be deleted in this container
|
||||
// if the message contains multiple entries for same block,
|
||||
// blocks will be merged
|
||||
private final Map<String, Integer> blockSummary;
|
||||
// total number of blocks in this message
|
||||
private int numOfBlocks;
|
||||
|
||||
private DeletedContainerBlocksSummary(List<DeletedBlocksTransaction> blocks) {
|
||||
this.blocks = blocks;
|
||||
txSummary = Maps.newHashMap();
|
||||
blockSummary = Maps.newHashMap();
|
||||
blocks.forEach(entry -> {
|
||||
txSummary.put(entry.getTxID(), entry.getCount());
|
||||
if (blockSummary.containsKey(entry.getContainerName())) {
|
||||
blockSummary.put(entry.getContainerName(),
|
||||
blockSummary.get(entry.getContainerName())
|
||||
+ entry.getBlockIDCount());
|
||||
} else {
|
||||
blockSummary.put(entry.getContainerName(), entry.getBlockIDCount());
|
||||
}
|
||||
numOfBlocks += entry.getBlockIDCount();
|
||||
});
|
||||
}
|
||||
|
||||
public static DeletedContainerBlocksSummary getFrom(
|
||||
List<DeletedBlocksTransaction> blocks) {
|
||||
return new DeletedContainerBlocksSummary(blocks);
|
||||
}
|
||||
|
||||
public int getNumOfBlocks() {
|
||||
return numOfBlocks;
|
||||
}
|
||||
|
||||
public int getNumOfContainers() {
|
||||
return blockSummary.size();
|
||||
}
|
||||
|
||||
public String getTXIDs() {
|
||||
return String.join(",", txSummary.keySet()
|
||||
.stream().map(String::valueOf).collect(Collectors.toList()));
|
||||
}
|
||||
|
||||
public String getTxIDSummary() {
|
||||
List<String> txSummaryEntry = txSummary.entrySet().stream()
|
||||
.map(entry -> entry.getKey() + "(" + entry.getValue() + ")")
|
||||
.collect(Collectors.toList());
|
||||
return "[" + String.join(",", txSummaryEntry) + "]";
|
||||
}
|
||||
|
||||
@Override public String toString() {
|
||||
StringBuffer sb = new StringBuffer();
|
||||
for (DeletedBlocksTransaction blks : blocks) {
|
||||
sb.append(" ")
|
||||
.append("TXID=")
|
||||
.append(blks.getTxID())
|
||||
.append(", ")
|
||||
.append("TimesProceed=")
|
||||
.append(blks.getCount())
|
||||
.append(", ")
|
||||
.append(blks.getContainerName())
|
||||
.append(" : [")
|
||||
.append(String.join(",", blks.getBlockIDList())).append("]")
|
||||
.append("\n");
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
|
@ -24,6 +24,7 @@ import org.apache.hadoop.ozone.OzoneConfiguration;
|
|||
import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.CommandDispatcher;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.ContainerReportHandler;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler.DeleteBlocksCommandHandler;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
@ -77,15 +78,16 @@ public class DatanodeStateMachine implements Closeable {
|
|||
this.datanodeID = datanodeID;
|
||||
nextHB = new AtomicLong(Time.monotonicNow());
|
||||
|
||||
|
||||
// When we add new handlers just adding a new handler here should do the
|
||||
// trick.
|
||||
commandDispatcher = CommandDispatcher.newBuilder()
|
||||
.addHandler(new ContainerReportHandler())
|
||||
.setConnectionManager(connectionManager)
|
||||
.setContainer(container)
|
||||
.setContext(context)
|
||||
.build();
|
||||
.addHandler(new ContainerReportHandler())
|
||||
.addHandler(new DeleteBlocksCommandHandler(
|
||||
container.getContainerManager(), conf))
|
||||
.setConnectionManager(connectionManager)
|
||||
.setContainer(container)
|
||||
.setContext(context)
|
||||
.build();
|
||||
}
|
||||
|
||||
public void setDatanodeID(DatanodeID datanodeID) {
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.hdfs.DFSUtil;
|
|||
import org.apache.hadoop.hdfs.ozone.protocol.proto.ContainerProtos;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
import org.apache.hadoop.scm.container.common.helpers.StorageContainerException;
|
||||
|
@ -116,6 +117,11 @@ public class BlockDeletingService extends BackgroundService{
|
|||
LOG.warn("Failed to initiate block deleting tasks, "
|
||||
+ "caused by unable to get containers info. "
|
||||
+ "Retry in next interval. ", e);
|
||||
} catch (Exception e) {
|
||||
// In case listContainer call throws any uncaught RuntimeException.
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Unexpected error occurs during deleting blocks.", e);
|
||||
}
|
||||
}
|
||||
return queue;
|
||||
}
|
||||
|
@ -159,6 +165,7 @@ public class BlockDeletingService extends BackgroundService{
|
|||
|
||||
@Override
|
||||
public BackgroundTaskResult call() throws Exception {
|
||||
ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
|
||||
long startTime = Time.monotonicNow();
|
||||
// Scan container's db and get list of under deletion blocks
|
||||
MetadataStore meta = KeyUtils.getDB(containerData, conf);
|
||||
|
@ -175,17 +182,24 @@ public class BlockDeletingService extends BackgroundService{
|
|||
List<String> succeedBlocks = new LinkedList<>();
|
||||
LOG.debug("Container : {}, To-Delete blocks : {}",
|
||||
containerData.getContainerName(), toDeleteBlocks.size());
|
||||
File dataDir = ContainerUtils.getDataDirectory(containerData).toFile();
|
||||
if (!dataDir.exists() || !dataDir.isDirectory()) {
|
||||
LOG.error("Invalid container data dir {} : "
|
||||
+ "not exist or not a directory", dataDir.getAbsolutePath());
|
||||
return crr;
|
||||
}
|
||||
|
||||
toDeleteBlocks.forEach(entry -> {
|
||||
String blockName = DFSUtil.bytes2String(entry.getKey());
|
||||
LOG.debug("Deleting block {}", blockName);
|
||||
try {
|
||||
ContainerProtos.KeyData data =
|
||||
ContainerProtos.KeyData.parseFrom(entry.getValue());
|
||||
|
||||
for (ContainerProtos.ChunkInfo chunkInfo : data.getChunksList()) {
|
||||
File chunkFile = new File(chunkInfo.getChunkName());
|
||||
File chunkFile = dataDir.toPath()
|
||||
.resolve(chunkInfo.getChunkName()).toFile();
|
||||
if (FileUtils.deleteQuietly(chunkFile)) {
|
||||
LOG.debug("block {} chunk {} deleted", blockName,
|
||||
LOG.info("block {} chunk {} deleted", blockName,
|
||||
chunkFile.getAbsolutePath());
|
||||
}
|
||||
}
|
||||
|
@ -201,11 +215,11 @@ public class BlockDeletingService extends BackgroundService{
|
|||
batch.delete(DFSUtil.string2Bytes(entry)));
|
||||
meta.writeBatch(batch);
|
||||
|
||||
LOG.info("The elapsed time of task@{} for"
|
||||
+ " deleting blocks: {}ms.",
|
||||
Integer.toHexString(this.hashCode()),
|
||||
Time.monotonicNow() - startTime);
|
||||
ContainerBackgroundTaskResult crr = new ContainerBackgroundTaskResult();
|
||||
if (!succeedBlocks.isEmpty()) {
|
||||
LOG.info("Container: {}, deleted blocks: {}, task elapsed time: {}ms",
|
||||
containerData.getContainerName(), succeedBlocks.size(),
|
||||
Time.monotonicNow() - startTime);
|
||||
}
|
||||
crr.addAll(succeedBlocks);
|
||||
return crr;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,198 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
|
||||
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.ozone.OzoneConsts;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.DeletedContainerBlocksSummary;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.container.common.interfaces.ContainerManager;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.SCMConnectionManager;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.utils.BatchOperation;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* Handle block deletion commands.
|
||||
*/
|
||||
public class DeleteBlocksCommandHandler implements CommandHandler {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(DeleteBlocksCommandHandler.class);
|
||||
|
||||
private ContainerManager containerManager;
|
||||
private Configuration conf;
|
||||
private int invocationCount;
|
||||
private long totalTime;
|
||||
|
||||
public DeleteBlocksCommandHandler(ContainerManager containerManager,
|
||||
Configuration conf) {
|
||||
this.containerManager = containerManager;
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handle(SCMCommand command, OzoneContainer container,
|
||||
StateContext context, SCMConnectionManager connectionManager) {
|
||||
if (command.getType() != Type.deleteBlocksCommand) {
|
||||
LOG.warn("Skipping handling command, expected command "
|
||||
+ "type {} but found {}",
|
||||
Type.deleteBlocksCommand, command.getType());
|
||||
return;
|
||||
}
|
||||
LOG.debug("Processing block deletion command.");
|
||||
invocationCount++;
|
||||
long startTime = Time.monotonicNow();
|
||||
|
||||
// move blocks to deleting state.
|
||||
// this is a metadata update, the actual deletion happens in another
|
||||
// recycling thread.
|
||||
DeleteBlocksCommand cmd = (DeleteBlocksCommand) command;
|
||||
List<DeletedBlocksTransaction> containerBlocks = cmd.blocksTobeDeleted();
|
||||
|
||||
|
||||
DeletedContainerBlocksSummary summary =
|
||||
DeletedContainerBlocksSummary.getFrom(containerBlocks);
|
||||
LOG.info("Start to delete container blocks, TXIDs={}, "
|
||||
+ "numOfContainers={}, numOfBlocks={}",
|
||||
summary.getTxIDSummary(),
|
||||
summary.getNumOfContainers(),
|
||||
summary.getNumOfBlocks());
|
||||
|
||||
ContainerBlocksDeletionACKProto.Builder resultBuilder =
|
||||
ContainerBlocksDeletionACKProto.newBuilder();
|
||||
containerBlocks.forEach(entry -> {
|
||||
DeleteBlockTransactionResult.Builder txResultBuilder =
|
||||
DeleteBlockTransactionResult.newBuilder();
|
||||
txResultBuilder.setTxID(entry.getTxID());
|
||||
try {
|
||||
deleteContainerBlocks(entry, conf);
|
||||
txResultBuilder.setSuccess(true);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to delete blocks for container={}, TXID={}",
|
||||
entry.getContainerName(), entry.getTxID(), e);
|
||||
txResultBuilder.setSuccess(false);
|
||||
}
|
||||
resultBuilder.addResults(txResultBuilder.build());
|
||||
});
|
||||
ContainerBlocksDeletionACKProto blockDeletionACK = resultBuilder.build();
|
||||
|
||||
// Send ACK back to SCM as long as meta updated
|
||||
// TODO Or we should wait until the blocks are actually deleted?
|
||||
if (!containerBlocks.isEmpty()) {
|
||||
for (EndpointStateMachine endPoint : connectionManager.getValues()) {
|
||||
try {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sending following block deletion ACK to SCM");
|
||||
for (DeleteBlockTransactionResult result :
|
||||
blockDeletionACK.getResultsList()) {
|
||||
LOG.debug(result.getTxID() + " : " + result.getSuccess());
|
||||
}
|
||||
}
|
||||
endPoint.getEndPoint()
|
||||
.sendContainerBlocksDeletionACK(blockDeletionACK);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to send block deletion ACK to SCM {}",
|
||||
endPoint.getAddress().toString(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
long endTime = Time.monotonicNow();
|
||||
totalTime += endTime - startTime;
|
||||
}
|
||||
|
||||
/**
|
||||
* Move a bunch of blocks from a container to deleting state.
|
||||
* This is a meta update, the actual deletes happen in async mode.
|
||||
*
|
||||
* @param delTX a block deletion transaction.
|
||||
* @param config configuration.
|
||||
* @throws IOException if I/O error occurs.
|
||||
*/
|
||||
private void deleteContainerBlocks(DeletedBlocksTransaction delTX,
|
||||
Configuration config) throws IOException {
|
||||
String containerId = delTX.getContainerName();
|
||||
ContainerData containerInfo = containerManager.readContainer(containerId);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Processing Container : {}, DB path : {}", containerId,
|
||||
containerInfo.getDBPath());
|
||||
}
|
||||
MetadataStore containerDB = KeyUtils.getDB(containerInfo, config);
|
||||
for (String blk : delTX.getBlockIDList()) {
|
||||
BatchOperation batch = new BatchOperation();
|
||||
byte[] blkBytes = DFSUtil.string2Bytes(blk);
|
||||
byte[] blkInfo = containerDB.get(blkBytes);
|
||||
if (blkInfo != null) {
|
||||
// Found the block in container db,
|
||||
// use an atomic update to change its state to deleting.
|
||||
batch.put(DFSUtil.string2Bytes(OzoneConsts.DELETING_KEY_PREFIX + blk),
|
||||
blkInfo);
|
||||
batch.delete(blkBytes);
|
||||
try {
|
||||
containerDB.writeBatch(batch);
|
||||
LOG.info("Transited Block {} to DELETING state in container {}",
|
||||
blk, containerId);
|
||||
} catch (IOException e) {
|
||||
// if some blocks failed to delete, we fail this TX,
|
||||
// without sending this ACK to SCM, SCM will resend the TX
|
||||
// with a certain number of retries.
|
||||
throw new IOException(
|
||||
"Failed to delete blocks for TXID = " + delTX.getTxID(), e);
|
||||
}
|
||||
} else {
|
||||
LOG.info("Block {} not found or already under deletion in"
|
||||
+ " container {}, skip deleting it.", blk, containerId);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type getCommandType() {
|
||||
return Type.deleteBlocksCommand;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getInvocationCount() {
|
||||
return this.invocationCount;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAverageRunTime() {
|
||||
if (invocationCount > 0) {
|
||||
return totalTime / invocationCount;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
}
|
|
@ -21,11 +21,14 @@ package org.apache.hadoop.ozone.container.common.states.endpoint;
|
|||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.ozone.container.common.helpers
|
||||
.DeletedContainerBlocksSummary;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine
|
||||
.EndpointStateMachine;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine
|
||||
.EndpointStateMachine.EndPointStates;
|
||||
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
|
||||
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.ContainerNodeIDProto;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SendContainerCommand;
|
||||
|
@ -144,6 +147,18 @@ public class HeartbeatEndpointTask
|
|||
}
|
||||
}
|
||||
break;
|
||||
case deleteBlocksCommand:
|
||||
DeleteBlocksCommand db = DeleteBlocksCommand
|
||||
.getFromProtobuf(commandResponseProto.getDeleteBlocksProto());
|
||||
if (!db.blocksTobeDeleted().isEmpty()) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(DeletedContainerBlocksSummary
|
||||
.getFrom(db.blocksTobeDeleted())
|
||||
.toString());
|
||||
}
|
||||
this.context.addCommand(db);
|
||||
}
|
||||
break;
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown response : "
|
||||
+ commandResponseProto.getCmdType().name());
|
||||
|
|
|
@ -25,7 +25,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
|
|||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
|
@ -70,4 +71,13 @@ public interface StorageContainerDatanodeProtocol {
|
|||
*/
|
||||
SCMHeartbeatResponseProto sendContainerReport(ContainerReportsProto reports)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Used by datanode to send block deletion ACK to SCM.
|
||||
* @param request block deletion transactions.
|
||||
* @return block deletion transaction response.
|
||||
* @throws IOException
|
||||
*/
|
||||
ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
|
||||
ContainerBlocksDeletionACKProto request) throws IOException;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,63 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.protocol.commands;
|
||||
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMDeleteBlocksCmdResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* A SCM command asks a datanode to delete a number of blocks.
|
||||
*/
|
||||
public class DeleteBlocksCommand extends
|
||||
SCMCommand<SCMDeleteBlocksCmdResponseProto> {
|
||||
|
||||
private List<DeletedBlocksTransaction> blocksTobeDeleted;
|
||||
|
||||
|
||||
public DeleteBlocksCommand(List<DeletedBlocksTransaction> blocks) {
|
||||
this.blocksTobeDeleted = blocks;
|
||||
}
|
||||
|
||||
public List<DeletedBlocksTransaction> blocksTobeDeleted() {
|
||||
return this.blocksTobeDeleted;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Type getType() {
|
||||
return Type.deleteBlocksCommand;
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte[] getProtoBufMessage() {
|
||||
return getProto().toByteArray();
|
||||
}
|
||||
|
||||
public static DeleteBlocksCommand getFromProtobuf(
|
||||
SCMDeleteBlocksCmdResponseProto deleteBlocksProto) {
|
||||
return new DeleteBlocksCommand(deleteBlocksProto
|
||||
.getDeletedBlocksTransactionsList());
|
||||
}
|
||||
|
||||
public SCMDeleteBlocksCmdResponseProto getProto() {
|
||||
return SCMDeleteBlocksCmdResponseProto.newBuilder()
|
||||
.addAllDeletedBlocksTransactions(blocksTobeDeleted).build();
|
||||
}
|
||||
}
|
|
@ -38,6 +38,8 @@ import org.apache.hadoop.ozone.protocol.proto
|
|||
.StorageContainerDatanodeProtocolProtos.SCMRegisteredCmdResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
|
@ -177,4 +179,16 @@ public class StorageContainerDatanodeProtocolClientSideTranslatorPB
|
|||
return resp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
|
||||
ContainerBlocksDeletionACKProto deletedBlocks) throws IOException {
|
||||
final ContainerBlocksDeletionACKResponseProto resp;
|
||||
try {
|
||||
resp = rpcProxy.sendContainerBlocksDeletionACK(NULL_RPC_CONTROLLER,
|
||||
deletedBlocks);
|
||||
} catch (ServiceException e) {
|
||||
throw ProtobufHelper.getRemoteException(e);
|
||||
}
|
||||
return resp;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,6 +24,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
|
|||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
|
@ -97,4 +99,15 @@ public class StorageContainerDatanodeProtocolServerSideTranslatorPB
|
|||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
|
||||
RpcController controller, ContainerBlocksDeletionACKProto request)
|
||||
throws ServiceException {
|
||||
try {
|
||||
return impl.sendContainerBlocksDeletionACK(request);
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -34,6 +34,7 @@ import org.apache.hadoop.ozone.client.OzoneClientUtils;
|
|||
import org.apache.hadoop.ozone.OzoneConfigKeys;
|
||||
import org.apache.hadoop.ozone.OzoneConfiguration;
|
||||
import org.apache.hadoop.ozone.protocol.StorageContainerDatanodeProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.RegisteredCommand;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
|
@ -52,6 +53,9 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
|
|||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMVersionResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SendContainerReportProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.Type;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto.DeleteBlockTransactionResult;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerLocationProtocolProtos.NotifyObjectCreationStageRequestProto;
|
||||
import org.apache.hadoop.ozone.protocolPB.ScmBlockLocationProtocolServerSideTranslatorPB;
|
||||
|
@ -91,6 +95,8 @@ import java.util.Map;
|
|||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
import java.util.UUID;
|
||||
import java.util.Collections;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.ozone.protocol.proto
|
||||
.ScmBlockLocationProtocolProtos.DeleteScmBlockResult.Result;
|
||||
|
@ -322,8 +328,8 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
* @throws InvalidProtocolBufferException
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
|
||||
throws InvalidProtocolBufferException {
|
||||
public SCMCommandResponseProto getCommandResponse(SCMCommand cmd)
|
||||
throws IOException {
|
||||
Type type = cmd.getType();
|
||||
SCMCommandResponseProto.Builder builder =
|
||||
SCMCommandResponseProto.newBuilder();
|
||||
|
@ -346,6 +352,17 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
.setReregisterProto(SCMReregisterCmdResponseProto
|
||||
.getDefaultInstance())
|
||||
.build();
|
||||
case deleteBlocksCommand:
|
||||
// Once SCM sends out the deletion message, increment the count.
|
||||
// this is done here instead of when SCM receives the ACK, because
|
||||
// DN might not be able to response the ACK for sometime. In case
|
||||
// it times out, SCM needs to re-send the message some more times.
|
||||
List<Long> txs = ((DeleteBlocksCommand) cmd).blocksTobeDeleted()
|
||||
.stream().map(tx -> tx.getTxID()).collect(Collectors.toList());
|
||||
this.getScmBlockManager().getDeletedBlockLog().incrementCount(txs);
|
||||
return builder.setCmdType(Type.deleteBlocksCommand)
|
||||
.setDeleteBlocksProto(((DeleteBlocksCommand) cmd).getProto())
|
||||
.build();
|
||||
default:
|
||||
throw new IllegalArgumentException("Not implemented");
|
||||
}
|
||||
|
@ -591,6 +608,7 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
datanodeRpcAddress));
|
||||
datanodeRpcServer.start();
|
||||
httpServer.start();
|
||||
scmBlockManager.start();
|
||||
|
||||
setStartTime();
|
||||
|
||||
|
@ -628,6 +646,13 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
LOG.error("Storage Container Manager HTTP server stop failed.", ex);
|
||||
}
|
||||
|
||||
try {
|
||||
LOG.info("Stopping Block Manager Service.");
|
||||
scmBlockManager.stop();
|
||||
} catch (Exception ex) {
|
||||
LOG.error("SCM block manager service stop failed.", ex);
|
||||
}
|
||||
|
||||
unregisterMXBean();
|
||||
IOUtils.cleanupWithLogger(LOG, scmContainerManager);
|
||||
IOUtils.cleanupWithLogger(LOG, scmBlockManager);
|
||||
|
@ -714,6 +739,38 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
.build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Handles the block deletion ACKs sent by datanodes. Once ACKs recieved,
|
||||
* SCM considers the blocks are deleted and update the metadata in SCM DB.
|
||||
*
|
||||
* @param acks
|
||||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
|
||||
ContainerBlocksDeletionACKProto acks) throws IOException {
|
||||
if (acks.getResultsCount() > 0) {
|
||||
List<DeleteBlockTransactionResult> resultList = acks.getResultsList();
|
||||
for (DeleteBlockTransactionResult result : resultList) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Got block deletion ACK from datanode, TXIDs={}, "
|
||||
+ "success={}", result.getTxID(), result.getSuccess());
|
||||
}
|
||||
if (result.getSuccess()) {
|
||||
LOG.info("Purging TXID={} from block deletion log", result.getTxID());
|
||||
this.getScmBlockManager().getDeletedBlockLog()
|
||||
.commitTransactions(Collections.singletonList(result.getTxID()));
|
||||
} else {
|
||||
LOG.warn("Got failed ACK for TXID={}, prepare to resend the "
|
||||
+ "TX in next interval", result.getTxID());
|
||||
}
|
||||
}
|
||||
}
|
||||
return ContainerBlocksDeletionACKResponseProto.newBuilder()
|
||||
.getDefaultInstanceForType();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Number of Datanodes that are communicating with SCM.
|
||||
*
|
||||
|
@ -742,6 +799,11 @@ public class StorageContainerManager extends ServiceRuntimeInfoImpl
|
|||
return scmNodeManager;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public BlockManager getScmBlockManager() {
|
||||
return scmBlockManager;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get block locations.
|
||||
* @param keys batch of block keys to retrieve.
|
||||
|
|
|
@ -51,4 +51,21 @@ public interface BlockManager extends Closeable {
|
|||
* @throws IOException
|
||||
*/
|
||||
void deleteBlock(String key) throws IOException;
|
||||
|
||||
/**
|
||||
* @return the block deletion transaction log maintained by SCM.
|
||||
*/
|
||||
DeletedBlockLog getDeletedBlockLog();
|
||||
|
||||
/**
|
||||
* Start block manager background services.
|
||||
* @throws IOException
|
||||
*/
|
||||
void start() throws IOException;
|
||||
|
||||
/**
|
||||
* Shutdown block manager background services.
|
||||
* @throws IOException
|
||||
*/
|
||||
void stop() throws IOException;
|
||||
}
|
||||
|
|
|
@ -46,6 +46,7 @@ import java.io.IOException;
|
|||
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.List;
|
||||
|
@ -70,6 +71,14 @@ import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
|
|||
FAILED_TO_LOAD_OPEN_CONTAINER;
|
||||
import static org.apache.hadoop.ozone.scm.exceptions.SCMException.ResultCodes.
|
||||
INVALID_BLOCK_SIZE;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT;
|
||||
import static org.apache.hadoop.ozone.OzoneConfigKeys
|
||||
.OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT;
|
||||
|
||||
/**
|
||||
* Block Manager manages the block access for SCM.
|
||||
|
@ -89,6 +98,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|||
// Track all containers owned by block service.
|
||||
private final MetadataStore containerStore;
|
||||
private final DeletedBlockLog deletedBlockLog;
|
||||
private final SCMBlockDeletingService blockDeletingService;
|
||||
|
||||
private Map<OzoneProtos.LifeCycleState,
|
||||
Map<String, BlockContainerInfo>> containers;
|
||||
|
@ -143,7 +153,34 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|||
this.lock = new ReentrantLock();
|
||||
|
||||
mxBean = MBeans.register("BlockManager", "BlockManagerImpl", this);
|
||||
|
||||
// SCM block deleting transaction log and deleting service.
|
||||
deletedBlockLog = new DeletedBlockLogImpl(conf);
|
||||
int svcInterval = conf.getInt(
|
||||
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS,
|
||||
OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS_DEFAULT);
|
||||
long serviceTimeout = conf.getTimeDuration(
|
||||
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT,
|
||||
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
|
||||
blockDeletingService = new SCMBlockDeletingService(deletedBlockLog,
|
||||
containerManager, nodeManager, svcInterval, serviceTimeout);
|
||||
}
|
||||
|
||||
/**
|
||||
* Start block manager services.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void start() throws IOException {
|
||||
this.blockDeletingService.start();
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdown block manager services.
|
||||
* @throws IOException
|
||||
*/
|
||||
public void stop() throws IOException {
|
||||
this.blockDeletingService.shutdown();
|
||||
this.close();
|
||||
}
|
||||
|
||||
// TODO: close full (or almost full) containers with a separate thread.
|
||||
|
@ -475,6 +512,11 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public DeletedBlockLog getDeletedBlockLog() {
|
||||
return this.deletedBlockLog;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public String getDeletedKeyName(String key) {
|
||||
return StringUtils.format(".Deleted/%s", key);
|
||||
|
@ -495,6 +537,7 @@ public class BlockManagerImpl implements BlockManager, BlockmanagerMXBean {
|
|||
if (deletedBlockLog != null) {
|
||||
deletedBlockLog.close();
|
||||
}
|
||||
blockDeletingService.shutdown();
|
||||
MBeans.unregister(mxBean);
|
||||
}
|
||||
|
||||
|
|
|
@ -17,7 +17,6 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone.scm.block;
|
||||
|
||||
|
||||
import org.apache.hadoop.ozone.protocol.proto
|
||||
.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||
|
||||
|
@ -45,6 +44,17 @@ public interface DeletedBlockLog extends Closeable {
|
|||
List<DeletedBlocksTransaction> getTransactions(int count)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Return all failed transactions in the log. A transaction is considered
|
||||
* to be failed if it has been sent more than MAX_RETRY limit and its
|
||||
* count is reset to -1.
|
||||
*
|
||||
* @return a list of failed deleted block transactions.
|
||||
* @throws IOException
|
||||
*/
|
||||
List<DeletedBlocksTransaction> getFailedTransactions()
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Increments count for given list of transactions by 1.
|
||||
* The log maintains a valid range of counts for each transaction
|
||||
|
@ -75,4 +85,13 @@ public interface DeletedBlockLog extends Closeable {
|
|||
*/
|
||||
void addTransaction(String containerName, List<String> blocks)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Returns the total number of valid transactions. A transaction is
|
||||
* considered to be valid as long as its count is in range [0, MAX_RETRY].
|
||||
*
|
||||
* @return number of a valid transactions.
|
||||
* @throws IOException
|
||||
*/
|
||||
int getNumOfValidTransactions() throws IOException;
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.ozone.scm.block;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.primitives.Longs;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
|
@ -36,6 +37,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
|
@ -147,6 +149,28 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
|||
return result;
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<DeletedBlocksTransaction> getFailedTransactions()
|
||||
throws IOException {
|
||||
lock.lock();
|
||||
try {
|
||||
final List<DeletedBlocksTransaction> failedTXs = Lists.newArrayList();
|
||||
deletedStore.iterate(null, (key, value) -> {
|
||||
if (!Arrays.equals(LATEST_TXID, key)) {
|
||||
DeletedBlocksTransaction delTX =
|
||||
DeletedBlocksTransaction.parseFrom(value);
|
||||
if (delTX.getCount() == -1) {
|
||||
failedTXs.add(delTX);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
return failedTXs;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* {@inheritDoc}
|
||||
*
|
||||
|
@ -163,13 +187,14 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
|||
DeletedBlocksTransaction block = DeletedBlocksTransaction
|
||||
.parseFrom(deletedStore.get(Longs.toByteArray(txID)));
|
||||
DeletedBlocksTransaction.Builder builder = block.toBuilder();
|
||||
if (block.getCount() > -1) {
|
||||
builder.setCount(block.getCount() + 1);
|
||||
int currentCount = block.getCount();
|
||||
if (currentCount > -1) {
|
||||
builder.setCount(++currentCount);
|
||||
}
|
||||
// if the retry time exceeds the maxRetry value
|
||||
// then set the retry value to -1, stop retrying, admins can
|
||||
// analyze those blocks and purge them manually by SCMCli.
|
||||
if (block.getCount() > maxRetry) {
|
||||
if (currentCount > maxRetry) {
|
||||
builder.setCount(-1);
|
||||
}
|
||||
deletedStore.put(Longs.toByteArray(txID),
|
||||
|
@ -237,6 +262,28 @@ public class DeletedBlockLogImpl implements DeletedBlockLog {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumOfValidTransactions() throws IOException {
|
||||
lock.lock();
|
||||
try {
|
||||
final AtomicInteger num = new AtomicInteger(0);
|
||||
deletedStore.iterate(null, (key, value) -> {
|
||||
// Exclude latest txid record
|
||||
if (!Arrays.equals(LATEST_TXID, key)) {
|
||||
DeletedBlocksTransaction delTX =
|
||||
DeletedBlocksTransaction.parseFrom(value);
|
||||
if (delTX.getCount() > -1) {
|
||||
num.incrementAndGet();
|
||||
}
|
||||
}
|
||||
return true;
|
||||
});
|
||||
return num.get();
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (deletedStore != null) {
|
||||
|
|
|
@ -0,0 +1,217 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone.scm.block;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
|
||||
import org.apache.hadoop.ozone.scm.container.Mapping;
|
||||
import org.apache.hadoop.ozone.scm.node.NodeManager;
|
||||
import org.apache.hadoop.scm.container.common.helpers.ContainerInfo;
|
||||
import org.apache.hadoop.utils.BackgroundService;
|
||||
import org.apache.hadoop.utils.BackgroundTask;
|
||||
import org.apache.hadoop.utils.BackgroundTaskQueue;
|
||||
import org.apache.hadoop.utils.BackgroundTaskResult.EmptyTaskResult;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* A background service running in SCM to delete blocks. This service scans
|
||||
* block deletion log in certain interval and caches block deletion commands
|
||||
* in {@link org.apache.hadoop.ozone.scm.node.CommandQueue}, asynchronously
|
||||
* SCM HB thread polls cached commands and sends them to datanode for physical
|
||||
* processing.
|
||||
*/
|
||||
public class SCMBlockDeletingService extends BackgroundService {
|
||||
|
||||
private static final Logger LOG =
|
||||
LoggerFactory.getLogger(SCMBlockDeletingService.class);
|
||||
|
||||
// ThreadPoolSize=2, 1 for scheduler and the other for the scanner.
|
||||
private final static int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 2;
|
||||
private final DeletedBlockLog deletedBlockLog;
|
||||
private final Mapping mappingService;
|
||||
private final NodeManager nodeManager;
|
||||
|
||||
// Default container size is 5G and block size is 256MB, a full container
|
||||
// at most contains 20 blocks. At most each TX contains 20 blocks.
|
||||
// When SCM sends block deletion TXs to datanode, each command we allow
|
||||
// at most 50 containers so that will limit number of to be deleted blocks
|
||||
// less than 1000.
|
||||
// TODO - a better throttle algorithm
|
||||
// Note, this is not an accurate limit of blocks. When we scan
|
||||
// the log, worst case we may get 50 TX for 50 different datanodes,
|
||||
// that will cause the deletion message sent by SCM extremely small.
|
||||
// As a result, the deletion will be slow. An improvement is to scan
|
||||
// log multiple times until we get enough TXs for each datanode, or
|
||||
// the entire log is scanned.
|
||||
private static final int BLOCK_DELETE_TX_PER_REQUEST_LIMIT = 50;
|
||||
|
||||
public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
|
||||
Mapping mapper, NodeManager nodeManager,
|
||||
int interval, long serviceTimeout) {
|
||||
super("SCMBlockDeletingService", interval, TimeUnit.MILLISECONDS,
|
||||
BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
|
||||
this.deletedBlockLog = deletedBlockLog;
|
||||
this.mappingService = mapper;
|
||||
this.nodeManager = nodeManager;
|
||||
}
|
||||
|
||||
@Override
|
||||
public BackgroundTaskQueue getTasks() {
|
||||
BackgroundTaskQueue queue = new BackgroundTaskQueue();
|
||||
queue.add(new DeletedBlockTransactionScanner());
|
||||
return queue;
|
||||
}
|
||||
|
||||
private class DeletedBlockTransactionScanner
|
||||
implements BackgroundTask<EmptyTaskResult> {
|
||||
|
||||
@Override
|
||||
public int getPriority() {
|
||||
return 1;
|
||||
}
|
||||
|
||||
@Override
|
||||
public EmptyTaskResult call() throws Exception {
|
||||
// Scan SCM DB in HB interval and collect a throttled list of
|
||||
// to delete blocks.
|
||||
LOG.info("Running DeletedBlockTransactionScanner");
|
||||
DatanodeDeletedBlockTransactions transactions =
|
||||
getToDeleteContainerBlocks();
|
||||
if (transactions != null && !transactions.isEmpty()) {
|
||||
transactions.getDatanodes().forEach(datanodeID -> {
|
||||
List<DeletedBlocksTransaction> dnTXs =
|
||||
transactions.getDatanodeTransactions(datanodeID);
|
||||
// TODO commandQueue needs a cap.
|
||||
// We should stop caching new commands if num of un-processed
|
||||
// command is bigger than a limit, e.g 50. In case datanode goes
|
||||
// offline for sometime, the cached commands be flooded.
|
||||
nodeManager.addDatanodeCommand(datanodeID,
|
||||
new DeleteBlocksCommand(dnTXs));
|
||||
LOG.info("Added delete block command for datanode {} in the queue,"
|
||||
+ " number delete block transactions: {}, TxID list: {}",
|
||||
datanodeID, dnTXs.size(),
|
||||
String.join(",", transactions.getTransactionIDList(datanodeID)));
|
||||
|
||||
});
|
||||
}
|
||||
return EmptyTaskResult.newResult();
|
||||
}
|
||||
|
||||
// Scan deleteBlocks.db to get a number of to-delete blocks.
|
||||
// this is going to be properly throttled.
|
||||
private DatanodeDeletedBlockTransactions getToDeleteContainerBlocks() {
|
||||
DatanodeDeletedBlockTransactions dnTXs =
|
||||
new DatanodeDeletedBlockTransactions();
|
||||
List<DeletedBlocksTransaction> txs = null;
|
||||
try {
|
||||
// Get a limited number of TXs to send via HB at a time.
|
||||
txs = deletedBlockLog
|
||||
.getTransactions(BLOCK_DELETE_TX_PER_REQUEST_LIMIT);
|
||||
LOG.info("Scanned deleted blocks log and got {} delTX to process",
|
||||
txs.size());
|
||||
} catch (IOException e) {
|
||||
// We may tolerant a number of failures for sometime
|
||||
// but if it continues to fail, at some point we need to raise
|
||||
// an exception and probably fail the SCM ? At present, it simply
|
||||
// continues to retry the scanning.
|
||||
LOG.error("Failed to get block deletion transactions from delTX log",
|
||||
e);
|
||||
}
|
||||
|
||||
if (txs != null) {
|
||||
for (DeletedBlocksTransaction tx : txs) {
|
||||
try {
|
||||
ContainerInfo info = mappingService
|
||||
.getContainer(tx.getContainerName());
|
||||
// Find out the datanode where this TX is supposed to send to.
|
||||
info.getPipeline().getMachines()
|
||||
.forEach(entry -> dnTXs.addTransaction(entry, tx));
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Container {} not found, continue to process next",
|
||||
tx.getContainerName(), e);
|
||||
}
|
||||
}
|
||||
}
|
||||
return dnTXs;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A wrapper class to hold info about datanode and all deleted block
|
||||
* transactions that will be sent to this datanode.
|
||||
*/
|
||||
private static class DatanodeDeletedBlockTransactions {
|
||||
|
||||
// A list of TXs mapped to a certain datanode ID.
|
||||
private final Map<DatanodeID, List<DeletedBlocksTransaction>> transactions;
|
||||
|
||||
DatanodeDeletedBlockTransactions() {
|
||||
this.transactions = Maps.newHashMap();
|
||||
}
|
||||
|
||||
void addTransaction(DatanodeID dnID, DeletedBlocksTransaction tx) {
|
||||
if (transactions.containsKey(dnID)) {
|
||||
transactions.get(dnID).add(tx);
|
||||
} else {
|
||||
List<DeletedBlocksTransaction> first = Lists.newArrayList();
|
||||
first.add(tx);
|
||||
transactions.put(dnID, first);
|
||||
}
|
||||
LOG.info("Transaction added: {} <- TX({})", dnID, tx.getTxID());
|
||||
}
|
||||
|
||||
Set<DatanodeID> getDatanodes() {
|
||||
return transactions.keySet();
|
||||
}
|
||||
|
||||
boolean isEmpty() {
|
||||
return transactions.isEmpty();
|
||||
}
|
||||
|
||||
boolean hasTransactions(DatanodeID dnID) {
|
||||
return transactions.containsKey(dnID) &&
|
||||
!transactions.get(dnID).isEmpty();
|
||||
}
|
||||
|
||||
List<DeletedBlocksTransaction> getDatanodeTransactions(DatanodeID dnID) {
|
||||
return transactions.get(dnID);
|
||||
}
|
||||
|
||||
List<String> getTransactionIDList(DatanodeID dnID) {
|
||||
if (hasTransactions(dnID)) {
|
||||
return transactions.get(dnID).stream()
|
||||
.map(DeletedBlocksTransaction::getTxID)
|
||||
.map(String::valueOf)
|
||||
.collect(Collectors.toList());
|
||||
} else {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.UnregisteredNodeException;
|
||||
import org.apache.hadoop.ozone.protocol.StorageContainerNodeProtocol;
|
||||
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos.NodeState;
|
||||
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeMetric;
|
||||
import org.apache.hadoop.ozone.scm.container.placement.metrics.SCMNodeStat;
|
||||
|
@ -134,4 +135,12 @@ public interface NodeManager extends StorageContainerNodeProtocol,
|
|||
* @return Healthy/Stale/Dead.
|
||||
*/
|
||||
NodeState getNodeState(DatanodeID id);
|
||||
|
||||
/**
|
||||
* Add a {@link SCMCommand} to the command queue, which are
|
||||
* handled by HB thread asynchronously.
|
||||
* @param id
|
||||
* @param command
|
||||
*/
|
||||
default void addDatanodeCommand(DatanodeID id, SCMCommand command) {}
|
||||
}
|
||||
|
|
|
@ -836,4 +836,9 @@ public class SCMNodeManager
|
|||
}
|
||||
return nodeCountMap;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addDatanodeCommand(DatanodeID id, SCMCommand command) {
|
||||
this.commandQueue.addCommand(id, command);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,7 +23,22 @@ package org.apache.hadoop.utils;
|
|||
public interface BackgroundTaskResult {
|
||||
|
||||
/**
|
||||
* Returns the size of entries included in this result.
|
||||
* Returns the size of entries included in this result.
|
||||
*/
|
||||
int getSize();
|
||||
|
||||
/**
|
||||
* An empty task result implementation.
|
||||
*/
|
||||
class EmptyTaskResult implements BackgroundTaskResult {
|
||||
|
||||
public static EmptyTaskResult newResult() {
|
||||
return new EmptyTaskResult();
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getSize() {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -122,7 +122,6 @@ message ContainerReportsProto {
|
|||
required reportType type = 3;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* This message is send along with the heart beat to report datanode
|
||||
* storage utilization by SCM.
|
||||
|
@ -210,6 +209,7 @@ enum Type {
|
|||
registeredCommand = 3;
|
||||
sendContainerReport = 4;
|
||||
reregisterCommand = 5;
|
||||
deleteBlocksCommand = 6;
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -221,6 +221,7 @@ message SCMCommandResponseProto {
|
|||
optional SCMVersionResponseProto versionProto = 4;
|
||||
optional SendContainerReportProto sendReport = 5;
|
||||
optional SCMReregisterCmdResponseProto reregisterProto = 6;
|
||||
optional SCMDeleteBlocksCmdResponseProto deleteBlocksProto = 7;
|
||||
}
|
||||
|
||||
|
||||
|
@ -231,6 +232,24 @@ message SCMHeartbeatResponseProto {
|
|||
repeated SCMCommandResponseProto commands = 1;
|
||||
}
|
||||
|
||||
// HB response from SCM, contains a list of block deletion transactions.
|
||||
message SCMDeleteBlocksCmdResponseProto {
|
||||
repeated DeletedBlocksTransaction deletedBlocksTransactions = 1;
|
||||
}
|
||||
|
||||
// SendACK response returned by datanode to SCM, currently empty.
|
||||
message ContainerBlocksDeletionACKResponseProto {
|
||||
}
|
||||
|
||||
// ACK message datanode sent to SCM, contains the result of
|
||||
// block deletion transactions.
|
||||
message ContainerBlocksDeletionACKProto {
|
||||
message DeleteBlockTransactionResult {
|
||||
required int64 txID = 1;
|
||||
required bool success = 2;
|
||||
}
|
||||
repeated DeleteBlockTransactionResult results = 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Protocol used from a datanode to StorageContainerManager.
|
||||
|
@ -318,6 +337,10 @@ service StorageContainerDatanodeProtocolService {
|
|||
send container reports sends the container report to SCM. This will
|
||||
return a null command as response.
|
||||
*/
|
||||
rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto);
|
||||
rpc sendContainerReport(ContainerReportsProto) returns (SCMHeartbeatResponseProto);
|
||||
|
||||
/**
|
||||
* Sends the block deletion ACK to SCM.
|
||||
*/
|
||||
rpc sendContainerBlocksDeletionACK (ContainerBlocksDeletionACKProto) returns (ContainerBlocksDeletionACKResponseProto);
|
||||
}
|
||||
|
|
|
@ -17,22 +17,35 @@
|
|||
*/
|
||||
package org.apache.hadoop.ozone;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.hadoop.ozone.protocol.proto.OzoneProtos;
|
||||
import org.apache.hadoop.ozone.scm.StorageContainerManager;
|
||||
import org.apache.hadoop.ozone.scm.block.DeletedBlockLog;
|
||||
import org.apache.hadoop.scm.XceiverClientManager;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.ExpectedException;
|
||||
import java.util.List;
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
import java.util.Collections;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyLocationInfo;
|
||||
import org.apache.hadoop.scm.ScmConfigKeys;
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.rules.Timeout;
|
||||
import org.mockito.Mockito;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
|
||||
/**
|
||||
* Test class that exercises the StorageContainerManager.
|
||||
|
@ -149,4 +162,103 @@ public class TestStorageContainerManager {
|
|||
Assert.assertTrue(e instanceof IOException);
|
||||
Assert.assertEquals(expectedErrorMessage, e.getMessage());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testBlockDeletionTransactions() throws Exception {
|
||||
OzoneConfiguration conf = new OzoneConfiguration();
|
||||
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_INTERVAL_SECONDS, 5);
|
||||
conf.setInt(ScmConfigKeys.OZONE_SCM_HEARTBEAT_PROCESS_INTERVAL_MS, 3000);
|
||||
conf.setInt(ScmConfigKeys.OZONE_SCM_BLOCK_DELETION_MAX_RETRY, 5);
|
||||
conf.setInt(OzoneConfigKeys.OZONE_BLOCK_DELETING_SERVICE_INTERVAL_MS, 1000);
|
||||
MiniOzoneCluster cluster =
|
||||
new MiniOzoneCluster.Builder(conf).numDataNodes(1)
|
||||
.setHandlerType(OzoneConsts.OZONE_HANDLER_DISTRIBUTED).build();
|
||||
|
||||
DeletedBlockLog delLog = cluster.getStorageContainerManager()
|
||||
.getScmBlockManager().getDeletedBlockLog();
|
||||
Assert.assertEquals(0, delLog.getNumOfValidTransactions());
|
||||
|
||||
// Create 20 random names keys.
|
||||
TestStorageContainerManagerHelper helper =
|
||||
new TestStorageContainerManagerHelper(cluster, conf);
|
||||
Map<String, KsmKeyInfo> keyLocations = helper.createKeys(20, 4096);
|
||||
|
||||
// These keys will be written into a bunch of containers,
|
||||
// gets a set of container names, verify container containerBlocks
|
||||
// on datanodes.
|
||||
Set<String> containerNames = new HashSet<>();
|
||||
for (Map.Entry<String, KsmKeyInfo> entry : keyLocations.entrySet()) {
|
||||
entry.getValue().getKeyLocationList()
|
||||
.forEach(loc -> containerNames.add(loc.getContainerName()));
|
||||
}
|
||||
|
||||
// Total number of containerBlocks of these containers should be equal to
|
||||
// total number of containerBlocks via creation call.
|
||||
int totalCreatedBlocks = 0;
|
||||
for (KsmKeyInfo info : keyLocations.values()) {
|
||||
totalCreatedBlocks += info.getKeyLocationList().size();
|
||||
}
|
||||
Assert.assertTrue(totalCreatedBlocks > 0);
|
||||
Assert.assertEquals(totalCreatedBlocks,
|
||||
helper.getAllBlocks(containerNames).size());
|
||||
|
||||
// Create a deletion TX for each key.
|
||||
Map<String, List<String>> containerBlocks = Maps.newHashMap();
|
||||
for (KsmKeyInfo info : keyLocations.values()) {
|
||||
List<KsmKeyLocationInfo> list = info.getKeyLocationList();
|
||||
list.forEach(location -> {
|
||||
if (containerBlocks.containsKey(location.getContainerName())) {
|
||||
containerBlocks.get(location.getContainerName())
|
||||
.add(location.getBlockID());
|
||||
} else {
|
||||
List<String> blks = Lists.newArrayList();
|
||||
blks.add(location.getBlockID());
|
||||
containerBlocks.put(location.getContainerName(), blks);
|
||||
}
|
||||
});
|
||||
}
|
||||
for (Map.Entry<String, List<String>> tx : containerBlocks.entrySet()) {
|
||||
delLog.addTransaction(tx.getKey(), tx.getValue());
|
||||
}
|
||||
|
||||
// Verify a few TX gets created in the TX log.
|
||||
Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
|
||||
|
||||
// Once TXs are written into the log, SCM starts to fetch TX
|
||||
// entries from the log and schedule block deletions in HB interval,
|
||||
// after sometime, all the TX should be proceed and by then
|
||||
// the number of containerBlocks of all known containers will be
|
||||
// empty again.
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
try {
|
||||
return delLog.getNumOfValidTransactions() == 0;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}, 1000, 10000);
|
||||
Assert.assertTrue(helper.getAllBlocks(containerNames).isEmpty());
|
||||
|
||||
// Continue the work, add some TXs that with known container names,
|
||||
// but unknown block IDs.
|
||||
for (String containerName : containerBlocks.keySet()) {
|
||||
// Add 2 TXs per container.
|
||||
delLog.addTransaction(containerName,
|
||||
Collections.singletonList(RandomStringUtils.randomAlphabetic(5)));
|
||||
delLog.addTransaction(containerName,
|
||||
Collections.singletonList(RandomStringUtils.randomAlphabetic(5)));
|
||||
}
|
||||
|
||||
// Verify a few TX gets created in the TX log.
|
||||
Assert.assertTrue(delLog.getNumOfValidTransactions() > 0);
|
||||
|
||||
// These blocks cannot be found in the container, skip deleting them
|
||||
// eventually these TX will success.
|
||||
GenericTestUtils.waitFor(() -> {
|
||||
try {
|
||||
return delLog.getFailedTransactions().size() == 0;
|
||||
} catch (IOException e) {
|
||||
return false;
|
||||
}
|
||||
}, 1000, 10000);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,180 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with this
|
||||
* work for additional information regarding copyright ownership. The ASF
|
||||
* licenses this file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
* License for the specific language governing permissions and limitations under
|
||||
* the License.
|
||||
*/
|
||||
package org.apache.hadoop.ozone;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.commons.lang.RandomStringUtils;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.StorageType;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ObjectStoreHandler;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.ContainerData;
|
||||
import org.apache.hadoop.ozone.container.common.helpers.KeyUtils;
|
||||
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyArgs;
|
||||
import org.apache.hadoop.ozone.ksm.helpers.KsmKeyInfo;
|
||||
import org.apache.hadoop.ozone.web.handlers.BucketArgs;
|
||||
import org.apache.hadoop.ozone.web.handlers.KeyArgs;
|
||||
import org.apache.hadoop.ozone.web.handlers.UserArgs;
|
||||
import org.apache.hadoop.ozone.web.handlers.VolumeArgs;
|
||||
import org.apache.hadoop.ozone.web.interfaces.StorageHandler;
|
||||
import org.apache.hadoop.ozone.web.utils.OzoneUtils;
|
||||
import org.apache.hadoop.scm.container.common.helpers.Pipeline;
|
||||
import org.apache.hadoop.utils.MetadataKeyFilters.KeyPrefixFilter;
|
||||
import org.apache.hadoop.utils.MetadataKeyFilters.MetadataKeyFilter;
|
||||
import org.apache.hadoop.utils.MetadataStore;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.OutputStream;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Set;
|
||||
|
||||
/**
|
||||
* A helper class used by {@link TestStorageContainerManager} to generate
|
||||
* some keys and helps to verify containers and blocks locations.
|
||||
*/
|
||||
public class TestStorageContainerManagerHelper {
|
||||
|
||||
private final MiniOzoneCluster cluster;
|
||||
private final Configuration conf;
|
||||
private final StorageHandler storageHandler;
|
||||
|
||||
public TestStorageContainerManagerHelper(MiniOzoneCluster cluster,
|
||||
Configuration conf) throws IOException {
|
||||
this.cluster = cluster;
|
||||
this.conf = conf;
|
||||
storageHandler = new ObjectStoreHandler(conf).getStorageHandler();
|
||||
}
|
||||
|
||||
public Map<String, KsmKeyInfo> createKeys(int numOfKeys, int keySize)
|
||||
throws Exception {
|
||||
Map<String, KsmKeyInfo> keyLocationMap = Maps.newHashMap();
|
||||
String volume = "volume" + RandomStringUtils.randomNumeric(5);
|
||||
String bucket = "bucket" + RandomStringUtils.randomNumeric(5);
|
||||
String userName = "user" + RandomStringUtils.randomNumeric(5);
|
||||
String adminName = "admin" + RandomStringUtils.randomNumeric(5);
|
||||
UserArgs userArgs = new UserArgs(null, OzoneUtils.getRequestID(),
|
||||
null, null, null, null);
|
||||
|
||||
VolumeArgs createVolumeArgs = new VolumeArgs(volume, userArgs);
|
||||
createVolumeArgs.setUserName(userName);
|
||||
createVolumeArgs.setAdminName(adminName);
|
||||
storageHandler.createVolume(createVolumeArgs);
|
||||
|
||||
BucketArgs bucketArgs = new BucketArgs(bucket, createVolumeArgs);
|
||||
bucketArgs.setAddAcls(new LinkedList<>());
|
||||
bucketArgs.setRemoveAcls(new LinkedList<>());
|
||||
bucketArgs.setStorageType(StorageType.DISK);
|
||||
storageHandler.createBucket(bucketArgs);
|
||||
|
||||
// Write 20 keys in bucket.
|
||||
Set<String> keyNames = Sets.newHashSet();
|
||||
KeyArgs keyArgs;
|
||||
for (int i = 0; i < numOfKeys; i++) {
|
||||
String keyName = RandomStringUtils.randomAlphabetic(5) + i;
|
||||
keyNames.add(keyName);
|
||||
keyArgs = new KeyArgs(keyName, bucketArgs);
|
||||
keyArgs.setSize(keySize);
|
||||
// Just for testing list keys call, so no need to write real data.
|
||||
OutputStream stream = storageHandler.newKeyWriter(keyArgs);
|
||||
stream.write(DFSUtil.string2Bytes(
|
||||
RandomStringUtils.randomAlphabetic(5)));
|
||||
stream.close();
|
||||
}
|
||||
|
||||
for (String key : keyNames) {
|
||||
KsmKeyArgs arg = new KsmKeyArgs.Builder()
|
||||
.setVolumeName(volume)
|
||||
.setBucketName(bucket)
|
||||
.setKeyName(key)
|
||||
.build();
|
||||
KsmKeyInfo location = cluster.getKeySpaceManager()
|
||||
.lookupKey(arg);
|
||||
keyLocationMap.put(key, location);
|
||||
}
|
||||
return keyLocationMap;
|
||||
}
|
||||
|
||||
public List<String> getPendingDeletionBlocks(String containerName)
|
||||
throws IOException {
|
||||
List<String> pendingDeletionBlocks = Lists.newArrayList();
|
||||
MetadataStore meta = getContainerMetadata(containerName);
|
||||
KeyPrefixFilter filter =
|
||||
new KeyPrefixFilter(OzoneConsts.DELETING_KEY_PREFIX);
|
||||
List<Map.Entry<byte[], byte[]>> kvs = meta
|
||||
.getRangeKVs(null, Integer.MAX_VALUE, filter);
|
||||
kvs.forEach(entry -> {
|
||||
String key = DFSUtil.bytes2String(entry.getKey());
|
||||
pendingDeletionBlocks
|
||||
.add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, ""));
|
||||
});
|
||||
return pendingDeletionBlocks;
|
||||
}
|
||||
|
||||
public List<String> getAllBlocks(Set<String> containerNames)
|
||||
throws IOException {
|
||||
List<String> allBlocks = Lists.newArrayList();
|
||||
for (String containerName : containerNames) {
|
||||
allBlocks.addAll(getAllBlocks(containerName));
|
||||
}
|
||||
return allBlocks;
|
||||
}
|
||||
|
||||
public List<String> getAllBlocks(String containerName) throws IOException {
|
||||
List<String> allBlocks = Lists.newArrayList();
|
||||
MetadataStore meta = getContainerMetadata(containerName);
|
||||
MetadataKeyFilter filter =
|
||||
(preKey, currentKey, nextKey) -> !DFSUtil.bytes2String(currentKey)
|
||||
.startsWith(OzoneConsts.DELETING_KEY_PREFIX);
|
||||
List<Map.Entry<byte[], byte[]>> kvs =
|
||||
meta.getRangeKVs(null, Integer.MAX_VALUE, filter);
|
||||
kvs.forEach(entry -> {
|
||||
String key = DFSUtil.bytes2String(entry.getKey());
|
||||
allBlocks.add(key.replace(OzoneConsts.DELETING_KEY_PREFIX, ""));
|
||||
});
|
||||
return allBlocks;
|
||||
}
|
||||
|
||||
private MetadataStore getContainerMetadata(String containerName)
|
||||
throws IOException {
|
||||
Pipeline pipeline = cluster.getStorageContainerManager()
|
||||
.getContainer(containerName);
|
||||
DatanodeID leadDN = pipeline.getLeader();
|
||||
OzoneContainer containerServer =
|
||||
getContainerServerByDatanodeID(leadDN.getDatanodeUuid());
|
||||
ContainerData containerData = containerServer.getContainerManager()
|
||||
.readContainer(containerName);
|
||||
return KeyUtils.getDB(containerData, conf);
|
||||
}
|
||||
|
||||
private OzoneContainer getContainerServerByDatanodeID(String dnUUID)
|
||||
throws IOException {
|
||||
for (DataNode dn : cluster.getDataNodes()) {
|
||||
if (dn.getDatanodeId().getDatanodeUuid().equals(dnUUID)) {
|
||||
return dn.getOzoneContainerManager();
|
||||
}
|
||||
}
|
||||
throw new IOException("Unable to get the ozone container "
|
||||
+ "for given datanode ID " + dnUUID);
|
||||
}
|
||||
}
|
|
@ -25,6 +25,8 @@ import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolPr
|
|||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandResponseProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ReportState;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMNodeReport;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKProto;
|
||||
import org.apache.hadoop.ozone.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerBlocksDeletionACKResponseProto;
|
||||
import org.apache.hadoop.ozone.scm.VersionInfo;
|
||||
|
||||
import java.io.IOException;
|
||||
|
@ -188,6 +190,13 @@ public class ScmTestMock implements StorageContainerDatanodeProtocol {
|
|||
.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public ContainerBlocksDeletionACKResponseProto sendContainerBlocksDeletionACK(
|
||||
ContainerBlocksDeletionACKProto request) throws IOException {
|
||||
return ContainerBlocksDeletionACKResponseProto
|
||||
.newBuilder().getDefaultInstanceForType();
|
||||
}
|
||||
|
||||
public ReportState getReportState() {
|
||||
return this.reportState;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue