HDFS-6057. DomainSocketWatcher.watcherThread should be marked as a daemon thread (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1574787 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2014-03-06 07:57:19 +00:00
parent ad61eec072
commit 95e0f616b9
5 changed files with 92 additions and 16 deletions

View File

@ -235,6 +235,7 @@ public final class DomainSocketWatcher implements Closeable {
Preconditions.checkArgument(interruptCheckPeriodMs > 0);
this.interruptCheckPeriodMs = interruptCheckPeriodMs;
notificationSockets = DomainSocket.socketpair();
watcherThread.setDaemon(true);
watcherThread.start();
}
@ -263,6 +264,16 @@ public final class DomainSocketWatcher implements Closeable {
Uninterruptibles.joinUninterruptibly(watcherThread);
}
@VisibleForTesting
public boolean isClosed() {
lock.lock();
try {
return closed;
} finally {
lock.unlock();
}
}
/**
* Add a socket.
*
@ -274,7 +285,11 @@ public final class DomainSocketWatcher implements Closeable {
public void add(DomainSocket sock, Handler handler) {
lock.lock();
try {
checkNotClosed();
if (closed) {
handler.handle(sock);
IOUtils.cleanup(LOG, sock);
return;
}
Entry entry = new Entry(sock, handler);
try {
sock.refCount.reference();
@ -295,7 +310,6 @@ public final class DomainSocketWatcher implements Closeable {
if (!toAdd.contains(entry)) {
break;
}
checkNotClosed();
}
} finally {
lock.unlock();
@ -310,7 +324,7 @@ public final class DomainSocketWatcher implements Closeable {
public void remove(DomainSocket sock) {
lock.lock();
try {
checkNotClosed();
if (closed) return;
toRemove.put(sock.fd, sock);
kick();
while (true) {
@ -322,7 +336,6 @@ public final class DomainSocketWatcher implements Closeable {
if (!toRemove.containsKey(sock.fd)) {
break;
}
checkNotClosed();
}
} finally {
lock.unlock();
@ -342,17 +355,6 @@ public final class DomainSocketWatcher implements Closeable {
}
}
/**
* Check that the DomainSocketWatcher is not closed.
* Must be called while holding the lock.
*/
private void checkNotClosed() {
Preconditions.checkState(lock.isHeldByCurrentThread());
if (closed) {
throw new RuntimeException("DomainSocketWatcher is closed.");
}
}
private void sendCallback(String caller, TreeMap<Integer, Entry> entries,
FdSet fdSet, int fd) {
if (LOG.isTraceEnabled()) {

View File

@ -689,6 +689,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5898. Allow NFS gateway to login/relogin from its kerberos keytab.
(Abin Shahab via atm)
HDFS-6057. DomainSocketWatcher.watcherThread should be marked as daemon
thread (cmccabe)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

View File

@ -21,6 +21,7 @@ import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.io.BufferedOutputStream;
import java.io.Closeable;
import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.FileInputStream;
@ -59,7 +60,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
* See {@link ShortCircuitRegistry} for more information on the communication protocol.
*/
@InterfaceAudience.Private
public class DfsClientShmManager {
public class DfsClientShmManager implements Closeable {
private static final Log LOG = LogFactory.getLog(DfsClientShmManager.class);
/**
@ -225,6 +226,12 @@ public class DfsClientShmManager {
Slot allocSlot(DomainPeer peer, MutableBoolean usedPeer,
String clientName, ExtendedBlockId blockId) throws IOException {
while (true) {
if (closed) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": the DfsClientShmManager has been closed.");
}
return null;
}
if (disabled) {
if (LOG.isTraceEnabled()) {
LOG.trace(this + ": shared memory segment access is disabled.");
@ -374,6 +381,8 @@ public class DfsClientShmManager {
}
}
private boolean closed = false;
private final ReentrantLock lock = new ReentrantLock();
/**
@ -409,6 +418,10 @@ public class DfsClientShmManager {
String clientName) throws IOException {
lock.lock();
try {
if (closed) {
LOG.trace(this + ": the DfsClientShmManager isclosed.");
return null;
}
EndpointShmManager shmManager = datanodes.get(datanode);
if (shmManager == null) {
shmManager = new EndpointShmManager(datanode);
@ -466,9 +479,32 @@ public class DfsClientShmManager {
}
}
/**
* Close the DfsClientShmManager.
*/
@Override
public void close() throws IOException {
lock.lock();
try {
if (closed) return;
closed = true;
} finally {
lock.unlock();
}
// When closed, the domainSocketWatcher will issue callbacks that mark
// all the outstanding DfsClientShm segments as stale.
IOUtils.cleanup(LOG, domainSocketWatcher);
}
@Override
public String toString() {
return String.format("ShortCircuitShmManager(%08x)",
System.identityHashCode(this));
}
@VisibleForTesting
public DomainSocketWatcher getDomainSocketWatcher() {
return domainSocketWatcher;
}
}

View File

@ -887,6 +887,7 @@ public class ShortCircuitCache implements Closeable {
/**
* Close the cache and free all associated resources.
*/
@Override
public void close() {
try {
lock.lock();
@ -911,6 +912,7 @@ public class ShortCircuitCache implements Closeable {
} finally {
lock.unlock();
}
IOUtils.cleanup(LOG, shmManager);
}
@VisibleForTesting // ONLY for testing

View File

@ -376,4 +376,37 @@ public class TestBlockReaderFactory {
Assert.assertEquals(null, cache.getDfsClientShmManager());
cluster.shutdown();
}
/**
* Test shutting down the ShortCircuitCache while there are things in it.
*/
@Test
public void testShortCircuitCacheShutdown() throws Exception {
TemporarySocketDirectory sockDir = new TemporarySocketDirectory();
Configuration conf = createShortCircuitConf(
"testShortCircuitCacheShutdown", sockDir);
conf.set(DFS_CLIENT_CONTEXT, "testShortCircuitCacheShutdown");
Configuration serverConf = new Configuration(conf);
DFSInputStream.tcpReadsDisabledForTesting = true;
final MiniDFSCluster cluster =
new MiniDFSCluster.Builder(serverConf).numDataNodes(1).build();
cluster.waitActive();
final DistributedFileSystem fs =
(DistributedFileSystem)FileSystem.get(cluster.getURI(0), conf);
final String TEST_FILE = "/test_file";
final int TEST_FILE_LEN = 4000;
final int SEED = 0xFADEC;
DFSTestUtil.createFile(fs, new Path(TEST_FILE), TEST_FILE_LEN,
(short)1, SEED);
byte contents[] = DFSTestUtil.readFileBuffer(fs, new Path(TEST_FILE));
byte expected[] = DFSTestUtil.
calculateFileContentsFromSeed(SEED, TEST_FILE_LEN);
Assert.assertTrue(Arrays.equals(contents, expected));
final ShortCircuitCache cache =
fs.dfs.getClientContext().getShortCircuitCache();
cache.close();
Assert.assertTrue(cache.getDfsClientShmManager().
getDomainSocketWatcher().isClosed());
cluster.shutdown();
}
}