HDFS-11029. [SPS]:Provide retry mechanism for the blocks which were failed while moving its storage at DNs. Contributed by Uma Maheswara Rao G

This commit is contained in:
Rakesh Radhakrishnan 2016-11-10 10:09:45 +05:30 committed by Uma Maheswara Rao Gangumalla
parent 0f2d1ddc2c
commit 047526b4c2
4 changed files with 343 additions and 11 deletions

View File

@ -0,0 +1,221 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* A monitor class for checking whether block storage movements finished or not.
* If block storage movement results from datanode indicates about the movement
* success, then it will just remove the entries from tracking. If it reports
* failure, then it will add back to needed block storage movements list. If no
* DN reports about movement for longer time, then such items will be retries
* automatically after timeout. The default timeout would be 30mins.
*/
public class BlockStorageMovementAttemptedItems {
public static final Logger LOG =
LoggerFactory.getLogger(BlockStorageMovementAttemptedItems.class);
// A map holds the items which are already taken for blocks movements
// processing and sent to DNs.
private final Map<Long, Long> storageMovementAttemptedItems;
private final List<BlocksStorageMovementResult> storageMovementAttemptedResults;
private volatile boolean spsRunning = true;
private Daemon timerThread = null;
//
// It might take anywhere between 30 to 60 minutes before
// a request is timed out.
//
private long selfRetryTimeout = 30 * 60 * 1000;
//
// It might take anywhere between 5 to 10 minutes before
// a request is timed out.
//
private long checkTimeout = 5 * 60 * 1000; // minimum value
private BlockStorageMovementNeeded blockStorageMovementNeeded;
public BlockStorageMovementAttemptedItems(long timeoutPeriod,
long selfRetryTimeout,
BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) {
if (timeoutPeriod > 0) {
this.checkTimeout = Math.min(checkTimeout, timeoutPeriod);
}
this.selfRetryTimeout = selfRetryTimeout;
this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles;
storageMovementAttemptedItems = new HashMap<>();
storageMovementAttemptedResults = new ArrayList<>();
}
/**
* Add item to block storage movement attempted items map which holds the
* tracking/blockCollection id versus time stamp.
*
* @param blockCollectionID
* - tracking id / block collection id
*/
public void add(Long blockCollectionID) {
synchronized (storageMovementAttemptedItems) {
storageMovementAttemptedItems.put(blockCollectionID, monotonicNow());
}
}
/**
* Add the trackIDBlocksStorageMovementResults to
* storageMovementAttemptedResults.
*
* @param blksMovementResults
*/
public void addResults(BlocksStorageMovementResult[] blksMovementResults) {
if (blksMovementResults.length == 0) {
return;
}
synchronized (storageMovementAttemptedResults) {
storageMovementAttemptedResults
.addAll(Arrays.asList(blksMovementResults));
}
}
/**
* Starts the monitor thread.
*/
void start() {
timerThread = new Daemon(new BlocksStorageMovementAttemptResultMonitor());
timerThread.setName("BlocksStorageMovementAttemptResultMonitor");
timerThread.start();
}
/**
* Stops the monitor thread.
*/
public void stop() {
spsRunning = false;
}
/**
* A monitor class for checking block storage movement result and long waiting
* items periodically.
*/
private class BlocksStorageMovementAttemptResultMonitor implements Runnable {
@Override
public void run() {
while (spsRunning) {
try {
blockStorageMovementResultCheck();
blocksStorageMovementUnReportedItemsCheck();
Thread.sleep(checkTimeout);
} catch (InterruptedException ie) {
LOG.debug("BlocksStorageMovementAttemptResultMonitor thread "
+ "is interrupted.", ie);
}
}
}
private void blocksStorageMovementUnReportedItemsCheck() {
synchronized (storageMovementAttemptedItems) {
Iterator<Entry<Long, Long>> iter =
storageMovementAttemptedItems.entrySet().iterator();
long now = monotonicNow();
while (iter.hasNext()) {
Entry<Long, Long> entry = iter.next();
if (now > entry.getValue() + selfRetryTimeout) {
Long blockCollectionID = entry.getKey();
synchronized (storageMovementAttemptedResults) {
boolean exist = isExistInResult(blockCollectionID);
if (!exist) {
blockStorageMovementNeeded.add(blockCollectionID);
} else {
LOG.info("Blocks storage movement results for the"
+ " tracking id : " + blockCollectionID
+ " is reported from one of the co-ordinating datanode."
+ " So, the result will be processed soon.");
}
iter.remove();
}
}
}
}
}
private boolean isExistInResult(Long blockCollectionID) {
Iterator<BlocksStorageMovementResult> iter =
storageMovementAttemptedResults.iterator();
while (iter.hasNext()) {
BlocksStorageMovementResult storageMovementAttemptedResult =
iter.next();
if (storageMovementAttemptedResult.getTrackId() == blockCollectionID) {
return true;
}
}
return false;
}
private void blockStorageMovementResultCheck() {
synchronized (storageMovementAttemptedResults) {
Iterator<BlocksStorageMovementResult> iter =
storageMovementAttemptedResults.iterator();
while (iter.hasNext()) {
BlocksStorageMovementResult storageMovementAttemptedResult =
iter.next();
if (storageMovementAttemptedResult
.getStatus() == BlocksStorageMovementResult.Status.FAILURE) {
blockStorageMovementNeeded
.add(storageMovementAttemptedResult.getTrackId());
LOG.warn("Blocks storage movement results for the tracking id : "
+ storageMovementAttemptedResult.getTrackId()
+ " is reported from co-ordinating datanode, but result"
+ " status is FAILURE. So, added for retry");
} else {
synchronized (storageMovementAttemptedItems) {
storageMovementAttemptedItems
.remove(storageMovementAttemptedResult.getTrackId());
}
LOG.info("Blocks storage movement results for the tracking id : "
+ storageMovementAttemptedResult.getTrackId()
+ " is reported from co-ordinating datanode. "
+ "The result status is SUCCESS.");
}
iter.remove(); // remove from results as processed above
}
}
}
}
@VisibleForTesting
public int resultsCount() {
return storageMovementAttemptedResults.size();
}
}

