HDFS-13033: [SPS]: Implement a mechanism to do file block movements for external SPS. Contributed by Rakesh R.

This commit is contained in:
Uma Maheswara Rao G 2018-01-23 16:19:46 -08:00 committed by Uma Maheswara Rao Gangumalla
parent 3159b39cf8
commit b0cb8d9bb4
18 changed files with 817 additions and 289 deletions

View File

@ -269,6 +269,14 @@ public class NameNodeConnector implements Closeable {
}
}
/**
* Returns fallbackToSimpleAuth. This will be true or false during calls to
* indicate if a secure client falls back to simple auth.
*/
public AtomicBoolean getFallbackToSimpleAuth() {
return fallbackToSimpleAuth;
}
@Override
public void close() {
keyManager.close();

View File

@ -0,0 +1,186 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common.sps;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dispatching block replica moves between datanodes to satisfy the storage
* policy.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockDispatcher {
private static final Logger LOG = LoggerFactory
.getLogger(BlockDispatcher.class);
private final boolean connectToDnViaHostname;
private final int socketTimeout;
private final int ioFileBufferSize;
/**
* Construct block dispatcher details.
*
* @param sockTimeout
* soTimeout
* @param ioFileBuffSize
* file io buffer size
* @param connectToDatanodeViaHostname
* true represents connect via hostname, false otw
*/
public BlockDispatcher(int sockTimeout, int ioFileBuffSize,
boolean connectToDatanodeViaHostname) {
this.socketTimeout = sockTimeout;
this.ioFileBufferSize = ioFileBuffSize;
this.connectToDnViaHostname = connectToDatanodeViaHostname;
}
/**
* Moves the given block replica to the given target node and wait for the
* response.
*
* @param blkMovingInfo
* block to storage info
* @param saslClient
* SASL for DataTransferProtocol on behalf of a client
* @param eb
* extended block info
* @param sock
* target node's socket
* @param km
* for creation of an encryption key
* @param accessToken
* connection block access token
* @return status of the block movement
*/
public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo,
SaslDataTransferClient saslClient, ExtendedBlock eb, Socket sock,
DataEncryptionKeyFactory km, Token<BlockTokenIdentifier> accessToken) {
LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
+ "storageType, sourceStoragetype:{} and destinStoragetype:{}",
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
blkMovingInfo.getTarget(), blkMovingInfo.getSourceStorageType(),
blkMovingInfo.getTargetStorageType());
DataOutputStream out = null;
DataInputStream in = null;
try {
NetUtils.connect(sock,
NetUtils.createSocketAddr(
blkMovingInfo.getTarget().getXferAddr(connectToDnViaHostname)),
socketTimeout);
// Set read timeout so that it doesn't hang forever against
// unresponsive nodes. Datanode normally sends IN_PROGRESS response
// twice within the client read timeout period (every 30 seconds by
// default). Here, we make it give up after "socketTimeout * 5" period
// of no response.
sock.setSoTimeout(socketTimeout * 5);
sock.setKeepAlive(true);
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
LOG.debug("Connecting to datanode {}", blkMovingInfo.getTarget());
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, km, accessToken, blkMovingInfo.getTarget());
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(
new BufferedOutputStream(unbufOut, ioFileBufferSize));
in = new DataInputStream(
new BufferedInputStream(unbufIn, ioFileBufferSize));
sendRequest(out, eb, accessToken, blkMovingInfo.getSource(),
blkMovingInfo.getTargetStorageType());
receiveResponse(in);
LOG.info(
"Successfully moved block:{} from src:{} to destin:{} for"
+ " satisfying storageType:{}",
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType());
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
} catch (BlockPinningException e) {
// Pinned block won't be able to move to a different node. So, its not
// required to do retries, just marked as SUCCESS.
LOG.debug("Pinned block can't be moved, so skipping block:{}",
blkMovingInfo.getBlock(), e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
} catch (IOException e) {
// TODO: handle failure retries
LOG.warn(
"Failed to move block:{} from src:{} to destin:{} to satisfy "
+ "storageType:{}",
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
} finally {
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeSocket(sock);
}
}
/** Send a reportedBlock replace request to the output stream. */
private static void sendRequest(DataOutputStream out, ExtendedBlock eb,
Token<BlockTokenIdentifier> accessToken, DatanodeInfo source,
StorageType targetStorageType) throws IOException {
new Sender(out).replaceBlock(eb, targetStorageType, accessToken,
source.getDatanodeUuid(), source, null);
}
/** Receive a reportedBlock copy response from the input stream. */
private static void receiveResponse(DataInputStream in) throws IOException {
BlockOpResponseProto response = BlockOpResponseProto
.parseFrom(vintPrefixed(in));
while (response.getStatus() == Status.IN_PROGRESS) {
// read intermediate responses
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
}
String logInfo = "reportedBlock move is failed";
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo);
}
}

