Merge trunk into auto-HA branch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1340622 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-05-20 06:12:14 +00:00
commit 1a3bd5d42f
114 changed files with 959 additions and 258 deletions

View File

@ -67,6 +67,9 @@ Trunk (unreleased changes)
HADOOP-8297. Writable javadocs don't carry default constructor (harsh)
HADOOP-8360. empty-configuration.xml fails xml validation
(Radim Kolar via harsh)
BUG FIXES
HADOOP-8177. MBeans shouldn't try to register when it fails to create MBeanName.
@ -175,6 +178,16 @@ Release 2.0.1-alpha - UNRELEASED
HADOOP-8400. All commands warn "Kerberos krb5 configuration not found" when security is not enabled. (tucu)
HADOOP-8406. CompressionCodecFactory.CODEC_PROVIDERS iteration is
thread-unsafe (todd)
HADOOP-8287. etc/hadoop is missing hadoop-env.sh (eli)
HADOOP-8408. MR doesn't work with a non-default ViewFS mount table
and security enabled. (atm via eli)
HADOOP-8329. Build fails with Java 7. (eli)
Release 2.0.0-alpha - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -0,0 +1,74 @@
# Copyright 2011 The Apache Software Foundation
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Set Hadoop-specific environment variables here.
# The only required environment variable is JAVA_HOME. All others are
# optional. When running a distributed configuration it is best to
# set JAVA_HOME in this file, so that it is correctly defined on
# remote nodes.
# The java implementation to use.
export JAVA_HOME=${JAVA_HOME}
# The jsvc implementation to use. Jsvc is required to run secure datanodes.
#export JSVC_HOME=${JSVC_HOME}
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-"/etc/hadoop"}
# Extra Java CLASSPATH elements. Automatically insert capacity-scheduler.
for f in $HADOOP_HOME/contrib/capacity-scheduler/*.jar; do
if [ "$HADOOP_CLASSPATH" ]; then
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:$f
else
export HADOOP_CLASSPATH=$f
fi
done
# The maximum amount of heap to use, in MB. Default is 1000.
#export HADOOP_HEAPSIZE=
#export HADOOP_NAMENODE_INIT_HEAPSIZE=""
# Extra Java runtime options. Empty by default.
export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true $HADOOP_CLIENT_OPTS"
# Command specific options appended to HADOOP_OPTS when specified
export HADOOP_NAMENODE_OPTS="-Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_NAMENODE_OPTS"
export HADOOP_DATANODE_OPTS="-Dhadoop.security.logger=ERROR,RFAS $HADOOP_DATANODE_OPTS"
export HADOOP_SECONDARYNAMENODE_OPTS="-Dhadoop.security.logger=${HADOOP_SECURITY_LOGGER:-INFO,RFAS} -Dhdfs.audit.logger=${HDFS_AUDIT_LOGGER:-INFO,NullAppender} $HADOOP_SECONDARYNAMENODE_OPTS"
# The following applies to multiple commands (fs, dfs, fsck, distcp etc)
export HADOOP_CLIENT_OPTS="-Xmx128m $HADOOP_CLIENT_OPTS"
#HADOOP_JAVA_PLATFORM_OPTS="-XX:-UsePerfData $HADOOP_JAVA_PLATFORM_OPTS"
# On secure datanodes, user to run the datanode as after dropping privileges
export HADOOP_SECURE_DN_USER=${HADOOP_SECURE_DN_USER}
# Where log files are stored. $HADOOP_HOME/logs by default.
export HADOOP_LOG_DIR=${HADOOP_LOG_DIR}/$USER
# Where log files are stored in the secure data environment.
export HADOOP_SECURE_DN_LOG_DIR=${HADOOP_LOG_DIR}/${HADOOP_HDFS_USER}
# The directory where pid files are stored. /tmp by default.
export HADOOP_PID_DIR=${HADOOP_PID_DIR}
export HADOOP_SECURE_DN_PID_DIR=${HADOOP_PID_DIR}
# A string representing this instance of hadoop. $USER by default.
export HADOOP_IDENT_STRING=$USER

View File

@ -233,6 +233,11 @@ public class ViewFileSystem extends FileSystem {
fsState.resolve(getUriPath(f), true);
return res.isInternalDir() ? null : res.targetFileSystem.getHomeDirectory();
}
@Override
public String getCanonicalServiceName() {
return getUri().getHost();
}
@Override
public URI getUri() {

View File

@ -96,7 +96,7 @@ public class HttpServer implements FilterContainer {
// The ServletContext attribute where the daemon Configuration
// gets stored.
public static final String CONF_CONTEXT_ATTRIBUTE = "hadoop.conf";
static final String ADMINS_ACL = "admins.acl";
public static final String ADMINS_ACL = "admins.acl";
public static final String SPNEGO_FILTER = "SpnegoFilter";
public static final String BIND_ADDRESS = "bind.address";
@ -792,7 +792,7 @@ public class HttpServer implements FilterContainer {
*
* @param servletContext
* @param request
* @param response
* @param response used to send the error response if user does not have admin access.
* @return true if admin-authorized, false otherwise
* @throws IOException
*/
@ -814,18 +814,33 @@ public class HttpServer implements FilterContainer {
"authorized to access this page.");
return false;
}
if (servletContext.getAttribute(ADMINS_ACL) != null &&
!userHasAdministratorAccess(servletContext, remoteUser)) {
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "User "
+ remoteUser + " is unauthorized to access this page.");
return false;
}
return true;
}
/**
* Get the admin ACLs from the given ServletContext and check if the given
* user is in the ACL.
*
* @param servletContext the context containing the admin ACL.
* @param remoteUser the remote user to check for.
* @return true if the user is present in the ACL, false if no ACL is set or
* the user is not present
*/
public static boolean userHasAdministratorAccess(ServletContext servletContext,
String remoteUser) {
AccessControlList adminsAcl = (AccessControlList) servletContext
.getAttribute(ADMINS_ACL);
UserGroupInformation remoteUserUGI =
UserGroupInformation.createRemoteUser(remoteUser);
if (adminsAcl != null) {
if (!adminsAcl.isUserAllowed(remoteUserUGI)) {
response.sendError(HttpServletResponse.SC_UNAUTHORIZED, "User "
+ remoteUser + " is unauthorized to access this page.");
return false;
}
}
return true;
return adminsAcl != null && adminsAcl.isUserAllowed(remoteUserUGI);
}
/**

View File

@ -109,8 +109,12 @@ public class CompressionCodecFactory {
List<Class<? extends CompressionCodec>> result
= new ArrayList<Class<? extends CompressionCodec>>();
// Add codec classes discovered via service loading
for (CompressionCodec codec : CODEC_PROVIDERS) {
result.add(codec.getClass());
synchronized (CODEC_PROVIDERS) {
// CODEC_PROVIDERS is a lazy collection. Synchronize so it is
// thread-safe. See HADOOP-8406.
for (CompressionCodec codec : CODEC_PROVIDERS) {
result.add(codec.getClass());
}
}
// Add codec classes from configuration
String codecsString = conf.get("io.compression.codecs");

View File

@ -1,3 +1,4 @@
<?xml version="1.0"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
@ -14,7 +15,6 @@
See the License for the specific language governing permissions and
limitations under the License.
-->
<?xml version="1.0"?>
<configuration>
</configuration>

View File

@ -0,0 +1,73 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs.viewfs;
import static org.junit.Assert.*;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsConstants;
import org.junit.Test;
/**
* Test ViewFileSystem's support for having delegation tokens fetched and cached
* for the file system.
*/
public class TestViewFileSystemDelegationTokenSupport {
private static final String MOUNT_TABLE_NAME = "vfs-cluster";
/**
* Ensure that a canonical service name can be determined for ViewFileSystem
* instances configured with a non-default mount table name.
*
* Regression test for HADOOP-8408.
*/
@Test
public void testGetCanonicalServiceNameWithNonDefaultMountTable()
throws URISyntaxException, IOException {
Configuration conf = new Configuration();
ConfigUtil.addLink(conf, MOUNT_TABLE_NAME, "/user", new URI("file:///"));
FileSystem viewFs = FileSystem.get(new URI(FsConstants.VIEWFS_SCHEME +
"://" + MOUNT_TABLE_NAME), conf);
String serviceName = viewFs.getCanonicalServiceName();
assertNotNull(serviceName);
assertEquals(MOUNT_TABLE_NAME, serviceName);
}
@Test
public void testGetCanonicalServiceNameWithDefaultMountTable()
throws URISyntaxException, IOException {
Configuration conf = new Configuration();
ConfigUtil.addLink(conf, "/user", new URI("file:///"));
FileSystem viewFs = FileSystem.get(FsConstants.VIEWFS_URI, conf);
String serviceName = viewFs.getCanonicalServiceName();
assertNull(serviceName);
}
}

