HDFS-16601. DataTransfer should throw Exception to client
This commit is contained in:
parent
c37f01d95b
commit
6eb9db3ac1
|
@ -2873,6 +2873,8 @@ public class DataNode extends ReconfigurableBase
|
||||||
/** Throttle to block replication when data transfers or writes. */
|
/** Throttle to block replication when data transfers or writes. */
|
||||||
private DataTransferThrottler throttler;
|
private DataTransferThrottler throttler;
|
||||||
|
|
||||||
|
private IOException lastException = null;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect to the first item in the target list. Pass along the
|
* Connect to the first item in the target list. Pass along the
|
||||||
* entire target list, the block, and the data.
|
* entire target list, the block, and the data.
|
||||||
|
@ -2917,6 +2919,7 @@ public class DataNode extends ReconfigurableBase
|
||||||
final boolean isClient = clientname.length() > 0;
|
final boolean isClient = clientname.length() > 0;
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
DataNodeFaultInjector.get().failTransfer(id);
|
||||||
final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname);
|
final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname);
|
||||||
InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr);
|
InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr);
|
||||||
LOG.debug("Connecting to datanode {}", dnAddr);
|
LOG.debug("Connecting to datanode {}", dnAddr);
|
||||||
|
@ -2989,8 +2992,10 @@ public class DataNode extends ReconfigurableBase
|
||||||
handleBadBlock(b, ie, false);
|
handleBadBlock(b, ie, false);
|
||||||
LOG.warn("{}:Failed to transfer {} to {} got",
|
LOG.warn("{}:Failed to transfer {} to {} got",
|
||||||
bpReg, b, targets[0], ie);
|
bpReg, b, targets[0], ie);
|
||||||
|
lastException = ie;
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.error("Failed to transfer block {}", b, t);
|
LOG.error("Failed to transfer block {}", b, t);
|
||||||
|
lastException = new IOException("Failed to transfer block " + b, t);
|
||||||
} finally {
|
} finally {
|
||||||
decrementXmitsInProgress();
|
decrementXmitsInProgress();
|
||||||
IOUtils.closeStream(blockSender);
|
IOUtils.closeStream(blockSender);
|
||||||
|
@ -3000,6 +3005,10 @@ public class DataNode extends ReconfigurableBase
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public IOException getLastException() {
|
||||||
|
return this.lastException;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return "DataTransfer " + b + " to " + Arrays.asList(targets);
|
return "DataTransfer " + b + " to " + Arrays.asList(targets);
|
||||||
|
@ -3507,6 +3516,9 @@ public class DataNode extends ReconfigurableBase
|
||||||
throw new IOException("Pipeline recovery for " + b + " is interrupted.",
|
throw new IOException("Pipeline recovery for " + b + " is interrupted.",
|
||||||
e);
|
e);
|
||||||
}
|
}
|
||||||
|
if (dataTransferTask.getLastException() != null) {
|
||||||
|
throw dataTransferTask.getLastException();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
|
||||||
import org.apache.hadoop.classification.VisibleForTesting;
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -95,6 +96,8 @@ public class DataNodeFaultInjector {
|
||||||
|
|
||||||
public void noRegistration() throws IOException { }
|
public void noRegistration() throws IOException { }
|
||||||
|
|
||||||
|
public void failTransfer(DatanodeID sourceDNId) throws IOException { }
|
||||||
|
|
||||||
public void failMirrorConnection() throws IOException { }
|
public void failMirrorConnection() throws IOException { }
|
||||||
|
|
||||||
public void failPipeline(ReplicaInPipeline replicaInfo,
|
public void failPipeline(ReplicaInPipeline replicaInfo,
|
||||||
|
|
|
@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
|
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.BlockWrite;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
@ -761,6 +762,131 @@ public class TestClientProtocolForPipelineRecovery {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testPipelineRecoveryWithFailedTransferBlock() throws Exception {
|
||||||
|
final int chunkSize = 512;
|
||||||
|
final int oneWriteSize = 5000;
|
||||||
|
final int totalSize = 1024 * 1024;
|
||||||
|
final int errorInjectionPos = 512;
|
||||||
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
// Need 5 datanodes to verify the replaceDatanode during pipeline recovery
|
||||||
|
final MiniDFSCluster cluster =
|
||||||
|
new MiniDFSCluster.Builder(conf).numDataNodes(5).build();
|
||||||
|
DataNodeFaultInjector old = DataNodeFaultInjector.get();
|
||||||
|
|
||||||
|
try {
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
Path fileName = new Path("/f");
|
||||||
|
FSDataOutputStream o = fs.create(fileName);
|
||||||
|
int count = 0;
|
||||||
|
// Flush to get the pipeline created.
|
||||||
|
o.writeBytes("hello");
|
||||||
|
o.hflush();
|
||||||
|
DFSOutputStream dfsO = (DFSOutputStream) o.getWrappedStream();
|
||||||
|
final DatanodeInfo[] pipeline = dfsO.getStreamer().getNodes();
|
||||||
|
final String firstDn = pipeline[0].getXferAddr(false);
|
||||||
|
final String secondDn = pipeline[1].getXferAddr(false);
|
||||||
|
final AtomicBoolean pipelineFailed = new AtomicBoolean(false);
|
||||||
|
final AtomicBoolean transferFailed = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
DataNodeFaultInjector.set(new DataNodeFaultInjector() {
|
||||||
|
@Override
|
||||||
|
public void failPipeline(ReplicaInPipeline replicaInfo,
|
||||||
|
String mirror) throws IOException {
|
||||||
|
if (!secondDn.equals(mirror)) {
|
||||||
|
// Only fail for first DN
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (!pipelineFailed.get() &&
|
||||||
|
(replicaInfo.getBytesAcked() > errorInjectionPos) &&
|
||||||
|
(replicaInfo.getBytesAcked() % chunkSize != 0)) {
|
||||||
|
int count = 0;
|
||||||
|
while (count < 10) {
|
||||||
|
// Fail the pipeline (Throw exception) when:
|
||||||
|
// 1. bytsAcked is not at chunk boundary (checked in the if
|
||||||
|
// statement above)
|
||||||
|
// 2. bytesOnDisk is bigger than bytesAcked and at least
|
||||||
|
// reaches (or go beyond) the end of the chunk that
|
||||||
|
// bytesAcked is in (checked in the if statement below).
|
||||||
|
// At this condition, transferBlock that happens during
|
||||||
|
// pipeline recovery would transfer extra bytes to make up to the
|
||||||
|
// end of the chunk. And this is when the block corruption
|
||||||
|
// described in HDFS-4660 would occur.
|
||||||
|
if ((replicaInfo.getBytesOnDisk() / chunkSize) -
|
||||||
|
(replicaInfo.getBytesAcked() / chunkSize) >= 1) {
|
||||||
|
pipelineFailed.set(true);
|
||||||
|
throw new IOException(
|
||||||
|
"Failing Pipeline " + replicaInfo.getBytesAcked() + " : "
|
||||||
|
+ replicaInfo.getBytesOnDisk());
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
Thread.sleep(200);
|
||||||
|
} catch (InterruptedException ignored) {
|
||||||
|
}
|
||||||
|
count++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void failTransfer(DatanodeID sourceDNId) throws IOException {
|
||||||
|
if (sourceDNId.getXferAddr().equals(firstDn)) {
|
||||||
|
transferFailed.set(true);
|
||||||
|
throw new IOException(
|
||||||
|
"Failing Transfer from " + sourceDNId.getXferAddr());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Random r = new Random();
|
||||||
|
byte[] b = new byte[oneWriteSize];
|
||||||
|
while (count < totalSize) {
|
||||||
|
r.nextBytes(b);
|
||||||
|
o.write(b);
|
||||||
|
count += oneWriteSize;
|
||||||
|
o.hflush();
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("Expected a failure in the pipeline", pipelineFailed.get());
|
||||||
|
assertTrue("Expected a failure in the transfer block", transferFailed.get());
|
||||||
|
DatanodeInfo[] newNodes = dfsO.getStreamer().getNodes();
|
||||||
|
o.close();
|
||||||
|
// Trigger block report to NN
|
||||||
|
for (DataNode d: cluster.getDataNodes()) {
|
||||||
|
DataNodeTestUtils.triggerBlockReport(d);
|
||||||
|
}
|
||||||
|
// Read from the replaced datanode to verify the corruption. So shutdown
|
||||||
|
// all other nodes in the pipeline.
|
||||||
|
List<DatanodeInfo> pipelineList = Arrays.asList(pipeline);
|
||||||
|
DatanodeInfo newNode = null;
|
||||||
|
for (DatanodeInfo node : newNodes) {
|
||||||
|
if (!pipelineList.contains(node)) {
|
||||||
|
newNode = node;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assert newNode != null;
|
||||||
|
LOG.info("Number of nodes in pipeline: {} newNode {}",
|
||||||
|
newNodes.length, newNode.getName());
|
||||||
|
// shutdown old 2 nodes
|
||||||
|
for (DatanodeInfo node : newNodes) {
|
||||||
|
if (node.getName().equals(newNode.getName())) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LOG.info("shutdown {}", node.getName());
|
||||||
|
cluster.stopDataNode(node.getName());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read should be successfull from only the newNode. There should not be
|
||||||
|
// any corruption reported.
|
||||||
|
DFSTestUtil.readFile(fs, fileName);
|
||||||
|
} finally {
|
||||||
|
DataNodeFaultInjector.set(old);
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUpdatePipeLineAfterDNReg()throws Exception {
|
public void testUpdatePipeLineAfterDNReg()throws Exception {
|
||||||
Configuration conf = new HdfsConfiguration();
|
Configuration conf = new HdfsConfiguration();
|
||||||
|
|
Loading…
Reference in New Issue