diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2b6f1c6750f..d3748106f8c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -321,6 +321,9 @@ Release 2.6.0 - UNRELEASED DFSOutputStream#close gives up its attempt to contact the namenode (mitdesai21 via cmccabe) + HDFS-6750. The DataNode should use its shared memory segment to mark + short-circuit replicas that have been unlinked as stale (cmccabe) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java index 9dba6a2085d..a252a17855a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java @@ -74,7 +74,7 @@ import com.google.common.collect.HashMultimap; * DN also marks the block's slots as "unanchorable" to prevent additional * clients from initiating these operations in the future. * - * The counterpart fo this class on the client is {@link DfsClientShmManager}. + * The counterpart of this class on the client is {@link DfsClientShmManager}. */ public class ShortCircuitRegistry { public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class); @@ -217,7 +217,32 @@ public class ShortCircuitRegistry { } return allowMunlock; } - + + /** + * Invalidate any slot associated with a blockId that we are invalidating + * (deleting) from this DataNode. When a slot is invalid, the DFSClient will + * not use the corresponding replica for new read or mmap operations (although + * existing, ongoing read or mmap operations will complete.) + * + * @param blockId The block ID. + */ + public synchronized void processBlockInvalidation(ExtendedBlockId blockId) { + if (!enabled) return; + final Set affectedSlots = slots.get(blockId); + if (!affectedSlots.isEmpty()) { + final StringBuilder bld = new StringBuilder(); + String prefix = ""; + bld.append("Block ").append(blockId).append(" has been invalidated. "). + append("Marking short-circuit slots as invalid: "); + for (Slot slot : affectedSlots) { + slot.makeInvalid(); + bld.append(prefix).append(slot.toString()); + prefix = ", "; + } + LOG.info(bld.toString()); + } + } + public static class NewShmInfo implements Closeable { public final ShmId shmId; public final FileInputStream stream; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java index b068c664fe3..e8a06aec8ac 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java @@ -44,6 +44,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.ExtendedBlockId; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; @@ -1232,8 +1233,15 @@ class FsDatasetImpl implements FsDatasetSpi { } volumeMap.remove(bpid, invalidBlks[i]); } + + // If a DFSClient has the replica in its cache of short-circuit file + // descriptors (and the client is using ShortCircuitShm), invalidate it. + datanode.getShortCircuitRegistry().processBlockInvalidation( + new ExtendedBlockId(invalidBlks[i].getBlockId(), bpid)); + // If the block is cached, start uncaching it. cacheManager.uncacheBlock(bpid, invalidBlks[i].getBlockId()); + // Delete the block asynchronously to make sure we can do it fast enough. // It's ok to unlink the block file before the uncache operation // finishes. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java index 1c9a2e5a742..81cc68da072 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShm.java @@ -32,11 +32,16 @@ import com.google.common.base.Preconditions; * DfsClientShm is a subclass of ShortCircuitShm which is used by the * DfsClient. * When the UNIX domain socket associated with this shared memory segment - * closes unexpectedly, we mark the slots inside this segment as stale. - * ShortCircuitReplica objects that contain stale slots are themselves stale, + * closes unexpectedly, we mark the slots inside this segment as disconnected. + * ShortCircuitReplica objects that contain disconnected slots are stale, * and will not be used to service new reads or mmap operations. * However, in-progress read or mmap operations will continue to proceed. * Once the last slot is deallocated, the segment can be safely munmapped. + * + * Slots may also become stale because the associated replica has been deleted + * on the DataNode. In this case, the DataNode will clear the 'valid' bit. + * The client will then see these slots as stale (see + * #{ShortCircuitReplica#isStale}). */ public class DfsClientShm extends ShortCircuitShm implements DomainSocketWatcher.Handler { @@ -58,7 +63,7 @@ public class DfsClientShm extends ShortCircuitShm * * {@link DfsClientShm#handle} sets this to true. */ - private boolean stale = false; + private boolean disconnected = false; DfsClientShm(ShmId shmId, FileInputStream stream, EndpointShmManager manager, DomainPeer peer) throws IOException { @@ -76,14 +81,14 @@ public class DfsClientShm extends ShortCircuitShm } /** - * Determine if the shared memory segment is stale. + * Determine if the shared memory segment is disconnected from the DataNode. * * This must be called with the DfsClientShmManager lock held. * * @return True if the shared memory segment is stale. */ - public synchronized boolean isStale() { - return stale; + public synchronized boolean isDisconnected() { + return disconnected; } /** @@ -97,8 +102,8 @@ public class DfsClientShm extends ShortCircuitShm public boolean handle(DomainSocket sock) { manager.unregisterShm(getShmId()); synchronized (this) { - Preconditions.checkState(!stale); - stale = true; + Preconditions.checkState(!disconnected); + disconnected = true; boolean hadSlots = false; for (Iterator iter = slotIterator(); iter.hasNext(); ) { Slot slot = iter.next(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java index ca9e8e6e0a5..6dbaf84d269 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/DfsClientShmManager.java @@ -271,12 +271,12 @@ public class DfsClientShmManager implements Closeable { loading = false; finishedLoading.signalAll(); } - if (shm.isStale()) { + if (shm.isDisconnected()) { // If the peer closed immediately after the shared memory segment // was created, the DomainSocketWatcher callback might already have - // fired and marked the shm as stale. In this case, we obviously - // don't want to add the SharedMemorySegment to our list of valid - // not-full segments. + // fired and marked the shm as disconnected. In this case, we + // obviously don't want to add the SharedMemorySegment to our list + // of valid not-full segments. if (LOG.isDebugEnabled()) { LOG.debug(this + ": the UNIX domain socket associated with " + "this short-circuit memory closed before we could make " + @@ -299,7 +299,7 @@ public class DfsClientShmManager implements Closeable { void freeSlot(Slot slot) { DfsClientShm shm = (DfsClientShm)slot.getShm(); shm.unregisterSlot(slot.getSlotIdx()); - if (shm.isStale()) { + if (shm.isDisconnected()) { // Stale shared memory segments should not be tracked here. Preconditions.checkState(!full.containsKey(shm.getShmId())); Preconditions.checkState(!notFull.containsKey(shm.getShmId())); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java index d860c8b174c..7b89d0a978d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/shortcircuit/ShortCircuitShm.java @@ -306,6 +306,13 @@ public class ShortCircuitShm { (slotAddress - baseAddress) / BYTES_PER_SLOT); } + /** + * Clear the slot. + */ + void clear() { + unsafe.putLongVolatile(null, this.slotAddress, 0); + } + private boolean isSet(long flag) { long prev = unsafe.getLongVolatile(null, this.slotAddress); return (prev & flag) != 0; @@ -535,6 +542,7 @@ public class ShortCircuitShm { } allocatedSlots.set(idx, true); Slot slot = new Slot(calculateSlotAddress(idx), blockId); + slot.clear(); slot.makeValid(); slots[idx] = slot; if (LOG.isTraceEnabled()) { @@ -583,7 +591,7 @@ public class ShortCircuitShm { Slot slot = new Slot(calculateSlotAddress(slotIdx), blockId); if (!slot.isValid()) { throw new InvalidRequestException(this + ": slot " + slotIdx + - " has not been allocated."); + " is not marked as valid."); } slots[slotIdx] = slot; allocatedSlots.set(slotIdx, true); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java index a2d2bf830f3..ca30e029942 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/shortcircuit/TestShortCircuitCache.java @@ -23,6 +23,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_SKIP_CHECKSUM_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DOMAIN_SOCKET_PATH_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY; import static org.hamcrest.CoreMatchers.equalTo; import java.io.DataOutputStream; @@ -30,7 +31,9 @@ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import org.apache.commons.lang.mutable.MutableBoolean; @@ -462,6 +465,7 @@ public class TestShortCircuitCache { } }, 10, 60000); cluster.shutdown(); + sockDir.close(); } @Test(timeout=60000) @@ -516,4 +520,98 @@ public class TestShortCircuitCache { }); cluster.shutdown(); } + + /** + * Test unlinking a file whose blocks we are caching in the DFSClient. + * The DataNode will notify the DFSClient that the replica is stale via the + * ShortCircuitShm. + */ + @Test(timeout=60000) + public void testUnlinkingReplicasInFileDescriptorCache() throws Exception { + BlockReaderTestUtil.enableShortCircuitShmTracing(); + TemporarySocketDirectory sockDir = new TemporarySocketDirectory(); + Configuration conf = createShortCircuitConf( + "testUnlinkingReplicasInFileDescriptorCache", sockDir); + // We don't want the CacheCleaner to time out short-circuit shared memory + // segments during the test, so set the timeout really high. + conf.setLong(DFS_CLIENT_READ_SHORTCIRCUIT_STREAMS_CACHE_EXPIRY_MS_KEY, + 1000000000L); + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(1).build(); + cluster.waitActive(); + DistributedFileSystem fs = cluster.getFileSystem(); + final ShortCircuitCache cache = + fs.getClient().getClientContext().getShortCircuitCache(); + cache.getDfsClientShmManager().visit(new Visitor() { + @Override + public void visit(HashMap info) + throws IOException { + // The ClientShmManager starts off empty. + Assert.assertEquals(0, info.size()); + } + }); + final Path TEST_PATH = new Path("/test_file"); + final int TEST_FILE_LEN = 8193; + final int SEED = 0xFADE0; + DFSTestUtil.createFile(fs, TEST_PATH, TEST_FILE_LEN, + (short)1, SEED); + byte contents[] = DFSTestUtil.readFileBuffer(fs, TEST_PATH); + byte expected[] = DFSTestUtil. + calculateFileContentsFromSeed(SEED, TEST_FILE_LEN); + Assert.assertTrue(Arrays.equals(contents, expected)); + // Loading this file brought the ShortCircuitReplica into our local + // replica cache. + final DatanodeInfo datanode = + new DatanodeInfo(cluster.getDataNodes().get(0).getDatanodeId()); + cache.getDfsClientShmManager().visit(new Visitor() { + @Override + public void visit(HashMap info) + throws IOException { + Assert.assertTrue(info.get(datanode).full.isEmpty()); + Assert.assertFalse(info.get(datanode).disabled); + Assert.assertEquals(1, info.get(datanode).notFull.values().size()); + DfsClientShm shm = + info.get(datanode).notFull.values().iterator().next(); + Assert.assertFalse(shm.isDisconnected()); + } + }); + // Remove the file whose blocks we just read. + fs.delete(TEST_PATH, false); + + // Wait for the replica to be purged from the DFSClient's cache. + GenericTestUtils.waitFor(new Supplier() { + MutableBoolean done = new MutableBoolean(true); + @Override + public Boolean get() { + try { + done.setValue(true); + cache.getDfsClientShmManager().visit(new Visitor() { + @Override + public void visit(HashMap info) throws IOException { + Assert.assertTrue(info.get(datanode).full.isEmpty()); + Assert.assertFalse(info.get(datanode).disabled); + Assert.assertEquals(1, + info.get(datanode).notFull.values().size()); + DfsClientShm shm = info.get(datanode).notFull.values(). + iterator().next(); + // Check that all slots have been invalidated. + for (Iterator iter = shm.slotIterator(); + iter.hasNext(); ) { + Slot slot = iter.next(); + if (slot.isValid()) { + done.setValue(false); + } + } + } + }); + } catch (IOException e) { + LOG.error("error running visitor", e); + } + return done.booleanValue(); + } + }, 10, 60000); + cluster.shutdown(); + sockDir.close(); + } }