HDFS-11708. Positional read will fail if replicas moved to different DNs after stream is opened. Contributed by Vinayakumar B.

This commit is contained in:
Vinayakumar B 2017-06-07 10:11:23 +05:30
parent 1869e1771c
commit 70fc6746b3
5 changed files with 221 additions and 51 deletions

View File

@ -559,6 +559,8 @@ public class DFSInputStream extends FSInputStream
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
StorageType storageType = retval.storageType;
// Latest block if refreshed by chooseDatanode()
targetBlock = retval.block;
try {
blockReader = getBlockReader(targetBlock, offsetIntoBlock,
@ -915,7 +917,7 @@ public class DFSInputStream extends FSInputStream
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr, storageType);
return new DNAddrPair(chosenNode, targetAddr, storageType, block);
}
/**
@ -957,12 +959,13 @@ public class DFSInputStream extends FSInputStream
protected void fetchBlockByteRange(LocatedBlock block, long start, long end,
ByteBuffer buf, CorruptedBlocks corruptedBlocks)
throws IOException {
block = refreshLocatedBlock(block);
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
// Latest block, if refreshed internally
block = addressPair.block;
try {
actualGetFromOneDataNode(addressPair, block, start, end,
buf, corruptedBlocks);
actualGetFromOneDataNode(addressPair, start, end, buf,
corruptedBlocks);
return;
} catch (IOException e) {
checkInterrupted(e); // check if the read has been interrupted
@ -983,8 +986,7 @@ public class DFSInputStream extends FSInputStream
public ByteBuffer call() throws Exception {
try (TraceScope ignored = dfsClient.getTracer().
newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
actualGetFromOneDataNode(datanode, block, start, end, bb,
corruptedBlocks);
actualGetFromOneDataNode(datanode, start, end, bb, corruptedBlocks);
return bb;
}
}
@ -995,27 +997,21 @@ public class DFSInputStream extends FSInputStream
* Read data from one DataNode.
*
* @param datanode the datanode from which to read data
* @param block the located block containing the requested data
* @param startInBlk the startInBlk offset of the block
* @param endInBlk the endInBlk offset of the block
* @param buf the given byte buffer into which the data is read
* @param corruptedBlocks map recording list of datanodes with corrupted
* block replica
*/
void actualGetFromOneDataNode(final DNAddrPair datanode, LocatedBlock block,
final long startInBlk, final long endInBlk, ByteBuffer buf,
CorruptedBlocks corruptedBlocks)
void actualGetFromOneDataNode(final DNAddrPair datanode, final long startInBlk,
final long endInBlk, ByteBuffer buf, CorruptedBlocks corruptedBlocks)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
int refetchEncryptionKey = 1; // only need to get a new encryption key once
final int len = (int) (endInBlk - startInBlk + 1);
LocatedBlock block = datanode.block;
while (true) {
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
block = refreshLocatedBlock(block);
BlockReader reader = null;
try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
@ -1078,6 +1074,9 @@ public class DFSInputStream extends FSInputStream
addToDeadNodes(datanode.info);
throw new IOException(msg);
}
// Refresh the block for updated tokens in case of token failures or
// encryption key failures.
block = refreshLocatedBlock(block);
} finally {
if (reader != null) {
reader.close();
@ -1113,7 +1112,6 @@ public class DFSInputStream extends FSInputStream
ByteBuffer bb;
int len = (int) (end - start + 1);
int hedgedReadId = 0;
block = refreshLocatedBlock(block);
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
@ -1123,6 +1121,8 @@ public class DFSInputStream extends FSInputStream
// chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read.
chosenNode = chooseDataNode(block, ignored);
// Latest block, if refreshed internally
block = chosenNode.block;
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb,
@ -1160,6 +1160,8 @@ public class DFSInputStream extends FSInputStream
if (chosenNode == null) {
chosenNode = chooseDataNode(block, ignored);
}
// Latest block, if refreshed internally
block = chosenNode.block;
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb,
@ -1530,12 +1532,14 @@ public class DFSInputStream extends FSInputStream
final DatanodeInfo info;
final InetSocketAddress addr;
final StorageType storageType;
final LocatedBlock block;
DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
StorageType storageType) {
StorageType storageType, LocatedBlock block) {
this.info = info;
this.addr = addr;
this.storageType = storageType;
this.block = block;
}
}