View File

@ -187,6 +187,12 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3419. Cleanup LocatedBlock. (eli)
HDFS-3440. More effectively limit stream memory consumption when reading
corrupt edit logs (Colin Patrick McCabe via todd)
HDFS-3438. BootstrapStandby should not require a rollEdits on active node
(todd)
OPTIMIZATIONS
BUG FIXES
@ -206,6 +212,25 @@ Release 2.0.1-alpha - UNRELEASED
HDFS-3413. TestFailureToReadEdits timing out. (atm)
HDFS-3422. TestStandbyIsHot timeouts too aggressive (todd)
HDFS-3433. GetImageServlet should allow administrative requestors when
security is enabled. (atm)
HDFS-1153. dfsnodelist.jsp should handle invalid input parameters.
(Ravi Phulari via eli)
HDFS-3434. InvalidProtocolBufferException when visiting DN
browseDirectory.jsp (eli)
HDFS-2800. Fix cancellation of checkpoints in the standby node to be more
reliable. (todd)
HDFS-3391. Fix InvalidateBlocks to compare blocks including their
generation stamps. (todd)
HDFS-3444. hdfs groups command doesn't work with security enabled. (atm)
Release 2.0.0-alpha - UNRELEASED
INCOMPATIBLE CHANGES
@ -697,6 +722,9 @@ Release 2.0.0-alpha - UNRELEASED
HDFS-3026. HA: Handle failure during HA state transition. (atm)
HDFS-860. fuse-dfs truncate behavior causes issues with scp.
(Brian Bockelman via eli)
BREAKDOWN OF HDFS-1623 SUBTASKS
HDFS-2179. Add fencing framework and mechanisms for NameNode HA. (todd)

View File

@ -75,7 +75,7 @@ class BookKeeperEditLogInputStream extends EditLogInputStream {
tracker = new FSEditLogLoader.PositionTrackingInputStream(bin);
DataInputStream in = new DataInputStream(tracker);
reader = new FSEditLogOp.Reader(in, logVersion);
reader = new FSEditLogOp.Reader(in, tracker, logVersion);
}
@Override

View File

@ -37,7 +37,7 @@ int dfs_truncate(const char *path, off_t size)
assert(dfs);
if (size != 0) {
return -ENOTSUP;
return 0;
}
int ret = dfs_unlink(path);

View File

@ -20,9 +20,13 @@ package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.hdfs.protocol.proto.GetUserMappingsProtocolProtos.GetUserMappingsProtocolService;
import org.apache.hadoop.ipc.ProtocolInfo;
import org.apache.hadoop.security.KerberosInfo;
@KerberosInfo(
serverPrincipal=CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY)
@ProtocolInfo(
protocolName = "org.apache.hadoop.tools.GetUserMappingsProtocol",
protocolVersion = 1)

View File

