HDFS-5940. Minor cleanups to ShortCircuitReplica, FsDatasetCache, and DomainSocketWatcher (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1567838 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2014-02-13 03:14:00 +00:00
parent d05f300504
commit 6bd4581910
11 changed files with 164 additions and 186 deletions

View File

@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.NativeCodeLoader; import org.apache.hadoop.util.NativeCodeLoader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Uninterruptibles; import com.google.common.util.concurrent.Uninterruptibles;
@ -48,7 +49,7 @@ import com.google.common.util.concurrent.Uninterruptibles;
* See {@link DomainSocket} for more information about UNIX domain sockets. * See {@link DomainSocket} for more information about UNIX domain sockets.
*/ */
@InterfaceAudience.LimitedPrivate("HDFS") @InterfaceAudience.LimitedPrivate("HDFS")
public final class DomainSocketWatcher extends Thread implements Closeable { public final class DomainSocketWatcher implements Closeable {
static { static {
if (SystemUtils.IS_OS_WINDOWS) { if (SystemUtils.IS_OS_WINDOWS) {
loadingFailureReason = "UNIX Domain sockets are not available on Windows."; loadingFailureReason = "UNIX Domain sockets are not available on Windows.";
@ -281,7 +282,7 @@ public final class DomainSocketWatcher extends Thread implements Closeable {
try { try {
processedCond.await(); processedCond.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
this.interrupt(); Thread.currentThread().interrupt();
} }
if (!toAdd.contains(entry)) { if (!toAdd.contains(entry)) {
break; break;
@ -308,7 +309,7 @@ public final class DomainSocketWatcher extends Thread implements Closeable {
try { try {
processedCond.await(); processedCond.await();
} catch (InterruptedException e) { } catch (InterruptedException e) {
this.interrupt(); Thread.currentThread().interrupt();
} }
if (!toRemove.containsKey(sock.fd)) { if (!toRemove.containsKey(sock.fd)) {
break; break;
@ -381,7 +382,8 @@ public final class DomainSocketWatcher extends Thread implements Closeable {
} }
} }
private final Thread watcherThread = new Thread(new Runnable() { @VisibleForTesting
final Thread watcherThread = new Thread(new Runnable() {
@Override @Override
public void run() { public void run() {
LOG.info(this + ": starting with interruptCheckPeriodMs = " + LOG.info(this + ": starting with interruptCheckPeriodMs = " +
@ -443,6 +445,7 @@ public final class DomainSocketWatcher extends Thread implements Closeable {
} catch (IOException e) { } catch (IOException e) {
LOG.error(toString() + " terminating on IOException", e); LOG.error(toString() + " terminating on IOException", e);
} finally { } finally {
kick(); // allow the handler for notificationSockets[0] to read a byte
for (Entry entry : entries.values()) { for (Entry entry : entries.values()) {
sendCallback("close", entries, fdSet, entry.getDomainSocket().fd); sendCallback("close", entries, fdSet, entry.getDomainSocket().fd);
} }

View File

@ -73,9 +73,10 @@ public class TestDomainSocketWatcher {
*/ */
@Test(timeout=60000) @Test(timeout=60000)
public void testInterruption() throws Exception { public void testInterruption() throws Exception {
DomainSocketWatcher watcher = new DomainSocketWatcher(10); final DomainSocketWatcher watcher = new DomainSocketWatcher(10);
watcher.interrupt(); watcher.watcherThread.interrupt();
Uninterruptibles.joinUninterruptibly(watcher); Uninterruptibles.joinUninterruptibly(watcher.watcherThread);
watcher.close();
} }
@Test(timeout=300000) @Test(timeout=300000)

View File

@ -33,6 +33,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5810. Unify mmap cache and short-circuit file descriptor cache HDFS-5810. Unify mmap cache and short-circuit file descriptor cache
(cmccabe) (cmccabe)
HDFS-5940. Minor cleanups to ShortCircuitReplica, FsDatasetCache, and
DomainSocketWatcher (cmccabe)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery HDFS-5790. LeaseManager.findPath is very slow when many leases need recovery

View File

@ -31,7 +31,6 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitCache;
import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator; import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator;
import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo; import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
import org.apache.hadoop.hdfs.net.DomainPeer; import org.apache.hadoop.hdfs.net.DomainPeer;
import org.apache.hadoop.hdfs.net.Peer; import org.apache.hadoop.hdfs.net.Peer;
@ -389,7 +388,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
return null; return null;
} }
ShortCircuitCache cache = clientContext.getShortCircuitCache(); ShortCircuitCache cache = clientContext.getShortCircuitCache();
Key key = new Key(block.getBlockId(), block.getBlockPoolId()); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this); ShortCircuitReplicaInfo info = cache.fetchOrCreate(key, this);
InvalidToken exc = info.getInvalidTokenException(); InvalidToken exc = info.getInvalidTokenException();
if (exc != null) { if (exc != null) {
@ -492,7 +491,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
sock.recvFileInputStreams(fis, buf, 0, buf.length); sock.recvFileInputStreams(fis, buf, 0, buf.length);
ShortCircuitReplica replica = null; ShortCircuitReplica replica = null;
try { try {
Key key = new Key(block.getBlockId(), block.getBlockPoolId()); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
replica = new ShortCircuitReplica(key, fis[0], fis[1], replica = new ShortCircuitReplica(key, fis[0], fis[1],
clientContext.getShortCircuitCache(), Time.monotonicNow()); clientContext.getShortCircuitCache(), Time.monotonicNow());
} catch (IOException e) { } catch (IOException e) {

View File

@ -0,0 +1,75 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
/**
* An immutable key which identifies a block.
*/
final public class ExtendedBlockId {
/**
* The block ID for this block.
*/
private final long blockId;
/**
* The block pool ID for this block.
*/
private final String bpId;
public ExtendedBlockId(long blockId, String bpId) {
this.blockId = blockId;
this.bpId = bpId;
}
public long getBlockId() {
return this.blockId;
}
public String getBlockPoolId() {
return this.bpId;
}
@Override
public boolean equals(Object o) {
if ((o == null) || (o.getClass() != this.getClass())) {
return false;
}
ExtendedBlockId other = (ExtendedBlockId)o;
return new EqualsBuilder().
append(blockId, other.blockId).
append(bpId, other.bpId).
isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder().
append(this.blockId).
append(this.bpId).
toHashCode();
}
@Override
public String toString() {
return new StringBuilder().append(blockId).
append("_").append(bpId).toString();
}
}

View File

@ -36,9 +36,9 @@ import java.util.concurrent.locks.ReentrantLock;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.SecretManager.InvalidToken;
@ -183,8 +183,9 @@ public class ShortCircuitCache implements Closeable {
* ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken * ShortCircuitReplicaInfo objects may contain a replica, or an InvalidToken
* exception. * exception.
*/ */
private final HashMap<Key, Waitable<ShortCircuitReplicaInfo>> private final HashMap<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>>
replicaInfoMap = new HashMap<Key, Waitable<ShortCircuitReplicaInfo>>(); replicaInfoMap = new HashMap<ExtendedBlockId,
Waitable<ShortCircuitReplicaInfo>>();
/** /**
* The CacheCleaner. We don't create this and schedule it until it becomes * The CacheCleaner. We don't create this and schedule it until it becomes
@ -566,7 +567,7 @@ public class ShortCircuitCache implements Closeable {
* @return Null if no replica could be found or created. * @return Null if no replica could be found or created.
* The replica, otherwise. * The replica, otherwise.
*/ */
public ShortCircuitReplicaInfo fetchOrCreate(Key key, public ShortCircuitReplicaInfo fetchOrCreate(ExtendedBlockId key,
ShortCircuitReplicaCreator creator) { ShortCircuitReplicaCreator creator) {
Waitable<ShortCircuitReplicaInfo> newWaitable = null; Waitable<ShortCircuitReplicaInfo> newWaitable = null;
lock.lock(); lock.lock();
@ -612,7 +613,7 @@ public class ShortCircuitCache implements Closeable {
* *
* @throws RetriableException If the caller needs to retry. * @throws RetriableException If the caller needs to retry.
*/ */
private ShortCircuitReplicaInfo fetch(Key key, private ShortCircuitReplicaInfo fetch(ExtendedBlockId key,
Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException { Waitable<ShortCircuitReplicaInfo> waitable) throws RetriableException {
// Another thread is already in the process of loading this // Another thread is already in the process of loading this
// ShortCircuitReplica. So we simply wait for it to complete. // ShortCircuitReplica. So we simply wait for it to complete.
@ -656,7 +657,7 @@ public class ShortCircuitCache implements Closeable {
return info; return info;
} }
private ShortCircuitReplicaInfo create(Key key, private ShortCircuitReplicaInfo create(ExtendedBlockId key,
ShortCircuitReplicaCreator creator, ShortCircuitReplicaCreator creator,
Waitable<ShortCircuitReplicaInfo> newWaitable) { Waitable<ShortCircuitReplicaInfo> newWaitable) {
// Handle loading a new replica. // Handle loading a new replica.
@ -805,8 +806,8 @@ public class ShortCircuitCache implements Closeable {
@VisibleForTesting // ONLY for testing @VisibleForTesting // ONLY for testing
public interface CacheVisitor { public interface CacheVisitor {
void visit(int numOutstandingMmaps, void visit(int numOutstandingMmaps,
Map<Key, ShortCircuitReplica> replicas, Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<Key, InvalidToken> failedLoads, Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictable,
Map<Long, ShortCircuitReplica> evictableMmapped); Map<Long, ShortCircuitReplica> evictableMmapped);
} }
@ -815,11 +816,11 @@ public class ShortCircuitCache implements Closeable {
public void accept(CacheVisitor visitor) { public void accept(CacheVisitor visitor) {
lock.lock(); lock.lock();
try { try {
Map<Key, ShortCircuitReplica> replicas = Map<ExtendedBlockId, ShortCircuitReplica> replicas =
new HashMap<Key, ShortCircuitReplica>(); new HashMap<ExtendedBlockId, ShortCircuitReplica>();
Map<Key, InvalidToken> failedLoads = Map<ExtendedBlockId, InvalidToken> failedLoads =
new HashMap<Key, InvalidToken>(); new HashMap<ExtendedBlockId, InvalidToken>();
for (Entry<Key, Waitable<ShortCircuitReplicaInfo>> entry : for (Entry<ExtendedBlockId, Waitable<ShortCircuitReplicaInfo>> entry :
replicaInfoMap.entrySet()) { replicaInfoMap.entrySet()) {
Waitable<ShortCircuitReplicaInfo> waitable = entry.getValue(); Waitable<ShortCircuitReplicaInfo> waitable = entry.getValue();
if (waitable.hasVal()) { if (waitable.hasVal()) {
@ -839,13 +840,13 @@ public class ShortCircuitCache implements Closeable {
append("with outstandingMmapCount=").append(outstandingMmapCount). append("with outstandingMmapCount=").append(outstandingMmapCount).
append(", replicas="); append(", replicas=");
String prefix = ""; String prefix = "";
for (Entry<Key, ShortCircuitReplica> entry : replicas.entrySet()) { for (Entry<ExtendedBlockId, ShortCircuitReplica> entry : replicas.entrySet()) {
builder.append(prefix).append(entry.getValue()); builder.append(prefix).append(entry.getValue());
prefix = ","; prefix = ",";
} }
prefix = ""; prefix = "";
builder.append(", failedLoads="); builder.append(", failedLoads=");
for (Entry<Key, InvalidToken> entry : failedLoads.entrySet()) { for (Entry<ExtendedBlockId, InvalidToken> entry : failedLoads.entrySet()) {
builder.append(prefix).append(entry.getValue()); builder.append(prefix).append(entry.getValue());
prefix = ","; prefix = ",";
} }

View File

@ -25,10 +25,9 @@ import java.nio.MappedByteBuffer;
import java.nio.channels.FileChannel; import java.nio.channels.FileChannel;
import java.nio.channels.FileChannel.MapMode; import java.nio.channels.FileChannel.MapMode;
import org.apache.commons.lang.builder.EqualsBuilder;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO;
@ -49,65 +48,10 @@ import com.google.common.base.Preconditions;
public class ShortCircuitReplica { public class ShortCircuitReplica {
public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class); public static final Log LOG = LogFactory.getLog(ShortCircuitCache.class);
/**
* Immutable class which identifies a ShortCircuitReplica object.
*/
public static final class Key {
public Key(long blockId, String bpId) {
this.blockId = blockId;
this.bpId = bpId;
}
public long getBlockId() {
return this.blockId;
}
public String getBlockPoolId() {
return this.bpId;
}
@Override
public boolean equals(Object o) {
if ((o == null) || (o.getClass() != this.getClass())) {
return false;
}
Key other = (Key)o;
return new EqualsBuilder().
append(blockId, other.blockId).
append(bpId, other.bpId).
isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder().
append(this.blockId).
append(this.bpId).
toHashCode();
}
@Override
public String toString() {
return new StringBuilder().append(blockId).
append("_").append(bpId).toString();
}
/**
* The block ID for this BlockDescriptors object.
*/
private final long blockId;
/**
* The block pool ID for this BlockDescriptors object.
*/
private final String bpId;
}
/** /**
* Identifies this ShortCircuitReplica object. * Identifies this ShortCircuitReplica object.
*/ */
final Key key; final ExtendedBlockId key;
/** /**
* The block data input stream. * The block data input stream.
@ -168,7 +112,7 @@ public class ShortCircuitReplica {
*/ */
private Long evictableTimeNs = null; private Long evictableTimeNs = null;
public ShortCircuitReplica(Key key, public ShortCircuitReplica(ExtendedBlockId key,
FileInputStream dataStream, FileInputStream metaStream, FileInputStream dataStream, FileInputStream metaStream,
ShortCircuitCache cache, long creationTimeMs) throws IOException { ShortCircuitCache cache, long creationTimeMs) throws IOException {
this.key = key; this.key = key;
@ -262,7 +206,7 @@ public class ShortCircuitReplica {
return metaHeader; return metaHeader;
} }
public Key getKey() { public ExtendedBlockId getKey() {
return key; return key;
} }

View File

@ -37,12 +37,12 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.commons.io.IOUtils; import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.builder.HashCodeBuilder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.ChecksumException; import org.apache.hadoop.fs.ChecksumException;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -56,43 +56,6 @@ import org.apache.hadoop.io.nativeio.NativeIO;
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class FsDatasetCache { public class FsDatasetCache {
/**
* Keys which identify MappableBlocks.
*/
private static final class Key {
/**
* Block id.
*/
final long id;
/**
* Block pool id.
*/
final String bpid;
Key(long id, String bpid) {
this.id = id;
this.bpid = bpid;
}
@Override
public boolean equals(Object o) {
if (o == null) {
return false;
}
if (!(o.getClass() == getClass())) {
return false;
}
Key other = (Key)o;
return ((other.id == this.id) && (other.bpid.equals(this.bpid)));
}
@Override
public int hashCode() {
return new HashCodeBuilder().append(id).append(bpid).hashCode();
}
};
/** /**
* MappableBlocks that we know about. * MappableBlocks that we know about.
*/ */
@ -143,7 +106,8 @@ public class FsDatasetCache {
/** /**
* Stores MappableBlock objects and the states they're in. * Stores MappableBlock objects and the states they're in.
*/ */
private final HashMap<Key, Value> mappableBlockMap = new HashMap<Key, Value>(); private final HashMap<ExtendedBlockId, Value> mappableBlockMap =
new HashMap<ExtendedBlockId, Value>();
private final AtomicLong numBlocksCached = new AtomicLong(0); private final AtomicLong numBlocksCached = new AtomicLong(0);
@ -260,12 +224,12 @@ public class FsDatasetCache {
*/ */
synchronized List<Long> getCachedBlocks(String bpid) { synchronized List<Long> getCachedBlocks(String bpid) {
List<Long> blocks = new ArrayList<Long>(); List<Long> blocks = new ArrayList<Long>();
for (Iterator<Entry<Key, Value>> iter = for (Iterator<Entry<ExtendedBlockId, Value>> iter =
mappableBlockMap.entrySet().iterator(); iter.hasNext(); ) { mappableBlockMap.entrySet().iterator(); iter.hasNext(); ) {
Entry<Key, Value> entry = iter.next(); Entry<ExtendedBlockId, Value> entry = iter.next();
if (entry.getKey().bpid.equals(bpid)) { if (entry.getKey().getBlockPoolId().equals(bpid)) {
if (entry.getValue().state.shouldAdvertise()) { if (entry.getValue().state.shouldAdvertise()) {
blocks.add(entry.getKey().id); blocks.add(entry.getKey().getBlockId());
} }
} }
} }
@ -278,7 +242,7 @@ public class FsDatasetCache {
synchronized void cacheBlock(long blockId, String bpid, synchronized void cacheBlock(long blockId, String bpid,
String blockFileName, long length, long genstamp, String blockFileName, long length, long genstamp,
Executor volumeExecutor) { Executor volumeExecutor) {
Key key = new Key(blockId, bpid); ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
Value prevValue = mappableBlockMap.get(key); Value prevValue = mappableBlockMap.get(key);
if (prevValue != null) { if (prevValue != null) {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
@ -299,7 +263,7 @@ public class FsDatasetCache {
} }
synchronized void uncacheBlock(String bpid, long blockId) { synchronized void uncacheBlock(String bpid, long blockId) {
Key key = new Key(blockId, bpid); ExtendedBlockId key = new ExtendedBlockId(blockId, bpid);
Value prevValue = mappableBlockMap.get(key); Value prevValue = mappableBlockMap.get(key);
if (prevValue == null) { if (prevValue == null) {
@ -344,12 +308,12 @@ public class FsDatasetCache {
* Background worker that mmaps, mlocks, and checksums a block * Background worker that mmaps, mlocks, and checksums a block
*/ */
private class CachingTask implements Runnable { private class CachingTask implements Runnable {
private final Key key; private final ExtendedBlockId key;
private final String blockFileName; private final String blockFileName;
private final long length; private final long length;
private final long genstamp; private final long genstamp;
CachingTask(Key key, String blockFileName, long length, long genstamp) { CachingTask(ExtendedBlockId key, String blockFileName, long length, long genstamp) {
this.key = key; this.key = key;
this.blockFileName = blockFileName; this.blockFileName = blockFileName;
this.length = length; this.length = length;
@ -361,13 +325,13 @@ public class FsDatasetCache {
boolean success = false; boolean success = false;
FileInputStream blockIn = null, metaIn = null; FileInputStream blockIn = null, metaIn = null;
MappableBlock mappableBlock = null; MappableBlock mappableBlock = null;
ExtendedBlock extBlk = ExtendedBlock extBlk = new ExtendedBlock(key.getBlockPoolId(),
new ExtendedBlock(key.bpid, key.id, length, genstamp); key.getBlockId(), length, genstamp);
long newUsedBytes = usedBytesCount.reserve(length); long newUsedBytes = usedBytesCount.reserve(length);
if (newUsedBytes < 0) { if (newUsedBytes < 0) {
LOG.warn("Failed to cache block id " + key.id + ", pool " + key.bpid + LOG.warn("Failed to cache " + key + ": could not reserve " + length +
": could not reserve " + length + " more bytes in the " + " more bytes in the cache: " +
"cache: " + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY + DFSConfigKeys.DFS_DATANODE_MAX_LOCKED_MEMORY_KEY +
" of " + maxBytes + " exceeded."); " of " + maxBytes + " exceeded.");
numBlocksFailedToCache.incrementAndGet(); numBlocksFailedToCache.incrementAndGet();
return; return;
@ -378,16 +342,15 @@ public class FsDatasetCache {
metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk) metaIn = (FileInputStream)dataset.getMetaDataInputStream(extBlk)
.getWrappedStream(); .getWrappedStream();
} catch (ClassCastException e) { } catch (ClassCastException e) {
LOG.warn("Failed to cache block with id " + key.id + ", pool " + LOG.warn("Failed to cache " + key +
key.bpid + ": Underlying blocks are not backed by files.", e); ": Underlying blocks are not backed by files.", e);
return; return;
} catch (FileNotFoundException e) { } catch (FileNotFoundException e) {
LOG.info("Failed to cache block with id " + key.id + ", pool " + LOG.info("Failed to cache " + key + ": failed to find backing " +
key.bpid + ": failed to find backing files."); "files.");
return; return;
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to cache block with id " + key.id + ", pool " + LOG.warn("Failed to cache " + key + ": failed to open file", e);
key.bpid + ": failed to open file", e);
return; return;
} }
try { try {
@ -395,11 +358,10 @@ public class FsDatasetCache {
load(length, blockIn, metaIn, blockFileName); load(length, blockIn, metaIn, blockFileName);
} catch (ChecksumException e) { } catch (ChecksumException e) {
// Exception message is bogus since this wasn't caused by a file read // Exception message is bogus since this wasn't caused by a file read
LOG.warn("Failed to cache block " + key.id + " in " + key.bpid + ": " + LOG.warn("Failed to cache " + key + ": checksum verification failed.");
"checksum verification failed.");
return; return;
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed to cache block " + key.id + " in " + key.bpid, e); LOG.warn("Failed to cache " + key, e);
return; return;
} }
synchronized (FsDatasetCache.this) { synchronized (FsDatasetCache.this) {
@ -409,15 +371,14 @@ public class FsDatasetCache {
value.state == State.CACHING_CANCELLED); value.state == State.CACHING_CANCELLED);
if (value.state == State.CACHING_CANCELLED) { if (value.state == State.CACHING_CANCELLED) {
mappableBlockMap.remove(key); mappableBlockMap.remove(key);
LOG.warn("Caching of block " + key.id + " in " + key.bpid + LOG.warn("Caching of " + key + " was cancelled.");
" was cancelled.");
return; return;
} }
mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED)); mappableBlockMap.put(key, new Value(mappableBlock, State.CACHED));
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Successfully cached block " + key.id + " in " + key.bpid + LOG.debug("Successfully cached " + key + ". We are now caching " +
". We are now caching " + newUsedBytes + " bytes in total."); newUsedBytes + " bytes in total.");
} }
numBlocksCached.addAndGet(1); numBlocksCached.addAndGet(1);
success = true; success = true;
@ -425,9 +386,8 @@ public class FsDatasetCache {
if (!success) { if (!success) {
newUsedBytes = usedBytesCount.release(length); newUsedBytes = usedBytesCount.release(length);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Caching of block " + key.id + " in " + LOG.debug("Caching of " + key + " was aborted. We are now " +
key.bpid + " was aborted. We are now caching only " + "caching only " + newUsedBytes + " + bytes in total.");
newUsedBytes + " + bytes in total.");
} }
IOUtils.closeQuietly(blockIn); IOUtils.closeQuietly(blockIn);
IOUtils.closeQuietly(metaIn); IOUtils.closeQuietly(metaIn);
@ -445,9 +405,9 @@ public class FsDatasetCache {
} }
private class UncachingTask implements Runnable { private class UncachingTask implements Runnable {
private final Key key; private final ExtendedBlockId key;
UncachingTask(Key key) { UncachingTask(ExtendedBlockId key) {
this.key = key; this.key = key;
} }
@ -470,8 +430,8 @@ public class FsDatasetCache {
usedBytesCount.release(value.mappableBlock.getLength()); usedBytesCount.release(value.mappableBlock.getLength());
numBlocksCached.addAndGet(-1); numBlocksCached.addAndGet(-1);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Uncaching of block " + key.id + " in " + key.bpid + LOG.debug("Uncaching of " + key + " completed. " +
" completed. usedBytes = " + newUsedBytes); "usedBytes = " + newUsedBytes);
} }
} }
} }

View File

@ -34,7 +34,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.BlockReaderTestUtil; import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.ClientContext; import org.apache.hadoop.hdfs.ClientContext;
import org.apache.hadoop.hdfs.DFSClient; import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -46,7 +46,6 @@ import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitCache;
import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor; import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor;
import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.ByteBufferPool; import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -275,8 +274,8 @@ public class TestEnhancedByteBufferAccess {
@Override @Override
public void visit(int numOutstandingMmaps, public void visit(int numOutstandingMmaps,
Map<Key, ShortCircuitReplica> replicas, Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<Key, InvalidToken> failedLoads, Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictable,
Map<Long, ShortCircuitReplica> evictableMmapped) { Map<Long, ShortCircuitReplica> evictableMmapped) {
if (expectedNumOutstandingMmaps >= 0) { if (expectedNumOutstandingMmaps >= 0) {
@ -341,12 +340,12 @@ public class TestEnhancedByteBufferAccess {
cache.accept(new CacheVisitor() { cache.accept(new CacheVisitor() {
@Override @Override
public void visit(int numOutstandingMmaps, public void visit(int numOutstandingMmaps,
Map<Key, ShortCircuitReplica> replicas, Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<Key, InvalidToken> failedLoads, Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictable,
Map<Long, ShortCircuitReplica> evictableMmapped) { Map<Long, ShortCircuitReplica> evictableMmapped) {
ShortCircuitReplica replica = replicas.get( ShortCircuitReplica replica = replicas.get(
new Key(firstBlock.getBlockId(), firstBlock.getBlockPoolId())); new ExtendedBlockId(firstBlock.getBlockId(), firstBlock.getBlockPoolId()));
Assert.assertNotNull(replica); Assert.assertNotNull(replica);
Assert.assertTrue(replica.hasMmap()); Assert.assertTrue(replica.hasMmap());
// The replica should not yet be evictable, since we have it open. // The replica should not yet be evictable, since we have it open.
@ -378,8 +377,8 @@ public class TestEnhancedByteBufferAccess {
cache.accept(new CacheVisitor() { cache.accept(new CacheVisitor() {
@Override @Override
public void visit(int numOutstandingMmaps, public void visit(int numOutstandingMmaps,
Map<Key, ShortCircuitReplica> replicas, Map<ExtendedBlockId, ShortCircuitReplica> replicas,
Map<Key, InvalidToken> failedLoads, Map<ExtendedBlockId, InvalidToken> failedLoads,
Map<Long, ShortCircuitReplica> evictable, Map<Long, ShortCircuitReplica> evictable,
Map<Long, ShortCircuitReplica> evictableMmapped) { Map<Long, ShortCircuitReplica> evictableMmapped) {
finished.setValue(evictableMmapped.isEmpty()); finished.setValue(evictableMmapped.isEmpty());

View File

@ -32,14 +32,12 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream; import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitCache;
import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy; import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.DomainSocket; import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -170,7 +168,7 @@ public class TestBlockReaderLocal {
}; };
dataIn = streams[0]; dataIn = streams[0];
metaIn = streams[1]; metaIn = streams[1];
Key key = new Key(block.getBlockId(), block.getBlockPoolId()); ExtendedBlockId key = new ExtendedBlockId(block.getBlockId(), block.getBlockPoolId());
ShortCircuitReplica replica = new ShortCircuitReplica( ShortCircuitReplica replica = new ShortCircuitReplica(
key, dataIn, metaIn, shortCircuitCache, Time.now()); key, dataIn, metaIn, shortCircuitCache, Time.now());
blockReaderLocal = new BlockReaderLocal.Builder( blockReaderLocal = new BlockReaderLocal.Builder(

View File

@ -20,17 +20,13 @@ package org.apache.hadoop.hdfs;
import org.apache.commons.lang.mutable.MutableBoolean; import org.apache.commons.lang.mutable.MutableBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.client.ClientMmap;
import org.apache.hadoop.hdfs.client.ShortCircuitCache; import org.apache.hadoop.hdfs.client.ShortCircuitCache;
import org.apache.hadoop.hdfs.client.ShortCircuitCache.CacheVisitor;
import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator; import org.apache.hadoop.hdfs.client.ShortCircuitCache.ShortCircuitReplicaCreator;
import org.apache.hadoop.hdfs.client.ShortCircuitReplica; import org.apache.hadoop.hdfs.client.ShortCircuitReplica;
import org.apache.hadoop.hdfs.client.ShortCircuitReplica.Key;
import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo; import org.apache.hadoop.hdfs.client.ShortCircuitReplicaInfo;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.unix.TemporarySocketDirectory; import org.apache.hadoop.net.unix.TemporarySocketDirectory;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -44,7 +40,6 @@ import java.io.DataOutputStream;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.util.Map;
public class TestShortCircuitCache { public class TestShortCircuitCache {
static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class); static final Log LOG = LogFactory.getLog(TestShortCircuitCache.class);
@ -105,7 +100,7 @@ public class TestShortCircuitCache {
@Override @Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
try { try {
Key key = new Key(blockId, "test_bp1"); ExtendedBlockId key = new ExtendedBlockId(blockId, "test_bp1");
return new ShortCircuitReplicaInfo( return new ShortCircuitReplicaInfo(
new ShortCircuitReplica(key, new ShortCircuitReplica(key,
pair.getFileInputStreams()[0], pair.getFileInputStreams()[1], pair.getFileInputStreams()[0], pair.getFileInputStreams()[1],
@ -129,14 +124,14 @@ public class TestShortCircuitCache {
new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000); new ShortCircuitCache(10, 10000000, 10, 10000000, 1, 10000);
final TestFileDescriptorPair pair = new TestFileDescriptorPair(); final TestFileDescriptorPair pair = new TestFileDescriptorPair();
ShortCircuitReplicaInfo replicaInfo1 = ShortCircuitReplicaInfo replicaInfo1 =
cache.fetchOrCreate(new Key(123, "test_bp1"), cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"),
new SimpleReplicaCreator(123, cache, pair)); new SimpleReplicaCreator(123, cache, pair));
Preconditions.checkNotNull(replicaInfo1.getReplica()); Preconditions.checkNotNull(replicaInfo1.getReplica());
Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null); Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
pair.compareWith(replicaInfo1.getReplica().getDataStream(), pair.compareWith(replicaInfo1.getReplica().getDataStream(),
replicaInfo1.getReplica().getMetaStream()); replicaInfo1.getReplica().getMetaStream());
ShortCircuitReplicaInfo replicaInfo2 = ShortCircuitReplicaInfo replicaInfo2 =
cache.fetchOrCreate(new Key(123, "test_bp1"), cache.fetchOrCreate(new ExtendedBlockId(123, "test_bp1"),
new ShortCircuitReplicaCreator() { new ShortCircuitReplicaCreator() {
@Override @Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
@ -157,7 +152,7 @@ public class TestShortCircuitCache {
// really long here) // really long here)
ShortCircuitReplicaInfo replicaInfo3 = ShortCircuitReplicaInfo replicaInfo3 =
cache.fetchOrCreate( cache.fetchOrCreate(
new Key(123, "test_bp1"), new ShortCircuitReplicaCreator() { new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() {
@Override @Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
Assert.fail("expected to use existing entry."); Assert.fail("expected to use existing entry.");
@ -179,7 +174,7 @@ public class TestShortCircuitCache {
final TestFileDescriptorPair pair = new TestFileDescriptorPair(); final TestFileDescriptorPair pair = new TestFileDescriptorPair();
ShortCircuitReplicaInfo replicaInfo1 = ShortCircuitReplicaInfo replicaInfo1 =
cache.fetchOrCreate( cache.fetchOrCreate(
new Key(123, "test_bp1"), new SimpleReplicaCreator(123, cache, pair)); new ExtendedBlockId(123, "test_bp1"), new SimpleReplicaCreator(123, cache, pair));
Preconditions.checkNotNull(replicaInfo1.getReplica()); Preconditions.checkNotNull(replicaInfo1.getReplica());
Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null); Preconditions.checkState(replicaInfo1.getInvalidTokenException() == null);
pair.compareWith(replicaInfo1.getReplica().getDataStream(), pair.compareWith(replicaInfo1.getReplica().getDataStream(),
@ -190,7 +185,7 @@ public class TestShortCircuitCache {
Thread.sleep(10); Thread.sleep(10);
ShortCircuitReplicaInfo replicaInfo2 = ShortCircuitReplicaInfo replicaInfo2 =
cache.fetchOrCreate( cache.fetchOrCreate(
new Key(123, "test_bp1"), new ShortCircuitReplicaCreator() { new ExtendedBlockId(123, "test_bp1"), new ShortCircuitReplicaCreator() {
@Override @Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
triedToCreate.setValue(true); triedToCreate.setValue(true);
@ -221,7 +216,7 @@ public class TestShortCircuitCache {
}; };
for (int i = 0; i < pairs.length; i++) { for (int i = 0; i < pairs.length; i++) {
replicaInfos[i] = cache.fetchOrCreate( replicaInfos[i] = cache.fetchOrCreate(
new Key(i, "test_bp1"), new ExtendedBlockId(i, "test_bp1"),
new SimpleReplicaCreator(i, cache, pairs[i])); new SimpleReplicaCreator(i, cache, pairs[i]));
Preconditions.checkNotNull(replicaInfos[i].getReplica()); Preconditions.checkNotNull(replicaInfos[i].getReplica());
Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null); Preconditions.checkState(replicaInfos[i].getInvalidTokenException() == null);
@ -237,7 +232,7 @@ public class TestShortCircuitCache {
for (int i = 1; i < pairs.length; i++) { for (int i = 1; i < pairs.length; i++) {
final Integer iVal = new Integer(i); final Integer iVal = new Integer(i);
replicaInfos[i] = cache.fetchOrCreate( replicaInfos[i] = cache.fetchOrCreate(
new Key(i, "test_bp1"), new ExtendedBlockId(i, "test_bp1"),
new ShortCircuitReplicaCreator() { new ShortCircuitReplicaCreator() {
@Override @Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
@ -253,7 +248,7 @@ public class TestShortCircuitCache {
// The first (oldest) replica should not be cached. // The first (oldest) replica should not be cached.
final MutableBoolean calledCreate = new MutableBoolean(false); final MutableBoolean calledCreate = new MutableBoolean(false);
replicaInfos[0] = cache.fetchOrCreate( replicaInfos[0] = cache.fetchOrCreate(
new Key(0, "test_bp1"), new ExtendedBlockId(0, "test_bp1"),
new ShortCircuitReplicaCreator() { new ShortCircuitReplicaCreator() {
@Override @Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
@ -289,7 +284,7 @@ public class TestShortCircuitCache {
final long HOUR_IN_MS = 60 * 60 * 1000; final long HOUR_IN_MS = 60 * 60 * 1000;
for (int i = 0; i < pairs.length; i++) { for (int i = 0; i < pairs.length; i++) {
final Integer iVal = new Integer(i); final Integer iVal = new Integer(i);
final Key key = new Key(i, "test_bp1"); final ExtendedBlockId key = new ExtendedBlockId(i, "test_bp1");
replicaInfos[i] = cache.fetchOrCreate(key, replicaInfos[i] = cache.fetchOrCreate(key,
new ShortCircuitReplicaCreator() { new ShortCircuitReplicaCreator() {
@Override @Override
@ -316,7 +311,7 @@ public class TestShortCircuitCache {
@Override @Override
public Boolean get() { public Boolean get() {
ShortCircuitReplicaInfo info = cache.fetchOrCreate( ShortCircuitReplicaInfo info = cache.fetchOrCreate(
new Key(0, "test_bp1"), new ShortCircuitReplicaCreator() { new ExtendedBlockId(0, "test_bp1"), new ShortCircuitReplicaCreator() {
@Override @Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
return null; return null;
@ -332,7 +327,7 @@ public class TestShortCircuitCache {
// Make sure that second replica did not go stale. // Make sure that second replica did not go stale.
ShortCircuitReplicaInfo info = cache.fetchOrCreate( ShortCircuitReplicaInfo info = cache.fetchOrCreate(
new Key(1, "test_bp1"), new ShortCircuitReplicaCreator() { new ExtendedBlockId(1, "test_bp1"), new ShortCircuitReplicaCreator() {
@Override @Override
public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() { public ShortCircuitReplicaInfo createShortCircuitReplicaInfo() {
Assert.fail("second replica went stale, despite 1 " + Assert.fail("second replica went stale, despite 1 " +