View File

@ -69,6 +69,7 @@ public class StoragePolicySatisfier implements Runnable {
private final Namesystem namesystem;
private final BlockManager blockManager;
private final BlockStorageMovementNeeded storageMovementNeeded;
private final BlockStorageMovementAttemptedItems storageMovementsMonitor;
public StoragePolicySatisfier(final Namesystem namesystem,
final BlockStorageMovementNeeded storageMovementNeeded,
@ -76,15 +77,22 @@ public class StoragePolicySatisfier implements Runnable {
this.namesystem = namesystem;
this.storageMovementNeeded = storageMovementNeeded;
this.blockManager = blkManager;
// TODO: below selfRetryTimeout and checkTimeout can be configurable later
// Now, the default values of selfRetryTimeout and checkTimeout are 30mins
// and 5mins respectively
this.storageMovementsMonitor = new BlockStorageMovementAttemptedItems(
5 * 60 * 1000, 30 * 60 * 1000, storageMovementNeeded);
}
/**
* Start storage policy satisfier demon thread.
* Start storage policy satisfier demon thread. Also start block storage
* movements monitor for retry the attempts if needed.
*/
public void start() {
storagePolicySatisfierThread = new Daemon(this);
storagePolicySatisfierThread.setName("StoragePolicySatisfier");
storagePolicySatisfierThread.start();
this.storageMovementsMonitor.start();
}
/**
@ -99,6 +107,7 @@ public class StoragePolicySatisfier implements Runnable {
storagePolicySatisfierThread.join(3000);
} catch (InterruptedException ie) {
}
this.storageMovementsMonitor.stop();
}
@Override
@ -108,6 +117,7 @@ public class StoragePolicySatisfier implements Runnable {
Long blockCollectionID = storageMovementNeeded.get();
if (blockCollectionID != null) {
computeAndAssignStorageMismatchedBlocksToDNs(blockCollectionID);
this.storageMovementsMonitor.add(blockCollectionID);
}
// TODO: We can think to make this as configurable later, how frequently
// we want to check block movements.
@ -398,11 +408,6 @@ public class StoragePolicySatisfier implements Runnable {
}
}
// TODO: Temporarily keeping the results for assertion. This has to be
// revisited as part of HDFS-11029.
@VisibleForTesting
List<BlocksStorageMovementResult> results = new ArrayList<>();
/**
* Receives the movement results of collection of blocks associated to a
* trackId.
@ -415,6 +420,11 @@ public class StoragePolicySatisfier implements Runnable {
if (blksMovementResults.length <= 0) {
return;
}
results.addAll(Arrays.asList(blksMovementResults));
storageMovementsMonitor.addResults(blksMovementResults);
}
@VisibleForTesting
BlockStorageMovementAttemptedItems getAttemptedItemsMonitor() {
return storageMovementsMonitor;
}
}

View File

@ -0,0 +1,101 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.junit.Assert.*;
import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Tests that block storage movement attempt failures are reported from DN and
* processed them correctly or not.
*/
public class TestBlockStorageMovementAttemptedItems {
private BlockStorageMovementAttemptedItems bsmAttemptedItems = null;
private BlockStorageMovementNeeded unsatisfiedStorageMovementFiles = null;
@Before
public void setup() {
unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded();
bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, 500,
unsatisfiedStorageMovementFiles);
bsmAttemptedItems.start();
}
@After
public void teardown() {
if (bsmAttemptedItems != null) {
bsmAttemptedItems.stop();
}
}
private boolean checkItemMovedForRetry(Long item, long retryTimeout)
throws InterruptedException {
long stopTime = monotonicNow() + (retryTimeout * 2);
boolean isItemFound = false;
while (monotonicNow() < (stopTime)) {
Long ele = null;
while ((ele = unsatisfiedStorageMovementFiles.get()) != null) {
if (item.longValue() == ele.longValue()) {
isItemFound = true;
break;
}
}
if (!isItemFound) {
Thread.sleep(100);
} else {
break;
}
}
return isItemFound;
}
@Test(timeout = 30000)
public void testAddResultWithFailureResult() throws Exception {
Long item = new Long(1234);
bsmAttemptedItems.add(item);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.FAILURE)});
assertTrue(checkItemMovedForRetry(item, 200));
}
@Test(timeout = 30000)
public void testAddResultWithSucessResult() throws Exception {
Long item = new Long(1234);
bsmAttemptedItems.add(item);
bsmAttemptedItems.addResults(
new BlocksStorageMovementResult[]{new BlocksStorageMovementResult(
item.longValue(), BlocksStorageMovementResult.Status.SUCCESS)});
assertFalse(checkItemMovedForRetry(item, 200));
}
@Test(timeout = 30000)
public void testNoResultAdded() throws Exception {
Long item = new Long(1234);
bsmAttemptedItems.add(item);
// After selfretry timeout, it should be added back for retry
assertTrue(checkItemMovedForRetry(item, 600));
}
}

View File

@ -174,8 +174,6 @@ public class TestStoragePolicySatisfier {
waitExpectedStorageType(file, StorageType.SSD, distributedFS, 1, 30000);
waitExpectedStorageType(file, StorageType.DISK, distributedFS, 2, 30000);
// TODO: Temporarily using the results from StoragePolicySatisfier class.
// This has to be revisited as part of HDFS-11029.
waitForBlocksMovementResult(1, 30000);
} finally {
hdfsCluster.shutdown();
@ -190,8 +188,10 @@ public class TestStoragePolicySatisfier {
@Override
public Boolean get() {
LOG.info("expectedResultsCount={} actualResultsCount={}",
expectedResultsCount, sps.results.size());
return expectedResultsCount == sps.results.size();
expectedResultsCount,
sps.getAttemptedItemsMonitor().resultsCount());
return expectedResultsCount == sps.getAttemptedItemsMonitor()
.resultsCount();
}
}, 100, timeout);
}