@ -32,6 +32,8 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksReq
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
@ -104,6 +106,20 @@ public class NamenodeProtocolServerSideTranslatorPB implements
}
return GetTransactionIdResponseProto.newBuilder().setTxId(txid).build();
}
@Override
public GetMostRecentCheckpointTxIdResponseProto getMostRecentCheckpointTxId(
RpcController unused, GetMostRecentCheckpointTxIdRequestProto request)
throws ServiceException {
long txid;
try {
txid = impl.getMostRecentCheckpointTxId();
} catch (IOException e) {
throw new ServiceException(e);
}
return GetMostRecentCheckpointTxIdResponseProto.newBuilder().setTxId(txid).build();
}
@Override
public RollEditLogResponseProto rollEditLog(RpcController unused,

View File

@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.ErrorReportR
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlockKeysRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetBlocksRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetMostRecentCheckpointTxIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.GetTransactionIdRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RegisterRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.NamenodeProtocolProtos.RollEditLogRequestProto;
@ -119,6 +120,16 @@ public class NamenodeProtocolTranslatorPB implements NamenodeProtocol,
}
}
@Override
public long getMostRecentCheckpointTxId() throws IOException {
try {
return rpcProxy.getMostRecentCheckpointTxId(NULL_CONTROLLER,
GetMostRecentCheckpointTxIdRequestProto.getDefaultInstance()).getTxId();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public CheckpointSignature rollEditLog() throws IOException {
try {

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.TreeMap;
@ -54,10 +53,23 @@ class InvalidateBlocks {
return numBlocks;
}
/** Does this contain the block which is associated with the storage? */
/**
* @return true if the given storage has the given block listed for
* invalidation. Blocks are compared including their generation stamps:
* if a block is pending invalidation but with a different generation stamp,
* returns false.
* @param storageID the storage to check
* @param the block to look for
*
*/
synchronized boolean contains(final String storageID, final Block block) {
final Collection<Block> s = node2blocks.get(storageID);
return s != null && s.contains(block);
final LightWeightHashSet<Block> s = node2blocks.get(storageID);
if (s == null) {
return false; // no invalidate blocks for this storage ID
}
Block blockInSet = s.getElement(block);
return blockInSet != null &&
block.getGenerationStamp() == blockInSet.getGenerationStamp();
}
/**

View File

@ -591,7 +591,8 @@ class BPOfferService {
processDistributedUpgradeCommand((UpgradeCommand)cmd);
break;
case DatanodeProtocol.DNA_RECOVERBLOCK:
dn.recoverBlocks(((BlockRecoveryCommand)cmd).getRecoveringBlocks());
String who = "NameNode at " + actor.getNNSocketAddress();
dn.recoverBlocks(who, ((BlockRecoveryCommand)cmd).getRecoveringBlocks());
break;
case DatanodeProtocol.DNA_ACCESSKEYUPDATE:
LOG.info("DatanodeCommand action: DNA_ACCESSKEYUPDATE");

View File

@ -163,6 +163,7 @@ import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
@ -1706,13 +1707,16 @@ public class DataNode extends Configured
secureMain(args, null);
}
public Daemon recoverBlocks(final Collection<RecoveringBlock> blocks) {
public Daemon recoverBlocks(
final String who,
final Collection<RecoveringBlock> blocks) {
Daemon d = new Daemon(threadGroup, new Runnable() {
/** Recover a list of blocks. It is run by the primary datanode. */
public void run() {
for(RecoveringBlock b : blocks) {
try {
logRecoverBlock("NameNode", b.getBlock(), b.getLocations());
logRecoverBlock(who, b);
recoverBlock(b);
} catch (IOException e) {
LOG.warn("recoverBlocks FAILED: " + b, e);
@ -1973,14 +1977,13 @@ public class DataNode extends Configured
datanodes, storages);
}
private static void logRecoverBlock(String who,
ExtendedBlock block, DatanodeID[] targets) {
StringBuilder msg = new StringBuilder(targets[0].toString());
for (int i = 1; i < targets.length; i++) {
msg.append(", " + targets[i]);
}
private static void logRecoverBlock(String who, RecoveringBlock rb) {
ExtendedBlock block = rb.getBlock();
DatanodeInfo[] targets = rb.getLocations();
LOG.info(who + " calls recoverBlock(block=" + block
+ ", targets=[" + msg + "])");
+ ", targets=[" + Joiner.on(", ").join(targets) + "]"
+ ", newGenerationStamp=" + rb.getNewGenerationStamp() + ")");
}
@Override // ClientDataNodeProtocol

View File

@ -119,7 +119,7 @@ class EditLogBackupInputStream extends EditLogInputStream {
this.version = version;
reader = new FSEditLogOp.Reader(in, version);
reader = new FSEditLogOp.Reader(in, tracker, version);
}
void clear() throws IOException {

View File

@ -83,7 +83,7 @@ public class EditLogFileInputStream extends EditLogInputStream {
throw new LogHeaderCorruptException("No header found in log");
}
reader = new FSEditLogOp.Reader(in, logVersion);
reader = new FSEditLogOp.Reader(in, tracker, logVersion);
this.firstTxId = firstTxId;
this.lastTxId = lastTxId;
this.isInProgress = isInProgress;

View File

@ -721,17 +721,31 @@ public class FSEditLogLoader {
/**
* Stream wrapper that keeps track of the current stream position.
*
* This stream also allows us to set a limit on how many bytes we can read
* without getting an exception.
*/
public static class PositionTrackingInputStream extends FilterInputStream {
public static class PositionTrackingInputStream extends FilterInputStream
implements StreamLimiter {
private long curPos = 0;
private long markPos = -1;
private long limitPos = Long.MAX_VALUE;
public PositionTrackingInputStream(InputStream is) {
super(is);
}
private void checkLimit(long amt) throws IOException {
long extra = (curPos + amt) - limitPos;
if (extra > 0) {
throw new IOException("Tried to read " + amt + " byte(s) past " +
"the limit at offset " + limitPos);
}
}
@Override
public int read() throws IOException {
checkLimit(1);
int ret = super.read();
if (ret != -1) curPos++;
return ret;
@ -739,6 +753,7 @@ public class FSEditLogLoader {
@Override
public int read(byte[] data) throws IOException {
checkLimit(data.length);
int ret = super.read(data);
if (ret > 0) curPos += ret;
return ret;
@ -746,11 +761,17 @@ public class FSEditLogLoader {
@Override
public int read(byte[] data, int offset, int length) throws IOException {
checkLimit(length);
int ret = super.read(data, offset, length);
if (ret > 0) curPos += ret;
return ret;
}
@Override
public void setLimit(long limit) {
limitPos = curPos + limit;
}
@Override
public void mark(int limit) {
super.mark(limit);
@ -773,6 +794,11 @@ public class FSEditLogLoader {
@Override
public long skip(long amt) throws IOException {
long extra = (curPos + amt) - limitPos;
if (extra > 0) {
throw new IOException("Tried to skip " + extra + " bytes past " +
"the limit at offset " + limitPos);
}
long ret = super.skip(amt);
curPos += ret;
return ret;

View File

@ -75,7 +75,10 @@ import java.io.EOFException;
public abstract class FSEditLogOp {
public final FSEditLogOpCodes opCode;
long txid;
private static final int MAX_OP_SIZE = 100 * 1024 * 1024;
/**
* Opcode size is limited to 1.5 megabytes
*/
public static final int MAX_OP_SIZE = (3 * 1024 * 1024) / 2;
@SuppressWarnings("deprecation")
@ -2229,6 +2232,7 @@ public abstract class FSEditLogOp {
*/
public static class Reader {
private final DataInputStream in;
private final StreamLimiter limiter;
private final int logVersion;
private final Checksum checksum;
private final OpInstanceCache cache;
@ -2239,7 +2243,7 @@ public abstract class FSEditLogOp {
* @param logVersion The version of the data coming from the stream.
*/
@SuppressWarnings("deprecation")
public Reader(DataInputStream in, int logVersion) {
public Reader(DataInputStream in, StreamLimiter limiter, int logVersion) {
this.logVersion = logVersion;
if (LayoutVersion.supports(Feature.EDITS_CHESKUM, logVersion)) {
this.checksum = new PureJavaCrc32();
@ -2253,6 +2257,7 @@ public abstract class FSEditLogOp {
} else {
this.in = in;
}
this.limiter = limiter;
this.cache = new OpInstanceCache();
}
@ -2272,6 +2277,7 @@ public abstract class FSEditLogOp {
public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
while (true) {
try {
limiter.setLimit(MAX_OP_SIZE);
in.mark(MAX_OP_SIZE);
return decodeOp();
} catch (GarbageAfterTerminatorException e) {

View File

@ -54,6 +54,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.hdfs.DFSConfigKeys;
@ -89,9 +90,6 @@ public class FSImage implements Closeable {
private final NNStorageRetentionManager archivalManager;
private SaveNamespaceContext curSaveNamespaceContext = null;
/**
* Construct an FSImage
* @param conf Configuration
@ -804,17 +802,28 @@ public class FSImage implements Closeable {
try {
thread.join();
} catch (InterruptedException iex) {
LOG.error("Caught exception while waiting for thread " +
LOG.error("Caught interrupted exception while waiting for thread " +
thread.getName() + " to finish. Retrying join");
}
}
}
}
/**
* @see #saveNamespace(FSNamesystem, Canceler)
*/
public synchronized void saveNamespace(FSNamesystem source)
throws IOException {
saveNamespace(source, null);
}
/**
* Save the contents of the FS image to a new image file in each of the
* current storage directories.
* @param canceler
*/
public synchronized void saveNamespace(FSNamesystem source) throws IOException {
public synchronized void saveNamespace(FSNamesystem source,
Canceler canceler) throws IOException {
assert editLog != null : "editLog must be initialized";
storage.attemptRestoreRemovedStorage();
@ -825,7 +834,7 @@ public class FSImage implements Closeable {
}
long imageTxId = getLastAppliedOrWrittenTxId();
try {
saveFSImageInAllDirs(source, imageTxId);
saveFSImageInAllDirs(source, imageTxId, canceler);
storage.writeAll();
} finally {
if (editLogWasOpen) {
@ -837,27 +846,27 @@ public class FSImage implements Closeable {
storage.writeTransactionIdFileToStorage(imageTxId + 1);
}
}
}
public void cancelSaveNamespace(String reason)
throws InterruptedException {
SaveNamespaceContext ctx = curSaveNamespaceContext;
if (ctx != null) {
ctx.cancel(reason); // waits until complete
}
}
/**
* @see #saveFSImageInAllDirs(FSNamesystem, long, Canceler)
*/
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid)
throws IOException {
saveFSImageInAllDirs(source, txid, null);
}
protected synchronized void saveFSImageInAllDirs(FSNamesystem source, long txid,
Canceler canceler)
throws IOException {
if (storage.getNumStorageDirs(NameNodeDirType.IMAGE) == 0) {
throw new IOException("No image directories available!");
}
if (canceler == null) {
canceler = new Canceler();
}
SaveNamespaceContext ctx = new SaveNamespaceContext(
source, txid);
curSaveNamespaceContext = ctx;
source, txid, canceler);
try {
List<Thread> saveThreads = new ArrayList<Thread>();
@ -878,7 +887,7 @@ public class FSImage implements Closeable {
throw new IOException(
"Failed to save in any storage directories while saving namespace.");
}
if (ctx.isCancelled()) {
if (canceler.isCancelled()) {
deleteCancelledCheckpoint(txid);
ctx.checkCancelled(); // throws
assert false : "should have thrown above!";

View File

@ -540,7 +540,6 @@ class FSImageFormat {
private void saveImage(ByteBuffer currentDirName,
INodeDirectory current,
DataOutputStream out) throws IOException {
context.checkCancelled();
List<INode> children = current.getChildrenRaw();
if (children == null || children.isEmpty())
return;
@ -554,9 +553,13 @@ class FSImageFormat {
out.write(currentDirName.array(), 0, prefixLen);
}
out.writeInt(children.size());
int i = 0;
for(INode child : children) {
// print all children first
FSImageSerialization.saveINode2Image(child, out);
if (i++ % 50 == 0) {
context.checkCancelled();
}
}
for(INode child : children) {
if(!child.isDirectory())

View File

@ -702,7 +702,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
*/
void prepareToStopStandbyServices() throws ServiceFailedException {
if (standbyCheckpointer != null) {
standbyCheckpointer.cancelAndPreventCheckpoints();
standbyCheckpointer.cancelAndPreventCheckpoints(
"About to leave standby state");
}
}
@ -3372,27 +3373,6 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
}
}
/**
* Cancel an ongoing saveNamespace operation and wait for its
* threads to exit, if one is currently in progress.
*
* If no such operation is in progress, this call does nothing.
*
* @param reason a reason to be communicated to the caller saveNamespace
* @throws IOException
*/
void cancelSaveNamespace(String reason) throws IOException {
readLock();
try {
checkSuperuserPrivilege();
getFSImage().cancelSaveNamespace(reason);
} catch (InterruptedException e) {
throw new IOException(e);
} finally {
readUnlock();
}
}
/**
* Enables/Disables/Checks restoring failed storage replicas if the storage becomes available again.
* Requires superuser privilege.

View File

@ -44,6 +44,7 @@ import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@ -85,11 +86,12 @@ public class GetImageServlet extends HttpServlet {
final Configuration conf =
(Configuration)getServletContext().getAttribute(JspHelper.CURRENT_CONF);
if(UserGroupInformation.isSecurityEnabled() &&
!isValidRequestor(request.getUserPrincipal().getName(), conf)) {
if (UserGroupInformation.isSecurityEnabled() &&
!isValidRequestor(context, request.getUserPrincipal().getName(), conf)) {
response.sendError(HttpServletResponse.SC_FORBIDDEN,
"Only Namenode and Secondary Namenode may access this servlet");
LOG.warn("Received non-NN/SNN request for image or edits from "
"Only Namenode, Secondary Namenode, and administrators may access " +
"this servlet");
LOG.warn("Received non-NN/SNN/administrator request for image or edits from "
+ request.getUserPrincipal().getName() + " at " + request.getRemoteHost());
return;
}
@ -209,8 +211,8 @@ public class GetImageServlet extends HttpServlet {
}
@VisibleForTesting
static boolean isValidRequestor(String remoteUser, Configuration conf)
throws IOException {
static boolean isValidRequestor(ServletContext context, String remoteUser,
Configuration conf) throws IOException {
if(remoteUser == null) { // This really shouldn't happen...
LOG.warn("Received null remoteUser while authorizing access to getImage servlet");
return false;
@ -237,11 +239,17 @@ public class GetImageServlet extends HttpServlet {
for(String v : validRequestors) {
if(v != null && v.equals(remoteUser)) {
if(LOG.isInfoEnabled()) LOG.info("GetImageServlet allowing: " + remoteUser);
LOG.info("GetImageServlet allowing checkpointer: " + remoteUser);
return true;
}
}
if(LOG.isInfoEnabled()) LOG.info("GetImageServlet rejecting: " + remoteUser);
if (HttpServer.userHasAdministratorAccess(context, remoteUser)) {
LOG.info("GetImageServlet allowing administrator: " + remoteUser);
return true;
}
LOG.info("GetImageServlet rejecting: " + remoteUser);
return false;
}

View File

@ -165,9 +165,9 @@ public class NameNodeHttpServer {
httpServer.setAttribute("datanode.https.port", datanodeSslPort
.getPort());
}
httpServer.setAttribute("name.node", nn);
httpServer.setAttribute("name.node.address", bindAddress);
httpServer.setAttribute("name.system.image", nn.getFSImage());
httpServer.setAttribute(NAMENODE_ATTRIBUTE_KEY, nn);
httpServer.setAttribute(NAMENODE_ADDRESS_ATTRIBUTE_KEY, nn.getNameNodeAddress());
httpServer.setAttribute(FSIMAGE_ATTRIBUTE_KEY, nn.getFSImage());
httpServer.setAttribute(JspHelper.CURRENT_CONF, conf);
setupServlets(httpServer, conf);
httpServer.start();

View File

@ -50,7 +50,6 @@ import org.apache.hadoop.ha.protocolPB.HAServiceProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
@ -712,10 +711,16 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // NamenodeProtocol
public long getTransactionID() throws IOException {
namesystem.checkOperation(OperationCategory.CHECKPOINT);
return namesystem.getEditLog().getSyncTxId();
namesystem.checkOperation(OperationCategory.UNCHECKED);
return namesystem.getFSImage().getLastAppliedOrWrittenTxId();
}
@Override // NamenodeProtocol
public long getMostRecentCheckpointTxId() throws IOException {
namesystem.checkOperation(OperationCategory.UNCHECKED);
return namesystem.getFSImage().getMostRecentCheckpointTxId();
}
@Override // NamenodeProtocol
public CheckpointSignature rollEditLog() throws IOException {
return namesystem.rollEditLog();

View File

@ -587,6 +587,10 @@ class NamenodeJspHelper {
whatNodes = request.getParameter("whatNodes"); // show only live or only
// dead nodes
if (null == whatNodes || whatNodes.isEmpty()) {
out.print("Invalid input");
return;
}
sorterField = request.getParameter("sorter/field");
sorterOrder = request.getParameter("sorter/order");
if (sorterField == null)
@ -714,6 +718,8 @@ class NamenodeJspHelper {
}
out.print("</table>\n");
}
} else {
out.print("Invalid input");
}
out.print("</div>");
}

View File

@ -23,6 +23,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.util.Canceler;
import com.google.common.base.Preconditions;
@ -36,20 +37,17 @@ class SaveNamespaceContext {
private final long txid;
private final List<StorageDirectory> errorSDs =
Collections.synchronizedList(new ArrayList<StorageDirectory>());
/**
* If the operation has been canceled, set to the reason why
* it has been canceled (eg standby moving to active)
*/
private volatile String cancelReason = null;
private final Canceler canceller;
private CountDownLatch completionLatch = new CountDownLatch(1);
SaveNamespaceContext(
FSNamesystem sourceNamesystem,
long txid) {
long txid,
Canceler canceller) {
this.sourceNamesystem = sourceNamesystem;
this.txid = txid;
this.canceller = canceller;
}
FSNamesystem getSourceNamesystem() {
@ -68,17 +66,6 @@ class SaveNamespaceContext {
return errorSDs;
}
/**
* Requests that the current saveNamespace operation be
* canceled if it is still running.
* @param reason the reason why cancellation is requested
* @throws InterruptedException
*/
void cancel(String reason) throws InterruptedException {
this.cancelReason = reason;
completionLatch.await();
}
void markComplete() {
Preconditions.checkState(completionLatch.getCount() == 1,
"Context already completed!");
@ -86,13 +73,9 @@ class SaveNamespaceContext {
}
void checkCancelled() throws SaveNamespaceCancelledException {
if (cancelReason != null) {
if (canceller.isCancelled()) {
throw new SaveNamespaceCancelledException(
cancelReason);
canceller.getCancellationReason());
}
}
boolean isCancelled() {
return cancelReason != null;
}
}

View File

@ -0,0 +1,30 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.namenode;
/**
* An object that allows you to set a limit on a stream. This limit
* represents the number of bytes that can be read without getting an
* exception.
*/
interface StreamLimiter {
/**
* Set a limit. Calling this function clears any existing limit.
*/
public void setLimit(long limit);
}

View File

@ -33,18 +33,10 @@ import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceStatus;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.CheckpointSignature;
import org.apache.hadoop.hdfs.server.namenode.EditLogInputStream;
import org.apache.hadoop.hdfs.server.namenode.FSImage;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -54,10 +46,8 @@ import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.tools.DFSHAAdmin;
import org.apache.hadoop.hdfs.tools.NNHAServiceTarget;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.Tool;
@ -92,7 +82,7 @@ public class BootstrapStandby implements Tool, Configurable {
// Exit/return codes.
static final int ERR_CODE_FAILED_CONNECT = 2;
static final int ERR_CODE_INVALID_VERSION = 3;
static final int ERR_CODE_OTHER_NN_NOT_ACTIVE = 4;
// Skip 4 - was used in previous versions, but no longer returned.
static final int ERR_CODE_ALREADY_FORMATTED = 5;
static final int ERR_CODE_LOGS_UNAVAILABLE = 6;
@ -144,12 +134,6 @@ public class BootstrapStandby implements Tool, Configurable {
.getProxy();
}
private HAServiceProtocol createHAProtocolProxy()
throws IOException {
return new NNHAServiceTarget(new HdfsConfiguration(conf), nsId, otherNNId)
.getProxy(conf, 15000);
}
private int doRun() throws IOException {
NamenodeProtocol proxy = createNNProtocolProxy();
@ -186,29 +170,6 @@ public class BootstrapStandby implements Tool, Configurable {
" Layout version: " + nsInfo.getLayoutVersion() + "\n" +
"=====================================================");
// Ensure the other NN is active - we can't force it to roll edit logs
// below if it's not active.
if (!isOtherNNActive()) {
String err = "NameNode " + nsId + "." + nnId + " at " + otherIpcAddr +
" is not currently in ACTIVE state.";
if (!interactive) {
LOG.fatal(err + " Please transition it to " +
"active before attempting to bootstrap a standby node.");
return ERR_CODE_OTHER_NN_NOT_ACTIVE;
}
System.err.println(err);
if (ToolRunner.confirmPrompt(
"Do you want to automatically transition it to active now?")) {
transitionOtherNNActive();
} else {
LOG.fatal("User aborted. Exiting without bootstrapping standby.");
return ERR_CODE_OTHER_NN_NOT_ACTIVE;
}
}
// Check with the user before blowing away data.
if (!NameNode.confirmFormat(
Sets.union(Sets.newHashSet(dirsToFormat),
@ -216,13 +177,10 @@ public class BootstrapStandby implements Tool, Configurable {
force, interactive)) {
return ERR_CODE_ALREADY_FORMATTED;
}
// Force the active to roll its log
CheckpointSignature csig = proxy.rollEditLog();
long imageTxId = csig.getMostRecentCheckpointTxId();
long rollTxId = csig.getCurSegmentTxId();
long imageTxId = proxy.getMostRecentCheckpointTxId();
long curTxId = proxy.getTransactionID();
// Format the storage (writes VERSION file)
NNStorage storage = new NNStorage(conf, dirsToFormat, editUrisToFormat);
storage.format(nsInfo);
@ -235,11 +193,11 @@ public class BootstrapStandby implements Tool, Configurable {
// Ensure that we have enough edits already in the shared directory to
// start up from the last checkpoint on the active.
if (!checkLogsAvailableForRead(image, imageTxId, rollTxId)) {
if (!checkLogsAvailableForRead(image, imageTxId, curTxId)) {
return ERR_CODE_LOGS_UNAVAILABLE;
}
image.getStorage().writeTransactionIdFileToStorage(rollTxId);
image.getStorage().writeTransactionIdFileToStorage(curTxId);
// Download that checkpoint into our storage directories.
MD5Hash hash = TransferFsImage.downloadImageToStorage(
@ -249,33 +207,31 @@ public class BootstrapStandby implements Tool, Configurable {
return 0;
}
private void transitionOtherNNActive()
throws AccessControlException, ServiceFailedException, IOException {
LOG.info("Transitioning the running namenode to active...");
createHAProtocolProxy().transitionToActive(
new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER));
LOG.info("Successful");
}
private boolean checkLogsAvailableForRead(FSImage image, long imageTxId,
long rollTxId) {
long curTxIdOnOtherNode) {
if (imageTxId == curTxIdOnOtherNode) {
// The other node hasn't written any logs since the last checkpoint.
// This can be the case if the NN was freshly formatted as HA, and
// then started in standby mode, so it has no edit logs at all.
return true;
}
long firstTxIdInLogs = imageTxId + 1;
long lastTxIdInLogs = rollTxId - 1;
assert lastTxIdInLogs >= firstTxIdInLogs;
assert curTxIdOnOtherNode >= firstTxIdInLogs :
"first=" + firstTxIdInLogs + " onOtherNode=" + curTxIdOnOtherNode;
try {
Collection<EditLogInputStream> streams =
image.getEditLog().selectInputStreams(
firstTxIdInLogs, lastTxIdInLogs, false);
firstTxIdInLogs, curTxIdOnOtherNode, true);
for (EditLogInputStream stream : streams) {
IOUtils.closeStream(stream);
}
return true;
} catch (IOException e) {
String msg = "Unable to read transaction ids " +
firstTxIdInLogs + "-" + lastTxIdInLogs +
firstTxIdInLogs + "-" + curTxIdOnOtherNode +
" from the configured shared edits storage " +
Joiner.on(",").join(sharedEditsUris) + ". " +
"Please copy these logs into the shared edits storage " +
@ -294,12 +250,6 @@ public class BootstrapStandby implements Tool, Configurable {
return (nsInfo.getLayoutVersion() == HdfsConstants.LAYOUT_VERSION);
}
private boolean isOtherNNActive()
throws AccessControlException, IOException {
HAServiceStatus status = createHAProtocolProxy().getServiceStatus();
return status.getState() == HAServiceState.ACTIVE;
}
private void parseConfAndFindOtherNN() throws IOException {
Configuration conf = getConf();
nsId = DFSUtil.getNamenodeNameServiceId(conf);

View File

@ -34,6 +34,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.SaveNamespaceCancelledException;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
@ -58,6 +59,9 @@ public class StandbyCheckpointer {
private final CheckpointerThread thread;
private String activeNNAddress;
private InetSocketAddress myNNAddress;
private Object cancelLock = new Object();
private Canceler canceler;
// Keep track of how many checkpoints were canceled.
// This is for use in tests.
@ -123,6 +127,7 @@ public class StandbyCheckpointer {
}
public void stop() throws IOException {
cancelAndPreventCheckpoints("Stopping checkpointer");
thread.setShouldRun(false);
thread.interrupt();
try {
@ -134,6 +139,7 @@ public class StandbyCheckpointer {
}
private void doCheckpoint() throws InterruptedException, IOException {
assert canceler != null;
long txid;
namesystem.writeLockInterruptibly();
@ -153,8 +159,8 @@ public class StandbyCheckpointer {
thisCheckpointTxId + ". Skipping...");
return;
}
img.saveNamespace(namesystem);
img.saveNamespace(namesystem, canceler);
txid = img.getStorage().getMostRecentCheckpointTxId();
assert txid == thisCheckpointTxId : "expected to save checkpoint at txid=" +
thisCheckpointTxId + " but instead saved at txid=" + txid;
@ -173,16 +179,18 @@ public class StandbyCheckpointer {
* and prevent any new checkpoints from starting for the next
* minute or so.
*/
public void cancelAndPreventCheckpoints() throws ServiceFailedException {
try {
thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS);
// TODO(HA): there is a really narrow race here if we are just
// about to start a checkpoint - this won't cancel it!
namesystem.getFSImage().cancelSaveNamespace(
"About to exit standby state");
} catch (InterruptedException e) {
throw new ServiceFailedException(
"Interrupted while trying to cancel checkpoint");
public void cancelAndPreventCheckpoints(String msg) throws ServiceFailedException {
thread.preventCheckpointsFor(PREVENT_AFTER_CANCEL_MS);
synchronized (cancelLock) {
// Before beginning a checkpoint, the checkpointer thread
// takes this lock, and creates a canceler object.
// If the canceler is non-null, then a checkpoint is in
// progress and we need to cancel it. If it's null, then
// the operation has not started, meaning that the above
// time-based prevention will take effect.
if (canceler != null) {
canceler.cancel(msg);
}
}
}
@ -272,10 +280,18 @@ public class StandbyCheckpointer {
"exceeds the configured interval " + checkpointConf.getPeriod());
needCheckpoint = true;
}
if (needCheckpoint && now < preventCheckpointsUntil) {
LOG.info("But skipping this checkpoint since we are about to failover!");
canceledCount++;
} else if (needCheckpoint) {
synchronized (cancelLock) {
if (now < preventCheckpointsUntil) {
LOG.info("But skipping this checkpoint since we are about to failover!");
canceledCount++;
continue;
}
assert canceler == null;
canceler = new Canceler();
}
if (needCheckpoint) {
doCheckpoint();
lastCheckpointTime = now;
}
@ -287,6 +303,10 @@ public class StandbyCheckpointer {
continue;
} catch (Throwable t) {
LOG.error("Exception in doCheckpoint", t);
} finally {
synchronized (cancelLock) {
canceler = null;
}
}
}
}

View File

@ -87,11 +87,17 @@ public interface NamenodeProtocol {
/**
* @return The most recent transaction ID that has been synced to
* persistent storage.
* persistent storage, or applied from persistent storage in the
* case of a non-active node.
* @throws IOException
*/
public long getTransactionID() throws IOException;
/**
* Get the transaction ID of the most recent checkpoint.
*/
public long getMostRecentCheckpointTxId() throws IOException;
/**
* Closes the current edit log and opens a new one. The
* call fails if the file system is in SafeMode.

View File

@ -21,14 +21,16 @@ import java.io.IOException;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocolPB.GetUserMappingsProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.tools.GetGroupsBase;
import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.util.ToolRunner;
@ -39,6 +41,8 @@ import org.apache.hadoop.util.ToolRunner;
*/
@InterfaceAudience.Private
public class GetGroups extends GetGroupsBase {
private static final Log LOG = LogFactory.getLog(GetGroups.class);
static{
HdfsConfiguration.init();
@ -59,6 +63,22 @@ public class GetGroups extends GetGroupsBase {
return NameNode.getAddress(conf);
}
@Override
public void setConf(Configuration conf) {
conf = new HdfsConfiguration(conf);
String nameNodePrincipal = conf.get(
DFSConfigKeys.DFS_NAMENODE_USER_NAME_KEY, "");
if (LOG.isDebugEnabled()) {
LOG.debug("Using NN principal: " + nameNodePrincipal);
}
conf.set(CommonConfigurationKeys.HADOOP_SECURITY_SERVICE_USER_NAME_KEY,
nameNodePrincipal);
super.setConf(conf);
}
@Override
protected GetUserMappingsProtocol getUgmProtocol() throws IOException {
return NameNodeProxies.createProxy(getConf(), FileSystem.getDefaultUri(getConf()),

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.util;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Provides a simple interface where one thread can mark an operation
* for cancellation, and another thread can poll for whether the
* cancellation has occurred.
*/
@InterfaceAudience.Private
public class Canceler {
/**
* If the operation has been canceled, set to the reason why
* it has been canceled (eg standby moving to active)
*/
private volatile String cancelReason = null;
/**
* Requests that the current operation be canceled if it is still running.
* This does not block until the cancellation is successful.
* @param reason the reason why cancellation is requested
*/
public void cancel(String reason) {
this.cancelReason = reason;
}
public boolean isCancelled() {
return cancelReason != null;
}
public String getCancellationReason() {
return cancelReason;
}
}

View File

@ -173,31 +173,42 @@ public class LightWeightHashSet<T> implements Collection<T> {
* @return true if element present, false otherwise.
*/
@SuppressWarnings("unchecked")
@Override
public boolean contains(final Object key) {
return getElement((T)key) != null;
}
/**
* Return the element in this set which is equal to
* the given key, if such an element exists.
* Otherwise returns null.
*/
public T getElement(final T key) {
// validate key
if (key == null) {
throw new IllegalArgumentException("Null element is not supported.");
}
// find element
final int hashCode = ((T)key).hashCode();
final int hashCode = key.hashCode();
final int index = getIndex(hashCode);
return containsElem(index, (T) key, hashCode);
return getContainedElem(index, key, hashCode);
}
/**
* Check if the set contains given element at given index.
* Check if the set contains given element at given index. If it
* does, return that element.
*
* @return true if element present, false otherwise.
* @return the element, or null, if no element matches
*/
protected boolean containsElem(int index, final T key, int hashCode) {
protected T getContainedElem(int index, final T key, int hashCode) {
for (LinkedElement<T> e = entries[index]; e != null; e = e.next) {
// element found
if (hashCode == e.hashCode && e.element.equals(key)) {
return true;
return e.element;
}
}
// element not found
return false;
return null;
}
/**
@ -240,7 +251,7 @@ public class LightWeightHashSet<T> implements Collection<T> {
final int hashCode = element.hashCode();
final int index = getIndex(hashCode);
// return false if already present
if (containsElem(index, element, hashCode)) {
if (getContainedElem(index, element, hashCode) != null) {
return false;
}

View File

@ -88,7 +88,7 @@ public class LightWeightLinkedSet<T> extends LightWeightHashSet<T> {
final int hashCode = element.hashCode();
final int index = getIndex(hashCode);
// return false if already present
if (containsElem(index, element, hashCode)) {
if (getContainedElem(index, element, hashCode) != null) {
return false;
}

View File

@ -84,6 +84,16 @@ message RollEditLogResponseProto {
required CheckpointSignatureProto signature = 1;
}
/**
* void request
*/
message GetMostRecentCheckpointTxIdRequestProto {
}
message GetMostRecentCheckpointTxIdResponseProto{
required uint64 txId = 1;
}
/**
* registration - Namenode reporting the error
* errorCode - error code indicating the error
@ -188,13 +198,19 @@ service NamenodeProtocolService {
rpc getTransactionId(GetTransactionIdRequestProto)
returns(GetTransactionIdResponseProto);
/**
* Get the transaction ID of the most recently persisted editlog record
*/
rpc getMostRecentCheckpointTxId(GetMostRecentCheckpointTxIdRequestProto)
returns(GetMostRecentCheckpointTxIdResponseProto);
/**
* Close the current editlog and open a new one for checkpointing purposes
*/
rpc rollEditLog(RollEditLogRequestProto) returns(RollEditLogResponseProto);
/**
* Close the current editlog and open a new one for checkpointing purposes
* Request info about the version running on this NameNode
*/
rpc versionRequest(VersionRequestProto) returns(VersionResponseProto);

View File

@ -114,7 +114,7 @@ public class TestJspHelper {
UserGroupInformation ugi = JspHelper.getUGI(context, request, conf);
Token<? extends TokenIdentifier> tokenInUgi = ugi.getTokens().iterator()
.next();
Assert.assertEquals(tokenInUgi.getService().toString(), expected);
Assert.assertEquals(expected, tokenInUgi.getService().toString());
}

View File

@ -425,7 +425,7 @@ public class TestBlockRecovery {
DataNode spyDN = spy(dn);
doThrow(new RecoveryInProgressException("Replica recovery is in progress")).
when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
d.join();
verify(spyDN, never()).syncBlock(
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
@ -445,7 +445,7 @@ public class TestBlockRecovery {
DataNode spyDN = spy(dn);
doThrow(new IOException()).
when(spyDN).initReplicaRecovery(any(RecoveringBlock.class));
Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
d.join();
verify(spyDN, never()).syncBlock(
any(RecoveringBlock.class), anyListOf(BlockRecord.class));
@ -465,7 +465,7 @@ public class TestBlockRecovery {
doReturn(new ReplicaRecoveryInfo(block.getBlockId(), 0,
block.getGenerationStamp(), ReplicaState.FINALIZED)).when(spyDN).
initReplicaRecovery(any(RecoveringBlock.class));
Daemon d = spyDN.recoverBlocks(initRecoveringBlocks());
Daemon d = spyDN.recoverBlocks("fake NN", initRecoveringBlocks());
d.join();
DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
verify(dnP).commitBlockSynchronization(

View File

@ -765,7 +765,7 @@ public class TestEditLog extends TestCase {
tracker = new FSEditLogLoader.PositionTrackingInputStream(in);
in = new DataInputStream(tracker);
reader = new FSEditLogOp.Reader(in, version);
reader = new FSEditLogOp.Reader(in, tracker, version);
}
@Override

View File

@ -22,8 +22,10 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.nio.channels.FileChannel;
@ -316,4 +318,47 @@ public class TestFSEditLogLoader {
fis.close();
}
}
@Test
public void testStreamLimiter() throws IOException {
final File LIMITER_TEST_FILE = new File(TEST_DIR, "limiter.test");
FileOutputStream fos = new FileOutputStream(LIMITER_TEST_FILE);
try {
fos.write(0x12);
fos.write(0x12);
fos.write(0x12);
} finally {
fos.close();
}
FileInputStream fin = new FileInputStream(LIMITER_TEST_FILE);
BufferedInputStream bin = new BufferedInputStream(fin);
FSEditLogLoader.PositionTrackingInputStream tracker =
new FSEditLogLoader.PositionTrackingInputStream(bin);
try {
tracker.setLimit(2);
tracker.mark(100);
tracker.read();
tracker.read();
try {
tracker.read();
fail("expected to get IOException after reading past the limit");
} catch (IOException e) {
}
tracker.reset();
tracker.mark(100);
byte arr[] = new byte[3];
try {
tracker.read(arr);
fail("expected to get IOException after reading past the limit");
} catch (IOException e) {
}
tracker.reset();
arr = new byte[2];
tracker.read(arr);
} finally {
tracker.close();
}
}
}

View File

@ -21,17 +21,26 @@ import static org.junit.Assert.*;
import java.io.IOException;
import javax.servlet.ServletContext;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authentication.util.KerberosName;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.junit.Test;
import org.mockito.ArgumentMatcher;
import org.mockito.Mockito;
public class TestGetImageServlet {
@Test
public void testIsValidRequestorWithHa() throws IOException {
public void testIsValidRequestor() throws IOException {
Configuration conf = new HdfsConfiguration();
KerberosName.setRules("RULE:[1:$1]\nRULE:[2:$1]");
// Set up generic HA configs.
conf.set(DFSConfigKeys.DFS_FEDERATION_NAMESERVICES, "ns1");
@ -53,8 +62,33 @@ public class TestGetImageServlet {
// Initialize this conf object as though we're running on NN1.
NameNode.initializeGenericKeys(conf, "ns1", "nn1");
AccessControlList acls = Mockito.mock(AccessControlList.class);
Mockito.when(acls.isUserAllowed(Mockito.<UserGroupInformation>any())).thenReturn(false);
ServletContext context = Mockito.mock(ServletContext.class);
Mockito.when(context.getAttribute(HttpServer.ADMINS_ACL)).thenReturn(acls);
// Make sure that NN2 is considered a valid fsimage/edits requestor.
assertTrue(GetImageServlet.isValidRequestor("hdfs/host2@TEST-REALM.COM",
conf));
assertTrue(GetImageServlet.isValidRequestor(context,
"hdfs/host2@TEST-REALM.COM", conf));
// Mark atm as an admin.
Mockito.when(acls.isUserAllowed(Mockito.argThat(new ArgumentMatcher<UserGroupInformation>() {
@Override
public boolean matches(Object argument) {
return ((UserGroupInformation) argument).getShortUserName().equals("atm");
}
}))).thenReturn(true);
// Make sure that NN2 is still considered a valid requestor.
assertTrue(GetImageServlet.isValidRequestor(context,
"hdfs/host2@TEST-REALM.COM", conf));
// Make sure an admin is considered a valid requestor.
assertTrue(GetImageServlet.isValidRequestor(context,
"atm@TEST-REALM.COM", conf));
// Make sure other users are *not* considered valid requestors.
assertFalse(GetImageServlet.isValidRequestor(context,
"todd@TEST-REALM.COM", conf));
}
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.hdfs.util.MD5FileUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.test.GenericTestUtils;
@ -517,14 +518,15 @@ public class TestSaveNamespace {
try {
doAnEdit(fsn, 1);
final Canceler canceler = new Canceler();
// Save namespace
fsn.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
try {
Future<Void> saverFuture = pool.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
image.saveNamespace(finalFsn);
image.saveNamespace(finalFsn, canceler);
return null;
}
});
@ -534,7 +536,7 @@ public class TestSaveNamespace {
// then cancel the saveNamespace
Future<Void> cancelFuture = pool.submit(new Callable<Void>() {
public Void call() throws Exception {
image.cancelSaveNamespace("cancelled");
canceler.cancel("cancelled");
return null;
}
});

View File

@ -23,6 +23,7 @@ import java.io.IOException;
import java.net.URI;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -35,6 +36,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.namenode.NamenodeFsck;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.GenericTestUtils.LogCapturer;
import org.junit.After;
@ -43,6 +45,7 @@ import org.junit.Test;
import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import static org.junit.Assert.*;
@ -177,7 +180,7 @@ public class TestBootstrapStandby {
logs.stopCapturing();
}
GenericTestUtils.assertMatches(logs.getOutput(),
"FATAL.*Unable to read transaction ids 1-4 from the configured shared");
"FATAL.*Unable to read transaction ids 1-3 from the configured shared");
}
@Test
@ -195,30 +198,29 @@ public class TestBootstrapStandby {
assertEquals(0, rc);
}
/**
* Test that, even if the other node is not active, we are able
* to bootstrap standby from it.
*/
@Test(timeout=30000)
public void testOtherNodeNotActive() throws Exception {
cluster.transitionToStandby(0);
int rc = BootstrapStandby.run(
new String[]{"-nonInteractive"},
cluster.getConfiguration(1));
assertEquals(BootstrapStandby.ERR_CODE_OTHER_NN_NOT_ACTIVE, rc);
// Answer "yes" to the prompt about transition to active
System.setIn(new ByteArrayInputStream("yes\n".getBytes()));
rc = BootstrapStandby.run(
new String[]{"-force"},
cluster.getConfiguration(1));
assertEquals(0, rc);
assertFalse(nn0.getNamesystem().isInStandbyState());
}
private void assertNNFilesMatch() throws Exception {
List<File> curDirs = Lists.newArrayList();
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 0));
curDirs.addAll(FSImageTestUtil.getNameNodeCurrentDirs(cluster, 1));
// Ignore seen_txid file, since the newly bootstrapped standby
// will have a higher seen_txid than the one it bootstrapped from.
Set<String> ignoredFiles = ImmutableSet.of("seen_txid");
FSImageTestUtil.assertParallelFilesAreIdentical(curDirs,
Collections.<String>emptySet());
ignoredFiles);
}
private void removeStandbyNameDirs() {

View File

@ -21,6 +21,7 @@ import static org.junit.Assert.*;
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.util.List;
@ -36,6 +37,11 @@ import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.util.Canceler;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionOutputStream;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@ -52,12 +58,18 @@ public class TestStandbyCheckpoints {
private NameNode nn0, nn1;
private FileSystem fs;
@SuppressWarnings("rawtypes")
@Before
public void setupCluster() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_CHECK_PERIOD_KEY, 1);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_CHECKPOINT_TXNS_KEY, 5);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
conf.setBoolean(DFSConfigKeys.DFS_IMAGE_COMPRESS_KEY, true);
conf.set(DFSConfigKeys.DFS_IMAGE_COMPRESSION_CODEC_KEY,
SlowCodec.class.getCanonicalName());
CompressionCodecFactory.setCodecClasses(conf,
ImmutableList.<Class>of(SlowCodec.class));
MiniDFSNNTopology topology = new MiniDFSNNTopology()
.addNameservice(new MiniDFSNNTopology.NSConf("ns1")
@ -159,14 +171,15 @@ public class TestStandbyCheckpoints {
// We should make exactly one checkpoint at this new txid.
Mockito.verify(spyImage1, Mockito.times(1))
.saveNamespace((FSNamesystem) Mockito.anyObject());
.saveNamespace((FSNamesystem) Mockito.anyObject(),
(Canceler)Mockito.anyObject());
}
/**
* Test cancellation of ongoing checkpoints when failover happens
* mid-checkpoint.
*/
@Test
@Test(timeout=120000)
public void testCheckpointCancellation() throws Exception {
cluster.transitionToStandby(0);
@ -191,16 +204,18 @@ public class TestStandbyCheckpoints {
cluster.transitionToActive(0);
for (int i = 0; i < 10; i++) {
boolean canceledOne = false;
for (int i = 0; i < 10 && !canceledOne; i++) {
doEdits(i*10, i*10 + 10);
cluster.transitionToStandby(0);
cluster.transitionToActive(1);
cluster.transitionToStandby(1);
cluster.transitionToActive(0);
canceledOne = StandbyCheckpointer.getCanceledCount() > 0;
}
assertTrue(StandbyCheckpointer.getCanceledCount() > 0);
assertTrue(canceledOne);
}
private void doEdits(int start, int stop) throws IOException {
@ -209,5 +224,22 @@ public class TestStandbyCheckpoints {
fs.mkdirs(p);
}
}
/**
* A codec which just slows down the saving of the image significantly
* by sleeping a few milliseconds on every write. This makes it easy to
* catch the standby in the middle of saving a checkpoint.
*/
public static class SlowCodec extends GzipCodec {
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
CompressionOutputStream ret = super.createOutputStream(out);
CompressionOutputStream spy = Mockito.spy(ret);
Mockito.doAnswer(new GenericTestUtils.SleepAnswer(2))
.when(spy).write(Mockito.<byte[]>any(), Mockito.anyInt(), Mockito.anyInt());
return spy;
}
}
}

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@ -70,7 +69,7 @@ public class TestStandbyIsHot {
((Log4JLogger)NameNode.stateChangeLog).getLogger().setLevel(Level.ALL);
}
@Test
@Test(timeout=60000)
public void testStandbyIsHot() throws Exception {
Configuration conf = new Configuration();
// We read from the standby to watch block locations
@ -111,6 +110,8 @@ public class TestStandbyIsHot {
// Change replication
LOG.info("Changing replication to 1");
fs.setReplication(TEST_FILE_PATH, (short)1);
BlockManagerTestUtil.computeAllPendingWork(
nn1.getNamesystem().getBlockManager());
waitForBlockLocations(cluster, nn1, TEST_FILE, 1);
nn1.getRpcServer().rollEditLog();
@ -121,6 +122,8 @@ public class TestStandbyIsHot {
// Change back to 3
LOG.info("Changing replication to 3");
fs.setReplication(TEST_FILE_PATH, (short)3);
BlockManagerTestUtil.computeAllPendingWork(
nn1.getNamesystem().getBlockManager());
nn1.getRpcServer().rollEditLog();
LOG.info("Waiting for higher replication to show up on standby");
@ -142,7 +145,7 @@ public class TestStandbyIsHot {
* In the bug, the standby node would only very slowly notice the blocks returning
* to the cluster.
*/
@Test
@Test(timeout=60000)
public void testDatanodeRestarts() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 1024);
@ -224,17 +227,16 @@ public class TestStandbyIsHot {
LOG.info("Got " + numReplicas + " locs: " + locs);
if (numReplicas > expectedReplicas) {
for (DataNode dn : cluster.getDataNodes()) {
DataNodeTestUtils.triggerDeletionReport(dn);
}
cluster.triggerDeletionReports();
}
cluster.triggerHeartbeats();
return numReplicas == expectedReplicas;
} catch (IOException e) {
LOG.warn("No block locations yet: " + e.getMessage());
return false;
}
}
}, 500, 10000);
}, 500, 20000);
}
}

View File

@ -421,5 +421,48 @@ public class TestLightWeightHashSet{
LOG.info("Test other - DONE");
}
@Test
public void testGetElement() {
LightWeightHashSet<TestObject> objSet = new LightWeightHashSet<TestObject>();
TestObject objA = new TestObject("object A");
TestObject equalToObjA = new TestObject("object A");
TestObject objB = new TestObject("object B");
objSet.add(objA);
objSet.add(objB);
assertSame(objA, objSet.getElement(objA));
assertSame(objA, objSet.getElement(equalToObjA));
assertSame(objB, objSet.getElement(objB));
assertNull(objSet.getElement(new TestObject("not in set")));
}
/**
* Wrapper class which is used in
* {@link TestLightWeightHashSet#testGetElement()}
*/
private static class TestObject {
private final String value;
public TestObject(String value) {
super();
this.value = value;
}
@Override
public int hashCode() {
return value.hashCode();
}
@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null) return false;
if (getClass() != obj.getClass())
return false;
TestObject other = (TestObject) obj;
return this.value.equals(other.value);
}
}
}

View File

@ -503,6 +503,11 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4102. job counters not available in Jobhistory webui for
killed jobs (Bhallamudi Venkata Siva Kamesh via tgraves)
MAPREDUCE-3543. Mavenize Gridmix. (tgraves)
MAPREDUCE-4197. Include the hsqldb jar in the hadoop-mapreduce tar
file (Ravi Prakash via tgraves)
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -159,6 +159,11 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
</dependency>
<dependency>
<groupId>org.hsqldb</groupId>
<artifactId>hsqldb</artifactId>
<version>2.0.0</version>
</dependency>
</dependencies>

View File

@ -212,6 +212,11 @@
<artifactId>hadoop-mapreduce-examples</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-gridmix</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>

View File

@ -0,0 +1,131 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<project>
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-project</artifactId>
<version>3.0.0-SNAPSHOT</version>
<relativePath>../../hadoop-project</relativePath>
</parent>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-gridmix</artifactId>
<version>3.0.0-SNAPSHOT</version>
<description>Apache Hadoop Gridmix</description>
<name>Apache Hadoop Gridmix</name>
<packaging>jar</packaging>
<properties>
<hadoop.log.dir>${project.build.directory}/log</hadoop.log.dir>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-annotations</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-hs</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-rumen</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<scope>test</scope>
<type>test-jar</type>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-yarn-server-tests</artifactId>
<type>test-jar</type>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-antrun-plugin</artifactId>
<executions>
<execution>
<id>create-log-dir</id>
<phase>process-test-resources</phase>
<goals>
<goal>run</goal>
</goals>
<configuration>
<target>
<delete dir="${test.build.data}"/>
<mkdir dir="${test.build.data}"/>
<mkdir dir="${hadoop.log.dir}"/>
</target>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<mainClass>org.apache.hadoop.tools.HadoopArchives</mainClass>
</manifest>
</archive>
</configuration>
</plugin>
</plugins>
</build>
</project>

View File

@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.gridmix.Statistics.ClusterStats;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
@ -114,4 +113,4 @@ class ClusterSummarizer implements StatListener<ClusterStats> {
protected String getNamenodeInfo() {
return namenodeInfo;
}
}
}

View File

@ -697,10 +697,10 @@ public class Gridmix extends Configured implements Tool {
}
}
private <T> String getEnumValues(Enum<? extends T>[] e) {
private String getEnumValues(Enum<?>[] e) {
StringBuilder sb = new StringBuilder();
String sep = "";
for (Enum<? extends T> v : e) {
for (Enum<?> v : e) {
sb.append(sep);
sb.append(v.name());
sep = "|";

View File

@ -1,5 +1,7 @@
package org.apache.hadoop.mapred.gridmix;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@ -32,6 +34,7 @@ import java.io.IOException;
* limitations under the License.
*/
public class GridmixTestUtils {
private static final Log LOG = LogFactory.getLog(GridmixTestUtils.class);
static final Path DEST = new Path("/gridmix");
static FileSystem dfs = null;
static MiniDFSCluster dfsCluster = null;
@ -69,15 +72,13 @@ public class GridmixTestUtils {
if(fs.exists(homeDirectory)) {
fs.delete(homeDirectory,true);
}
TestGridmixSubmission.LOG.info(
"Creating Home directory : " + homeDirectory);
LOG.info("Creating Home directory : " + homeDirectory);
fs.mkdirs(homeDirectory);
changePermission(user,homeDirectory, fs);
Path stagingArea =
new Path(conf.get("mapreduce.jobtracker.staging.root.dir",
"/tmp/hadoop/mapred/staging"));
TestGridmixSubmission.LOG.info(
"Creating Staging root directory : " + stagingArea);
LOG.info("Creating Staging root directory : " + stagingArea);
fs.mkdirs(stagingArea);
fs.setPermission(stagingArea, new FsPermission((short) 0777));
} catch (IOException ioe) {

Some files were not shown because too many files have changed in this diff Show More