HDFS-11708. Positional read will fail if replicas moved to different DNs after stream is opened. Contributed by Vinayakumar B.

This commit is contained in:
Vinayakumar B 2017-06-07 10:36:09 +05:30
parent 5fc4b8567b
commit 4a391c72d4
4 changed files with 227 additions and 54 deletions

View File

@ -642,6 +642,8 @@ public class DFSInputStream extends FSInputStream
chosenNode = retval.info;
InetSocketAddress targetAddr = retval.addr;
StorageType storageType = retval.storageType;
// Latest block if refreshed by chooseDatanode()
targetBlock = retval.block;
try {
blockReader = getBlockReader(targetBlock, offsetIntoBlock,
@ -1090,7 +1092,7 @@ public class DFSInputStream extends FSInputStream
chosenNode.getXferAddr(dfsClient.getConf().isConnectToDnViaHostname());
DFSClient.LOG.debug("Connecting to datanode {}", dnAddr);
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr, storageType);
return new DNAddrPair(chosenNode, targetAddr, storageType, block);
}
private static String getBestNodeDNAddrPairErrorString(
@ -1122,12 +1124,13 @@ public class DFSInputStream extends FSInputStream
byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
block = refreshLocatedBlock(block);
while (true) {
DNAddrPair addressPair = chooseDataNode(block, null);
// Latest block, if refreshed internally
block = addressPair.block;
try {
actualGetFromOneDataNode(addressPair, block, start, end,
buf, offset, corruptedBlockMap);
actualGetFromOneDataNode(addressPair, start, end, buf, offset,
corruptedBlockMap);
return;
} catch (IOException e) {
// Ignore. Already processed inside the function.
@ -1149,8 +1152,8 @@ public class DFSInputStream extends FSInputStream
int offset = bb.position();
try (TraceScope ignored = dfsClient.getTracer().
newScope("hedgedRead" + hedgedReadId, parentSpanId)) {
actualGetFromOneDataNode(datanode, block, start, end, buf,
offset, corruptedBlockMap);
actualGetFromOneDataNode(datanode, start, end, buf, offset,
corruptedBlockMap);
return bb;
}
}
@ -1161,18 +1164,17 @@ public class DFSInputStream extends FSInputStream
* Used when reading contiguous blocks
*/
private void actualGetFromOneDataNode(final DNAddrPair datanode,
LocatedBlock block, final long start, final long end, byte[] buf,
int offset, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
final long start, final long end, byte[] buf, int offset,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
final int length = (int) (end - start + 1);
actualGetFromOneDataNode(datanode, block, start, end, buf,
new int[]{offset}, new int[]{length}, corruptedBlockMap);
actualGetFromOneDataNode(datanode, start, end, buf, new int[] { offset },
new int[] { length }, corruptedBlockMap);
}
/**
* Read data from one DataNode.
* @param datanode the datanode from which to read data
* @param block the located block containing the requested data
* @param startInBlk the startInBlk offset of the block
* @param endInBlk the endInBlk offset of the block
* @param buf the given byte array into which the data is read
@ -1184,9 +1186,8 @@ public class DFSInputStream extends FSInputStream
* block replica
*/
void actualGetFromOneDataNode(final DNAddrPair datanode,
LocatedBlock block, final long startInBlk, final long endInBlk,
byte[] buf, int[] offsets, int[] lengths,
Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
final long startInBlk, final long endInBlk, byte[] buf,
int[] offsets, int[] lengths, Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
throws IOException {
DFSClientFaultInjector.get().startFetchFromDatanode();
int refetchToken = 1; // only need to get a new access token once
@ -1194,11 +1195,8 @@ public class DFSInputStream extends FSInputStream
final int len = (int) (endInBlk - startInBlk + 1);
checkReadPortions(offsets, lengths, len);
LocatedBlock block = datanode.block;
while (true) {
// cached block locations may have been updated by chooseDataNode()
// or fetchBlockAt(). Always get the latest list of locations at the
// start of the loop.
block = refreshLocatedBlock(block);
BlockReader reader = null;
try {
DFSClientFaultInjector.get().fetchFromDatanodeException();
@ -1246,6 +1244,9 @@ public class DFSInputStream extends FSInputStream
addToDeadNodes(datanode.info);
throw new IOException(msg);
}
// Refresh the block for updated tokens in case of token failures or
// encryption key failures.
block = refreshLocatedBlock(block);
} finally {
if (reader != null) {
reader.close();
@ -1300,7 +1301,6 @@ public class DFSInputStream extends FSInputStream
ByteBuffer bb;
int len = (int) (end - start + 1);
int hedgedReadId = 0;
block = refreshLocatedBlock(block);
while (true) {
// see HDFS-6591, this metric is used to verify/catch unnecessary loops
hedgedReadOpsLoopNumForTesting++;
@ -1310,6 +1310,8 @@ public class DFSInputStream extends FSInputStream
// chooseDataNode is a commitment. If no node, we go to
// the NN to reget block locations. Only go here on first read.
chosenNode = chooseDataNode(block, ignored);
// Latest block, if refreshed internally
block = chosenNode.block;
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb,
@ -1344,6 +1346,8 @@ public class DFSInputStream extends FSInputStream
if (chosenNode == null) {
chosenNode = chooseDataNode(block, ignored);
}
// Latest block, if refreshed internally
block = chosenNode.block;
bb = ByteBuffer.allocate(len);
Callable<ByteBuffer> getFromDataNodeCallable = getFromOneDataNode(
chosenNode, block, start, end, bb,
@ -1693,12 +1697,14 @@ public class DFSInputStream extends FSInputStream
final DatanodeInfo info;
final InetSocketAddress addr;
final StorageType storageType;
final LocatedBlock block;
DNAddrPair(DatanodeInfo info, InetSocketAddress addr,
StorageType storageType) {
StorageType storageType, LocatedBlock block) {
this.info = info;
this.addr = addr;
this.storageType = storageType;
this.block = block;
}
}

View File

@ -50,6 +50,7 @@ import java.lang.reflect.Modifier;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
@ -110,7 +111,9 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
@ -1955,6 +1958,49 @@ public class DFSTestUtil {
return DFSTestUtil.getFileSystemAs(ugi, conf);
}
/*
* Copy a block from sourceProxy to destination. If the block becomes
* over-replicated, preferably remove it from source.
* Return true if a block is successfully copied; otherwise false.
*/
public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source,
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
return replaceBlock(block, source, sourceProxy, destination,
StorageType.DEFAULT, Status.SUCCESS);
}
/*
* Replace block
*/
public static boolean replaceBlock(ExtendedBlock block, DatanodeInfo source,
DatanodeInfo sourceProxy, DatanodeInfo destination,
StorageType targetStorageType, Status opStatus) throws IOException,
SocketException {
Socket sock = new Socket();
try {
sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
HdfsConstants.READ_TIMEOUT);
sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
new Sender(out).replaceBlock(block, targetStorageType,
BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
sourceProxy);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
BlockOpResponseProto proto = BlockOpResponseProto.parseDelimitedFrom(
reply);
while (proto.getStatus() == Status.IN_PROGRESS) {
proto = BlockOpResponseProto.parseDelimitedFrom(reply);
}
return proto.getStatus() == opStatus;
} finally {
sock.close();
}
}
/**
* Test if the given {@link FileStatus} user, group owner and its permission
* are expected, throw {@link AssertionError} if any value is not expected.

View File

@ -24,6 +24,8 @@ import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
@ -31,6 +33,9 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.fs.FSDataInputStream;
@ -38,6 +43,9 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
@ -51,6 +59,8 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Supplier;
/**
* This class tests the DFS positional read functionality in a single node
* mini-cluster.
@ -544,6 +554,143 @@ public class TestPread {
}
}
/**
* Scenario: 1. Write a file with RF=2, DN1 and DN2<br>
* 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br>
* 3. Move block from DN2 to DN3.<br>
* 4. Let block gets replicated to another DN3<br>
* 5. Stop DN1 also.<br>
* 6. Current valid Block locations in NameNode [DN1, DN3]<br>
* 7. Consider next calls to getBlockLocations() always returns DN3 as last
* location.<br>
*/
@Test
public void testPreadFailureWithChangedBlockLocations() throws Exception {
doPreadTestWithChangedLocations();
}
/**
* Scenario: 1. Write a file with RF=2, DN1 and DN2<br>
* 2. Open the stream, Consider Locations are [DN1, DN2] in LocatedBlock.<br>
* 3. Move block from DN2 to DN3.<br>
* 4. Let block gets replicated to another DN3<br>
* 5. Stop DN1 also.<br>
* 6. Current valid Block locations in NameNode [DN1, DN3]<br>
* 7. Consider next calls to getBlockLocations() always returns DN3 as last
* location.<br>
*/
@Test
public void testPreadHedgedFailureWithChangedBlockLocations()
throws Exception {
isHedgedRead = true;
doPreadTestWithChangedLocations();
}
private void doPreadTestWithChangedLocations()
throws IOException, TimeoutException, InterruptedException {
GenericTestUtils.setLogLevel(DFSClient.LOG, Level.DEBUG);
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 2);
conf.setInt(DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, 1);
if (isHedgedRead) {
conf.setInt(HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, 2);
}
try (MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(3).build()) {
DistributedFileSystem dfs = cluster.getFileSystem();
final Path p = new Path("/test");
String data = "testingmissingblock";
DFSTestUtil.writeFile(dfs, p, data);
FSDataInputStream in = dfs.open(p);
List<LocatedBlock> blocks = DFSTestUtil.getAllBlocks(in);
LocatedBlock lb = blocks.get(0);
DFSTestUtil.waitForReplication(cluster, lb.getBlock(), 1, 2, 0);
blocks = DFSTestUtil.getAllBlocks(in);
DatanodeInfo[] locations = null;
for (LocatedBlock locatedBlock : blocks) {
locations = locatedBlock.getLocations();
DFSClient.LOG
.info(locatedBlock.getBlock() + " " + Arrays.toString(locations));
}
final DatanodeInfo validDownLocation = locations[0];
final DFSClient client = dfs.getClient();
final DFSClient dfsClient = Mockito.spy(client);
// Keep the valid location as last in the locations list for second
// requests
// onwards.
final AtomicInteger count = new AtomicInteger(0);
Mockito.doAnswer(new Answer<LocatedBlocks>() {
@Override
public LocatedBlocks answer(InvocationOnMock invocation)
throws Throwable {
if (count.compareAndSet(0, 1)) {
return (LocatedBlocks) invocation.callRealMethod();
}
Object obj = invocation.callRealMethod();
LocatedBlocks locatedBlocks = (LocatedBlocks) obj;
LocatedBlock lb = locatedBlocks.get(0);
DatanodeInfo[] locations = lb.getLocations();
if (!(locations[0].getName().equals(validDownLocation.getName()))) {
// Latest location which is currently down, should be first
DatanodeInfo l = locations[0];
locations[0] = locations[locations.length - 1];
locations[locations.length - 1] = l;
}
return locatedBlocks;
}
}).when(dfsClient).getLocatedBlocks(p.toString(), 0);
// Findout target node to move the block to.
DatanodeInfo[] nodes =
cluster.getNameNodeRpc().getDatanodeReport(DatanodeReportType.LIVE);
DatanodeInfo toMove = null;
List<DatanodeInfo> locationsList = Arrays.asList(locations);
for (DatanodeInfo node : nodes) {
if (locationsList.contains(node)) {
continue;
}
toMove = node;
break;
}
// STEP 2: Open stream
DFSInputStream din = dfsClient.open(p.toString());
// STEP 3: Move replica
final DatanodeInfo source = locations[1];
final DatanodeInfo destination = toMove;
DFSTestUtil.replaceBlock(lb.getBlock(), source, locations[1], toMove);
// Wait for replica to get deleted
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
try {
LocatedBlocks lbs = dfsClient.getLocatedBlocks(p.toString(), 0);
LocatedBlock lb = lbs.get(0);
List<DatanodeInfo> locations = Arrays.asList(lb.getLocations());
DFSClient.LOG
.info("Source :" + source + ", destination: " + destination);
DFSClient.LOG.info("Got updated locations :" + locations);
return locations.contains(destination)
&& !locations.contains(source);
} catch (IOException e) {
DFSClient.LOG.error("Problem in getting block locations", e);
}
return null;
}
}, 1000, 10000);
DFSTestUtil.waitForReplication(cluster, lb.getBlock(), 1, 2, 0);
// STEP 4: Stop first node in new locations
cluster.stopDataNode(validDownLocation.getName());
DFSClient.LOG.info("Starting read");
byte[] buf = new byte[1024];
int n = din.read(0, buf, 0, data.length());
assertEquals(data.length(), n);
assertEquals("Data should be read", data, new String(buf, 0, n));
DFSClient.LOG.info("Read completed");
}
}
public static void main(String[] args) throws Exception {
new TestPread().testPreadDFS();
}

View File

@ -17,13 +17,13 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.util.ArrayList;
import java.util.Arrays;
@ -48,19 +48,14 @@ import org.apache.hadoop.hdfs.client.BlockReportOptions;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Time;
import org.junit.Test;
@ -310,8 +305,8 @@ public class TestBlockReplacement {
*/
private boolean replaceBlock( ExtendedBlock block, DatanodeInfo source,
DatanodeInfo sourceProxy, DatanodeInfo destination) throws IOException {
return replaceBlock(block, source, sourceProxy, destination,
StorageType.DEFAULT);
return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination,
StorageType.DEFAULT, Status.SUCCESS);
}
/*
@ -323,29 +318,8 @@ public class TestBlockReplacement {
DatanodeInfo sourceProxy,
DatanodeInfo destination,
StorageType targetStorageType) throws IOException, SocketException {
Socket sock = new Socket();
try {
sock.connect(NetUtils.createSocketAddr(destination.getXferAddr()),
HdfsConstants.READ_TIMEOUT);
sock.setKeepAlive(true);
// sendRequest
DataOutputStream out = new DataOutputStream(sock.getOutputStream());
new Sender(out).replaceBlock(block, targetStorageType,
BlockTokenSecretManager.DUMMY_TOKEN, source.getDatanodeUuid(),
sourceProxy);
out.flush();
// receiveResponse
DataInputStream reply = new DataInputStream(sock.getInputStream());
BlockOpResponseProto proto =
BlockOpResponseProto.parseDelimitedFrom(reply);
while (proto.getStatus() == Status.IN_PROGRESS) {
proto = BlockOpResponseProto.parseDelimitedFrom(reply);
}
return proto.getStatus() == Status.SUCCESS;
} finally {
sock.close();
}
return DFSTestUtil.replaceBlock(block, source, sourceProxy, destination,
targetStorageType, Status.SUCCESS);
}
/**