HDFS-16460. [SPS]: Handle failure retries for moving tasks (#4001)
This commit is contained in:
parent
807a428b55
commit
5412fbf6d4
|
@ -827,6 +827,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
||||||
"dfs.storage.policy.satisfier.retry.max.attempts";
|
"dfs.storage.policy.satisfier.retry.max.attempts";
|
||||||
public static final int DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT =
|
public static final int DFS_STORAGE_POLICY_SATISFIER_MAX_RETRY_ATTEMPTS_DEFAULT =
|
||||||
3;
|
3;
|
||||||
|
public static final String DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_KEY =
|
||||||
|
"dfs.storage.policy.satisfier.move.task.retry.max.attempts";
|
||||||
|
public static final int DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT =
|
||||||
|
3;
|
||||||
public static final String DFS_STORAGE_DEFAULT_POLICY =
|
public static final String DFS_STORAGE_DEFAULT_POLICY =
|
||||||
"dfs.storage.default.policy";
|
"dfs.storage.default.policy";
|
||||||
public static final HdfsConstants.StoragePolicy
|
public static final HdfsConstants.StoragePolicy
|
||||||
|
|
|
@ -101,7 +101,7 @@ public class BlockDispatcher {
|
||||||
*/
|
*/
|
||||||
public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo,
|
public BlockMovementStatus moveBlock(BlockMovingInfo blkMovingInfo,
|
||||||
SaslDataTransferClient saslClient, ExtendedBlock eb, Socket sock,
|
SaslDataTransferClient saslClient, ExtendedBlock eb, Socket sock,
|
||||||
DataEncryptionKeyFactory km, Token<BlockTokenIdentifier> accessToken) {
|
DataEncryptionKeyFactory km, Token<BlockTokenIdentifier> accessToken) throws IOException {
|
||||||
LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
|
LOG.info("Start moving block:{} from src:{} to destin:{} to satisfy "
|
||||||
+ "storageType, sourceStoragetype:{} and destinStoragetype:{}",
|
+ "storageType, sourceStoragetype:{} and destinStoragetype:{}",
|
||||||
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
|
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
|
||||||
|
@ -149,14 +149,6 @@ public class BlockDispatcher {
|
||||||
LOG.debug("Pinned block can't be moved, so skipping block:{}",
|
LOG.debug("Pinned block can't be moved, so skipping block:{}",
|
||||||
blkMovingInfo.getBlock(), e);
|
blkMovingInfo.getBlock(), e);
|
||||||
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
|
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_SUCCESS;
|
||||||
} catch (IOException e) {
|
|
||||||
// TODO: handle failure retries
|
|
||||||
LOG.warn(
|
|
||||||
"Failed to move block:{} from src:{} to destin:{} to satisfy "
|
|
||||||
+ "storageType:{}",
|
|
||||||
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
|
|
||||||
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
|
|
||||||
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
|
|
||||||
} finally {
|
} finally {
|
||||||
IOUtils.closeStream(out);
|
IOUtils.closeStream(out);
|
||||||
IOUtils.closeStream(in);
|
IOUtils.closeStream(in);
|
||||||
|
|
|
@ -80,11 +80,15 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
|
||||||
private Daemon movementTrackerThread;
|
private Daemon movementTrackerThread;
|
||||||
private final SPSService service;
|
private final SPSService service;
|
||||||
private final BlockDispatcher blkDispatcher;
|
private final BlockDispatcher blkDispatcher;
|
||||||
|
private final int maxRetry;
|
||||||
|
|
||||||
public ExternalSPSBlockMoveTaskHandler(Configuration conf,
|
public ExternalSPSBlockMoveTaskHandler(Configuration conf,
|
||||||
NameNodeConnector nnc, SPSService spsService) {
|
NameNodeConnector nnc, SPSService spsService) {
|
||||||
int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
|
int moverThreads = conf.getInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY,
|
||||||
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
|
DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT);
|
||||||
|
maxRetry = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_KEY,
|
||||||
|
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT);
|
||||||
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
|
moveExecutor = initializeBlockMoverThreadPool(moverThreads);
|
||||||
mCompletionServ = new ExecutorCompletionService<>(moveExecutor);
|
mCompletionServ = new ExecutorCompletionService<>(moveExecutor);
|
||||||
this.nnc = nnc;
|
this.nnc = nnc;
|
||||||
|
@ -151,7 +155,7 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
|
||||||
// during block movement assignment logic. In the internal movement,
|
// during block movement assignment logic. In the internal movement,
|
||||||
// remaining space is bookkeeping at the DatanodeDescriptor, please refer
|
// remaining space is bookkeeping at the DatanodeDescriptor, please refer
|
||||||
// IntraSPSNameNodeBlockMoveTaskHandler#submitMoveTask implementation and
|
// IntraSPSNameNodeBlockMoveTaskHandler#submitMoveTask implementation and
|
||||||
// updating via the funcation call -
|
// updating via the function call -
|
||||||
// dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
|
// dn.incrementBlocksScheduled(blkMovingInfo.getTargetStorageType());
|
||||||
LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
|
LOG.debug("Received BlockMovingTask {}", blkMovingInfo);
|
||||||
BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
|
BlockMovingTask blockMovingTask = new BlockMovingTask(blkMovingInfo);
|
||||||
|
@ -195,21 +199,25 @@ public class ExternalSPSBlockMoveTaskHandler implements BlockMoveTaskHandler {
|
||||||
|
|
||||||
final KeyManager km = nnc.getKeyManager();
|
final KeyManager km = nnc.getKeyManager();
|
||||||
Token<BlockTokenIdentifier> accessToken;
|
Token<BlockTokenIdentifier> accessToken;
|
||||||
|
int retry = 0;
|
||||||
|
while (retry <= maxRetry) {
|
||||||
try {
|
try {
|
||||||
|
ExternalSPSFaultInjector.getInstance().mockAnException(retry);
|
||||||
accessToken = km.getAccessToken(eb,
|
accessToken = km.getAccessToken(eb,
|
||||||
new StorageType[]{blkMovingInfo.getTargetStorageType()},
|
new StorageType[]{blkMovingInfo.getTargetStorageType()},
|
||||||
new String[0]);
|
new String[0]);
|
||||||
} catch (IOException e) {
|
|
||||||
// TODO: handle failure retries
|
|
||||||
LOG.warn(
|
|
||||||
"Failed to move block:{} from src:{} to destin:{} to satisfy "
|
|
||||||
+ "storageType:{}",
|
|
||||||
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
|
|
||||||
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), e);
|
|
||||||
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
|
|
||||||
}
|
|
||||||
return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb,
|
return blkDispatcher.moveBlock(blkMovingInfo, saslClient, eb,
|
||||||
new Socket(), km, accessToken);
|
new Socket(), km, accessToken);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn(
|
||||||
|
"Failed to move block:{} from src:{} to dest:{} to satisfy "
|
||||||
|
+ "storageType:{}, retry: {}",
|
||||||
|
blkMovingInfo.getBlock(), blkMovingInfo.getSource(),
|
||||||
|
blkMovingInfo.getTarget(), blkMovingInfo.getTargetStorageType(), retry, e);
|
||||||
|
retry++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return BlockMovementStatus.DN_BLK_STORAGE_MOVEMENT_FAILURE;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,46 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hdfs.server.sps;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used to inject certain faults for testing.
|
||||||
|
*/
|
||||||
|
public class ExternalSPSFaultInjector {
|
||||||
|
@VisibleForTesting
|
||||||
|
private static ExternalSPSFaultInjector instance =
|
||||||
|
new ExternalSPSFaultInjector();
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static ExternalSPSFaultInjector getInstance() {
|
||||||
|
return instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static void setInstance(ExternalSPSFaultInjector instance) {
|
||||||
|
ExternalSPSFaultInjector.instance = instance;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void mockAnException(int retry) throws IOException {
|
||||||
|
}
|
||||||
|
}
|
|
@ -5507,6 +5507,14 @@
|
||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.storage.policy.satisfier.move.task.retry.max.attempts</name>
|
||||||
|
<value>3</value>
|
||||||
|
<description>
|
||||||
|
Max retries for moving task to satisfy the block storage policy.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.storage.policy.satisfier.datanode.cache.refresh.interval.ms</name>
|
<name>dfs.storage.policy.satisfier.datanode.cache.refresh.interval.ms</name>
|
||||||
<value>300000</value>
|
<value>300000</value>
|
||||||
|
|
|
@ -33,6 +33,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KERBEROS_PRINCIPAL_KE
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_KEYTAB_FILE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SPS_MAX_OUTSTANDING_PATHS_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_ENABLED_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBEROS_PRINCIPAL_KEY;
|
||||||
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_DATA_TRANSFER_PROTECTION_KEY;
|
||||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
|
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY;
|
||||||
|
@ -130,6 +131,14 @@ public class TestExternalStoragePolicySatisfier {
|
||||||
private static final int DEFAULT_BLOCK_SIZE = 1024;
|
private static final int DEFAULT_BLOCK_SIZE = 1024;
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(TestExternalStoragePolicySatisfier.class);
|
LoggerFactory.getLogger(TestExternalStoragePolicySatisfier.class);
|
||||||
|
private final ExternalSPSFaultInjector injector = new ExternalSPSFaultInjector() {
|
||||||
|
@Override
|
||||||
|
public void mockAnException(int retry) throws IOException {
|
||||||
|
if (retry < DFS_STORAGE_POLICY_SATISFIER_MOVE_TASK_MAX_RETRY_ATTEMPTS_DEFAULT) {
|
||||||
|
throw new IOException("IO exception");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() {
|
public void setUp() {
|
||||||
|
@ -480,6 +489,20 @@ public class TestExternalStoragePolicySatisfier {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testWhenStoragePolicySetToCOLDWithException()
|
||||||
|
throws Exception {
|
||||||
|
|
||||||
|
try {
|
||||||
|
createCluster();
|
||||||
|
// Mock an IOException 3 times, and moving tasks should succeed finally.
|
||||||
|
ExternalSPSFaultInjector.setInstance(injector);
|
||||||
|
doTestWhenStoragePolicySetToCOLD();
|
||||||
|
} finally {
|
||||||
|
shutdownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void doTestWhenStoragePolicySetToCOLD() throws Exception {
|
private void doTestWhenStoragePolicySetToCOLD() throws Exception {
|
||||||
// Change policy to COLD
|
// Change policy to COLD
|
||||||
dfs.setStoragePolicy(new Path(FILE), COLD);
|
dfs.setStoragePolicy(new Path(FILE), COLD);
|
||||||
|
|
Loading…
Reference in New Issue