HADOOP-11802. DomainSocketWatcher thread terminates sometimes after there is an I/O error during requestShortCircuitShm (cmccabe)

(cherry picked from commit a0e0a63209)
(cherry picked from commit 788b76761d)
This commit is contained in:
Colin Patrick Mccabe 2015-04-23 18:59:52 -07:00
parent 0eae3bc0f6
commit 496afb5e1a
8 changed files with 111 additions and 18 deletions

View File

@ -21,6 +21,9 @@ Release 2.7.1 - UNRELEASED
HADOOP-11730. Regression: s3n read failure recovery broken.
(Takenori Sato via stevel)
HADOOP-11802. DomainSocketWatcher thread terminates sometimes after there
is an I/O error during requestShortCircuitShm (cmccabe)
Release 2.7.0 - 2015-04-20
INCOMPATIBLE CHANGES

View File

@ -512,8 +512,8 @@ public final class DomainSocketWatcher implements Closeable {
}
} catch (InterruptedException e) {
LOG.info(toString() + " terminating on InterruptedException");
} catch (IOException e) {
LOG.error(toString() + " terminating on IOException", e);
} catch (Throwable e) {
LOG.error(toString() + " terminating on exception", e);
} finally {
lock.lock();
try {

View File

@ -111,7 +111,7 @@ JNIEnv *env, jobject obj, jint fd)
pollfd = &sd->pollfd[sd->used_size];
sd->used_size++;
pollfd->fd = fd;
pollfd->events = POLLIN;
pollfd->events = POLLIN | POLLHUP;
pollfd->revents = 0;
}
@ -162,7 +162,10 @@ JNIEnv *env, jobject obj)
GetLongField(env, obj, fd_set_data_fid);
used_size = sd->used_size;
for (i = 0; i < used_size; i++) {
if (sd->pollfd[i].revents & POLLIN) {
// We check for both POLLIN and POLLHUP, because on some OSes, when a socket
// is shutdown(), it sends POLLHUP rather than POLLIN.
if ((sd->pollfd[i].revents & POLLIN) ||
(sd->pollfd[i].revents & POLLHUP)) {
num_readable++;
} else {
sd->pollfd[i].revents = 0;
@ -177,7 +180,8 @@ JNIEnv *env, jobject obj)
}
j = 0;
for (i = 0; ((i < used_size) && (j < num_readable)); i++) {
if (sd->pollfd[i].revents & POLLIN) {
if ((sd->pollfd[i].revents & POLLIN) ||
(sd->pollfd[i].revents & POLLHUP)) {
carr[j] = sd->pollfd[i].fd;
j++;
sd->pollfd[i].revents = 0;

View File

@ -39,4 +39,6 @@ public class DataNodeFaultInjector {
public void getHdfsBlocksMetadata() {}
public void writeBlockAfterFlush() throws IOException {}
public void sendShortCircuitShmResponse() throws IOException {}
}

View File

@ -421,6 +421,7 @@ class DataXceiver extends Receiver implements Runnable {
private void sendShmSuccessResponse(DomainSocket sock, NewShmInfo shmInfo)
throws IOException {
DataNodeFaultInjector.get().sendShortCircuitShmResponse();
ShortCircuitShmResponseProto.newBuilder().setStatus(SUCCESS).
setId(PBHelper.convert(shmInfo.shmId)).build().
writeDelimitedTo(socketOut);
@ -479,10 +480,19 @@ class DataXceiver extends Receiver implements Runnable {
}
}
if ((!success) && (peer == null)) {
// If we failed to pass the shared memory segment to the client,
// close the UNIX domain socket now. This will trigger the
// DomainSocketWatcher callback, cleaning up the segment.
IOUtils.cleanup(null, sock);
// The socket is now managed by the DomainSocketWatcher. However,
// we failed to pass it to the client. We call shutdown() on the
// UNIX domain socket now. This will trigger the DomainSocketWatcher
// callback. The callback will close the domain socket.
// We don't want to close the socket here, since that might lead to
// bad behavior inside the poll() call. See HADOOP-11802 for details.
try {
LOG.warn("Failed to send success response back to the client. " +
"Shutting down socket for " + shmInfo.shmId + ".");
sock.shutdown();
} catch (IOException e) {
LOG.warn("Failed to shut down socket in error handler", e);
}
}
IOUtils.cleanup(null, shmInfo);
}

View File

@ -216,10 +216,11 @@ public class DfsClientShmManager implements Closeable {
* Must be called with the EndpointShmManager lock held.
*
* @param peer The peer to use to talk to the DataNode.
* @param clientName The client name.
* @param usedPeer (out param) Will be set to true if we used the peer.
* When a peer is used
*
* @param clientName The client name.
* @param blockId The block ID to use.
* @return null if the DataNode does not support shared memory
* segments, or experienced an error creating the
* shm. The shared memory segment itself on success.

View File

@ -21,6 +21,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.io.IOUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -185,4 +186,9 @@ public class DomainSocketFactory {
public void disableDomainSocketPath(String path) {
pathMap.put(path, PathState.UNUSABLE);
}
@VisibleForTesting
public void clearPathMap() {
pathMap.invalidateAll();
}
}

View File

@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.DataNodeFaultInjector;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry;
import org.apache.hadoop.hdfs.server.datanode.ShortCircuitRegistry.RegisteredShm;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager.PerDatanodeVisitorInfo;
@ -73,6 +74,9 @@ import org.apache.hadoop.util.Time;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Preconditions;
import com.google.common.base.Supplier;
@ -622,6 +626,18 @@ public class TestShortCircuitCache {
sockDir.close();
}
static private void checkNumberOfSegmentsAndSlots(final int expectedSegments,
final int expectedSlots, ShortCircuitRegistry registry) {
registry.visit(new ShortCircuitRegistry.Visitor() {
@Override
public void accept(HashMap<ShmId, RegisteredShm> segments,
HashMultimap<ExtendedBlockId, Slot> slots) {
Assert.assertEquals(expectedSegments, segments.size());
Assert.assertEquals(expectedSlots, slots.size());
}
});
}
public static class TestCleanupFailureInjector
extends BlockReaderFactory.FailureInjector {
@Override
@ -665,16 +681,67 @@ public class TestShortCircuitCache {
GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
"testing, but we failed to do a non-TCP read.", t);
}
ShortCircuitRegistry registry =
cluster.getDataNodes().get(0).getShortCircuitRegistry();
registry.visit(new ShortCircuitRegistry.Visitor() {
checkNumberOfSegmentsAndSlots(1, 1,
cluster.getDataNodes().get(0).getShortCircuitRegistry());
cluster.shutdown();
sockDir.close();
}
// Regression test for HADOOP-11802
@Test(timeout=60000)
public void testDataXceiverHandlesRequestShortCircuitShmFailure()
throws Exception {
BlockReaderTestUtil.enableShortCircuitShmTracing();
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testDataXceiverHandlesRequestShortCircuitShmFailure", sockDir);
conf.setLong(HdfsClientConfigKeys.Read.ShortCircuit.STREAMS_CACHE_EXPIRY_MS_KEY,
1000000000L);
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(1).build();
cluster.waitActive();
DistributedFileSystem fs = cluster.getFileSystem();
final Path TEST_PATH1 = new Path("/test_file1");
DFSTestUtil.createFile(fs, TEST_PATH1, 4096,
(short)1, 0xFADE1);
LOG.info("Setting failure injector and performing a read which " +
"should fail...");
DataNodeFaultInjector failureInjector = Mockito.mock(DataNodeFaultInjector.class);
Mockito.doAnswer(new Answer<Void>() {
@Override
public void accept(HashMap<ShmId, RegisteredShm> segments,
HashMultimap<ExtendedBlockId, Slot> slots) {
Assert.assertEquals(1, segments.size());
Assert.assertEquals(1, slots.size());
public Void answer(InvocationOnMock invocation) throws Throwable {
throw new IOException("injected error into sendShmResponse");
}
});
}).when(failureInjector).sendShortCircuitShmResponse();
DataNodeFaultInjector prevInjector = DataNodeFaultInjector.instance;
DataNodeFaultInjector.instance = failureInjector;
try {
// The first read will try to allocate a shared memory segment and slot.
// The shared memory segment allocation will fail because of the failure
// injector.
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
Assert.fail("expected readFileBuffer to fail, but it succeeded.");
} catch (Throwable t) {
GenericTestUtils.assertExceptionContains("TCP reads were disabled for " +
"testing, but we failed to do a non-TCP read.", t);
}
checkNumberOfSegmentsAndSlots(0, 0,
cluster.getDataNodes().get(0).getShortCircuitRegistry());
LOG.info("Clearing failure injector and performing another read...");
DataNodeFaultInjector.instance = prevInjector;
fs.getClient().getClientContext().getDomainSocketFactory().clearPathMap();
// The second read should succeed.
DFSTestUtil.readFileBuffer(fs, TEST_PATH1);
// We should have added a new short-circuit shared memory segment and slot.
checkNumberOfSegmentsAndSlots(1, 1,
cluster.getDataNodes().get(0).getShortCircuitRegistry());
cluster.shutdown();
sockDir.close();
}