HDFS-2791. If block report races with closing of file, replica is incorrectly marked corrupt. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1236944 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
56b07f1ab2
commit
8f9e2627dc
|
@ -190,6 +190,9 @@ Release 0.23.1 - UNRELEASED
|
||||||
|
|
||||||
HDFS-2840. TestHostnameFilter should work with localhost or localhost.localdomain (tucu)
|
HDFS-2840. TestHostnameFilter should work with localhost or localhost.localdomain (tucu)
|
||||||
|
|
||||||
|
HDFS-2791. If block report races with closing of file, replica is
|
||||||
|
incorrectly marked corrupt. (todd)
|
||||||
|
|
||||||
Release 0.23.0 - 2011-11-01
|
Release 0.23.0 - 2011-11-01
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -1569,7 +1569,24 @@ public class BlockManager {
|
||||||
}
|
}
|
||||||
case RBW:
|
case RBW:
|
||||||
case RWR:
|
case RWR:
|
||||||
return storedBlock.isComplete();
|
if (!storedBlock.isComplete()) {
|
||||||
|
return false;
|
||||||
|
} else if (storedBlock.getGenerationStamp() != iblk.getGenerationStamp()) {
|
||||||
|
return true;
|
||||||
|
} else { // COMPLETE block, same genstamp
|
||||||
|
if (reportedState == ReplicaState.RBW) {
|
||||||
|
// If it's a RBW report for a COMPLETE block, it may just be that
|
||||||
|
// the block report got a little bit delayed after the pipeline
|
||||||
|
// closed. So, ignore this report, assuming we will get a
|
||||||
|
// FINALIZED replica later. See HDFS-2791
|
||||||
|
LOG.info("Received an RBW replica for block " + storedBlock +
|
||||||
|
" on " + dn.getName() + ": ignoring it, since the block is " +
|
||||||
|
"complete with the same generation stamp.");
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
case RUR: // should not be reported
|
case RUR: // should not be reported
|
||||||
case TEMPORARY: // should not be reported
|
case TEMPORARY: // should not be reported
|
||||||
default:
|
default:
|
||||||
|
|
|
@ -763,4 +763,13 @@ class BPOfferService implements Runnable {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
DatanodeProtocol getBpNamenode() {
|
||||||
|
return bpNamenode;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setBpNamenode(DatanodeProtocol bpNamenode) {
|
||||||
|
this.bpNamenode = bpNamenode;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -101,7 +101,7 @@ public class AppendTestUtil {
|
||||||
return DFSTestUtil.getFileSystemAs(ugi, conf);
|
return DFSTestUtil.getFileSystemAs(ugi, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void write(OutputStream out, int offset, int length) throws IOException {
|
public static void write(OutputStream out, int offset, int length) throws IOException {
|
||||||
final byte[] bytes = new byte[length];
|
final byte[] bytes = new byte[length];
|
||||||
for(int i = 0; i < length; i++) {
|
for(int i = 0; i < length; i++) {
|
||||||
bytes[i] = (byte)(offset + i);
|
bytes[i] = (byte)(offset + i);
|
||||||
|
|
|
@ -17,6 +17,15 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* WARNING!! This is TEST ONLY class: it never has to be used
|
* WARNING!! This is TEST ONLY class: it never has to be used
|
||||||
* for ANY development purposes.
|
* for ANY development purposes.
|
||||||
|
@ -42,4 +51,34 @@ public class DataNodeAdapter {
|
||||||
boolean heartbeatsDisabledForTests) {
|
boolean heartbeatsDisabledForTests) {
|
||||||
dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests);
|
dn.setHeartbeatsDisabledForTests(heartbeatsDisabledForTests);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Insert a Mockito spy object between the given DataNode and
|
||||||
|
* the given NameNode. This can be used to delay or wait for
|
||||||
|
* RPC calls on the datanode->NN path.
|
||||||
|
*/
|
||||||
|
public static DatanodeProtocol spyOnBposToNN(
|
||||||
|
DataNode dn, NameNode nn) {
|
||||||
|
String bpid = nn.getNamesystem().getBlockPoolId();
|
||||||
|
|
||||||
|
BPOfferService bpos = null;
|
||||||
|
for (BPOfferService thisBpos : dn.getAllBpOs()) {
|
||||||
|
if (thisBpos.getBlockPoolId().equals(bpid)) {
|
||||||
|
bpos = thisBpos;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Preconditions.checkArgument(bpos != null,
|
||||||
|
"No such bpid: %s", bpid);
|
||||||
|
|
||||||
|
// When protobufs are merged, the following can be converted
|
||||||
|
// to a simple spy. Because you can't spy on proxy objects,
|
||||||
|
// we have to use the DelegateAnswer trick.
|
||||||
|
DatanodeProtocol origNN = bpos.getBpNamenode();
|
||||||
|
DatanodeProtocol spy = Mockito.mock(DatanodeProtocol.class,
|
||||||
|
new GenericTestUtils.DelegateAnswer(origNN));
|
||||||
|
|
||||||
|
bpos.setBpNamenode(spy);
|
||||||
|
return spy;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,7 +21,9 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.AppendTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
@ -35,14 +37,19 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FilenameFilter;
|
import java.io.FilenameFilter;
|
||||||
|
@ -50,6 +57,7 @@ import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This test simulates a variety of situations when blocks are being
|
* This test simulates a variety of situations when blocks are being
|
||||||
|
@ -492,6 +500,84 @@ public class TestBlockReport {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test for the case where one of the DNs in the pipeline is in the
|
||||||
|
* process of doing a block report exactly when the block is closed.
|
||||||
|
* In this case, the block report becomes delayed until after the
|
||||||
|
* block is marked completed on the NN, and hence it reports an RBW
|
||||||
|
* replica for a COMPLETE block. Such a report should not be marked
|
||||||
|
* corrupt.
|
||||||
|
* This is a regression test for HDFS-2791.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testOneReplicaRbwReportArrivesAfterBlockCompleted() throws Exception {
|
||||||
|
final CountDownLatch brFinished = new CountDownLatch(1);
|
||||||
|
DelayAnswer delayer = new GenericTestUtils.DelayAnswer(LOG) {
|
||||||
|
@Override
|
||||||
|
protected Object passThrough(InvocationOnMock invocation)
|
||||||
|
throws Throwable {
|
||||||
|
try {
|
||||||
|
return super.passThrough(invocation);
|
||||||
|
} finally {
|
||||||
|
// inform the test that our block report went through.
|
||||||
|
brFinished.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||||
|
Path filePath = new Path("/" + METHOD_NAME + ".dat");
|
||||||
|
|
||||||
|
// Start a second DN for this test -- we're checking
|
||||||
|
// what happens when one of the DNs is slowed for some reason.
|
||||||
|
REPL_FACTOR = 2;
|
||||||
|
startDNandWait(null, false);
|
||||||
|
|
||||||
|
NameNode nn = cluster.getNameNode();
|
||||||
|
|
||||||
|
FSDataOutputStream out = fs.create(filePath, REPL_FACTOR);
|
||||||
|
try {
|
||||||
|
AppendTestUtil.write(out, 0, 10);
|
||||||
|
out.hflush();
|
||||||
|
|
||||||
|
// Set up a spy so that we can delay the block report coming
|
||||||
|
// from this node.
|
||||||
|
DataNode dn = cluster.getDataNodes().get(0);
|
||||||
|
DatanodeProtocol spy =
|
||||||
|
DataNodeAdapter.spyOnBposToNN(dn, nn);
|
||||||
|
|
||||||
|
Mockito.doAnswer(delayer)
|
||||||
|
.when(spy).blockReport(
|
||||||
|
Mockito.<DatanodeRegistration>anyObject(),
|
||||||
|
Mockito.anyString(),
|
||||||
|
Mockito.<long[]>anyObject());
|
||||||
|
|
||||||
|
// Force a block report to be generated. The block report will have
|
||||||
|
// an RBW replica in it. Wait for the RPC to be sent, but block
|
||||||
|
// it before it gets to the NN.
|
||||||
|
dn.scheduleAllBlockReport(0);
|
||||||
|
delayer.waitForCall();
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
IOUtils.closeStream(out);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Now that the stream is closed, the NN will have the block in COMPLETE
|
||||||
|
// state.
|
||||||
|
delayer.proceed();
|
||||||
|
brFinished.await();
|
||||||
|
|
||||||
|
// Verify that no replicas are marked corrupt, and that the
|
||||||
|
// file is still readable.
|
||||||
|
BlockManagerTestUtil.updateState(nn.getNamesystem().getBlockManager());
|
||||||
|
assertEquals(0, nn.getNamesystem().getCorruptReplicaBlocks());
|
||||||
|
DFSTestUtil.readFile(fs, filePath);
|
||||||
|
|
||||||
|
// Ensure that the file is readable even from the DN that we futzed with.
|
||||||
|
cluster.stopDataNode(1);
|
||||||
|
DFSTestUtil.readFile(fs, filePath);
|
||||||
|
}
|
||||||
|
|
||||||
private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
|
private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
|
||||||
final boolean tooLongWait = false;
|
final boolean tooLongWait = false;
|
||||||
final int TIMEOUT = 40000;
|
final int TIMEOUT = 40000;
|
||||||
|
|
Loading…
Reference in New Issue