View File

@ -236,7 +236,7 @@ public class DFSStripedInputStream extends DFSInputStream {
BlockReader reader = null;
final ReaderRetryPolicy retry = new ReaderRetryPolicy();
DFSInputStream.DNAddrPair dnInfo =
new DFSInputStream.DNAddrPair(null, null, null);
new DFSInputStream.DNAddrPair(null, null, null, null);
while (true) {
try {

View File

@ -50,6 +50,7 @@ import java.lang.reflect.Modifier;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
@ -122,7 +123,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@ -2055,6 +2058,49 @@ public class DFSTestUtil {
return lastBlock;
}
/*
* Copy a block from sourceProxy to destination. If the block becomes
* over-replicated, preferably remove it from source.
* Return true if a block is successfully copied; otherwise false.
*/
public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source,
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
return replaceBlock(block, source, sourceProxy, destination,
StorageType.DEFAULT, Status.SUCCESS);
}
/*
* Replace block
*/
public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source,
DatanodeInfo sourceProxy, DatanodeInfo destination,
StorageType targetStorageType, Status opStatus) throws IOException,
SocketException {
Socket sock = new Socket();
try {
sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
HdfsConstants.READ_TIMEOUT);
sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
new Sender(out).replaceBlock(block, targetStorageType,
BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
sourceProxy, null);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(
reply);
while (proto.getStatus() == Status.IN_PROGRESS) {
proto = BlockOpResponseProto.parseDelimitedFrom(reply);
}
return proto.getStatus() == opStatus;
} finally {
sock.close();
}
}
/**
* Because currently DFSStripedOutputStream does not support hflush/hsync,
* tests can use this method to flush all the buffered data to DataNodes.

View File

@ -23,6 +23,8 @@ import static org.junit.Assert.assertTrue;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@ -30,6 +32,8 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
@ -38,6 +42,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@ -51,6 +58,8 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
/**
* This class tests the DFS positional read functionality in a single node
* mini-cluster.
@ -542,6 +551,143 @@ public class TestPread {
}
}
/**
* Scenario: 1. Write a file with RF=2, DN1 and DN2<br>
* 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br>
* 3. Move block from DN2 to DN3.<br>
* 4. Let block gets replicated to another DN3<br>
* 5. Stop DN1 also.<br>
* 6. Current valid Block locations in NameNode [DN1, DN3]<br>
* 7. Consider next calls to getBlockLocations() always returns DN3 as last
* location.<br>
*/
@Test
public void testPreadFailureWithChangedBlockLocations() throws Exception {
doPreadTestWithChangedLocations();
}
/**
* Scenario: 1. Write a file with RF=2, DN1 and DN2<br>
* 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br>
* 3. Move block from DN2 to DN3.<br>
* 4. Let block gets replicated to another DN3<br>
* 5. Stop DN1 also.<br>
* 6. Current valid Block locations in NameNode [DN1, DN3]<br>
* 7. Consider next calls to getBlockLocations() always returns DN3 as last
* location.<br>
*/
@Test
public void testPreadHedgedFailureWithChangedBlockLocations()
throws Exception {
isHedgedRead = true;
doPreadTestWithChangedLocations();
}
private void doPreadTestWithChangedLocations()
throws IOException, TimeoutException, InterruptedException {
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
if (isHedgedRead) {
conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2);
}
try (MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
DistributedFileSystem dfs = cluster.getFileSystem();
Path p = new Path("/test");
String data = "testingmissingblock";
DFSTestUtil.writeFile(dfs, p, data);
FSDataInputStream in = dfs.open(p);
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(in);
LocatedBlock lb = blocks.get(0);
DFSTestUtil.waitForReplication(cluster, lb.getBlock(), 1, 2, 0);
blocks = DFSTestUtil.getAllBlocks(in);
DatanodeInfo[] locations = null;
for (LocatedBlock locatedBlock : blocks) {
locations = locatedBlock.getLocations();
DFSClient.LOG
.info(locatedBlock.getBlock() + " " + Arrays.toString(locations));
}
final DatanodeInfo validDownLocation = locations[0];
final DFSClient client = dfs.getClient();
DFSClient dfsClient = Mockito.spy(client);
// Keep the valid location as last in the locations list for second
// requests
// onwards.
final AtomicInteger count = new AtomicInteger(0);
Mockito.doAnswer(new Answer<LocatedBlocks>() {
@Override
public LocatedBlocks answer(InvocationOnMock invocation)
throws Throwable {
if (count.compareAndSet(0, 1)) {
return (LocatedBlocks) invocation.callRealMethod();
}
Object obj = invocation.callRealMethod();
LocatedBlocks locatedBlocks = (LocatedBlocks) obj;
LocatedBlock lb = locatedBlocks.get(0);
DatanodeInfo[] locations = lb.getLocations();
if (!(locations[0].getName().equals(validDownLocation.getName()))) {
// Latest location which is currently down, should be first
DatanodeInfo l = locations[0];
locations[0] = locations[locations.length - 1];
locations[locations.length - 1] = l;
}
return locatedBlocks;
}
}).when(dfsClient).getLocatedBlocks(p.toString(), 0);
// Findout target node to move the block to.
DatanodeInfo[] nodes =
cluster.getNameNodeRpc().getDatanodeReport(DatanodeReportType.LIVE);
DatanodeInfo toMove = null;
List<DatanodeInfo> locationsList = Arrays.asList(locations);
for (DatanodeInfo node : nodes) {
if (locationsList.contains(node)) {
continue;
}
toMove = node;
break;
}
// STEP 2: Open stream
DFSInputStream din = dfsClient.open(p.toString());
// STEP 3: Move replica
final DatanodeInfo source = locations[1];
final DatanodeInfo destination = toMove;
DFSTestUtil.replaceBlock(lb.getBlock(), source, locations[1], toMove);
// Wait for replica to get deleted
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
LocatedBlocks lbs = dfsClient.getLocatedBlocks(p.toString(), 0);
LocatedBlock lb = lbs.get(0);
List<DatanodeInfo> locations = Arrays.asList(lb.getLocations());
DFSClient.LOG
.info("Source :" + source + ", destination: " + destination);
DFSClient.LOG.info("Got updated locations :" + locations);
return locations.contains(destination)
&& !locations.contains(source);
} catch (IOException e) {
DFSClient.LOG.error("Problem in getting block locations", e);
}
return null;
}
}, 1000, 10000);
DFSTestUtil.waitForReplication(cluster, lb.getBlock(), 1, 2, 0);
// STEP 4: Stop first node in new locations
cluster.stopDataNode(validDownLocation.getName());
DFSClient.LOG.info("Starting read");
byte[] buf = new byte[1024];
int n = din.read(0, buf, 0, data.length());
assertEquals(data.length(), n);
assertEquals("Data should be read", data, new String(buf, 0, n));
DFSClient.LOG.info("Read completed");
}
}
public static void main(String[] args) throws Exception {
new TestPread().testPreadDFS();
}

View File

@ -17,13 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
@ -48,19 +48,14 @@ import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Time;
import org.junit.Test;
@ -371,7 +366,7 @@ public class TestBlockReplacement {
*/
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
return replaceBlock(block, source, sourceProxy, destination,
return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination,
StorageType.DEFAULT, Status.SUCCESS);
}
@ -385,29 +380,8 @@ public class TestBlockReplacement {
DatanodeInfo destination,
StorageType targetStorageType,
Status opStatus) throws IOException, SocketException {
Socket sock = new Socket();
try {
sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
HdfsConstants.READ_TIMEOUT);
sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
new Sender(out).replaceBlock(block, targetStorageType,
BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
sourceProxy, null);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
BlockOpResponseProto proto =
BlockOpResponseProto.parseDelimitedFrom(reply);
while (proto.getStatus() == Status.IN_PROGRESS) {
proto = BlockOpResponseProto.parseDelimitedFrom(reply);
}
return proto.getStatus() == opStatus;
} finally {
sock.close();
}
return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination,
targetStorageType, opStatus);
}
/**