HDFS-16268. Balancer stuck when moving striped blocks due to NPE (#3546)
(cherry picked from commit 7279fe8661
)
This commit is contained in:
parent
292527b325
commit
b4fc326acf
|
@ -238,7 +238,7 @@ public class Dispatcher {
|
||||||
private DDatanode proxySource;
|
private DDatanode proxySource;
|
||||||
private StorageGroup target;
|
private StorageGroup target;
|
||||||
|
|
||||||
private PendingMove(Source source, StorageGroup target) {
|
PendingMove(Source source, StorageGroup target) {
|
||||||
this.source = source;
|
this.source = source;
|
||||||
this.target = target;
|
this.target = target;
|
||||||
}
|
}
|
||||||
|
@ -279,12 +279,18 @@ public class Dispatcher {
|
||||||
/**
|
/**
|
||||||
* @return true if the given block is good for the tentative move.
|
* @return true if the given block is good for the tentative move.
|
||||||
*/
|
*/
|
||||||
private boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType) {
|
boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType) {
|
||||||
synchronized (block) {
|
synchronized (block) {
|
||||||
synchronized (movedBlocks) {
|
synchronized (movedBlocks) {
|
||||||
if (isGoodBlockCandidate(source, target, targetStorageType, block)) {
|
if (isGoodBlockCandidate(source, target, targetStorageType, block)) {
|
||||||
if (block instanceof DBlockStriped) {
|
if (block instanceof DBlockStriped) {
|
||||||
reportedBlock = ((DBlockStriped) block).getInternalBlock(source);
|
reportedBlock = ((DBlockStriped) block).getInternalBlock(source);
|
||||||
|
if (reportedBlock == null) {
|
||||||
|
LOG.info(
|
||||||
|
"No striped internal block on source {}, block {}. Skipping.",
|
||||||
|
source, block);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
reportedBlock = block;
|
reportedBlock = block;
|
||||||
}
|
}
|
||||||
|
@ -496,7 +502,7 @@ public class Dispatcher {
|
||||||
this.cellSize = cellSize;
|
this.cellSize = cellSize;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DBlock getInternalBlock(StorageGroup storage) {
|
DBlock getInternalBlock(StorageGroup storage) {
|
||||||
int idxInLocs = locations.indexOf(storage);
|
int idxInLocs = locations.indexOf(storage);
|
||||||
if (idxInLocs == -1) {
|
if (idxInLocs == -1) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -515,7 +521,11 @@ public class Dispatcher {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public long getNumBytes(StorageGroup storage) {
|
public long getNumBytes(StorageGroup storage) {
|
||||||
return getInternalBlock(storage).getNumBytes();
|
DBlock block = getInternalBlock(storage);
|
||||||
|
if (block == null) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return block.getNumBytes();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1303,7 +1313,7 @@ public class Dispatcher {
|
||||||
* 2. the block does not have a replica/internalBlock on the target;
|
* 2. the block does not have a replica/internalBlock on the target;
|
||||||
* 3. doing the move does not reduce the number of racks that the block has
|
* 3. doing the move does not reduce the number of racks that the block has
|
||||||
*/
|
*/
|
||||||
private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
|
boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
|
||||||
StorageType targetStorageType, DBlock block) {
|
StorageType targetStorageType, DBlock block) {
|
||||||
if (source.equals(target)) {
|
if (source.equals(target)) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -41,11 +41,16 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_WEB_AUTHENTICATION_KERBER
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
import org.junit.AfterClass;
|
import org.junit.AfterClass;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Mockito.any;
|
import static org.mockito.Mockito.any;
|
||||||
import static org.mockito.Mockito.anyLong;
|
import static org.mockito.Mockito.anyLong;
|
||||||
import static org.mockito.Mockito.doAnswer;
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -1614,11 +1619,39 @@ public class TestBalancer {
|
||||||
// verify locations of striped blocks
|
// verify locations of striped blocks
|
||||||
locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
|
locatedBlocks = client.getBlockLocations(fileName, 0, fileLen);
|
||||||
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
|
StripedFileTestUtil.verifyLocatedStripedBlocks(locatedBlocks, groupSize);
|
||||||
|
|
||||||
|
// Test handling NPE with striped blocks
|
||||||
|
testNullStripedBlocks(conf);
|
||||||
|
|
||||||
} finally {
|
} finally {
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void testNullStripedBlocks(Configuration conf) throws IOException {
|
||||||
|
NameNodeConnector nnc = NameNodeConnector.newNameNodeConnectors(
|
||||||
|
DFSUtil.getInternalNsRpcUris(conf),
|
||||||
|
Balancer.class.getSimpleName(), Balancer.BALANCER_ID_PATH, conf,
|
||||||
|
BalancerParameters.DEFAULT.getMaxIdleIteration()).get(0);
|
||||||
|
Dispatcher dispatcher = new Dispatcher(nnc, Collections.emptySet(),
|
||||||
|
Collections.<String> emptySet(), 1, 1, 0,
|
||||||
|
1, 1, conf);
|
||||||
|
Dispatcher spyDispatcher = spy(dispatcher);
|
||||||
|
Dispatcher.PendingMove move = spyDispatcher.new PendingMove(
|
||||||
|
mock(Dispatcher.Source.class),
|
||||||
|
mock(Dispatcher.DDatanode.StorageGroup.class));
|
||||||
|
Dispatcher.DBlockStriped block = mock(Dispatcher.DBlockStriped.class);
|
||||||
|
|
||||||
|
doReturn(null).when(block).getInternalBlock(any());
|
||||||
|
doReturn(true)
|
||||||
|
.when(spyDispatcher)
|
||||||
|
.isGoodBlockCandidate(any(), any(), any(), any());
|
||||||
|
|
||||||
|
when(move.markMovedIfGoodBlock(block, DEFAULT)).thenCallRealMethod();
|
||||||
|
|
||||||
|
assertFalse(move.markMovedIfGoodBlock(block, DEFAULT));
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test Balancer runs fine when logging in with a keytab in kerberized env.
|
* Test Balancer runs fine when logging in with a keytab in kerberized env.
|
||||||
* Reusing testUnknownDatanode here for basic functionality testing.
|
* Reusing testUnknownDatanode here for basic functionality testing.
|
||||||
|
|
Loading…
Reference in New Issue