View File

@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common.sps;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
/**
* This class represents status from a block movement task. This will have the
* information of the task which was successful or failed due to errors.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlockMovementAttemptFinished {
private final Block block;
private final DatanodeInfo src;
private final DatanodeInfo target;
private final BlockMovementStatus status;
/**
* Construct movement attempt finished info.
*
* @param block
* block
* @param src
* src datanode
* @param target
* target datanode
* @param status
* movement status
*/
public BlockMovementAttemptFinished(Block block, DatanodeInfo src,
DatanodeInfo target, BlockMovementStatus status) {
this.block = block;
this.src = src;
this.target = target;
this.status = status;
}
/**
* @return details of the block, which attempted to move from src to target
* node.
*/
public Block getBlock() {
return block;
}
/**
* @return block movement status code.
*/
public BlockMovementStatus getStatus() {
return status;
}
@Override
public String toString() {
return new StringBuilder().append("Block movement attempt finished(\n ")
.append(" block : ").append(block).append(" src node: ").append(src)
.append(" target node: ").append(target).append(" movement status: ")
.append(status).append(")").toString();
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common.sps;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Block movement status code.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public enum BlockMovementStatus {
/** Success. */
DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
/**
* Failure due to generation time stamp mismatches or network errors
* or no available space.
*/
DN_BLK_STORAGE_MOVEMENT_FAILURE(-1);
// TODO: need to support different type of failures. Failure due to network
// errors, block pinned, no space available etc.
private final int code;
BlockMovementStatus(int code) {
this.code = code;
}
/**
* @return the status code.
*/
int getStatusCode() {
return code;
}
}

View File

@ -15,7 +15,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode;
package org.apache.hadoop.hdfs.server.common.sps;
import java.util.ArrayList;
import java.util.HashMap;
@ -28,8 +28,6 @@ import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlockMovementAttemptFinished;
import org.apache.hadoop.hdfs.server.datanode.StoragePolicySatisfyWorker.BlocksMovementsStatusHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -37,7 +35,7 @@ import org.slf4j.LoggerFactory;
* This class is used to track the completion of block movement future tasks.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@InterfaceStability.Evolving
public class BlockStorageMovementTracker implements Runnable {
private static final Logger LOG = LoggerFactory
.getLogger(BlockStorageMovementTracker.class);
@ -148,7 +146,7 @@ public class BlockStorageMovementTracker implements Runnable {
* @param futureTask
* future task used for moving the respective block
*/
void addBlock(Block block,
public void addBlock(Block block,
Future<BlockMovementAttemptFinished> futureTask) {
synchronized (moverTaskFutures) {
List<Future<BlockMovementAttemptFinished>> futures =
@ -167,7 +165,7 @@ public class BlockStorageMovementTracker implements Runnable {
/**
* Clear the pending movement and movement result queues.
*/
void removeAll() {
public void removeAll() {
synchronized (moverTaskFutures) {
moverTaskFutures.clear();
}

View File

@ -0,0 +1,95 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.common.sps;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block;
/**
* Blocks movements status handler, which is used to collect details of the
* completed block movements and later these attempted finished(with success or
* failure) blocks can be accessed to notify respective listeners, if any.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class BlocksMovementsStatusHandler {
private final List<Block> blockIdVsMovementStatus = new ArrayList<>();
/**
* Collect all the storage movement attempt finished blocks. Later this will
* be send to namenode via heart beat.
*
* @param moveAttemptFinishedBlks
* set of storage movement attempt finished blocks
*/
public void handle(
List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
List<Block> blocks = new ArrayList<>();
for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
blocks.add(item.getBlock());
}
// Adding to the tracking report list. Later this can be accessed to know
// the attempted block movements.
synchronized (blockIdVsMovementStatus) {
blockIdVsMovementStatus.addAll(blocks);
}
}
/**
* @return unmodifiable list of storage movement attempt finished blocks.
*/
public List<Block> getMoveAttemptFinishedBlocks() {
List<Block> moveAttemptFinishedBlks = new ArrayList<>();
// 1. Adding all the completed block ids.
synchronized (blockIdVsMovementStatus) {
if (blockIdVsMovementStatus.size() > 0) {
moveAttemptFinishedBlks = Collections
.unmodifiableList(blockIdVsMovementStatus);
}
}
return moveAttemptFinishedBlks;
}
/**
* Remove the storage movement attempt finished blocks from the tracking list.
*
* @param moveAttemptFinishedBlks
* set of storage movement attempt finished blocks
*/
public void remove(List<Block> moveAttemptFinishedBlks) {
if (moveAttemptFinishedBlks != null) {
blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
}
}
/**
* Clear the blockID vs movement status tracking map.
*/
public void removeAll() {
synchronized (blockIdVsMovementStatus) {
blockIdVsMovementStatus.clear();
}
}
}

View File

@ -0,0 +1,27 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This package provides commonly used classes for the block movement.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
package org.apache.hadoop.hdfs.server.common.sps;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

View File

@ -17,21 +17,9 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.protocolPB.PBHelperClient.vintPrefixed;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
@ -47,20 +35,15 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockPinningException;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtoUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
@ -81,7 +64,6 @@ public class StoragePolicySatisfyWorker {
.getLogger(StoragePolicySatisfyWorker.class);
private final DataNode datanode;
private final int ioFileBufferSize;
private final int moverThreads;
private final ExecutorService moveExecutor;
@ -89,10 +71,10 @@ public class StoragePolicySatisfyWorker {
private final BlocksMovementsStatusHandler handler;
private final BlockStorageMovementTracker movementTracker;
private Daemon movementTrackerThread;
private final BlockDispatcher blkDispatcher;
public StoragePolicySatisfyWorker(Configuration conf, DataNode datanode) {
this.datanode = datanode;
this.ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
@ -103,7 +85,10 @@ public class StoragePolicySatisfyWorker {
handler);
movementTrackerThread = new Daemon(movementTracker);
movementTrackerThread.setName("BlockStorageMovementTracker");
DNConf dnConf = datanode.getDnConf();
int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
blkDispatcher = new BlockDispatcher(dnConf.getSocketTimeout(),
ioFileBufferSize, dnConf.getConnectToDnViaHostname());
// TODO: Needs to manage the number of concurrent moves per DataNode.
}
@ -183,8 +168,7 @@ public class StoragePolicySatisfyWorker {
assert sourceStorageType != targetStorageType
: "Source and Target storage type shouldn't be same!";
BlockMovingTask blockMovingTask = new BlockMovingTask(blockPoolID,
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
blkMovingInfo.getTarget(), sourceStorageType, targetStorageType);
blkMovingInfo);
Future<BlockMovementAttemptFinished> moveCallable = moverCompletionService
.submit(blockMovingTask);
movementTracker.addBlock(blkMovingInfo.getBlock(),
@ -199,244 +183,45 @@ public class StoragePolicySatisfyWorker {
private class BlockMovingTask implements
Callable<BlockMovementAttemptFinished> {
private final String blockPoolID;
private final Block block;
private final DatanodeInfo source;
private final DatanodeInfo target;
private final StorageType srcStorageType;
private final StorageType targetStorageType;
private final BlockMovingInfo blkMovingInfo;
BlockMovingTask(String blockPoolID, Block block,
DatanodeInfo source, DatanodeInfo target,
StorageType srcStorageType, StorageType targetStorageType) {
BlockMovingTask(String blockPoolID, BlockMovingInfo blkMovInfo) {
this.blockPoolID = blockPoolID;
this.block = block;
this.source = source;
this.target = target;
this.srcStorageType = srcStorageType;
this.targetStorageType = targetStorageType;
this.blkMovingInfo = blkMovInfo;
}
@Override
public BlockMovementAttemptFinished call() {
BlockMovementStatus status = moveBlock();
return new BlockMovementAttemptFinished(block, source, target, status);
return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
blkMovingInfo.getSource(), blkMovingInfo.getTarget(), status);
}
private BlockMovementStatus moveBlock() {
LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
+ "storageType, sourceStoragetype:{} and destinStoragetype:{}",
block, source, target, srcStorageType, targetStorageType);
Socket sock = null;
DataOutputStream out = null;
DataInputStream in = null;
datanode.incrementXmitsInProgress();
ExtendedBlock eb = new ExtendedBlock(blockPoolID,
blkMovingInfo.getBlock());
try {
datanode.incrementXmitsInProgress();
ExtendedBlock extendedBlock = new ExtendedBlock(blockPoolID, block);
DNConf dnConf = datanode.getDnConf();
String dnAddr = datanode.getDatanodeId()
.getXferAddr(dnConf.getConnectToDnViaHostname());
sock = datanode.newSocket();
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr),
dnConf.getSocketTimeout());
sock.setSoTimeout(2 * dnConf.getSocketTimeout());
LOG.debug("Connecting to datanode {}", dnAddr);
OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream();
Token<BlockTokenIdentifier> accessToken = datanode.getBlockAccessToken(
extendedBlock, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
new StorageType[]{targetStorageType}, new String[0]);
eb, EnumSet.of(BlockTokenIdentifier.AccessMode.WRITE),
new StorageType[]{blkMovingInfo.getTargetStorageType()},
new String[0]);
DataEncryptionKeyFactory keyFactory = datanode
.getDataEncryptionKeyFactoryForBlock(extendedBlock);
IOStreamPair saslStreams = datanode.getSaslClient().socketSend(sock,
unbufOut, unbufIn, keyFactory, accessToken, target);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
out = new DataOutputStream(
new BufferedOutputStream(unbufOut, ioFileBufferSize));
in = new DataInputStream(
new BufferedInputStream(unbufIn, ioFileBufferSize));
sendRequest(out, extendedBlock, accessToken, source, targetStorageType);
receiveResponse(in);
.getDataEncryptionKeyFactoryForBlock(eb);
LOG.info(
"Successfully moved block:{} from src:{} to destin:{} for"
+ " satisfying storageType:{}",
block, source, target, targetStorageType);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
} catch (BlockPinningException e) {
// Pinned block won't be able to move to a different node. So, its not
// required to do retries, just marked as SUCCESS.
LOG.debug("Pinned block can't be moved, so skipping block:{}", block,
e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
return blkDispatcher.moveBlock(blkMovingInfo,
datanode.getSaslClient(), eb, datanode.newSocket(),
keyFactory, accessToken);
} catch (IOException e) {
// TODO: handle failure retries
LOG.warn(
"Failed to move block:{} from src:{} to destin:{} to satisfy "
+ "storageType:{}",
block, source, target, targetStorageType, e);
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
} finally {
datanode.decrementXmitsInProgress();
IOUtils.closeStream(out);
IOUtils.closeStream(in);
IOUtils.closeSocket(sock);
}
}
/** Send a reportedBlock replace request to the output stream. */
private void sendRequest(DataOutputStream out, ExtendedBlock eb,
Token<BlockTokenIdentifier> accessToken, DatanodeInfo srcDn,
StorageType destinStorageType) throws IOException {
new Sender(out).replaceBlock(eb, destinStorageType, accessToken,
srcDn.getDatanodeUuid(), srcDn, null);
}
/** Receive a reportedBlock copy response from the input stream. */
private void receiveResponse(DataInputStream in) throws IOException {
BlockOpResponseProto response = BlockOpResponseProto
.parseFrom(vintPrefixed(in));
while (response.getStatus() == Status.IN_PROGRESS) {
// read intermediate responses
response = BlockOpResponseProto.parseFrom(vintPrefixed(in));
}
String logInfo = "reportedBlock move is failed";
DataTransferProtoUtil.checkBlockOpStatus(response, logInfo, true);
}
}
/**
* Block movement status code.
*/
public enum BlockMovementStatus {
/** Success. */
DN_BLK_STORAGE_MOVEMENT_SUCCESS(0),
/**
* Failure due to generation time stamp mismatches or network errors
* or no available space.
*/
DN_BLK_STORAGE_MOVEMENT_FAILURE(-1);
// TODO: need to support different type of failures. Failure due to network
// errors, block pinned, no space available etc.
private final int code;
BlockMovementStatus(int code) {
this.code = code;
}
/**
* @return the status code.
*/
int getStatusCode() {
return code;
}
}
/**
* This class represents status from a block movement task. This will have the
* information of the task which was successful or failed due to errors.
*/
static class BlockMovementAttemptFinished {
private final Block block;
private final DatanodeInfo src;
private final DatanodeInfo target;
private final BlockMovementStatus status;
BlockMovementAttemptFinished(Block block, DatanodeInfo src,
DatanodeInfo target, BlockMovementStatus status) {
this.block = block;
this.src = src;
this.target = target;
this.status = status;
}
Block getBlock() {
return block;
}
BlockMovementStatus getStatus() {
return status;
}
@Override
public String toString() {
return new StringBuilder().append("Block movement attempt finished(\n ")
.append(" block : ")
.append(block).append(" src node: ").append(src)
.append(" target node: ").append(target)
.append(" movement status: ").append(status).append(")").toString();
}
}
/**
* Blocks movements status handler, which is used to collect details of the
* completed block movements and it will send these attempted finished(with
* success or failure) blocks to the namenode via heartbeat.
*/
public static class BlocksMovementsStatusHandler {
private final List<Block> blockIdVsMovementStatus =
new ArrayList<>();
/**
* Collect all the storage movement attempt finished blocks. Later this will
* be send to namenode via heart beat.
*
* @param moveAttemptFinishedBlks
* set of storage movement attempt finished blocks
*/
void handle(List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
List<Block> blocks = new ArrayList<>();
for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
blocks.add(item.getBlock());
}
// Adding to the tracking report list. Later this will be send to
// namenode via datanode heartbeat.
synchronized (blockIdVsMovementStatus) {
blockIdVsMovementStatus.addAll(blocks);
}
}
/**
* @return unmodifiable list of storage movement attempt finished blocks.
*/
List<Block> getMoveAttemptFinishedBlocks() {
List<Block> moveAttemptFinishedBlks = new ArrayList<>();
// 1. Adding all the completed block ids.
synchronized (blockIdVsMovementStatus) {
if (blockIdVsMovementStatus.size() > 0) {
moveAttemptFinishedBlks = Collections
.unmodifiableList(blockIdVsMovementStatus);
}
}
return moveAttemptFinishedBlks;
}
/**
* Remove the storage movement attempt finished blocks from the tracking
* list.
*
* @param moveAttemptFinishedBlks
* set of storage movement attempt finished blocks
*/
void remove(List<Block> moveAttemptFinishedBlks) {
if (moveAttemptFinishedBlks != null) {
blockIdVsMovementStatus.removeAll(moveAttemptFinishedBlks);
}
}
/**
* Clear the blockID vs movement status tracking map.
*/
void removeAll() {
synchronized (blockIdVsMovementStatus) {
blockIdVsMovementStatus.clear();
}
}
}

View File

@ -1299,7 +1299,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
blockManager.getSPSService()),
new IntraSPSNameNodeFileIdCollector(getFSDirectory(),
blockManager.getSPSService()),
new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this));
new IntraSPSNameNodeBlockMoveTaskHandler(blockManager, this), null);
blockManager.startSPS();
} finally {
startingActiveService = false;
@ -3996,7 +3996,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
+ " movement attempt finished block info sent by DN");
}
} else {
sps.handleStorageMovementAttemptFinishedBlks(blksMovementsFinished);
sps.notifyStorageMovementAttemptFinishedBlks(blksMovementsFinished);
}
}

