Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1213867 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2011-12-13 19:02:37 +00:00
commit a0fe4f476a
63 changed files with 990 additions and 356 deletions

View File

@ -585,6 +585,35 @@ $JIRA_COMMENT_FOOTER"
return 0
}
###############################################################################
### Verify eclipse:eclipse works
checkEclipseGeneration () {
echo ""
echo ""
echo "======================================================================"
echo "======================================================================"
echo " Running mvn eclipse:eclipse."
echo "======================================================================"
echo "======================================================================"
echo ""
echo ""
echo "$MVN eclipse:eclipse -D${PROJECT_NAME}PatchProcess"
$MVN eclipse:eclipse -D${PROJECT_NAME}PatchProcess
if [[ $? != 0 ]] ; then
JIRA_COMMENT="$JIRA_COMMENT
-1 eclipse:eclipse. The patch failed to build with eclipse:eclipse."
return 1
fi
JIRA_COMMENT="$JIRA_COMMENT
+1 eclipse:eclipse. The patch built with eclipse:eclipse."
return 0
}
###############################################################################
### Run the tests
runTests () {
@ -790,6 +819,8 @@ checkJavadocWarnings
(( RESULT = RESULT + $? ))
checkJavacWarnings
(( RESULT = RESULT + $? ))
checkEclipseGeneration
(( RESULT = RESULT + $? ))
### Checkstyle not implemented yet
#checkStyle
#(( RESULT = RESULT + $? ))

View File

@ -134,6 +134,8 @@ Trunk (unreleased changes)
HADOOP-7902. skipping name rules setting (if already set) should be done
on UGI initialization only. (tucu)
HADOOP-7913 Fix bug in ProtoBufRpcEngine (sanjay)
OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd)
@ -172,6 +174,9 @@ Release 0.23.1 - Unreleased
HADOOP-6886. LocalFileSystem Needs createNonRecursive API.
(Nicolas Spiegelberg and eli via eli)
HADOOP-7912. test-patch should run eclipse:eclipse to verify that it does
not break again. (Robert Joseph Evans via tomwhite)
OPTIMIZATIONS
BUG FIXES
@ -200,6 +205,9 @@ Release 0.23.1 - Unreleased
HADOOP-7878 Regression: HADOOP-7777 switch changes break HDFS tests when the
isSingleSwitch() predicate is used. (stevel)
HADOOP-7914. Remove the duplicated declaration of hadoop-hdfs test-jar in
hadoop-project/pom.xml. (szetszwo)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -325,7 +325,7 @@ public Server(Class<?> protocolClass, Object protocolImpl,
int numReaders, int queueSizePerHandler, boolean verbose,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
super(bindAddress, port, RpcRequestWritable.class, numHandlers,
super(bindAddress, port, null, numHandlers,
numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl
.getClass().getName()), secretManager);
this.verbose = verbose;

View File

@ -62,13 +62,13 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.RpcPayloadHeader.*;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RPC.RpcInvoker;
import org.apache.hadoop.ipc.RPC.VersionMismatch;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcPayloadOperation;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.metrics.RpcMetrics;
@ -76,18 +76,18 @@
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SaslRpcServer;
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
import org.apache.hadoop.security.SaslRpcServer.SaslDigestCallbackHandler;
import org.apache.hadoop.security.SaslRpcServer.SaslGssCallbackHandler;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.SaslRpcServer.SaslStatus;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AuthorizationException;
import org.apache.hadoop.security.authorize.PolicyProvider;
import org.apache.hadoop.security.authorize.ProxyUsers;
import org.apache.hadoop.security.authorize.ServiceAuthorizationManager;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.StringUtils;
@ -1504,7 +1504,8 @@ private void processData(byte[] buf) throws IOException, InterruptedException {
rpcRequest.readFields(dis);
} catch (Throwable t) {
LOG.warn("Unable to read call parameters for client " +
getHostAddress(), t);
getHostAddress() + "on connection protocol " +
this.protocolName + " for rpcKind " + header.getkind(), t);
final Call readParamsFailedCall =
new Call(header.getCallId(), null, this);
ByteArrayOutputStream responseBuffer = new ByteArrayOutputStream();

View File

@ -32,6 +32,10 @@ Trunk (unreleased changes)
HDFS-2647. Used protobuf based RPC for InterDatanodeProtocol,
ClientDatanodeProtocol, JournalProtocol, NamenodeProtocol. (suresh)
HDFS-2663. Handle protobuf optional parameters correctly. (suresh)
HDFS-2666. Fix TestBackupNode failure. (suresh)
IMPROVEMENTS
HADOOP-7524 Change RPC to allow multiple protocols including multuple
@ -100,6 +104,8 @@ Trunk (unreleased changes)
HDFS-2651 ClientNameNodeProtocol Translators for Protocol Buffers (sanjay)
HDFS-2650. Replace @inheritDoc with @Override. (Hari Mankude via suresh).
OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong)
@ -108,8 +114,7 @@ Trunk (unreleased changes)
thus reducing contention for write lock. (Tomasz Nykiel via hairong)
HDFS-2476. More CPU efficient data structure for under-replicated,
over-replicated, and invalidated blocks.
(Tomasz Nykiel via todd)
over-replicated, and invalidated blocks. (Tomasz Nykiel via todd)
BUG FIXES
HDFS-2299. TestOfflineEditsViewer is failing on trunk. (Uma Maheswara Rao G
@ -151,6 +156,9 @@ Trunk (unreleased changes)
HDFS-2606. webhdfs client filesystem impl must set the content-type
header for create/append. (tucu)
HDFS-1765. Block Replication should respect under-replication
block priority. (Uma Maheswara Rao G via eli)
Release 0.23.1 - UNRELEASED
INCOMPATIBLE CHANGES
@ -199,6 +207,8 @@ Release 0.23.1 - UNRELEASED
HDFS-2604. Add a log message to show if WebHDFS is enabled and a
configuration section in the forrest doc. (szetszwo)
HDFS-2654. Make BlockReaderLocal not extend RemoteBlockReader2. (eli)
OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd)
@ -232,6 +242,9 @@ Release 0.23.1 - UNRELEASED
HDFS-2596. TestDirectoryScanner doesn't test parallel scans. (eli)
HDFS-2653. DFSClient should cache whether addrs are non-local when
short-circuiting is enabled. (eli)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -21,6 +21,7 @@
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.HashMap;
@ -37,6 +38,7 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum;
@ -57,8 +59,8 @@
* if security is enabled.</li>
* </ul>
*/
class BlockReaderLocal extends RemoteBlockReader2 {
public static final Log LOG = LogFactory.getLog(DFSClient.class);
class BlockReaderLocal implements BlockReader {
private static final Log LOG = LogFactory.getLog(DFSClient.class);
//Stores the cache and proxy for a local datanode.
private static class LocalDatanodeInfo {
@ -117,13 +119,24 @@ private void removeBlockLocalPathInfo(ExtendedBlock b) {
private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
private final FileInputStream dataIn; // reader for the data file
private FileInputStream checksumIn; // reader for the checksum file
private int offsetFromChunkBoundary;
ByteBuffer dataBuff = null;
ByteBuffer checksumBuff = null;
private byte[] skipBuf = null;
private ByteBuffer dataBuff = null;
private ByteBuffer checksumBuff = null;
private DataChecksum checksum;
private final boolean verifyChecksum;
private static DirectBufferPool bufferPool = new DirectBufferPool();
private int bytesPerChecksum;
private int checksumSize;
/** offset in block where reader wants to actually read */
private long startOffset;
private final String filename;
/**
* The only way this object can be instantiated.
@ -256,9 +269,14 @@ private BlockReaderLocal(Configuration conf, String hdfsfile,
long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
FileInputStream checksumIn) throws IOException {
super(hdfsfile, block.getBlockPoolId(), block.getBlockId(), dataIn
.getChannel(), checksum, verifyChecksum, startOffset, firstChunkOffset,
length, null);
this.filename = hdfsfile;
this.checksum = checksum;
this.verifyChecksum = verifyChecksum;
this.startOffset = Math.max(startOffset, 0);
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
this.dataIn = dataIn;
this.checksumIn = checksumIn;
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
@ -322,10 +340,8 @@ public synchronized int read(byte[] buf, int off, int len) throws IOException {
readIntoBuffer(checksumIn, checksumBuff);
checksumBuff.flip();
dataBuff.flip();
if (verifyChecksum) {
checksum.verifyChunkedSums(dataBuff, checksumBuff, filename,
this.startOffset);
}
} else {
dataRead = dataBuff.remaining();
}
@ -356,9 +372,24 @@ public synchronized long skip(long n) throws IOException {
}
if (!verifyChecksum) {
return dataIn.skip(n);
} else {
return super.skip(n);
}
// Skip by reading the data so we stay in sync with checksums.
// This could be implemented more efficiently in the future to
// skip to the beginning of the appropriate checksum chunk
// and then only read to the middle of that chunk.
if (skipBuf == null) {
skipBuf = new byte[bytesPerChecksum];
}
long nSkipped = 0;
while ( nSkipped < n ) {
int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
int ret = read(skipBuf, 0, toSkip);
if ( ret <= 0 ) {
return nSkipped;
}
nSkipped += ret;
}
return nSkipped;
}
@Override
@ -375,6 +406,27 @@ public synchronized void close() throws IOException {
bufferPool.returnBuffer(checksumBuff);
checksumBuff = null;
}
super.close();
startOffset = -1;
checksum = null;
}
@Override
public int readAll(byte[] buf, int offset, int len) throws IOException {
return BlockReaderUtil.readAll(this, buf, offset, len);
}
@Override
public void readFully(byte[] buf, int off, int len) throws IOException {
BlockReaderUtil.readFully(this, buf, off, len);
}
@Override
public Socket takeSocket() {
return null;
}
@Override
public boolean hasSentStatusCode() {
return false;
}
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
/**
* For sharing between the local and remote block reader implementations.
*/
class BlockReaderUtil {
/* See {@link BlockReader#readAll(byte[], int, int)} */
public static int readAll(BlockReader reader,
byte[] buf, int offset, int len) throws IOException {
int n = 0;
for (;;) {
int nread = reader.read(buf, offset + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;
}
}
/* See {@link BlockReader#readFully(byte[], int, int)} */
public static void readFully(BlockReader reader,
byte[] buf, int off, int len) throws IOException {
int toRead = len;
while (toRead > 0) {
int ret = reader.read(buf, off, toRead);
if (ret < 0) {
throw new IOException("Premature EOF from inputStream");
}
toRead -= ret;
off += ret;
}
}
}

View File

@ -81,17 +81,13 @@ private void loadNext() throws IOException {
}
}
/**
* {@inheritDoc}
*/
@Override
public boolean hasNext() {
return nextPath != null;
}
/**
* {@inheritDoc}
*/
@Override
public Path next() throws IOException {
if (!hasNext()) {

View File

@ -33,10 +33,8 @@
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import javax.net.SocketFactory;
@ -575,12 +573,13 @@ static BlockReader getLocalBlockReader(Configuration conf,
}
}
private static Set<String> localIpAddresses = Collections
.synchronizedSet(new HashSet<String>());
private static Map<String, Boolean> localAddrMap = Collections
.synchronizedMap(new HashMap<String, Boolean>());
private static boolean isLocalAddress(InetSocketAddress targetAddr) {
InetAddress addr = targetAddr.getAddress();
if (localIpAddresses.contains(addr.getHostAddress())) {
Boolean cached = localAddrMap.get(addr.getHostAddress());
if (cached != null && cached) {
if (LOG.isTraceEnabled()) {
LOG.trace("Address " + targetAddr + " is local");
}
@ -601,9 +600,7 @@ private static boolean isLocalAddress(InetSocketAddress targetAddr) {
if (LOG.isTraceEnabled()) {
LOG.trace("Address " + targetAddr + " is local");
}
if (local == true) {
localIpAddresses.add(addr.getHostAddress());
}
localAddrMap.put(addr.getHostAddress(), local);
return local;
}
@ -1735,8 +1732,7 @@ public long getVisibleLength() throws IOException {
}
}
boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr)
throws IOException {
boolean shouldTryShortCircuitRead(InetSocketAddress targetAddr) {
if (shortCircuitLocalReads && isLocalAddress(targetAddr)) {
return true;
}
@ -1759,7 +1755,7 @@ void reportChecksumFailure(String file, LocatedBlock lblocks[]) {
}
}
/** {@inheritDoc} */
@Override
public String toString() {
return getClass().getSimpleName() + "[clientName=" + clientName
+ ", ugi=" + ugi + "]";

View File

@ -179,7 +179,7 @@ public void setWorkingDirectory(Path dir) {
workingDir = makeAbsolute(dir);
}
/** {@inheritDoc} */
@Override
public Path getHomeDirectory() {
return makeQualified(new Path("/user/" + dfs.ugi.getShortUserName()));
@ -306,7 +306,7 @@ public void concat(Path trg, Path [] psrcs) throws IOException {
dfs.concat(getPathName(trg), srcs);
}
/** {@inheritDoc} */
@SuppressWarnings("deprecation")
@Override
public boolean rename(Path src, Path dst) throws IOException {
@ -315,7 +315,6 @@ public boolean rename(Path src, Path dst) throws IOException {
}
/**
* {@inheritDoc}
* This rename operation is guaranteed to be atomic.
*/
@SuppressWarnings("deprecation")
@ -331,7 +330,6 @@ public boolean delete(Path f, boolean recursive) throws IOException {
return dfs.delete(getPathName(f), recursive);
}
/** {@inheritDoc} */
@Override
public ContentSummary getContentSummary(Path f) throws IOException {
statistics.incrementReadOps(1);
@ -512,7 +510,7 @@ protected boolean primitiveMkdir(Path f, FsPermission absolutePermission)
return dfs.primitiveMkdir(getPathName(f), absolutePermission);
}
/** {@inheritDoc} */
@Override
public void close() throws IOException {
try {
@ -552,7 +550,6 @@ public long getDfsUsed() {
}
}
/** {@inheritDoc} */
@Override
public FsStatus getStatus(Path p) throws IOException {
statistics.incrementReadOps(1);
@ -614,9 +611,6 @@ public long getCorruptBlocksCount() throws IOException {
return dfs.getCorruptBlocksCount();
}
/**
* {@inheritDoc}
*/
@Override
public RemoteIterator<Path> listCorruptFileBlocks(Path path)
throws IOException {
@ -694,7 +688,6 @@ public void metaSave(String pathname) throws IOException {
dfs.metaSave(pathname);
}
/** {@inheritDoc} */
@Override
public FsServerDefaults getServerDefaults() throws IOException {
return dfs.getServerDefaults();
@ -765,14 +758,12 @@ public FileStatus getFileStatus(Path f) throws IOException {
}
}
/** {@inheritDoc} */
@Override
public MD5MD5CRC32FileChecksum getFileChecksum(Path f) throws IOException {
statistics.incrementReadOps(1);
return dfs.getFileChecksum(getPathName(f));
}
/** {@inheritDoc }*/
@Override
public void setPermission(Path p, FsPermission permission
) throws IOException {
@ -780,7 +771,6 @@ public void setPermission(Path p, FsPermission permission
dfs.setPermission(getPathName(p), permission);
}
/** {@inheritDoc }*/
@Override
public void setOwner(Path p, String username, String groupname
) throws IOException {
@ -791,7 +781,6 @@ public void setOwner(Path p, String username, String groupname
dfs.setOwner(getPathName(p), username, groupname);
}
/** {@inheritDoc }*/
@Override
public void setTimes(Path p, long mtime, long atime
) throws IOException {

View File

@ -529,7 +529,7 @@ public FileStatus getFileStatus(Path f) throws IOException {
private class ChecksumParser extends DefaultHandler {
private FileChecksum filechecksum;
/** {@inheritDoc} */
@Override
public void startElement(String ns, String localname, String qname,
Attributes attrs) throws SAXException {
if (!MD5MD5CRC32FileChecksum.class.getName().equals(qname)) {
@ -563,7 +563,7 @@ private FileChecksum getFileChecksum(String f) throws IOException {
}
}
/** {@inheritDoc} */
@Override
public FileChecksum getFileChecksum(Path f) throws IOException {
final String s = makeQualified(f).toUri().getPath();
return new ChecksumParser().getFileChecksum(s);
@ -611,7 +611,7 @@ public boolean mkdirs(Path f, FsPermission permission) throws IOException {
private class ContentSummaryParser extends DefaultHandler {
private ContentSummary contentsummary;
/** {@inheritDoc} */
@Override
public void startElement(String ns, String localname, String qname,
Attributes attrs) throws SAXException {
if (!ContentSummary.class.getName().equals(qname)) {
@ -697,7 +697,7 @@ private static ContentSummary toContentSummary(Attributes attrs
}
}
/** {@inheritDoc} */
@Override
public ContentSummary getContentSummary(Path f) throws IOException {
final String s = makeQualified(f).toUri().getPath();
final ContentSummary cs = new ContentSummaryParser().getContentSummary(s);

View File

@ -85,7 +85,7 @@ public class RemoteBlockReader2 implements BlockReader {
Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
private ReadableByteChannel in;
protected DataChecksum checksum;
private DataChecksum checksum;
private PacketHeader curHeader;
private ByteBuffer curPacketBuf = null;
@ -96,25 +96,24 @@ public class RemoteBlockReader2 implements BlockReader {
private long lastSeqNo = -1;
/** offset in block where reader wants to actually read */
protected long startOffset;
protected final String filename;
private long startOffset;
private final String filename;
protected static DirectBufferPool bufferPool =
new DirectBufferPool();
private static DirectBufferPool bufferPool = new DirectBufferPool();
private ByteBuffer headerBuf = ByteBuffer.allocate(
PacketHeader.PKT_HEADER_LEN);
protected int bytesPerChecksum;
protected int checksumSize;
private int bytesPerChecksum;
private int checksumSize;
/**
* The total number of bytes we need to transfer from the DN.
* This is the amount that the user has requested plus some padding
* at the beginning so that the read can begin on a chunk boundary.
*/
protected long bytesNeededToFinish;
private long bytesNeededToFinish;
protected final boolean verifyChecksum;
private final boolean verifyChecksum;
private boolean sentStatusCode = false;
@ -389,29 +388,12 @@ public static String getFileName(final InetSocketAddress s,
@Override
public int readAll(byte[] buf, int offset, int len) throws IOException {
int n = 0;
for (;;) {
int nread = read(buf, offset + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;
}
return BlockReaderUtil.readAll(this, buf, offset, len);
}
@Override
public void readFully(byte[] buf, int off, int len)
throws IOException {
int toRead = len;
while (toRead > 0) {
int ret = read(buf, off, toRead);
if (ret < 0) {
throw new IOException("Premature EOF from inputStream");
}
toRead -= ret;
off += ret;
}
public void readFully(byte[] buf, int off, int len) throws IOException {
BlockReaderUtil.readFully(this, buf, off, len);
}
/**

View File

@ -53,9 +53,7 @@ public String getCookie() {
return cookie;
}
/**
* {@inheritDoc}
*/
@Override
public void readFields(DataInput in) throws IOException {
int fileCount = in.readInt();
@ -66,9 +64,7 @@ public void readFields(DataInput in) throws IOException {
cookie = Text.readString(in);
}
/**
* {@inheritDoc}
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeInt(files.length);
@ -78,9 +74,8 @@ public void write(DataOutput out) throws IOException {
Text.writeString(out, cookie);
}
/**
* {@inheritDoc}
*/
@Override
public boolean equals(Object obj) {
if (this == obj) {
return true;
@ -93,9 +88,8 @@ public boolean equals(Object obj) {
Arrays.equals(files, other.files);
}
/**
* {@inheritDoc}
*/
@Override
public int hashCode() {
int result = cookie.hashCode();

View File

@ -185,14 +185,14 @@ public int compareTo(DatanodeID that) {
/////////////////////////////////////////////////
// Writable
/////////////////////////////////////////////////
/** {@inheritDoc} */
@Override
public void write(DataOutput out) throws IOException {
DeprecatedUTF8.writeString(out, name);
DeprecatedUTF8.writeString(out, storageID);
out.writeShort(infoPort);
}
/** {@inheritDoc} */
@Override
public void readFields(DataInput in) throws IOException {
name = DeprecatedUTF8.readString(in);
storageID = DeprecatedUTF8.readString(in);

View File

@ -382,7 +382,7 @@ protected void setAdminState(AdminStates newState) {
});
}
/** {@inheritDoc} */
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
@ -400,7 +400,7 @@ public void write(DataOutput out) throws IOException {
WritableUtils.writeEnum(out, getAdminState());
}
/** {@inheritDoc} */
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);

View File

@ -159,7 +159,7 @@ public static LocatedBlock read(DataInput in) throws IOException {
return lb;
}
/** {@inheritDoc} */
@Override
public String toString() {
return getClass().getSimpleName() + "{" + b
+ "; getBlockSize()=" + getBlockSize()

View File

@ -225,7 +225,6 @@ public void readFields(DataInput in) throws IOException {
}
}
/** {@inheritDoc} */
@Override
public String toString() {
final StringBuilder b = new StringBuilder(getClass().getSimpleName());

View File

@ -24,6 +24,9 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.Options.Rename;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AbandonBlockResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.AddBlockRequestProto;
@ -52,6 +55,7 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
@ -124,6 +128,7 @@
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.UpgradeStatusReportProto;
import org.apache.hadoop.hdfs.protocolR23Compatible.ProtocolSignatureWritable;
import org.apache.hadoop.hdfs.server.common.UpgradeStatusReport;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.ipc.ProtocolSignature;
import org.apache.hadoop.ipc.RPC;
@ -208,11 +213,16 @@ public GetBlockLocationsResponseProto getBlockLocations(
RpcController controller, GetBlockLocationsRequestProto req)
throws ServiceException {
try {
return GetBlockLocationsResponseProto
.newBuilder()
.setLocations(
LocatedBlocks b = server.getBlockLocations(req.getSrc(), req.getOffset(),
req.getLength());
Builder builder = GetBlockLocationsResponseProto
.newBuilder();
if (b != null) {
builder.setLocations(
PBHelper.convert(server.getBlockLocations(req.getSrc(),
req.getOffset(), req.getLength()))).build();
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
@ -325,7 +335,7 @@ public AddBlockResponseProto addBlock(RpcController controller,
return AddBlockResponseProto.newBuilder().setBlock(
PBHelper.convert(
server.addBlock(req.getSrc(), req.getClientName(),
PBHelper.convert(req.getPrevious()),
req.hasPrevious() ? PBHelper.convert(req.getPrevious()) : null,
PBHelper.convert(
(DatanodeInfoProto[]) req.getExcludeNodesList().toArray()))))
.build();
@ -594,10 +604,14 @@ public DistributedUpgradeProgressResponseProto distributedUpgradeProgress(
RpcController controller, DistributedUpgradeProgressRequestProto req)
throws ServiceException {
try {
UpgradeStatusReportProto result = PBHelper.convert(server
.distributedUpgradeProgress(PBHelper.convert(req.getAction())));
return DistributedUpgradeProgressResponseProto.newBuilder()
.setReport(result).build();
UpgradeStatusReport result = server.distributedUpgradeProgress(PBHelper
.convert(req.getAction()));
DistributedUpgradeProgressResponseProto.Builder builder =
DistributedUpgradeProgressResponseProto.newBuilder();
if (result != null) {
builder.setReport(PBHelper.convert(result));
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}
@ -636,9 +650,13 @@ public MetaSaveResponseProto metaSave(RpcController controller,
public GetFileInfoResponseProto getFileInfo(RpcController controller,
GetFileInfoRequestProto req) throws ServiceException {
try {
HdfsFileStatusProto result =
PBHelper.convert(server.getFileInfo(req.getSrc()));
return GetFileInfoResponseProto.newBuilder().setFs(result).build();
HdfsFileStatus res = server.getFileInfo(req.getSrc());
GetFileInfoResponseProto.Builder builder =
GetFileInfoResponseProto.newBuilder();
if (res != null) {
builder.setFs(PBHelper.convert(res));
}
return builder.build();
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -83,14 +83,17 @@
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateSymlinkRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DeleteRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DistributedUpgradeProgressResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FinalizeUpgradeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.FsyncRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetAdditionalDatanodeRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetBlockLocationsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetContentSummaryRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDatanodeReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetDelegationTokenRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileInfoResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFileLinkInfoRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatusRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetLinkTargetRequestProto;
@ -205,7 +208,10 @@ public LocatedBlocks getBlockLocations(String src, long offset, long length)
.setLength(length)
.build();
try {
return PBHelper.convert(rpcProxy.getBlockLocations(null, req).getLocations());
GetBlockLocationsResponseProto resp = rpcProxy.getBlockLocations(null,
req);
return resp.hasLocations() ?
PBHelper.convert(resp.getLocations()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -329,12 +335,15 @@ public LocatedBlock addBlock(String src, String clientName,
throws AccessControlException, FileNotFoundException,
NotReplicatedYetException, SafeModeException, UnresolvedLinkException,
IOException {
AddBlockRequestProto req = AddBlockRequestProto.newBuilder().setSrc(src)
.setClientName(clientName).setPrevious(PBHelper.convert(previous))
.addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes)))
.build();
AddBlockRequestProto.Builder builder = AddBlockRequestProto.newBuilder();
builder.setSrc(src)
.setClientName(clientName)
.addAllExcludeNodes(Arrays.asList(PBHelper.convert(excludeNodes)));
if (previous != null) {
builder.setPrevious(PBHelper.convert(previous));
}
try {
return PBHelper.convert(rpcProxy.addBlock(null, req).getBlock());
return PBHelper.convert(rpcProxy.addBlock(null, builder.build()).getBlock());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -615,8 +624,9 @@ public UpgradeStatusReport distributedUpgradeProgress(UpgradeAction action)
DistributedUpgradeProgressRequestProto.newBuilder().
setAction(PBHelper.convert(action)).build();
try {
return PBHelper.convert(
rpcProxy.distributedUpgradeProgress(null, req).getReport());
DistributedUpgradeProgressResponseProto res = rpcProxy
.distributedUpgradeProgress(null, req);
return res.hasReport() ? PBHelper.convert(res.getReport()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -653,7 +663,8 @@ public HdfsFileStatus getFileInfo(String src) throws AccessControlException,
GetFileInfoRequestProto req = GetFileInfoRequestProto.newBuilder()
.setSrc(src).build();
try {
return PBHelper.convert(rpcProxy.getFileInfo(null, req).getFs());
GetFileInfoResponseProto res = rpcProxy.getFileInfo(null, req);
return res.hasFs() ? PBHelper.convert(res.getFs()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}

View File

@ -206,7 +206,7 @@ public DatanodeCommand blockReport(DatanodeRegistration registration,
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
return PBHelper.convert(resp.getCmd());
return resp.hasCmd() ? PBHelper.convert(resp.getCmd()) : null;
}
@Override
@ -264,7 +264,7 @@ public UpgradeCommand processUpgradeCommand(UpgradeCommand comm)
} catch (ServiceException se) {
throw ProtobufHelper.getRemoteException(se);
}
return PBHelper.convert(resp.getCmd());
return resp.hasCmd() ? PBHelper.convert(resp.getCmd()) : null;
}
@Override

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.CommitBlockSynchronizationResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.DatanodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ErrorReportResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.HeartbeatRequestProto;
@ -110,7 +111,9 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
DatanodeCommand[] cmds = response.getCommands();
if (cmds != null) {
for (int i = 0; i < cmds.length; i++) {
builder.addCmds(i, PBHelper.convert(cmds[i]));
if (cmds[i] != null) {
builder.addCmds(PBHelper.convert(cmds[i]));
}
}
}
return builder.build();
@ -131,8 +134,12 @@ public BlockReportResponseProto blockReport(RpcController controller,
} catch (IOException e) {
throw new ServiceException(e);
}
return BlockReportResponseProto.newBuilder().setCmd(PBHelper.convert(cmd))
.build();
BlockReportResponseProto.Builder builder =
BlockReportResponseProto.newBuilder();
if (cmd != null) {
builder.setCmd(PBHelper.convert(cmd));
}
return builder.build();
}
@Override
@ -182,14 +189,20 @@ public VersionResponseProto versionRequest(RpcController controller,
@Override
public ProcessUpgradeResponseProto processUpgrade(RpcController controller,
ProcessUpgradeRequestProto request) throws ServiceException {
UpgradeCommand cmd;
UpgradeCommand ret;
try {
cmd = impl.processUpgradeCommand(PBHelper.convert(request.getCmd()));
UpgradeCommand cmd = request.hasCmd() ? PBHelper
.convert(request.getCmd()) : null;
ret = impl.processUpgradeCommand(cmd);
} catch (IOException e) {
throw new ServiceException(e);
}
return ProcessUpgradeResponseProto.newBuilder()
.setCmd(PBHelper.convert(cmd)).build();
ProcessUpgradeResponseProto.Builder builder =
ProcessUpgradeResponseProto.newBuilder();
if (ret != null) {
builder.setCmd(PBHelper.convert(ret));
}
return builder.build();
}
@Override

View File

@ -129,6 +129,10 @@
/**
* Utilities for converting protobuf classes to and from implementation classes.
*
* Note that when converting from an internal type to protobuf type, the
* converter never return null for protobuf type. The check for internal type
* being null must be done before calling the convert() method.
*/
public class PBHelper {
private static final RegisterCommandProto REG_CMD_PROTO =
@ -341,16 +345,19 @@ public static RemoteEditLogManifest convert(
public static CheckpointCommandProto convert(CheckpointCommand cmd) {
return CheckpointCommandProto.newBuilder()
.setSignature(convert(cmd.getSignature())).build();
.setSignature(convert(cmd.getSignature()))
.setNeedToReturnImage(cmd.needToReturnImage()).build();
}
public static NamenodeCommandProto convert(NamenodeCommand cmd) {
if (cmd instanceof CheckpointCommand) {
return NamenodeCommandProto.newBuilder().setAction(cmd.getAction())
.setType(NamenodeCommandProto.Type.NamenodeCommand)
.setType(NamenodeCommandProto.Type.CheckPointCommand)
.setCheckpointCmd(convert((CheckpointCommand) cmd)).build();
}
return NamenodeCommandProto.newBuilder().setAction(cmd.getAction()).build();
return NamenodeCommandProto.newBuilder()
.setType(NamenodeCommandProto.Type.NamenodeCommand)
.setAction(cmd.getAction()).build();
}
public static BlockKey[] convertBlockKeys(List<BlockKeyProto> list) {
@ -369,6 +376,7 @@ public static NamespaceInfo convert(NamespaceInfoProto info) {
}
public static NamenodeCommand convert(NamenodeCommandProto cmd) {
if (cmd == null) return null;
switch (cmd.getType()) {
case CheckPointCommand:
CheckpointCommandProto chkPt = cmd.getCheckpointCmd();
@ -425,7 +433,8 @@ static public DatanodeInfo convert(DatanodeInfoProto di) {
if (di == null) return null;
return new DatanodeInfo(
PBHelper.convert(di.getId()),
di.getLocation(), di.getHostName(),
di.hasLocation() ? di.getLocation() : null ,
di.hasHostName() ? di.getHostName() : null,
di.getCapacity(), di.getDfsUsed(), di.getRemaining(),
di.getBlockPoolUsed() , di.getLastUpdate() , di.getXceiverCount() ,
PBHelper.convert(di.getAdminState()));
@ -433,10 +442,16 @@ static public DatanodeInfo convert(DatanodeInfoProto di) {
static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
if (di == null) return null;
return DatanodeInfoProto.newBuilder().
DatanodeInfoProto.Builder builder = DatanodeInfoProto.newBuilder();
if (di.getHostName() != null) {
builder.setHostName(di.getHostName());
}
if (di.getNetworkLocation() != null) {
builder.setLocation(di.getNetworkLocation());
}
return builder.
setId(PBHelper.convert((DatanodeID) di)).
setLocation(di.getNetworkLocation()).
setHostName(di.getHostName()).
setCapacity(di.getCapacity()).
setDfsUsed(di.getDfsUsed()).
setRemaining(di.getRemaining()).
@ -776,9 +791,14 @@ public static BalancerBandwidthCommand convert(
public static ReceivedDeletedBlockInfoProto convert(
ReceivedDeletedBlockInfo receivedDeletedBlockInfo) {
return ReceivedDeletedBlockInfoProto.newBuilder()
.setBlock(PBHelper.convert(receivedDeletedBlockInfo.getBlock()))
.setDeleteHint(receivedDeletedBlockInfo.getDelHints()).build();
ReceivedDeletedBlockInfoProto.Builder builder =
ReceivedDeletedBlockInfoProto.newBuilder();
if (receivedDeletedBlockInfo.getDelHints() != null) {
builder.setDeleteHint(receivedDeletedBlockInfo.getDelHints());
}
return builder.setBlock(PBHelper.convert(receivedDeletedBlockInfo.getBlock()))
.build();
}
public static UpgradeCommandProto convert(UpgradeCommand comm) {
@ -802,7 +822,7 @@ public static UpgradeCommandProto convert(UpgradeCommand comm) {
public static ReceivedDeletedBlockInfo convert(
ReceivedDeletedBlockInfoProto proto) {
return new ReceivedDeletedBlockInfo(PBHelper.convert(proto.getBlock()),
proto.getDeleteHint());
proto.hasDeleteHint() ? proto.getDeleteHint() : null);
}
public static NamespaceInfoProto convert(NamespaceInfo info) {
@ -862,13 +882,10 @@ public static List<LocatedBlockProto> convertLocatedBlock2(List<LocatedBlock> lb
// LocatedBlocks
public static LocatedBlocks convert(LocatedBlocksProto lb) {
if (lb == null) {
return null;
}
return new LocatedBlocks(
lb.getFileLength(), lb.getUnderConstruction(),
PBHelper.convertLocatedBlock(lb.getBlocksList()),
PBHelper.convert(lb.getLastBlock()),
lb.hasLastBlock() ? PBHelper.convert(lb.getLastBlock()) : null,
lb.getIsLastBlockComplete());
}
@ -876,11 +893,15 @@ public static LocatedBlocksProto convert(LocatedBlocks lb) {
if (lb == null) {
return null;
}
return LocatedBlocksProto.newBuilder().
setFileLength(lb.getFileLength()).
setUnderConstruction(lb.isUnderConstruction()).
addAllBlocks(PBHelper.convertLocatedBlock2(lb.getLocatedBlocks())).
setLastBlock(PBHelper.convert(lb.getLastLocatedBlock())).setIsLastBlockComplete(lb.isLastBlockComplete()).build();
LocatedBlocksProto.Builder builder =
LocatedBlocksProto.newBuilder();
if (lb.getLastLocatedBlock() != null) {
builder.setLastBlock(PBHelper.convert(lb.getLastLocatedBlock()));
}
return builder.setFileLength(lb.getFileLength())
.setUnderConstruction(lb.isUnderConstruction())
.addAllBlocks(PBHelper.convertLocatedBlock2(lb.getLocatedBlocks()))
.setIsLastBlockComplete(lb.isLastBlockComplete()).build();
}
public static FsServerDefaults convert(FsServerDefaultsProto fs) {
@ -981,12 +1002,17 @@ public static HdfsFileStatusProto convert(HdfsFileStatus fs) {
setPermission(PBHelper.convert(fs.getPermission())).
setOwner(fs.getOwner()).
setGroup(fs.getGroup()).
setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes())).
setPath(ByteString.copyFrom(fs.getLocalNameInBytes()));
LocatedBlocks locations = null;
if (fs.getSymlink() != null) {
builder.setSymlink(ByteString.copyFrom(fs.getSymlinkInBytes()));
}
if (fs instanceof HdfsLocatedFileStatus) {
LocatedBlocks locations = ((HdfsLocatedFileStatus)fs).getBlockLocations();
if (locations != null) {
builder.setLocations(PBHelper.convert(locations));
}
}
return builder.build();
}

View File

@ -112,7 +112,6 @@ public boolean equals(Object obj) {
return (this == obj) || super.equals(obj);
}
/** {@inheritDoc} */
@Override
public String toString() {
final StringBuilder b = new StringBuilder(getClass().getSimpleName());
@ -271,7 +270,6 @@ public boolean equals(Object obj) {
return (this == obj) || super.equals(obj);
}
/** {@inheritDoc} */
@Override
public String toString() {
final StringBuilder b = new StringBuilder(super.toString());

View File

@ -168,9 +168,6 @@ public long getExcessBlocksCount() {
/** variable to enable check for enough racks */
final boolean shouldCheckForEnoughRacks;
/** Last block index used for replication work. */
private int replIndex = 0;
/** for block replicas placement */
private BlockPlacementPolicy blockplacement;
@ -923,74 +920,16 @@ int computeInvalidateWork(int nodesToProcess) {
* @return number of blocks scheduled for replication during this iteration.
*/
private int computeReplicationWork(int blocksToProcess) throws IOException {
// Choose the blocks to be replicated
List<List<Block>> blocksToReplicate =
chooseUnderReplicatedBlocks(blocksToProcess);
// replicate blocks
return computeReplicationWorkForBlocks(blocksToReplicate);
}
/**
* Get a list of block lists to be replicated The index of block lists
* represents the
*
* @param blocksToProcess
* @return Return a list of block lists to be replicated. The block list index
* represents its replication priority.
*/
private List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
// initialize data structure for the return value
List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(
UnderReplicatedBlocks.LEVEL);
for (int i = 0; i < UnderReplicatedBlocks.LEVEL; i++) {
blocksToReplicate.add(new ArrayList<Block>());
}
List<List<Block>> blocksToReplicate = null;
namesystem.writeLock();
try {
synchronized (neededReplications) {
if (neededReplications.size() == 0) {
return blocksToReplicate;
}
// Go through all blocks that need replications.
UnderReplicatedBlocks.BlockIterator neededReplicationsIterator =
neededReplications.iterator();
// skip to the first unprocessed block, which is at replIndex
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
}
// # of blocks to process equals either twice the number of live
// data-nodes or the number of under-replicated blocks whichever is less
blocksToProcess = Math.min(blocksToProcess, neededReplications.size());
for (int blkCnt = 0; blkCnt < blocksToProcess; blkCnt++, replIndex++) {
if (!neededReplicationsIterator.hasNext()) {
// start from the beginning
replIndex = 0;
blocksToProcess = Math.min(blocksToProcess, neededReplications
.size());
if (blkCnt >= blocksToProcess)
break;
neededReplicationsIterator = neededReplications.iterator();
assert neededReplicationsIterator.hasNext() : "neededReplications should not be empty.";
}
Block block = neededReplicationsIterator.next();
int priority = neededReplicationsIterator.getPriority();
if (priority < 0 || priority >= blocksToReplicate.size()) {
LOG.warn("Unexpected replication priority: "
+ priority + " " + block);
} else {
blocksToReplicate.get(priority).add(block);
}
} // end for
} // end synchronized neededReplication
// Choose the blocks to be replicated
blocksToReplicate = neededReplications
.chooseUnderReplicatedBlocks(blocksToProcess);
} finally {
namesystem.writeUnlock();
}
return blocksToReplicate;
return computeReplicationWorkForBlocks(blocksToReplicate);
}
/** Replicate a set of blocks
@ -1019,7 +958,7 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
neededReplications.decrementReplicationIndex(priority);
continue;
}
@ -1043,7 +982,7 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
neededReplications.decrementReplicationIndex(priority);
NameNode.stateChangeLog.info("BLOCK* "
+ "Removing block " + block
+ " from neededReplications as it has enough replicas.");
@ -1104,7 +1043,7 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
replIndex--;
neededReplications.decrementReplicationIndex(priority);
continue;
}
requiredReplication = fileINode.getReplication();
@ -1118,7 +1057,7 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
neededReplications.decrementReplicationIndex(priority);
rw.targets = null;
NameNode.stateChangeLog.info("BLOCK* "
+ "Removing block " + block
@ -1156,7 +1095,7 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
// remove from neededReplications
if(numEffectiveReplicas + targets.length >= requiredReplication) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
neededReplications.decrementReplicationIndex(priority);
}
}
}

View File

@ -63,7 +63,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
BlockPlacementPolicyDefault() {
}
/** {@inheritDoc} */
@Override
public void initialize(Configuration conf, FSClusterStats stats,
NetworkTopology clusterMap) {
this.considerLoad = conf.getBoolean(DFSConfigKeys.DFS_NAMENODE_REPLICATION_CONSIDERLOAD_KEY, true);
@ -79,7 +79,7 @@ protected StringBuilder initialValue() {
}
};
/** {@inheritDoc} */
@Override
public DatanodeDescriptor[] chooseTarget(String srcPath,
int numOfReplicas,
DatanodeDescriptor writer,
@ -89,7 +89,6 @@ public DatanodeDescriptor[] chooseTarget(String srcPath,
null, blocksize);
}
/** {@inheritDoc} */
@Override
public DatanodeDescriptor[] chooseTarget(String srcPath,
int numOfReplicas,
@ -525,7 +524,7 @@ private DatanodeDescriptor[] getPipeline(
return nodes;
}
/** {@inheritDoc} */
@Override
public int verifyBlockPlacement(String srcPath,
LocatedBlock lBlk,
int minRacks) {
@ -544,7 +543,7 @@ public int verifyBlockPlacement(String srcPath,
return minRacks - racks.size();
}
/** {@inheritDoc} */
@Override
public DatanodeDescriptor chooseReplicaToDelete(FSInodeInfo inode,
Block block,
short replicationFactor,

View File

@ -18,8 +18,11 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.util.LightWeightLinkedSet;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -81,10 +84,14 @@ class UnderReplicatedBlocks implements Iterable<Block> {
private List<LightWeightLinkedSet<Block>> priorityQueues
= new ArrayList<LightWeightLinkedSet<Block>>();
/** Stores the replication index for each priority */
private Map<Integer, Integer> priorityToReplIdx = new HashMap<Integer, Integer>(LEVEL);
/** Create an object. */
UnderReplicatedBlocks() {
for (int i = 0; i < LEVEL; i++) {
priorityQueues.add(new LightWeightLinkedSet<Block>());
priorityToReplIdx.put(i, 0);
}
}
@ -301,6 +308,70 @@ synchronized void update(Block block, int curReplicas,
}
}
/**
* Get a list of block lists to be replicated. The index of block lists
* represents its replication priority. Replication index will be tracked for
* each priority list separately in priorityToReplIdx map. Iterates through
* all priority lists and find the elements after replication index. Once the
* last priority lists reaches to end, all replication indexes will be set to
* 0 and start from 1st priority list to fulfill the blockToProces count.
*
* @param blocksToProcess - number of blocks to fetch from underReplicated blocks.
* @return Return a list of block lists to be replicated. The block list index
* represents its replication priority.
*/
public synchronized List<List<Block>> chooseUnderReplicatedBlocks(
int blocksToProcess) {
// initialize data structure for the return value
List<List<Block>> blocksToReplicate = new ArrayList<List<Block>>(LEVEL);
for (int i = 0; i < LEVEL; i++) {
blocksToReplicate.add(new ArrayList<Block>());
}
if (size() == 0) { // There are no blocks to collect.
return blocksToReplicate;
}
int blockCount = 0;
for (int priority = 0; priority < LEVEL; priority++) {
// Go through all blocks that need replications with current priority.
BlockIterator neededReplicationsIterator = iterator(priority);
Integer replIndex = priorityToReplIdx.get(priority);
// skip to the first unprocessed block, which is at replIndex
for (int i = 0; i < replIndex && neededReplicationsIterator.hasNext(); i++) {
neededReplicationsIterator.next();
}
blocksToProcess = Math.min(blocksToProcess, size());
if (blockCount == blocksToProcess) {
break; // break if already expected blocks are obtained
}
// Loop through all remaining blocks in the list.
while (blockCount < blocksToProcess
&& neededReplicationsIterator.hasNext()) {
Block block = neededReplicationsIterator.next();
blocksToReplicate.get(priority).add(block);
replIndex++;
blockCount++;
}
if (!neededReplicationsIterator.hasNext()
&& neededReplicationsIterator.getPriority() == LEVEL - 1) {
// reset all priorities replication index to 0 because there is no
// recently added blocks in any list.
for (int i = 0; i < LEVEL; i++) {
priorityToReplIdx.put(i, 0);
}
break;
}
priorityToReplIdx.put(priority, replIndex);
}
return blocksToReplicate;
}
/** returns an iterator of all blocks in a given priority queue */
synchronized BlockIterator iterator(int level) {
return new BlockIterator(level);
@ -380,4 +451,14 @@ int getPriority() {
return level;
}
}
/**
* This method is to decrement the replication index for the given priority
*
* @param priority - int priority level
*/
public void decrementReplicationIndex(int priority) {
Integer replIdx = priorityToReplIdx.get(priority);
priorityToReplIdx.put(priority, --replIdx);
}
}

View File

@ -1788,7 +1788,7 @@ public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
return new ExtendedBlock(oldBlock.getBlockPoolId(), r);
}
/** {@inheritDoc} */
@Override
public long getProtocolVersion(String protocol, long clientVersion
) throws IOException {
if (protocol.equals(InterDatanodeProtocol.class.getName())) {
@ -1821,7 +1821,7 @@ static class BlockRecord {
this.rInfo = rInfo;
}
/** {@inheritDoc} */
@Override
public String toString() {
return "block:" + rInfo + " node:" + id;
}
@ -2006,7 +2006,6 @@ private static void logRecoverBlock(String who,
}
// ClientDataNodeProtocol implementation
/** {@inheritDoc} */
@Override // ClientDataNodeProtocol
public long getReplicaVisibleLength(final ExtendedBlock block) throws IOException {
checkWriteAccess(block);

View File

@ -193,7 +193,7 @@ static class BlockInputStreams implements Closeable {
this.checksumIn = checksumIn;
}
/** {@inheritDoc} */
@Override
public void close() {
IOUtils.closeStream(dataIn);
IOUtils.closeStream(checksumIn);

View File

@ -40,7 +40,7 @@ public class ContentSummaryServlet extends DfsServlet {
/** For java.io.Serializable */
private static final long serialVersionUID = 1L;
/** {@inheritDoc} */
@Override
public void doGet(final HttpServletRequest request,
final HttpServletResponse response) throws ServletException, IOException {
final Configuration conf =

View File

@ -71,7 +71,6 @@ class EditLogFileOutputStream extends EditLogOutputStream {
fc.position(fc.size());
}
/** {@inheritDoc} */
@Override
public void write(FSEditLogOp op) throws IOException {
doubleBuf.writeOp(op);

View File

@ -80,7 +80,7 @@ private URL createRedirectURL(UserGroupInformation ugi, DatanodeID host,
dtParam + addrParam);
}
/** {@inheritDoc} */
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws ServletException, IOException {
final ServletContext context = getServletContext();
@ -104,7 +104,7 @@ public static class GetServlet extends DfsServlet {
/** For java.io.Serializable */
private static final long serialVersionUID = 1L;
/** {@inheritDoc} */
@Override
public void doGet(HttpServletRequest request, HttpServletResponse response
) throws ServletException, IOException {
final PrintWriter out = response.getWriter();

View File

@ -261,13 +261,13 @@ void setLocalName(byte[] name) {
this.name = name;
}
/** {@inheritDoc} */
@Override
public String getFullPathName() {
// Get the full path name of this inode.
return FSDirectory.getFullPathName(this);
}
/** {@inheritDoc} */
@Override
public String toString() {
return "\"" + getFullPathName() + "\":"
+ getUserName() + ":" + getGroupName() + ":"

View File

@ -372,7 +372,7 @@ INodeDirectory addToParent( byte[][] pathComponents,
return parent;
}
/** {@inheritDoc} */
@Override
DirCounts spaceConsumedInTree(DirCounts counts) {
counts.nsCount += 1;
if (children != null) {
@ -383,7 +383,7 @@ DirCounts spaceConsumedInTree(DirCounts counts) {
return counts;
}
/** {@inheritDoc} */
@Override
long[] computeContentSummary(long[] summary) {
// Walk through the children of this node, using a new summary array
// for the (sub)tree rooted at this node

View File

@ -168,7 +168,7 @@ int collectSubtreeBlocksAndClear(List<Block> v) {
return 1;
}
/** {@inheritDoc} */
@Override
long[] computeContentSummary(long[] summary) {
summary[0] += computeFileSize(true);
summary[1]++;

View File

@ -252,13 +252,13 @@ boolean removePath(String src) {
return paths.remove(src);
}
/** {@inheritDoc} */
@Override
public String toString() {
return "[Lease. Holder: " + holder
+ ", pendingcreates: " + paths.size() + "]";
}
/** {@inheritDoc} */
@Override
public int compareTo(Lease o) {
Lease l1 = this;
Lease l2 = o;
@ -273,7 +273,7 @@ public int compareTo(Lease o) {
}
}
/** {@inheritDoc} */
@Override
public boolean equals(Object o) {
if (!(o instanceof Lease)) {
return false;
@ -286,7 +286,7 @@ public boolean equals(Object o) {
return false;
}
/** {@inheritDoc} */
@Override
public int hashCode() {
return holder.hashCode();
}
@ -436,7 +436,7 @@ private synchronized void checkLeases() {
}
}
/** {@inheritDoc} */
@Override
public synchronized String toString() {
return getClass().getSimpleName() + "= {"
+ "\n leases=" + leases

View File

@ -647,7 +647,7 @@ float getReplicationFactor() {
return (float) (totalReplicas) / (float) totalBlocks;
}
/** {@inheritDoc} */
@Override
public String toString() {
StringBuilder res = new StringBuilder();
res.append("Status: ").append((isHealthy() ? "HEALTHY" : "CORRUPT"))

View File

@ -129,7 +129,7 @@ public class SecondaryNameNode implements Runnable {
private FSNamesystem namesystem;
/** {@inheritDoc} */
@Override
public String toString() {
return getClass().getSimpleName() + " Status"
+ "\nName Node Address : " + nameNodeAddr

View File

@ -64,7 +64,7 @@ synchronized T get(int i) {
return i2t.get(i);
}
/** {@inheritDoc} */
@Override
public String toString() {
return "max=" + max + ",\n t2i=" + t2i + ",\n i2t=" + i2t;
}

View File

@ -113,7 +113,7 @@ public String toString() {
/////////////////////////////////////////////////
// Writable
/////////////////////////////////////////////////
/** {@inheritDoc} */
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
@ -124,7 +124,7 @@ public void write(DataOutput out) throws IOException {
exportedKeys.write(out);
}
/** {@inheritDoc} */
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);

View File

@ -74,7 +74,7 @@ public DatanodeRegistrationWritable(String nodeName, StorageInfo info,
/////////////////////////////////////////////////
// Writable
/////////////////////////////////////////////////
/** {@inheritDoc} */
@Override
public void write(DataOutput out) throws IOException {
datanodeId.write(out);
@ -85,7 +85,7 @@ public void write(DataOutput out) throws IOException {
exportedKeys.write(out);
}
/** {@inheritDoc} */
@Override
public void readFields(DataInput in) throws IOException {
datanodeId.readFields(in);

View File

@ -39,7 +39,7 @@ message GetBlockLocationsRequestProto {
}
message GetBlockLocationsResponseProto {
required LocatedBlocksProto locations = 1;
optional LocatedBlocksProto locations = 1;
}
message GetServerDefaultsRequestProto { // No parameters
@ -115,7 +115,7 @@ message AbandonBlockResponseProto { // void response
message AddBlockRequestProto {
required string src = 1;
required string clientName = 2;
required ExtendedBlockProto previous = 3;
optional ExtendedBlockProto previous = 3;
repeated DatanodeInfoProto excludeNodes = 4;
}
@ -306,7 +306,7 @@ message DistributedUpgradeProgressRequestProto {
required UpgradeActionProto action = 1;
}
message DistributedUpgradeProgressResponseProto {
required UpgradeStatusReportProto report = 1;
optional UpgradeStatusReportProto report = 1;
}
message ListCorruptFileBlocksRequestProto {
@ -330,7 +330,7 @@ message GetFileInfoRequestProto {
}
message GetFileInfoResponseProto {
required HdfsFileStatusProto fs = 1;
optional HdfsFileStatusProto fs = 1;
}
message GetFileLinkInfoRequestProto {

View File

@ -184,7 +184,7 @@ message NNHAStatusHeartbeatProto {
* haStatus - Status (from an HA perspective) of the NN sending this response
*/
message HeartbeatResponseProto {
repeated DatanodeCommandProto cmds = 1;
repeated DatanodeCommandProto cmds = 1; // Returned commands can be null
required NNHAStatusHeartbeatProto haStatus = 2;
}

View File

@ -30,7 +30,8 @@ message ExtendedBlockProto {
required string poolId = 1; // Block pool id - gloablly unique across clusters
required uint64 blockId = 2; // the local id within a pool
required uint64 generationStamp = 3;
optional uint64 numBytes = 4; // block len does not belong in ebid - here for historical reasons
optional uint64 numBytes = 4 [default = 0]; // len does not belong in ebid
// here for historical reasons
}
/**
@ -65,12 +66,12 @@ message DatanodeInfosProto {
*/
message DatanodeInfoProto {
required DatanodeIDProto id = 1;
optional uint64 capacity = 2;
optional uint64 dfsUsed = 3;
optional uint64 remaining = 4;
optional uint64 blockPoolUsed = 5;
optional uint64 lastUpdate = 6;
optional uint32 xceiverCount = 7;
optional uint64 capacity = 2 [default = 0];
optional uint64 dfsUsed = 3 [default = 0];
optional uint64 remaining = 4 [default = 0];
optional uint64 blockPoolUsed = 5 [default = 0];
optional uint64 lastUpdate = 6 [default = 0];
optional uint32 xceiverCount = 7 [default = 0];
optional string location = 8;
optional string hostName = 9;
enum AdminState {
@ -79,7 +80,7 @@ message DatanodeInfoProto {
DECOMMISSIONED = 2;
}
optional AdminState adminState = 10;
optional AdminState adminState = 10 [default = NORMAL];
}
/**
@ -162,8 +163,8 @@ message HdfsFileStatusProto {
optional bytes symlink = 9; // if symlink, target encoded java UTF8
// Optional fields for file
optional uint32 block_replication = 10; // Actually a short - only 16bits used
optional uint64 blocksize = 11;
optional uint32 block_replication = 10 [default = 0]; // only 16bits used
optional uint64 blocksize = 11 [default = 0];
optional LocatedBlocksProto locations = 12; // suppled only if asked by client
}
@ -218,7 +219,7 @@ message NamenodeRegistrationProto {
CHECKPOINT = 3;
}
required StorageInfoProto storageInfo = 3; // Node information
optional NamenodeRoleProto role = 4; // Namenode role
optional NamenodeRoleProto role = 4 [default = NAMENODE]; // Namenode role
}
/**
@ -264,7 +265,7 @@ message CheckpointCommandProto {
message BlockProto {
required uint64 blockId = 1;
required uint64 genStamp = 2;
optional uint64 numBytes = 3;
optional uint64 numBytes = 3 [default = 0];
}
/**
@ -313,7 +314,7 @@ message NamespaceInfoProto {
message BlockKeyProto {
required uint32 keyId = 1; // Key identifier
required uint64 expiryDate = 2; // Expiry time in milliseconds
required bytes keyBytes = 3; // Key secret
optional bytes keyBytes = 3; // Key secret
}
/**

View File

@ -515,6 +515,11 @@ private void initMiniDFSCluster(
this.waitSafeMode = waitSafeMode;
// use alternate RPC engine if spec'd
/*
Turned off - see HDFS-2647 and HDFS-2660 for related comments.
This test can be turned on when Avro RPC is enabled using mechanism
similar to protobuf.
String rpcEngineName = System.getProperty("hdfs.rpc.engine");
if (rpcEngineName != null && !"".equals(rpcEngineName)) {
@ -538,6 +543,7 @@ private void initMiniDFSCluster(
conf.setBoolean(HADOOP_SECURITY_AUTHORIZATION,
false);
}
*/
int replication = conf.getInt(DFS_REPLICATION_KEY, 3);
conf.setInt(DFS_REPLICATION_KEY, Math.min(replication, numDataNodes));

View File

@ -28,9 +28,16 @@ public class TestDfsOverAvroRpc extends TestLocalDFS {
@Test(timeout=20000)
public void testWorkingDirectory() throws IOException {
/*
Test turned off - see HDFS-2647 and HDFS-2660 for related comments.
This test can be turned on when Avro RPC is enabled using mechanism
similar to protobuf.
*/
/*
System.setProperty("hdfs.rpc.engine",
"org.apache.hadoop.ipc.AvroRpcEngine");
super.testWorkingDirectory();
*/
}
}

View File

@ -17,26 +17,32 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import junit.framework.TestCase;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.junit.Test;
public class TestReplicationPolicy extends TestCase {
public class TestReplicationPolicy {
private Random random= DFSUtil.getRandom();
private static final int BLOCK_SIZE = 1024;
private static final int NUM_OF_DATANODES = 6;
private static final Configuration CONF = new HdfsConfiguration();
@ -90,6 +96,7 @@ public class TestReplicationPolicy extends TestCase {
* the 1st is on dataNodes[0] and the 2nd is on a different rack.
* @throws Exception
*/
@Test
public void testChooseTarget1() throws Exception {
dataNodes[0].updateHeartbeat(
2*HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
@ -150,6 +157,7 @@ private static DatanodeDescriptor[] chooseTarget(
* should be placed on a third rack.
* @throws Exception
*/
@Test
public void testChooseTarget2() throws Exception {
HashMap<Node, Node> excludedNodes;
DatanodeDescriptor[] targets;
@ -225,6 +233,7 @@ public void testChooseTarget2() throws Exception {
* and the rest should be placed on the third rack.
* @throws Exception
*/
@Test
public void testChooseTarget3() throws Exception {
// make data node 0 to be not qualified to choose
dataNodes[0].updateHeartbeat(
@ -278,6 +287,7 @@ public void testChooseTarget3() throws Exception {
* the 3rd replica should be placed on the same rack as the 1st replica,
* @throws Exception
*/
@Test
public void testChoooseTarget4() throws Exception {
// make data node 0 & 1 to be not qualified to choose: not enough disk space
for(int i=0; i<2; i++) {
@ -325,6 +335,7 @@ public void testChoooseTarget4() throws Exception {
* the 3rd replica should be placed on the same rack as the 2nd replica,
* @throws Exception
*/
@Test
public void testChooseTarget5() throws Exception {
DatanodeDescriptor[] targets;
targets = replicator.chooseTarget(filename,
@ -354,6 +365,7 @@ public void testChooseTarget5() throws Exception {
* the 1st replica. The 3rd replica can be placed randomly.
* @throws Exception
*/
@Test
public void testRereplicate1() throws Exception {
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
@ -388,6 +400,7 @@ public void testRereplicate1() throws Exception {
* the rest replicas can be placed randomly,
* @throws Exception
*/
@Test
public void testRereplicate2() throws Exception {
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
@ -417,6 +430,7 @@ public void testRereplicate2() throws Exception {
* the rest replicas can be placed randomly,
* @throws Exception
*/
@Test
public void testRereplicate3() throws Exception {
List<DatanodeDescriptor> chosenNodes = new ArrayList<DatanodeDescriptor>();
chosenNodes.add(dataNodes[0]);
@ -450,4 +464,122 @@ public void testRereplicate3() throws Exception {
assertTrue(cluster.isOnSameRack(dataNodes[2], targets[0]));
}
/**
* Test for the high priority blocks are processed before the low priority
* blocks.
*/
@Test(timeout = 60000)
public void testReplicationWithPriority() throws Exception {
int DFS_NAMENODE_REPLICATION_INTERVAL = 1000;
int HIGH_PRIORITY = 0;
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, 1);
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2)
.format(true).build();
try {
cluster.waitActive();
final UnderReplicatedBlocks neededReplications = (UnderReplicatedBlocks) cluster
.getNameNode().getNamesystem().getBlockManager().neededReplications;
for (int i = 0; i < 100; i++) {
// Adding the blocks directly to normal priority
neededReplications.add(new Block(random.nextLong()), 2, 0, 3);
}
// Lets wait for the replication interval, to start process normal
// priority blocks
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
// Adding the block directly to high priority list
neededReplications.add(new Block(random.nextLong()), 1, 0, 3);
// Lets wait for the replication interval
Thread.sleep(DFS_NAMENODE_REPLICATION_INTERVAL);
// Check replication completed successfully. Need not wait till it process
// all the 100 normal blocks.
assertFalse("Not able to clear the element from high priority list",
neededReplications.iterator(HIGH_PRIORITY).hasNext());
} finally {
cluster.shutdown();
}
}
/**
* Test for the ChooseUnderReplicatedBlocks are processed based on priority
*/
@Test
public void testChooseUnderReplicatedBlocks() throws Exception {
UnderReplicatedBlocks underReplicatedBlocks = new UnderReplicatedBlocks();
for (int i = 0; i < 5; i++) {
// Adding QUEUE_HIGHEST_PRIORITY block
underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3);
// Adding QUEUE_VERY_UNDER_REPLICATED block
underReplicatedBlocks.add(new Block(random.nextLong()), 2, 0, 7);
// Adding QUEUE_UNDER_REPLICATED block
underReplicatedBlocks.add(new Block(random.nextLong()), 6, 0, 6);
// Adding QUEUE_REPLICAS_BADLY_DISTRIBUTED block
underReplicatedBlocks.add(new Block(random.nextLong()), 5, 0, 6);
// Adding QUEUE_WITH_CORRUPT_BLOCKS block
underReplicatedBlocks.add(new Block(random.nextLong()), 0, 0, 3);
}
// Choose 6 blocks from UnderReplicatedBlocks. Then it should pick 5 blocks
// from
// QUEUE_HIGHEST_PRIORITY and 1 block from QUEUE_VERY_UNDER_REPLICATED.
List<List<Block>> chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(6);
assertTheChosenBlocks(chosenBlocks, 5, 1, 0, 0, 0);
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 4 blocks from
// QUEUE_VERY_UNDER_REPLICATED, 5 blocks from QUEUE_UNDER_REPLICATED and 1
// block from QUEUE_REPLICAS_BADLY_DISTRIBUTED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
assertTheChosenBlocks(chosenBlocks, 0, 4, 5, 1, 0);
// Adding QUEUE_HIGHEST_PRIORITY
underReplicatedBlocks.add(new Block(random.nextLong()), 1, 0, 3);
// Choose 10 blocks from UnderReplicatedBlocks. Then it should pick 1 block from
// QUEUE_HIGHEST_PRIORITY, 4 blocks from QUEUE_REPLICAS_BADLY_DISTRIBUTED
// and 5 blocks from QUEUE_WITH_CORRUPT_BLOCKS.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(10);
assertTheChosenBlocks(chosenBlocks, 1, 0, 0, 4, 5);
// Since it is reached to end of all lists,
// should start picking the blocks from start.
// Choose 7 blocks from UnderReplicatedBlocks. Then it should pick 6 blocks from
// QUEUE_HIGHEST_PRIORITY, 1 block from QUEUE_VERY_UNDER_REPLICATED.
chosenBlocks = underReplicatedBlocks.chooseUnderReplicatedBlocks(7);
assertTheChosenBlocks(chosenBlocks, 6, 1, 0, 0, 0);
}
/** asserts the chosen blocks with expected priority blocks */
private void assertTheChosenBlocks(
List<List<Block>> chosenBlocks, int firstPrioritySize,
int secondPrioritySize, int thirdPrioritySize, int fourthPrioritySize,
int fifthPrioritySize) {
assertEquals(
"Not returned the expected number of QUEUE_HIGHEST_PRIORITY blocks",
firstPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY).size());
assertEquals(
"Not returned the expected number of QUEUE_VERY_UNDER_REPLICATED blocks",
secondPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED).size());
assertEquals(
"Not returned the expected number of QUEUE_UNDER_REPLICATED blocks",
thirdPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_UNDER_REPLICATED).size());
assertEquals(
"Not returned the expected number of QUEUE_REPLICAS_BADLY_DISTRIBUTED blocks",
fourthPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_REPLICAS_BADLY_DISTRIBUTED).size());
assertEquals(
"Not returned the expected number of QUEUE_WITH_CORRUPT_BLOCKS blocks",
fifthPrioritySize, chosenBlocks.get(
UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size());
}
}

View File

@ -145,9 +145,7 @@ public void testFileAdd() throws Exception {
fs.delete(file, true);
filesTotal--; // reduce the filecount for deleted file
// Wait for more than DATANODE_COUNT replication intervals to ensure all
// the blocks pending deletion are sent for deletion to the datanodes.
Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
waitForDeletion();
updateMetrics();
rb = getMetrics(NS_METRICS);
assertGauge("FilesTotal", filesTotal, rb);
@ -176,7 +174,7 @@ public void testCorruptBlock() throws Exception {
assertGauge("PendingReplicationBlocks", 1L, rb);
assertGauge("ScheduledReplicationBlocks", 1L, rb);
fs.delete(file, true);
updateMetrics();
waitForDeletion();
rb = getMetrics(NS_METRICS);
assertGauge("CorruptBlocks", 0L, rb);
assertGauge("PendingReplicationBlocks", 0L, rb);
@ -212,10 +210,16 @@ public void testMissingBlock() throws Exception {
assertGauge("UnderReplicatedBlocks", 1L, rb);
assertGauge("MissingBlocks", 1L, rb);
fs.delete(file, true);
updateMetrics();
waitForDeletion();
assertGauge("UnderReplicatedBlocks", 0L, getMetrics(NS_METRICS));
}
private void waitForDeletion() throws InterruptedException {
// Wait for more than DATANODE_COUNT replication intervals to ensure all
// the blocks pending deletion are sent for deletion to the datanodes.
Thread.sleep(DFS_REPLICATION_INTERVAL * (DATANODE_COUNT + 1) * 1000);
}
public void testRenameMetrics() throws Exception {
Path src = getTestPath("src");
createFile(src, 100, (short)1);

View File

@ -144,6 +144,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3369. Migrate MR1 tests to run on MR2 using the new interfaces
introduced in MAPREDUCE-3169. (Ahmed Radwan via tomwhite)
MAPREDUCE-3518. mapred queue -info <queue> -showJobs throws NPE.
(Jonathan Eagles via mahadev)
OPTIMIZATIONS
BUG FIXES
@ -264,6 +267,15 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3527. Fix minor API incompatibilities between 1.0 and 0.23.
(tomwhite)
MAPREDUCE-3328. mapred queue -list output inconsistent and missing child
queues. (Ravi Prakash via mahadev)
MAPREDUCE-3510. Capacity Scheduler inherited ACLs not displayed by mapred queue
-showacls (Jonathan Eagles via mahadev)
MAPREDUCE-3537. Fix race condition in DefaultContainerExecutor which led
to container localization occuring in wrong directories. (acmurthy)
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES

View File

@ -450,9 +450,19 @@ public static JobStatus[] fromYarnApps(List<ApplicationReport> applications,
public static QueueInfo fromYarn(org.apache.hadoop.yarn.api.records.QueueInfo
queueInfo, Configuration conf) {
return new QueueInfo(queueInfo.getQueueName(),queueInfo.toString(),
fromYarn(queueInfo.getQueueState()), TypeConverter.fromYarnApps(
queueInfo.getApplications(), conf));
QueueInfo toReturn = new QueueInfo(queueInfo.getQueueName(), "Capacity: " +
queueInfo.getCapacity() * 100 + ", MaximumCapacity: " +
(queueInfo.getMaximumCapacity() < 0 ? "UNDEFINED" :
queueInfo.getMaximumCapacity()) + ", CurrentCapacity: " +
queueInfo.getCurrentCapacity() * 100, fromYarn(queueInfo.getQueueState()),
TypeConverter.fromYarnApps(queueInfo.getApplications(), conf));
List<QueueInfo> childQueues = new ArrayList<QueueInfo>();
for(org.apache.hadoop.yarn.api.records.QueueInfo childQueue :
queueInfo.getChildQueues()) {
childQueues.add(fromYarn(childQueue, conf));
}
toReturn.setQueueChildren(childQueues);
return toReturn;
}
public static QueueInfo[] fromYarnQueueInfo(

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.mapreduce;
import java.util.ArrayList;
import java.util.List;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
@ -36,6 +39,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import org.junit.Test;
import org.mockito.Mockito;
public class TestTypeConverter {
@Test
@ -134,4 +138,33 @@ public void testFromYarnQueueInfo() {
Assert.assertEquals("queueInfo translation didn't work.",
returned.getState().toString(), queueInfo.getQueueState().toString().toLowerCase());
}
/**
* Test that child queues are converted too during conversion of the parent
* queue
*/
@Test
public void testFromYarnQueue() {
//Define child queue
org.apache.hadoop.yarn.api.records.QueueInfo child =
Mockito.mock(org.apache.hadoop.yarn.api.records.QueueInfo.class);
Mockito.when(child.getQueueState()).thenReturn(QueueState.RUNNING);
//Define parent queue
org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
Mockito.mock(org.apache.hadoop.yarn.api.records.QueueInfo.class);
List<org.apache.hadoop.yarn.api.records.QueueInfo> children =
new ArrayList<org.apache.hadoop.yarn.api.records.QueueInfo>();
children.add(child); //Add one child
Mockito.when(queueInfo.getChildQueues()).thenReturn(children);
Mockito.when(queueInfo.getQueueState()).thenReturn(QueueState.RUNNING);
//Call the function we're testing
org.apache.hadoop.mapreduce.QueueInfo returned =
TypeConverter.fromYarn(queueInfo, new Configuration());
//Verify that the converted queue has the 1 child we had added
Assert.assertEquals("QueueInfo children weren't properly converted",
returned.getQueueChildren().size(), 1);
}
}

View File

@ -432,7 +432,6 @@ public String getFailureInfo() throws IOException {
}
Cluster cluster;
/**
* Ugi of the client. We store this ugi when the client is created and
* then make sure that the same ugi is used to run the various protocols.

View File

@ -110,40 +110,33 @@ public int run(String[] argv) throws Exception {
}
// format and print information about the passed in job queue.
void printJobQueueInfo(JobQueueInfo jobQueueInfo, Writer writer)
throws IOException {
void printJobQueueInfo(JobQueueInfo jobQueueInfo, Writer writer,
String prefix) throws IOException {
if (jobQueueInfo == null) {
writer.write("No queue found.\n");
writer.flush();
return;
}
writer.write(String.format("Queue Name : %s \n",
writer.write(String.format(prefix + "======================\n"));
writer.write(String.format(prefix + "Queue Name : %s \n",
jobQueueInfo.getQueueName()));
writer.write(String.format("Queue State : %s \n",
writer.write(String.format(prefix + "Queue State : %s \n",
jobQueueInfo.getQueueState()));
writer.write(String.format("Scheduling Info : %s \n",
writer.write(String.format(prefix + "Scheduling Info : %s \n",
jobQueueInfo.getSchedulingInfo()));
List<JobQueueInfo> childQueues = jobQueueInfo.getChildren();
if (childQueues != null && childQueues.size() > 0) {
writer.write(String.format("Child Queues : "));
for (int i = 0; i < childQueues.size(); i++) {
JobQueueInfo childQueue = childQueues.get(i);
writer.write(String.format("%s", childQueue.getQueueName()));
if (i != childQueues.size() - 1) {
writer.write(String.format(", "));
printJobQueueInfo(childQueues.get(i), writer, " " + prefix);
}
}
writer.write("\n");
}
writer.write(String.format("======================\n"));
writer.flush();
}
private void displayQueueList() throws IOException {
JobQueueInfo[] rootQueues = jc.getRootQueues();
List<JobQueueInfo> allQueues = expandQueueList(rootQueues);
for (JobQueueInfo queue : allQueues) {
printJobQueueInfo(queue, new PrintWriter(System.out));
for (JobQueueInfo queue : rootQueues) {
printJobQueueInfo(queue, new PrintWriter(System.out), "");
}
}
@ -181,7 +174,7 @@ private void displayQueueInfo(String queue, boolean showJobs)
System.out.println("Queue \"" + queue + "\" does not exist.");
return;
}
printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out));
printJobQueueInfo(jobQueueInfo, new PrintWriter(System.out), "");
if (showJobs && (jobQueueInfo.getChildren() == null ||
jobQueueInfo.getChildren().size() == 0)) {
JobStatus[] jobs = jc.getJobsFromQueue(queue);

View File

@ -105,7 +105,7 @@ protected void setChildren(List<JobQueueInfo> children) {
public List<JobQueueInfo> getChildren() {
List<JobQueueInfo> list = new ArrayList<JobQueueInfo>();
for (QueueInfo q : super.getQueueChildren()) {
list.add((JobQueueInfo)q);
list.add(new JobQueueInfo(q));
}
return list;
}

View File

@ -55,7 +55,7 @@
@InterfaceStability.Stable
public class CLI extends Configured implements Tool {
private static final Log LOG = LogFactory.getLog(CLI.class);
private Cluster cluster;
protected Cluster cluster;
public CLI() {
}

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import junit.framework.Assert;
import org.junit.Test;
public class TestJobQueueClient {
/**
* Test that print job queue recursively prints child queues
*/
@Test
@SuppressWarnings("deprecation")
public void testPrintJobQueueInfo() throws IOException {
JobQueueClient queueClient = new JobQueueClient();
JobQueueInfo parent = new JobQueueInfo();
JobQueueInfo child = new JobQueueInfo();
JobQueueInfo grandChild = new JobQueueInfo();
child.addChild(grandChild);
parent.addChild(child);
grandChild.setQueueName("GrandChildQueue");
ByteArrayOutputStream bbos = new ByteArrayOutputStream();
PrintWriter writer = new PrintWriter(bbos);
queueClient.printJobQueueInfo(parent, writer, "");
Assert.assertTrue("printJobQueueInfo did not print grandchild's name",
bbos.toString().contains("GrandChildQueue"));
}
}

View File

@ -198,13 +198,16 @@ public QueueInfo getQueue(String queueName) throws IOException,
}
private void getChildQueues(org.apache.hadoop.yarn.api.records.QueueInfo parent,
List<org.apache.hadoop.yarn.api.records.QueueInfo> queues) {
List<org.apache.hadoop.yarn.api.records.QueueInfo> queues,
boolean recursive) {
List<org.apache.hadoop.yarn.api.records.QueueInfo> childQueues =
parent.getChildQueues();
for (org.apache.hadoop.yarn.api.records.QueueInfo child : childQueues) {
queues.add(child);
getChildQueues(child, queues);
if(recursive) {
getChildQueues(child, queues, recursive);
}
}
}
@ -226,7 +229,7 @@ public QueueInfo[] getQueues() throws IOException, InterruptedException {
org.apache.hadoop.yarn.api.records.QueueInfo rootQueue =
applicationsManager.getQueueInfo(
getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
getChildQueues(rootQueue, queues);
getChildQueues(rootQueue, queues, true);
return TypeConverter.fromYarnQueueInfo(queues, this.conf);
}
@ -238,8 +241,8 @@ public QueueInfo[] getRootQueues() throws IOException, InterruptedException {
org.apache.hadoop.yarn.api.records.QueueInfo rootQueue =
applicationsManager.getQueueInfo(
getQueueInfoRequest(ROOT, false, true, false)).getQueueInfo();
getChildQueues(rootQueue, queues);
getQueueInfoRequest(ROOT, false, true, true)).getQueueInfo();
getChildQueues(rootQueue, queues, false);
return TypeConverter.fromYarnQueueInfo(queues, this.conf);
}
@ -252,7 +255,7 @@ public QueueInfo[] getChildQueues(String parent) throws IOException,
org.apache.hadoop.yarn.api.records.QueueInfo parentQueue =
applicationsManager.getQueueInfo(
getQueueInfoRequest(parent, false, true, false)).getQueueInfo();
getChildQueues(parentQueue, queues);
getChildQueues(parentQueue, queues, true);
return TypeConverter.fromYarnQueueInfo(queues, this.conf);
}

View File

@ -0,0 +1,63 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapred;
import java.io.IOException;
import junit.framework.Assert;
import org.apache.hadoop.yarn.api.ClientRMProtocol;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetQueueInfoResponse;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
public class TestResourceMgrDelegate {
/**
* Tests that getRootQueues makes a request for the (recursive) child queues
*/
@Test
public void testGetRootQueues() throws IOException, InterruptedException {
ClientRMProtocol applicationsManager = Mockito.mock(ClientRMProtocol.class);
GetQueueInfoResponse response = Mockito.mock(GetQueueInfoResponse.class);
org.apache.hadoop.yarn.api.records.QueueInfo queueInfo =
Mockito.mock(org.apache.hadoop.yarn.api.records.QueueInfo.class);
Mockito.when(response.getQueueInfo()).thenReturn(queueInfo);
Mockito.when(applicationsManager.getQueueInfo(Mockito.any(
GetQueueInfoRequest.class))).thenReturn(response);
ResourceMgrDelegate delegate = new ResourceMgrDelegate(
new YarnConfiguration(), applicationsManager);
delegate.getRootQueues();
ArgumentCaptor<GetQueueInfoRequest> argument =
ArgumentCaptor.forClass(GetQueueInfoRequest.class);
Mockito.verify(delegate.applicationsManager).getQueueInfo(
argument.capture());
Assert.assertTrue("Children of root queue not requested",
argument.getValue().getIncludeChildQueues());
Assert.assertTrue("Request wasn't to recurse through children",
argument.getValue().getRecursive());
}
}

View File

@ -75,7 +75,7 @@ public void init() throws IOException {
}
@Override
public void startLocalizer(Path nmPrivateContainerTokensPath,
public synchronized void startLocalizer(Path nmPrivateContainerTokensPath,
InetSocketAddress nmAddr, String user, String appId, String locId,
List<String> localDirs, List<String> logDirs)
throws IOException, InterruptedException {

View File

@ -199,18 +199,32 @@ private static String getAclKey(QueueACL acl) {
return "acl_" + acl.toString().toLowerCase();
}
public AccessControlList getAcl(String queue, QueueACL acl) {
String queuePrefix = getQueuePrefix(queue);
String aclString = get(queuePrefix + getAclKey(acl), DEFAULT_ACL);
return new AccessControlList(aclString);
}
public void setAcl(String queue, QueueACL acl, String aclString) {
String queuePrefix = getQueuePrefix(queue);
set(queuePrefix + getAclKey(acl), aclString);
}
public Map<QueueACL, AccessControlList> getAcls(String queue) {
Map<QueueACL, AccessControlList> acls =
new HashMap<QueueACL, AccessControlList>();
String queuePrefix = getQueuePrefix(queue);
for (QueueACL acl : QueueACL.values()) {
acls.put(acl,
new AccessControlList(get(queuePrefix + getAclKey(acl),
DEFAULT_ACL)));
acls.put(acl, getAcl(queue, acl));
}
return acls;
}
public void setAcls(String queue, Map<QueueACL, AccessControlList> acls) {
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
setAcl(queue, e.getKey(), e.getValue().getAclString());
}
}
public String[] getQueues(String queue) {
LOG.info("CSConf - getQueues called for: queuePrefix=" + getQueuePrefix(queue));
String[] queues = getStrings(getQueuePrefix(queue) + QUEUES);

View File

@ -492,11 +492,8 @@ public synchronized QueueInfo getQueueInfo(
QueueUserACLInfo userAclInfo =
recordFactory.newRecordInstance(QueueUserACLInfo.class);
List<QueueACL> operations = new ArrayList<QueueACL>();
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
QueueACL operation = e.getKey();
AccessControlList acl = e.getValue();
if (acl.isUserAllowed(user)) {
for (QueueACL operation : QueueACL.values()) {
if (hasAccess(operation, user)) {
operations.add(operation);
}
}

View File

@ -21,6 +21,7 @@
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@ -30,11 +31,14 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -102,20 +106,29 @@ public void setUp() throws Exception {
private static final String A = "a";
private static final String B = "b";
private static final String C = "c";
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
// Define top-level queues
conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B});
conf.setQueues(CapacityScheduler.ROOT, new String[] {A, B, C});
conf.setCapacity(CapacityScheduler.ROOT, 100);
conf.setMaximumCapacity(CapacityScheduler.ROOT, 100);
conf.setAcl(CapacityScheduler.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
final String Q_A = CapacityScheduler.ROOT + "." + A;
conf.setCapacity(Q_A, 10);
conf.setCapacity(Q_A, 9);
conf.setMaximumCapacity(Q_A, 20);
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
final String Q_B = CapacityScheduler.ROOT + "." + B;
conf.setCapacity(Q_B, 90);
conf.setMaximumCapacity(Q_B, 99);
conf.setAcl(Q_B, QueueACL.SUBMIT_APPLICATIONS, "*");
final String Q_C = CapacityScheduler.ROOT + "." + C;
conf.setCapacity(Q_C, 1);
conf.setMaximumCapacity(Q_C, 10);
conf.setAcl(Q_C, QueueACL.SUBMIT_APPLICATIONS, " ");
LOG.info("Setup top-level queues a and b");
}
@ -167,8 +180,8 @@ public void testInitializeQueue() throws Exception {
//can add more sturdy test with 3-layer queues
//once MAPREDUCE:3410 is resolved
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
assertEquals(0.1, a.getCapacity(), epsilon);
assertEquals(0.1, a.getAbsoluteCapacity(), epsilon);
assertEquals(0.09, a.getCapacity(), epsilon);
assertEquals(0.09, a.getAbsoluteCapacity(), epsilon);
assertEquals(0.2, a.getMaximumCapacity(), epsilon);
assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
@ -177,6 +190,12 @@ public void testInitializeQueue() throws Exception {
assertEquals(0.9, b.getAbsoluteCapacity(), epsilon);
assertEquals(0.99, b.getMaximumCapacity(), epsilon);
assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
LeafQueue c = stubLeafQueue((LeafQueue)queues.get(C));
assertEquals(0.01, c.getCapacity(), epsilon);
assertEquals(0.01, c.getAbsoluteCapacity(), epsilon);
assertEquals(0.1, c.getMaximumCapacity(), epsilon);
assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
}
@Test
@ -1083,6 +1102,37 @@ public void testSchedulingConstraints() throws Exception {
}
public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) {
for (QueueUserACLInfo aclInfo : aclInfos) {
if (aclInfo.getUserAcls().contains(acl)) {
return true;
}
}
return false;
}
@Test
public void testInheritedQueueAcls() throws IOException {
UserGroupInformation user = UserGroupInformation.getCurrentUser();
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
LeafQueue c = stubLeafQueue((LeafQueue)queues.get(C));
assertFalse(root.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
assertTrue(a.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
assertTrue(b.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
assertFalse(c.hasAccess(QueueACL.SUBMIT_APPLICATIONS, user));
assertTrue(hasQueueACL(
a.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
assertTrue(hasQueueACL(
b.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
assertFalse(hasQueueACL(
c.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
}
@After
public void tearDown() throws Exception {
}

View File

@ -19,21 +19,41 @@
package org.apache.hadoop.mapred;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.atLeastOnce;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.Cluster;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobPriority;
import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.TaskReport;
import org.junit.Test;
public class JobClientUnitTest {
public class TestJobClient extends JobClient {
TestJobClient(JobConf jobConf) throws IOException {
super(jobConf);
}
void setCluster(Cluster cluster) {
this.cluster = cluster;
}
}
@SuppressWarnings("deprecation")
@Test
public void testMapTaskReportsWithNullJob() throws Exception {
JobClient client = new JobClient();
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.cluster = mockCluster;
client.setCluster(mockCluster);
JobID id = new JobID("test",0);
when(mockCluster.getJob(id)).thenReturn(null);
@ -47,9 +67,9 @@ public void testMapTaskReportsWithNullJob() throws Exception {
@SuppressWarnings("deprecation")
@Test
public void testReduceTaskReportsWithNullJob() throws Exception {
JobClient client = new JobClient();
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.cluster = mockCluster;
client.setCluster(mockCluster);
JobID id = new JobID("test",0);
when(mockCluster.getJob(id)).thenReturn(null);
@ -63,9 +83,9 @@ public void testReduceTaskReportsWithNullJob() throws Exception {
@SuppressWarnings("deprecation")
@Test
public void testSetupTaskReportsWithNullJob() throws Exception {
JobClient client = new JobClient();
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.cluster = mockCluster;
client.setCluster(mockCluster);
JobID id = new JobID("test",0);
when(mockCluster.getJob(id)).thenReturn(null);
@ -79,9 +99,9 @@ public void testSetupTaskReportsWithNullJob() throws Exception {
@SuppressWarnings("deprecation")
@Test
public void testCleanupTaskReportsWithNullJob() throws Exception {
JobClient client = new JobClient();
TestJobClient client = new TestJobClient(new JobConf());
Cluster mockCluster = mock(Cluster.class);
client.cluster = mockCluster;
client.setCluster(mockCluster);
JobID id = new JobID("test",0);
when(mockCluster.getJob(id)).thenReturn(null);
@ -91,4 +111,49 @@ public void testCleanupTaskReportsWithNullJob() throws Exception {
verify(mockCluster).getJob(id);
}
@Test
public void testShowJob() throws Exception {
TestJobClient client = new TestJobClient(new JobConf());
JobID jobID = new JobID("test", 0);
JobStatus mockJobStatus = mock(JobStatus.class);
when(mockJobStatus.getJobID()).thenReturn(jobID);
when(mockJobStatus.getState()).thenReturn(JobStatus.State.RUNNING);
when(mockJobStatus.getStartTime()).thenReturn(0L);
when(mockJobStatus.getUsername()).thenReturn("mockuser");
when(mockJobStatus.getQueue()).thenReturn("mockqueue");
when(mockJobStatus.getPriority()).thenReturn(JobPriority.NORMAL);
when(mockJobStatus.getNumUsedSlots()).thenReturn(1);
when(mockJobStatus.getNumReservedSlots()).thenReturn(1);
when(mockJobStatus.getUsedMem()).thenReturn(1024);
when(mockJobStatus.getReservedMem()).thenReturn(512);
when(mockJobStatus.getNeededMem()).thenReturn(2048);
when(mockJobStatus.getSchedulingInfo()).thenReturn("NA");
Job mockJob = mock(Job.class);
when(mockJob.getTaskReports(isA(TaskType.class))).thenReturn(new TaskReport[0]);
Cluster mockCluster = mock(Cluster.class);
when(mockCluster.getJob(jobID)).thenReturn(mockJob);
client.setCluster(mockCluster);
client.displayJobList(new JobStatus[] {mockJobStatus});
verify(mockJobStatus, atLeastOnce()).getJobID();
verify(mockJob, atLeastOnce()).getTaskReports(isA(TaskType.class));
verify(mockCluster, atLeastOnce()).getJob(jobID);
verify(mockJobStatus).getState();
verify(mockJobStatus).getStartTime();
verify(mockJobStatus).getUsername();
verify(mockJobStatus).getQueue();
verify(mockJobStatus).getPriority();
verify(mockJobStatus).getNumUsedSlots();
verify(mockJobStatus).getNumReservedSlots();
verify(mockJobStatus).getUsedMem();
verify(mockJobStatus).getReservedMem();
verify(mockJobStatus).getNeededMem();
verify(mockJobStatus).getSchedulingInfo();
}
}

View File

@ -137,13 +137,6 @@
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-examples</artifactId>