HDFS-10794. [SPS]: Provide storage policy satisfy worker at DN for co-ordinating the block storage movement work. Contributed by Rakesh R

This commit is contained in:
Kai Zheng 2016-09-14 17:02:11 +08:00 committed by Uma Maheswara Rao Gangumalla
parent a2a8c48699
commit 5692887395
3 changed files with 519 additions and 0 deletions

View File

@ -0,0 +1,258 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
* StoragePolicySatisfyWorker handles the storage policy satisfier commands.
* These commands would be issued from NameNode as part of Datanode's heart beat
* response. BPOfferService delegates the work to this class for handling
* BlockStorageMovement commands.
public class StoragePolicySatisfyWorker {
private static final Logger LOG = LoggerFactory
private final DataNode datanode;
private final int ioFileBufferSize;
private final int moverThreads;
private final ExecutorService moveExecutor;
private final CompletionService<Void> moverExecutorCompletionService;
private final List<Future<Void>> moverTaskFutures;
public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode;
this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
moverExecutorCompletionService = new ExecutorCompletionService<>(
moverTaskFutures = new ArrayList<>();
// TODO: Needs to manage the number of concurrent moves per DataNode.
private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) {
LOG.debug("Block mover to satisfy storage policy; pool threads={}", num);
ThreadPoolExecutor moverThreadPool = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("BlockMoverTask-" + threadIndex.getAndIncrement());
return t;
}, new ThreadPoolExecutor.CallerRunsPolicy() {
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor e) {
LOG.info("Execution for block movement to satisfy storage policy"
+ " got rejected, Executing in current thread");
// will run in the current thread.
super.rejectedExecution(runnable, e);
return moverThreadPool;
public void processBlockMovingTasks(long trackID,
List<BlockMovingInfo> blockMovingInfos) {
Future<Void> moveCallable = null;
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
assert blkMovingInfo
.getSources().length == blkMovingInfo.getTargets().length;
for (int i = 0; i < blkMovingInfo.getSources().length; i++) {
BlockMovingTask blockMovingTask =
new BlockMovingTask(blkMovingInfo.getBlock(),
moveCallable = moverExecutorCompletionService
// TODO: Presently this function act as a blocking call, this has to be
// refined by moving the tracking logic to another tracker thread.
for (int i = 0; i < moverTaskFutures.size(); i++) {
try {
moveCallable = moverExecutorCompletionService.take();
} catch (InterruptedException | ExecutionException e) {
// TODO: Failure retries and report back the error to NameNode.
LOG.error("Exception while moving block replica to target storage type",
* This class encapsulates the process of moving the block replica to the
* given target.
private class BlockMovingTask implements Callable<Void> {
private final ExtendedBlock block;
private final DatanodeInfo source;
private final DatanodeInfo target;
private final StorageType targetStorageType;
BlockMovingTask(ExtendedBlock block, DatanodeInfo source,
DatanodeInfo target, StorageType targetStorageType) {
this.block = block;
this.source = source;
this.target = target;
this.targetStorageType = targetStorageType;
public Void call() {
return null;
private void moveBlock() {
LOG.info("Start moving block {}", block);
LOG.debug("Start moving block:{} from src:{} to destin:{} to satisfy "
+ "storageType:{}", block, source, target, targetStorageType);
Socket sock = null;
DataOutputStream out = null;
DataInputStream in = null;
try {
DNConf dnConf = datanode.getDnConf();
String dnAddr = target.getXferAddr(dnConf.getConnectToDnViaHostname());
sock = datanode.newSocket();
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr),
sock.setSoTimeout(2 * dnConf.getSocketTimeout());
LOG.debug("Connecting to datanode {}", dnAddr);
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
block, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE));
DataEncryptionKeyFactory keyFactory = datanode
IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock,
unbufOut, unbufIn, keyFactory, accessToken, target);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(
new BufferedOutputStream(unbufOut, ioFileBufferSize));
in = new DataInputStream(
new BufferedInputStream(unbufIn, ioFileBufferSize));
sendRequest(out, block, accessToken, source, targetStorageType);
"Successfully moved block:{} from src:{} to destin:{} for"
+ " satisfying storageType:{}",
block, source, target, targetStorageType);
} catch (IOException e) {
// TODO: handle failure retries
"Failed to move block:{} from src:{} to destin:{} to satisfy "
+ "storageType:{}",
block, source, target, targetStorageType, e);
} finally {
/** Send a reportedBlock replace request to the output stream. */
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
Token<BlockTokenIdentifier> accessToken, DatanodeInfo srcDn,
StorageType destinStorageType) throws IOException {
new Sender(out).replaceBlock(eb, destinStorageType, accessToken,
srcDn.getDatanodeUuid(), srcDn);
/** Receive a reportedBlock copy response from the input stream. */
private void receiveResponse(DataInputStream in) throws IOException {
BlockOpResponseProto response = BlockOpResponseProto
while (response.getStatus() == Status.IN_PROGRESS) {
// read intermediate responses
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
String logInfo = "reportedBlock move is failed";
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);

View File

@ -0,0 +1,101 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hdfs.server.protocol;
import java.util.Arrays;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
* A BlockStorageMovementCommand is an instruction to a DataNode to move the
* given set of blocks to specified target DataNodes to fulfill the block
* storage policy.
* Upon receiving this command, this DataNode coordinates all the block movement
* by passing the details to
* {@link org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker}
* service. After the block movement this DataNode sends response back to the
* NameNode about the movement status.
public class BlockStorageMovementCommand extends DatanodeCommand {
// TODO: constructor needs to be refined based on the block movement data
// structure.
BlockStorageMovementCommand(int action) {
* Stores block to storage info that can be used for block movement.
public static class BlockMovingInfo {
private ExtendedBlock blk;
private DatanodeInfo[] sourceNodes;
private StorageType[] sourceStorageTypes;
private DatanodeInfo[] targetNodes;
private StorageType[] targetStorageTypes;
public BlockMovingInfo(ExtendedBlock block,
DatanodeInfo[] sourceDnInfos, DatanodeInfo[] targetDnInfos,
StorageType[] srcStorageTypes, StorageType[] targetStorageTypes) {
this.blk = block;
this.sourceNodes = sourceDnInfos;
this.targetNodes = targetDnInfos;
this.sourceStorageTypes = srcStorageTypes;
this.targetStorageTypes = targetStorageTypes;
public void addBlock(ExtendedBlock block) {
this.blk = block;
public ExtendedBlock getBlock() {
return this.blk;
public DatanodeInfo[] getSources() {
return sourceNodes;
public DatanodeInfo[] getTargets() {
return targetNodes;
public StorageType[] getTargetStorageTypes() {
return targetStorageTypes;
public StorageType[] getSourceStorageTypes() {
return sourceStorageTypes;
public String toString() {
return new StringBuilder().append("BlockMovingInfo(\n ")
.append("Moving block: ").append(blk).append(" From: ")
.append(Arrays.asList(sourceNodes)).append(" To: [")
.append(" sourceStorageTypes: ")
.append(" targetStorageTypes: ")

View File

@ -0,0 +1,160 @@
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.namenode.INode;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Supplier;
* This class tests the behavior of moving block replica to the given storage
* type to fulfill the storage policy requirement.
public class TestStoragePolicySatisfyWorker {
private static final Logger LOG = LoggerFactory
private static final int DEFAULT_BLOCK_SIZE = 100;
private static void initConf(Configuration conf) {
* Tests to verify that the block replica is moving to ARCHIVE storage type to
* fulfill the storage policy requirement.
@Test(timeout = 120000)
public void testMoveSingleBlockToAnotherDatanode() throws Exception {
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}})
try {
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoveSingleBlockToAnotherDatanode";
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
// verify before movement
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
StorageType[] storageTypes = lb.getStorageTypes();
for (StorageType storageType : storageTypes) {
Assert.assertTrue(StorageType.DISK == storageType);
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
DataNode src = cluster.getDataNodes().get(3);
DatanodeInfo targetDnInfo = DFSTestUtil
// TODO: Need to revisit this when NN is implemented to be able to send
// block moving commands.
StoragePolicySatisfyWorker worker = new StoragePolicySatisfyWorker(conf,
List<BlockMovingInfo> blockMovingInfos = new ArrayList<>();
BlockMovingInfo blockMovingInfo = prepareBlockMovingInfo(
lb.getBlock(), lb.getLocations()[0], targetDnInfo,
lb.getStorageTypes()[0], StorageType.ARCHIVE);
INode inode = cluster.getNamesystem().getFSDirectory().getINode(file);
// Wait till NameNode notified about the block location details
waitForLocatedBlockWithArchiveStorageType(dfs, file, 1, 30000);
} finally {
private void waitForLocatedBlockWithArchiveStorageType(
final DistributedFileSystem dfs, final String file,
int expectedArchiveCount, int timeout) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
public Boolean get() {
LocatedBlock lb = null;
try {
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
} catch (IOException e) {
LOG.error("Exception while getting located blocks", e);
return false;
int archiveCount = 0;
for (StorageType storageType : lb.getStorageTypes()) {
if (StorageType.ARCHIVE == storageType) {
LOG.info("Archive replica count, expected={} and actual={}",
expectedArchiveCount, archiveCount);
return expectedArchiveCount == archiveCount;
}, 100, timeout);
BlockMovingInfo prepareBlockMovingInfo(ExtendedBlock block,
DatanodeInfo src, DatanodeInfo destin, StorageType storageType,
StorageType targetStorageType) {
return new BlockMovingInfo(block, new DatanodeInfo[] {src},
new DatanodeInfo[] {destin}, new StorageType[] {storageType},
new StorageType[] {targetStorageType});