View File

@ -38,7 +38,6 @@ public interface BlockMoveTaskHandler {
* contain the required info to move the block, that source location,
* destination location and storage types.
*/
void submitMoveTask(BlockMovingInfo blkMovingInfo,
BlockMovementListener blockMoveCompletionListener) throws IOException;
void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException;
}

View File

@ -46,8 +46,7 @@ import com.google.common.annotations.VisibleForTesting;
* finished for a longer time period, then such items will retries automatically
* after timeout. The default timeout would be 5 minutes.
*/
public class BlockStorageMovementAttemptedItems
implements BlockMovementListener {
public class BlockStorageMovementAttemptedItems{
private static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
@ -59,6 +58,7 @@ public class BlockStorageMovementAttemptedItems
private final List<Block> movementFinishedBlocks;
private volatile boolean monitorRunning = true;
private Daemon timerThread = null;
private final BlockMovementListener blkMovementListener;
//
// It might take anywhere between 5 to 10 minutes before
// a request is timed out.
@ -74,7 +74,8 @@ public class BlockStorageMovementAttemptedItems
private final SPSService service;
public BlockStorageMovementAttemptedItems(SPSService service,
BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
BlockStorageMovementNeeded unsatisfiedStorageMovementFiles,
BlockMovementListener blockMovementListener) {
this.service = service;
long recheckTimeout = this.service.getConf().getLong(
DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY,
@ -89,6 +90,7 @@ public class BlockStorageMovementAttemptedItems
this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
storageMovementAttemptedItems = new ArrayList<>();
movementFinishedBlocks = new ArrayList<>();
this.blkMovementListener = blockMovementListener;
}
/**
@ -118,6 +120,10 @@ public class BlockStorageMovementAttemptedItems
synchronized (movementFinishedBlocks) {
movementFinishedBlocks.addAll(Arrays.asList(moveAttemptFinishedBlks));
}
// External listener if it is plugged-in
if (blkMovementListener != null) {
blkMovementListener.notifyMovementTriedBlocks(moveAttemptFinishedBlks);
}
}
/**

View File

@ -44,8 +44,7 @@ public class IntraSPSNameNodeBlockMoveTaskHandler
}
@Override
public void submitMoveTask(BlockMovingInfo blkMovingInfo,
BlockMovementListener blockMoveCompletionListener) throws IOException {
public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
namesystem.readLock();
try {
DatanodeDescriptor dn = blockManager.getDatanodeManager()

View File

@ -22,6 +22,7 @@ import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
/**
* An interface for SPSService, which exposes life cycle and processing APIs.
@ -41,9 +42,11 @@ public interface SPSService {
* id
* @param handler
* - a helper service for moving the blocks
* @param blkMovementListener
* - listener to know about block movement attempt completion
*/
void init(Context ctxt, FileIdCollector fileIDCollector,
BlockMoveTaskHandler handler);
BlockMoveTaskHandler handler, BlockMovementListener blkMovementListener);
/**
* Starts the SPS service. Make sure to initialize the helper services before
@ -112,4 +115,13 @@ public interface SPSService {
* - directory inode id.
*/
void markScanCompletedForPath(Long inodeId);
/**
* Notify the details of storage movement attempt finished blocks.
*
* @param moveAttemptFinishedBlks
* - array contains all the blocks that are attempted to move
*/
void notifyStorageMovementAttemptFinishedBlks(
BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks);
}

View File

@ -132,13 +132,14 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
public void init(final Context context, final FileIdCollector fileIDCollector,
final BlockMoveTaskHandler blockMovementTaskHandler) {
final BlockMoveTaskHandler blockMovementTaskHandler,
final BlockMovementListener blockMovementListener) {
this.ctxt = context;
this.storageMovementNeeded =
new BlockStorageMovementNeeded(context, fileIDCollector);
this.storageMovementsMonitor =
new BlockStorageMovementAttemptedItems(this,
storageMovementNeeded);
storageMovementNeeded, blockMovementListener);
this.blockMoveTaskHandler = blockMovementTaskHandler;
this.spsWorkMultiplier = DFSUtil.getSPSWorkMultiplier(getConf());
this.blockMovementMaxRetry = getConf().getInt(
@ -291,6 +292,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
+ " back to retry queue as some of the blocks"
+ " are low redundant.");
}
itemInfo.increRetryCount();
this.storageMovementNeeded.add(itemInfo);
break;
case BLOCKS_FAILED_TO_MOVE:
@ -410,15 +412,18 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
liveDns, ecPolicy);
if (blocksPaired) {
status = BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED;
} else {
// none of the blocks found its eligible targets for satisfying the
// storage policy.
} else
if (status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
// Check if the previous block was successfully paired. Here the
// status will set to NO_BLOCKS_TARGETS_PAIRED only when none of the
// blocks of a file found its eligible targets to satisfy the storage
// policy.
status = BlocksMovingAnalysis.Status.NO_BLOCKS_TARGETS_PAIRED;
}
} else {
if (hasLowRedundancyBlocks) {
status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
}
} else if (hasLowRedundancyBlocks
&& status != BlocksMovingAnalysis.Status.BLOCKS_TARGETS_PAIRED) {
// Check if the previous block was successfully paired.
status = BlocksMovingAnalysis.Status.FEW_LOW_REDUNDANCY_BLOCKS;
}
}
@ -426,8 +431,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
for (BlockMovingInfo blkMovingInfo : blockMovingInfos) {
// Check for at least one block storage movement has been chosen
try {
blockMoveTaskHandler.submitMoveTask(blkMovingInfo,
storageMovementsMonitor);
blockMoveTaskHandler.submitMoveTask(blkMovingInfo);
LOG.debug("BlockMovingInfo: {}", blkMovingInfo);
assignedBlockIds.add(blkMovingInfo.getBlock());
blockCount++;
@ -823,7 +827,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
* @param moveAttemptFinishedBlks
* set of storage movement attempt finished blocks.
*/
public void handleStorageMovementAttemptFinishedBlks(
public void notifyStorageMovementAttemptFinishedBlks(
BlocksStorageMoveAttemptFinished moveAttemptFinishedBlks) {
if (moveAttemptFinishedBlks.getBlocks().length <= 0) {
return;
@ -833,7 +837,7 @@ public class StoragePolicySatisfier implements SPSService, Runnable {
}
@VisibleForTesting
BlockMovementListener getAttemptedItemsMonitor() {
BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
return storageMovementsMonitor;
}

View File

@ -0,0 +1,233 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.sps;
import java.io.IOException;
import java.net.Socket;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.datatransfer.TrustedChannelResolver;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataTransferSaslUtil;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.SaslDataTransferClient;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.balancer.KeyManager;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.common.sps.BlockMovementAttemptFinished;
import org.apache.hadoop.hdfs.server.common.sps.BlockMovementStatus;
import org.apache.hadoop.hdfs.server.common.sps.BlockStorageMovementTracker;
import org.apache.hadoop.hdfs.server.common.sps.BlocksMovementsStatusHandler;
import org.apache.hadoop.hdfs.server.common.sps.BlockDispatcher;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMoveTaskHandler;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.protocol.BlockStorageMovementCommand.BlockMovingInfo;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMoveAttemptFinished;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This class handles the external SPS block movements. This will move the
* given block to a target datanode by directly establishing socket connection
* to it and invokes function
* {@link Sender#replaceBlock(ExtendedBlock, StorageType, Token, String,
* DatanodeInfo, String)}.
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
private static final Logger LOG = LoggerFactory
.getLogger(ExternalSPSBlockMoveTaskHandler.class);
private final ExecutorService moveExecutor;
private final CompletionService<BlockMovementAttemptFinished> mCompletionServ;
private final NameNodeConnector nnc;
private final SaslDataTransferClient saslClient;
private final BlockStorageMovementTracker blkMovementTracker;
private Daemon movementTrackerThread;
private final SPSService service;
private final BlockDispatcher blkDispatcher;
public ExternalSPSBlockMoveTaskHandler(Configuration conf,
NameNodeConnector nnc, SPSService spsService) {
int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
mCompletionServ = new ExecutorCompletionService<>(moveExecutor);
this.nnc = nnc;
this.saslClient = new SaslDataTransferClient(conf,
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf),
nnc.getFallbackToSimpleAuth());
this.blkMovementTracker = new BlockStorageMovementTracker(
mCompletionServ, new ExternalBlocksMovementsStatusHandler());
this.service = spsService;
boolean connectToDnViaHostname = conf.getBoolean(
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
int ioFileBufferSize = DFSUtilClient.getIoFileBufferSize(conf);
blkDispatcher = new BlockDispatcher(HdfsConstants.READ_TIMEOUT,
ioFileBufferSize, connectToDnViaHostname);
}
/**
* Initializes block movement tracker daemon and starts the thread.
*/
void init() {
movementTrackerThread = new Daemon(this.blkMovementTracker);
movementTrackerThread.setName("BlockStorageMovementTracker");
movementTrackerThread.start();
}
private ThreadPoolExecutor initializeBlockMoverThreadPool(int num) {
LOG.debug("Block mover to satisfy storage policy; pool threads={}", num);
ThreadPoolExecutor moverThreadPool = new ThreadPoolExecutor(1, num, 60,
TimeUnit.SECONDS, new SynchronousQueue<Runnable>(),
new Daemon.DaemonFactory() {
private final AtomicInteger threadIndex = new AtomicInteger(0);
@Override
public Thread newThread(Runnable r) {
Thread t = super.newThread(r);
t.setName("BlockMoverTask-" + threadIndex.getAndIncrement());
return t;
}
}, new ThreadPoolExecutor.CallerRunsPolicy() {
@Override
public void rejectedExecution(Runnable runnable,
ThreadPoolExecutor e) {
LOG.info("Execution for block movement to satisfy storage policy"
+ " got rejected, Executing in current thread");
// will run in the current thread.
super.rejectedExecution(runnable, e);
}
});
moverThreadPool.allowCoreThreadTimeOut(true);
return moverThreadPool;
}
@Override
public void submitMoveTask(BlockMovingInfo blkMovingInfo) throws IOException {
// TODO: Need to increment scheduled block size on the target node. This
// count will be used to calculate the remaining space of target datanode
// during block movement assignment logic. In the internal movement,
// remaining space is bookkeeping at the DatanodeDescriptor, please refer
// IntraSPSNameNodeBlockMoveTaskHandler#submitMoveTask implementation and
// updating via the funcation call -
// dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
Future<BlockMovementAttemptFinished> moveCallable = mCompletionServ
.submit(blockMovingTask);
blkMovementTracker.addBlock(blkMovingInfo.getBlock(), moveCallable);
}
private class ExternalBlocksMovementsStatusHandler
extends BlocksMovementsStatusHandler {
@Override
public void handle(
List<BlockMovementAttemptFinished> moveAttemptFinishedBlks) {
List<Block> blocks = new ArrayList<>();
for (BlockMovementAttemptFinished item : moveAttemptFinishedBlks) {
blocks.add(item.getBlock());
}
BlocksStorageMoveAttemptFinished blkAttempted =
new BlocksStorageMoveAttemptFinished(
blocks.toArray(new Block[blocks.size()]));
service.notifyStorageMovementAttemptFinishedBlks(blkAttempted);
}
}
/**
* This class encapsulates the process of moving the block replica to the
* given target.
*/
private class BlockMovingTask
implements Callable<BlockMovementAttemptFinished> {
private final BlockMovingInfo blkMovingInfo;
BlockMovingTask(BlockMovingInfo blkMovingInfo) {
this.blkMovingInfo = blkMovingInfo;
}
@Override
public BlockMovementAttemptFinished call() {
BlockMovementStatus blkMovementStatus = moveBlock();
return new BlockMovementAttemptFinished(blkMovingInfo.getBlock(),
blkMovingInfo.getSource(), blkMovingInfo.getTarget(),
blkMovementStatus);
}
private BlockMovementStatus moveBlock() {
ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(),
blkMovingInfo.getBlock());
final KeyManager km = nnc.getKeyManager();
Token<BlockTokenIdentifier> accessToken;
try {
accessToken = km.getAccessToken(eb,
new StorageType[]{blkMovingInfo.getTargetStorageType()},
new String[0]);
} catch (IOException e) {
// TODO: handle failure retries
LOG.warn(
"Failed to move block:{} from src:{} to destin:{} to satisfy "
+ "storageType:{}",
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
}
return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb,
new Socket(), km, accessToken);
}
}
/**
* Cleanup the resources.
*/
void cleanUp() {
blkMovementTracker.stopTracking();
if (movementTrackerThread != null) {
movementTrackerThread.interrupt();
}
}
}

View File

@ -56,7 +56,7 @@ public class TestBlockStorageMovementAttemptedItems {
unsatisfiedStorageMovementFiles =
new BlockStorageMovementNeeded(ctxt, null);
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(sps,
unsatisfiedStorageMovementFiles);
unsatisfiedStorageMovementFiles, null);
}
@After

View File

@ -92,7 +92,7 @@ public class TestStoragePolicySatisfier {
private static final String ONE_SSD = "ONE_SSD";
private static final String COLD = "COLD";
private static final Logger LOG =
protected static final Logger LOG =
LoggerFactory.getLogger(TestStoragePolicySatisfier.class);
private Configuration config = null;
private StorageType[][] allDiskTypes =
@ -1337,7 +1337,7 @@ public class TestStoragePolicySatisfier {
};
FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
sps.init(ctxt, fileIDCollector, null);
sps.init(ctxt, fileIDCollector, null, null);
sps.getStorageMovementQueue().activate();
INode rootINode = fsDir.getINode("/root");
@ -1404,7 +1404,7 @@ public class TestStoragePolicySatisfier {
}
};
FileIdCollector fileIDCollector = createFileIdCollector(sps, ctxt);
sps.init(ctxt, fileIDCollector, null);
sps.init(ctxt, fileIDCollector, null, null);
sps.getStorageMovementQueue().activate();
INode rootINode = fsDir.getINode("/root");

View File

@ -18,20 +18,33 @@
package org.apache.hadoop.hdfs.server.sps;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.namenode.sps.BlockMovementListener;
import org.apache.hadoop.hdfs.server.namenode.sps.Context;
import org.apache.hadoop.hdfs.server.namenode.sps.FileIdCollector;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeBlockMoveTaskHandler;
import org.apache.hadoop.hdfs.server.namenode.sps.IntraSPSNameNodeContext;
import org.apache.hadoop.hdfs.server.namenode.sps.SPSService;
import org.apache.hadoop.hdfs.server.namenode.sps.StoragePolicySatisfier;
import org.apache.hadoop.hdfs.server.namenode.sps.TestStoragePolicySatisfier;
import org.junit.Assert;
import org.junit.Ignore;
import com.google.common.collect.Maps;
/**
* Tests the external sps service plugins.
*/
@ -69,23 +82,24 @@ public class TestExternalStoragePolicySatisfier
cluster.waitActive();
if (conf.getBoolean(DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ENABLED_KEY,
false)) {
SPSService spsService = cluster.getNameNode().getNamesystem()
.getBlockManager().getSPSService();
BlockManager blkMgr = cluster.getNameNode().getNamesystem()
.getBlockManager();
SPSService spsService = blkMgr.getSPSService();
spsService.stopGracefully();
IntraSPSNameNodeContext context = new IntraSPSNameNodeContext(
cluster.getNameNode().getNamesystem(),
cluster.getNameNode().getNamesystem().getBlockManager(), cluster
.getNameNode().getNamesystem().getBlockManager().getSPSService());
blkMgr, blkMgr.getSPSService());
ExternalBlockMovementListener blkMoveListener =
new ExternalBlockMovementListener();
ExternalSPSBlockMoveTaskHandler externalHandler =
new ExternalSPSBlockMoveTaskHandler(conf, getNameNodeConnector(conf),
blkMgr.getSPSService());
externalHandler.init();
spsService.init(context,
new ExternalSPSFileIDCollector(context,
cluster.getNameNode().getNamesystem().getBlockManager()
.getSPSService(),
5),
new IntraSPSNameNodeBlockMoveTaskHandler(
cluster.getNameNode().getNamesystem().getBlockManager(),
cluster.getNameNode().getNamesystem()));
new ExternalSPSFileIDCollector(context, blkMgr.getSPSService(), 5),
externalHandler,
blkMoveListener);
spsService.start(true);
}
return cluster;
@ -97,6 +111,35 @@ public class TestExternalStoragePolicySatisfier
return new ExternalSPSFileIDCollector(ctxt, sps, 5);
}
private class ExternalBlockMovementListener implements BlockMovementListener {
private List<Block> actualBlockMovements = new ArrayList<>();
@Override
public void notifyMovementTriedBlocks(Block[] moveAttemptFinishedBlks) {
for (Block block : moveAttemptFinishedBlks) {
actualBlockMovements.add(block);
}
LOG.info("Movement attempted blocks", actualBlockMovements);
}
}
private NameNodeConnector getNameNodeConnector(Configuration conf)
throws IOException {
final Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Assert.assertEquals(1, namenodes.size());
Map<URI, List<Path>> nnMap = Maps.newHashMap();
for (URI nn : namenodes) {
nnMap.put(nn, null);
}
final Path externalSPSPathId = new Path("/system/externalSPS.id");
final List<NameNodeConnector> nncs = NameNodeConnector
.newNameNodeConnectors(nnMap,
StoragePolicySatisfier.class.getSimpleName(), externalSPSPathId,
conf, NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return nncs.get(0);
}
/**
* This test need not run as external scan is not a batch based scanning right
* now.