From 6465b0b55f263c06f4d37800db951adba314a9fd Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Mon, 3 Mar 2014 04:01:26 +0000 Subject: [PATCH] add missing file for HDFS-5950 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1573434 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/datanode/ShortCircuitRegistry.java | 332 ++++++++++++++++++ 1 file changed, 332 insertions(+) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java new file mode 100644 index 00000000000..ad3bf52508f --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java @@ -0,0 +1,332 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.datanode; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT; + +import java.io.Closeable; +import java.io.FileInputStream; +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Set; + +import org.apache.commons.io.IOUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.InvalidRequestException; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.ShortCircuitShm; +import org.apache.hadoop.hdfs.ShortCircuitShm.ShmId; +import org.apache.hadoop.hdfs.ShortCircuitShm.Slot; +import org.apache.hadoop.hdfs.ShortCircuitShm.SlotId; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory; +import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.DomainSocketWatcher; + +import com.google.common.base.Preconditions; +import com.google.common.collect.HashMultimap; + +/* + * Manages client short-circuit memory segments on the DataNode. + * + * DFSClients request shared memory segments from the DataNode. The + * ShortCircuitRegistry generates and manages these segments. Each segment + * has a randomly generated 128-bit ID which uniquely identifies it. The + * segments each contain several "slots." + * + * Before performing a short-circuit read, DFSClients must request a pair of + * file descriptors from the DataNode via the REQUEST_SHORT_CIRCUIT_FDS + * operation. As part of this operation, DFSClients pass the ID of the shared + * memory segment they would like to use to communicate information about this + * replica, as well as the slot number within that segment they would like to + * use. Slot allocation is always done by the client. + * + * Slots are used to track the state of the block on the both the client and + * datanode. When this DataNode mlocks a block, the corresponding slots for the + * replicas are marked as "anchorable". Anchorable blocks can be safely read + * without verifying the checksum. This means that BlockReaderLocal objects + * using these replicas can skip checksumming. It also means that we can do + * zero-copy reads on these replicas (the ZCR interface has no way of + * verifying checksums.) + * + * When a DN needs to munlock a block, it needs to first wait for the block to + * be unanchored by clients doing a no-checksum read or a zero-copy read. The + * DN also marks the block's slots as "unanchorable" to prevent additional + * clients from initiating these operations in the future. + * + * The counterpart fo this class on the client is {@link DfsClientShmManager}. + */ +public class ShortCircuitRegistry { + public static final Log LOG = LogFactory.getLog(ShortCircuitRegistry.class); + + private static final int SHM_LENGTH = 8192; + + private static class RegisteredShm extends ShortCircuitShm + implements DomainSocketWatcher.Handler { + private final ShortCircuitRegistry registry; + + RegisteredShm(ShmId shmId, FileInputStream stream, + ShortCircuitRegistry registry) throws IOException { + super(shmId, stream); + this.registry = registry; + } + + @Override + public boolean handle(DomainSocket sock) { + synchronized (registry) { + synchronized (this) { + registry.removeShm(this); + } + } + return true; + } + } + + public synchronized void removeShm(ShortCircuitShm shm) { + if (LOG.isTraceEnabled()) { + LOG.debug("removing shm " + shm); + } + // Stop tracking the shmId. + RegisteredShm removedShm = segments.remove(shm.getShmId()); + Preconditions.checkState(removedShm == shm, + "failed to remove " + shm.getShmId()); + // Stop tracking the slots. + for (Iterator iter = shm.slotIterator(); iter.hasNext(); ) { + Slot slot = iter.next(); + boolean removed = slots.remove(slot.getBlockId(), slot); + Preconditions.checkState(removed); + slot.makeInvalid(); + } + // De-allocate the memory map and close the shared file. + shm.free(); + } + + /** + * Whether or not the registry is enabled. + */ + private boolean enabled; + + /** + * The factory which creates shared file descriptors. + */ + private final SharedFileDescriptorFactory shmFactory; + + /** + * A watcher which sends out callbacks when the UNIX domain socket + * associated with a shared memory segment closes. + */ + private final DomainSocketWatcher watcher; + + private final HashMap segments = + new HashMap(0); + + private final HashMultimap slots = + HashMultimap.create(0, 1); + + public ShortCircuitRegistry(Configuration conf) throws IOException { + boolean enabled = false; + SharedFileDescriptorFactory shmFactory = null; + DomainSocketWatcher watcher = null; + try { + if (!NativeIO.isAvailable()) { + LOG.debug("Disabling ShortCircuitRegistry because NativeIO is " + + "not available."); + return; + } + String shmPath = conf.get(DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH, + DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATH_DEFAULT); + if (shmPath.isEmpty()) { + LOG.info("Disabling ShortCircuitRegistry because shmPath was not set."); + return; + } + int interruptCheck = conf.getInt( + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS, + DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT); + if (interruptCheck <= 0) { + LOG.info("Disabling ShortCircuitRegistry because interruptCheckMs " + + "was set to " + interruptCheck); + return; + } + shmFactory = + new SharedFileDescriptorFactory("HadoopShortCircuitShm_", shmPath); + watcher = new DomainSocketWatcher(interruptCheck); + enabled = true; + if (LOG.isDebugEnabled()) { + LOG.debug("created new ShortCircuitRegistry with interruptCheck=" + + interruptCheck + ", shmPath=" + shmPath); + } + } finally { + this.enabled = enabled; + this.shmFactory = shmFactory; + this.watcher = watcher; + } + } + + /** + * Process a block mlock event from the FsDatasetCache. + * + * @param blockId The block that was mlocked. + */ + public synchronized void processBlockMlockEvent(ExtendedBlockId blockId) { + if (!enabled) return; + Set affectedSlots = slots.get(blockId); + for (Slot slot : affectedSlots) { + slot.makeAnchorable(); + } + } + + /** + * Mark any slots associated with this blockId as unanchorable. + * + * @param blockId The block ID. + * @return True if we should allow the munlock request. + */ + public synchronized boolean processBlockMunlockRequest( + ExtendedBlockId blockId) { + if (!enabled) return true; + boolean allowMunlock = true; + Set affectedSlots = slots.get(blockId); + for (Slot slot : affectedSlots) { + slot.makeUnanchorable(); + if (slot.isAnchored()) { + allowMunlock = false; + } + } + return allowMunlock; + } + + public static class NewShmInfo implements Closeable { + public final ShmId shmId; + public final FileInputStream stream; + + NewShmInfo(ShmId shmId, FileInputStream stream) { + this.shmId = shmId; + this.stream = stream; + } + + @Override + public void close() throws IOException { + stream.close(); + } + } + + /** + * Handle a DFSClient request to create a new memory segment. + * + * @param clientName Client name as reported by the client. + * @param sock The DomainSocket to associate with this memory + * segment. When this socket is closed, or the + * other side writes anything to the socket, the + * segment will be closed. This can happen at any + * time, including right after this function returns. + * @return A NewShmInfo object. The caller must close the + * NewShmInfo object once they are done with it. + * @throws IOException If the new memory segment could not be created. + */ + public NewShmInfo createNewMemorySegment(String clientName, + DomainSocket sock) throws IOException { + NewShmInfo info = null; + RegisteredShm shm = null; + ShmId shmId = null; + synchronized (this) { + if (!enabled) { + if (LOG.isTraceEnabled()) { + LOG.trace("createNewMemorySegment: ShortCircuitRegistry is " + + "not enabled."); + } + throw new UnsupportedOperationException(); + } + FileInputStream fis = null; + try { + do { + shmId = ShmId.createRandom(); + } while (segments.containsKey(shmId)); + fis = shmFactory.createDescriptor(clientName, SHM_LENGTH); + shm = new RegisteredShm(shmId, fis, this); + } finally { + if (shm == null) { + IOUtils.closeQuietly(fis); + } + } + info = new NewShmInfo(shmId, fis); + segments.put(shmId, shm); + } + // Drop the registry lock to prevent deadlock. + // After this point, RegisteredShm#handle may be called at any time. + watcher.add(sock, shm); + if (LOG.isTraceEnabled()) { + LOG.trace("createNewMemorySegment: created " + info.shmId); + } + return info; + } + + public synchronized void registerSlot(ExtendedBlockId blockId, SlotId slotId) + throws InvalidRequestException { + if (!enabled) { + if (LOG.isTraceEnabled()) { + LOG.trace("registerSlot: ShortCircuitRegistry is " + + "not enabled."); + } + throw new UnsupportedOperationException(); + } + ShmId shmId = slotId.getShmId(); + RegisteredShm shm = segments.get(shmId); + if (shm == null) { + throw new InvalidRequestException("there is no shared memory segment " + + "registered with shmId " + shmId); + } + Slot slot = shm.registerSlot(slotId.getSlotIdx(), blockId); + boolean added = slots.put(blockId, slot); + Preconditions.checkState(added); + } + + public synchronized void unregisterSlot(SlotId slotId) + throws InvalidRequestException { + if (!enabled) { + if (LOG.isTraceEnabled()) { + LOG.trace("unregisterSlot: ShortCircuitRegistry is " + + "not enabled."); + } + throw new UnsupportedOperationException(); + } + ShmId shmId = slotId.getShmId(); + RegisteredShm shm = segments.get(shmId); + if (shm == null) { + throw new InvalidRequestException("there is no shared memory segment " + + "registered with shmId " + shmId); + } + Slot slot = shm.getSlot(slotId.getSlotIdx()); + slot.makeInvalid(); + shm.unregisterSlot(slotId.getSlotIdx()); + slots.remove(slot.getBlockId(), slot); + } + + public void shutdown() { + synchronized (this) { + if (!enabled) return; + enabled = false; + } + IOUtils.closeQuietly(watcher); + } +}