HDFS-10240. Race between close/recoverLease leads to missing block. Contributed by Jinglun, zhouyingchao and Wei-Chiu Chuang.
This commit is contained in:
parent
d42806160e
commit
1290e3c647
|
@ -262,6 +262,10 @@ public abstract class BlockInfo extends Block
|
||||||
return getBlockUCState().equals(BlockUCState.COMPLETE);
|
return getBlockUCState().equals(BlockUCState.COMPLETE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean isUnderRecovery() {
|
||||||
|
return getBlockUCState().equals(BlockUCState.UNDER_RECOVERY);
|
||||||
|
}
|
||||||
|
|
||||||
public final boolean isCompleteOrCommitted() {
|
public final boolean isCompleteOrCommitted() {
|
||||||
final BlockUCState state = getBlockUCState();
|
final BlockUCState state = getBlockUCState();
|
||||||
return state.equals(BlockUCState.COMPLETE) ||
|
return state.equals(BlockUCState.COMPLETE) ||
|
||||||
|
|
|
@ -985,6 +985,10 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
return false; // no blocks in file yet
|
return false; // no blocks in file yet
|
||||||
if(lastBlock.isComplete())
|
if(lastBlock.isComplete())
|
||||||
return false; // already completed (e.g. by syncBlock)
|
return false; // already completed (e.g. by syncBlock)
|
||||||
|
if(lastBlock.isUnderRecovery()) {
|
||||||
|
throw new IOException("Commit or complete block " + commitBlock +
|
||||||
|
", whereas it is under recovery.");
|
||||||
|
}
|
||||||
|
|
||||||
final boolean committed = commitBlock(lastBlock, commitBlock);
|
final boolean committed = commitBlock(lastBlock, commitBlock);
|
||||||
if (committed && lastBlock.isStriped()) {
|
if (committed && lastBlock.isStriped()) {
|
||||||
|
|
|
@ -684,7 +684,8 @@ class BPServiceActor implements Runnable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (ibrManager.sendImmediately() || sendHeartbeat) {
|
if (!dn.areIBRDisabledForTests() &&
|
||||||
|
(ibrManager.sendImmediately()|| sendHeartbeat)) {
|
||||||
ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
ibrManager.sendIBRs(bpNamenode, bpRegistration,
|
||||||
bpos.getBlockPoolId());
|
bpos.getBlockPoolId());
|
||||||
}
|
}
|
||||||
|
|
|
@ -331,6 +331,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
ThreadGroup threadGroup = null;
|
ThreadGroup threadGroup = null;
|
||||||
private DNConf dnConf;
|
private DNConf dnConf;
|
||||||
private volatile boolean heartbeatsDisabledForTests = false;
|
private volatile boolean heartbeatsDisabledForTests = false;
|
||||||
|
private volatile boolean ibrDisabledForTests = false;
|
||||||
private volatile boolean cacheReportsDisabledForTests = false;
|
private volatile boolean cacheReportsDisabledForTests = false;
|
||||||
private DataStorage storage = null;
|
private DataStorage storage = null;
|
||||||
|
|
||||||
|
@ -1334,6 +1335,15 @@ public class DataNode extends ReconfigurableBase
|
||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
void setIBRDisabledForTest(boolean disabled) {
|
||||||
|
this.ibrDisabledForTests = disabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
boolean areIBRDisabledForTests() {
|
||||||
|
return this.ibrDisabledForTests;
|
||||||
|
}
|
||||||
|
|
||||||
void setCacheReportsDisabledForTest(boolean disabled) {
|
void setCacheReportsDisabledForTest(boolean disabled) {
|
||||||
this.cacheReportsDisabledForTests = disabled;
|
this.cacheReportsDisabledForTests = disabled;
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,6 +25,7 @@ import static org.mockito.Mockito.doNothing;
|
||||||
import static org.mockito.Mockito.spy;
|
import static org.mockito.Mockito.spy;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
@ -163,6 +164,70 @@ public class TestLeaseRecovery2 {
|
||||||
verifyFile(dfs, filepath1, actual, size);
|
verifyFile(dfs, filepath1, actual, size);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCloseWhileRecoverLease() throws Exception {
|
||||||
|
// test recoverLease
|
||||||
|
// set the soft limit to be 1 hour but recoverLease should
|
||||||
|
// close the file immediately
|
||||||
|
cluster.setLeasePeriod(LONG_LEASE_PERIOD, LONG_LEASE_PERIOD);
|
||||||
|
int size = AppendTestUtil.nextInt(FILE_SIZE);
|
||||||
|
String filestr = "/testCloseWhileRecoverLease";
|
||||||
|
|
||||||
|
AppendTestUtil.LOG.info("filestr=" + filestr);
|
||||||
|
Path filepath = new Path(filestr);
|
||||||
|
FSDataOutputStream stm = dfs.create(filepath, true, BUF_SIZE,
|
||||||
|
REPLICATION_NUM, BLOCK_SIZE);
|
||||||
|
assertTrue(dfs.dfs.exists(filestr));
|
||||||
|
|
||||||
|
// hflush file
|
||||||
|
AppendTestUtil.LOG.info("hflush");
|
||||||
|
stm.hflush();
|
||||||
|
|
||||||
|
// Pause DN block report.
|
||||||
|
// Let client recover lease, and then close the file, and then let DN
|
||||||
|
// report blocks.
|
||||||
|
ArrayList<DataNode> dataNodes = cluster.getDataNodes();
|
||||||
|
for (DataNode dn: dataNodes) {
|
||||||
|
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("pause IBR");
|
||||||
|
for (DataNode dn: dataNodes) {
|
||||||
|
DataNodeTestUtils.pauseIBR(dn);
|
||||||
|
}
|
||||||
|
|
||||||
|
AppendTestUtil.LOG.info("size=" + size);
|
||||||
|
stm.write(buffer, 0, size);
|
||||||
|
|
||||||
|
// hflush file
|
||||||
|
AppendTestUtil.LOG.info("hflush");
|
||||||
|
stm.hflush();
|
||||||
|
|
||||||
|
LOG.info("recover lease");
|
||||||
|
dfs.recoverLease(filepath);
|
||||||
|
try {
|
||||||
|
stm.close();
|
||||||
|
fail("close() should fail because the file is under recovery.");
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"whereas it is under recovery", ioe);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (DataNode dn: dataNodes) {
|
||||||
|
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("trigger heartbeats");
|
||||||
|
// resume DN block report
|
||||||
|
for (DataNode dn: dataNodes) {
|
||||||
|
DataNodeTestUtils.triggerHeartbeat(dn);
|
||||||
|
}
|
||||||
|
|
||||||
|
stm.close();
|
||||||
|
assertEquals(cluster.getNamesystem().getBlockManager().
|
||||||
|
getMissingBlocksCount(), 0);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testLeaseRecoverByAnotherUser() throws Exception {
|
public void testLeaseRecoverByAnotherUser() throws Exception {
|
||||||
byte [] actual = new byte[FILE_SIZE];
|
byte [] actual = new byte[FILE_SIZE];
|
||||||
|
|
|
@ -98,6 +98,9 @@ public class DataNodeTestUtils {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static void pauseIBR(DataNode dn) {
|
||||||
|
dn.setIBRDisabledForTest(true);
|
||||||
|
}
|
||||||
public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
|
public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
|
||||||
DataNode dn, DatanodeID datanodeid, final Configuration conf,
|
DataNode dn, DatanodeID datanodeid, final Configuration conf,
|
||||||
boolean connectToDnViaHostname) throws IOException {
|
boolean connectToDnViaHostname) throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue