Merge trunk into branch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1373573 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-08-15 19:04:24 +00:00
commit d5abe22844
66 changed files with 1011 additions and 263 deletions

View File

@ -213,7 +213,10 @@ Branch-2 ( Unreleased changes )
HADOOP-8644. AuthenticatedURL should be able to use SSLFactory. (tucu) HADOOP-8644. AuthenticatedURL should be able to use SSLFactory. (tucu)
HADOOP-8681. add support for HTTPS to the web UIs. (tucu) HADOOP-8581. add support for HTTPS to the web UIs. (tucu)
HADOOP-7754. Expose file descriptors from Hadoop-wrapped local
FileSystems (todd and ahmed via tucu)
IMPROVEMENTS IMPROVEMENTS
@ -284,6 +287,8 @@ Branch-2 ( Unreleased changes )
HADOOP-8620. Add -Drequire.fuse and -Drequire.snappy. (Colin HADOOP-8620. Add -Drequire.fuse and -Drequire.snappy. (Colin
Patrick McCabe via eli) Patrick McCabe via eli)
HADOOP-8687. Upgrade log4j to 1.2.17. (eli)
BUG FIXES BUG FIXES
HADOOP-8372. NetUtils.normalizeHostName() incorrectly handles hostname HADOOP-8372. NetUtils.normalizeHostName() incorrectly handles hostname
@ -418,6 +423,9 @@ Branch-2 ( Unreleased changes )
HADOOP-8660. TestPseudoAuthenticator failing with NPE. (tucu) HADOOP-8660. TestPseudoAuthenticator failing with NPE. (tucu)
HADOOP-8699. some common testcases create core-site.xml in test-classes
making other testcases to fail. (tucu)
Release 2.0.0-alpha - 05-23-2012 Release 2.0.0-alpha - 05-23-2012
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.fs; package org.apache.hadoop.fs;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
import java.io.FileDescriptor;
import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -31,7 +33,7 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
public class BufferedFSInputStream extends BufferedInputStream public class BufferedFSInputStream extends BufferedInputStream
implements Seekable, PositionedReadable { implements Seekable, PositionedReadable, HasFileDescriptor {
/** /**
* Creates a <code>BufferedFSInputStream</code> * Creates a <code>BufferedFSInputStream</code>
* with the specified buffer size, * with the specified buffer size,
@ -97,4 +99,13 @@ public void readFully(long position, byte[] buffer, int offset, int length) thro
public void readFully(long position, byte[] buffer) throws IOException { public void readFully(long position, byte[] buffer) throws IOException {
((FSInputStream)in).readFully(position, buffer); ((FSInputStream)in).readFully(position, buffer);
} }
@Override
public FileDescriptor getFileDescriptor() throws IOException {
if (in instanceof HasFileDescriptor) {
return ((HasFileDescriptor) in).getFileDescriptor();
} else {
return null;
}
}
} }

View File

@ -28,7 +28,7 @@
@InterfaceAudience.Public @InterfaceAudience.Public
@InterfaceStability.Stable @InterfaceStability.Stable
public class FSDataInputStream extends DataInputStream public class FSDataInputStream extends DataInputStream
implements Seekable, PositionedReadable, Closeable, ByteBufferReadable { implements Seekable, PositionedReadable, Closeable, ByteBufferReadable, HasFileDescriptor {
public FSDataInputStream(InputStream in) public FSDataInputStream(InputStream in)
throws IOException { throws IOException {
@ -125,4 +125,15 @@ public int read(ByteBuffer buf) throws IOException {
throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream"); throw new UnsupportedOperationException("Byte-buffer read unsupported by input stream");
} }
@Override
public FileDescriptor getFileDescriptor() throws IOException {
if (in instanceof HasFileDescriptor) {
return ((HasFileDescriptor) in).getFileDescriptor();
} else if (in instanceof FileInputStream) {
return ((FileInputStream) in).getFD();
} else {
return null;
}
}
} }

View File

@ -0,0 +1,40 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.fs;
import java.io.FileDescriptor;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
/**
* Having a FileDescriptor
*/
@InterfaceAudience.Private
@InterfaceStability.Evolving
public interface HasFileDescriptor {
/**
* @return the FileDescriptor
* @throws IOException
*/
public FileDescriptor getFileDescriptor() throws IOException;
}

View File

@ -26,6 +26,7 @@
import java.io.FileOutputStream; import java.io.FileOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.FileDescriptor;
import java.net.URI; import java.net.URI;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
@ -111,7 +112,7 @@ public int read(byte[] data, int offset, int length) throws IOException {
/******************************************************* /*******************************************************
* For open()'s FSInputStream. * For open()'s FSInputStream.
*******************************************************/ *******************************************************/
class LocalFSFileInputStream extends FSInputStream { class LocalFSFileInputStream extends FSInputStream implements HasFileDescriptor {
private FileInputStream fis; private FileInputStream fis;
private long position; private long position;
@ -181,6 +182,11 @@ public long skip(long n) throws IOException {
} }
return value; return value;
} }
@Override
public FileDescriptor getFileDescriptor() throws IOException {
return fis.getFD();
}
} }
public FSDataInputStream open(Path f, int bufferSize) throws IOException { public FSDataInputStream open(Path f, int bufferSize) throws IOException {

View File

@ -88,7 +88,7 @@ public abstract class HAAdmin extends Configured implements Tool {
/** Output stream for errors, for use in tests */ /** Output stream for errors, for use in tests */
protected PrintStream errOut = System.err; protected PrintStream errOut = System.err;
PrintStream out = System.out; protected PrintStream out = System.out;
private RequestSource requestSource = RequestSource.REQUEST_BY_USER; private RequestSource requestSource = RequestSource.REQUEST_BY_USER;
protected abstract HAServiceTarget resolveTarget(String string); protected abstract HAServiceTarget resolveTarget(String string);
@ -439,7 +439,10 @@ private CommandLine parseOpts(String cmdName, Options opts, String[] argv) {
} }
private int help(String[] argv) { private int help(String[] argv) {
if (argv.length != 2) { if (argv.length == 1) { // only -help
printUsage(out);
return 0;
} else if (argv.length != 2) {
printUsage(errOut, "-help"); printUsage(errOut, "-help");
return -1; return -1;
} }
@ -454,7 +457,7 @@ private int help(String[] argv) {
return -1; return -1;
} }
errOut.println(cmd + " [" + usageInfo.args + "]: " + usageInfo.help); out.println(cmd + " [" + usageInfo.args + "]: " + usageInfo.help);
return 0; return 0;
} }

View File

@ -80,6 +80,8 @@ public abstract class ZKFailoverController {
ZK_AUTH_KEY ZK_AUTH_KEY
}; };
protected static final String USAGE =
"Usage: java zkfc [ -formatZK [-force] [-nonInteractive] ]";
/** Unable to format the parent znode in ZK */ /** Unable to format the parent znode in ZK */
static final int ERR_CODE_FORMAT_DENIED = 2; static final int ERR_CODE_FORMAT_DENIED = 2;
@ -248,8 +250,7 @@ private void badArg(String arg) {
} }
private void printUsage() { private void printUsage() {
System.err.println("Usage: " + this.getClass().getSimpleName() + System.err.println(USAGE + "\n");
" [-formatZK [-force | -nonInteractive]]");
} }
private int formatZK(boolean force, boolean interactive) private int formatZK(boolean force, boolean interactive)

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.http; package org.apache.hadoop.http;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -37,6 +38,11 @@ public class HttpConfig {
CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT); CommonConfigurationKeysPublic.HADOOP_SSL_ENABLED_DEFAULT);
} }
@VisibleForTesting
static void setSecure(boolean secure) {
sslEnabled = secure;
}
public static boolean isSecure() { public static boolean isSecure() {
return sslEnabled; return sslEnabled;
} }

View File

@ -73,6 +73,15 @@ public static void resetFirstExitException() {
firstExitException = null; firstExitException = null;
} }
/**
* Reset the tracking of process termination. This is for use
* in unit tests where one test in the suite expects an exit
* but others do not.
*/
public static void resetFirstExitException() {
firstExitException = null;
}
/** /**
* Terminate the current process. Note that terminate is the *only* method * Terminate the current process. Note that terminate is the *only* method
* that should be used to terminate the daemon processes. * that should be used to terminate the daemon processes.

View File

@ -248,4 +248,14 @@ public void testStatistics() throws Exception {
} }
assertEquals(1, fileSchemeCount); assertEquals(1, fileSchemeCount);
} }
public void testHasFileDescriptor() throws IOException {
Configuration conf = new Configuration();
LocalFileSystem fs = FileSystem.getLocal(conf);
Path path = new Path(TEST_ROOT_DIR, "test-file");
writeFile(fs, path, 1);
BufferedFSInputStream bis = new BufferedFSInputStream(
new RawLocalFileSystem().new LocalFSFileInputStream(path), 1024);
assertNotNull(bis.getFileDescriptor());
}
} }

View File

@ -40,7 +40,9 @@ public class TestHAAdmin {
private HAAdmin tool; private HAAdmin tool;
private ByteArrayOutputStream errOutBytes = new ByteArrayOutputStream(); private ByteArrayOutputStream errOutBytes = new ByteArrayOutputStream();
private ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
private String errOutput; private String errOutput;
private String output;
@Before @Before
public void setup() throws IOException { public void setup() throws IOException {
@ -53,12 +55,14 @@ protected HAServiceTarget resolveTarget(String target) {
}; };
tool.setConf(new Configuration()); tool.setConf(new Configuration());
tool.errOut = new PrintStream(errOutBytes); tool.errOut = new PrintStream(errOutBytes);
tool.out = new PrintStream(outBytes);
} }
private void assertOutputContains(String string) { private void assertOutputContains(String string) {
if (!errOutput.contains(string)) { if (!errOutput.contains(string) && !output.contains(string)) {
fail("Expected output to contain '" + string + "' but was:\n" + fail("Expected output to contain '" + string +
errOutput); "' but err_output was:\n" + errOutput +
"\n and output was: \n" + output);
} }
} }
@ -88,17 +92,19 @@ public void testAdminUsage() throws Exception {
@Test @Test
public void testHelp() throws Exception { public void testHelp() throws Exception {
assertEquals(-1, runTool("-help")); assertEquals(0, runTool("-help"));
assertEquals(0, runTool("-help", "transitionToActive")); assertEquals(0, runTool("-help", "transitionToActive"));
assertOutputContains("Transitions the service into Active"); assertOutputContains("Transitions the service into Active");
} }
private Object runTool(String ... args) throws Exception { private Object runTool(String ... args) throws Exception {
errOutBytes.reset(); errOutBytes.reset();
outBytes.reset();
LOG.info("Running: HAAdmin " + Joiner.on(" ").join(args)); LOG.info("Running: HAAdmin " + Joiner.on(" ").join(args));
int ret = tool.run(args); int ret = tool.run(args);
errOutput = new String(errOutBytes.toByteArray(), Charsets.UTF_8); errOutput = new String(errOutBytes.toByteArray(), Charsets.UTF_8);
LOG.info("Output:\n" + errOutput); output = new String(outBytes.toByteArray(), Charsets.UTF_8);
LOG.info("Err_output:\n" + errOutput + "\nOutput:\n" + output);
return ret; return ret;
} }
} }

View File

@ -41,6 +41,8 @@
* corresponding HTTPS URL. * corresponding HTTPS URL.
*/ */
public class TestSSLHttpServer extends HttpServerFunctionalTest { public class TestSSLHttpServer extends HttpServerFunctionalTest {
private static final String CONFIG_SITE_XML = "sslhttpserver-site.xml";
private static final String BASEDIR = private static final String BASEDIR =
System.getProperty("test.build.dir", "target/test-dir") + "/" + System.getProperty("test.build.dir", "target/test-dir") + "/" +
TestSSLHttpServer.class.getSimpleName(); TestSSLHttpServer.class.getSimpleName();
@ -49,8 +51,10 @@ public class TestSSLHttpServer extends HttpServerFunctionalTest {
private static HttpServer server; private static HttpServer server;
private static URL baseUrl; private static URL baseUrl;
@Before @Before
public void setup() throws Exception { public void setup() throws Exception {
HttpConfig.setSecure(true);
File base = new File(BASEDIR); File base = new File(BASEDIR);
FileUtil.fullyDelete(base); FileUtil.fullyDelete(base);
base.mkdirs(); base.mkdirs();
@ -66,11 +70,12 @@ public void setup() throws Exception {
//we do this trick because the MR AppMaster is started in another VM and //we do this trick because the MR AppMaster is started in another VM and
//the HttpServer configuration is not loaded from the job.xml but from the //the HttpServer configuration is not loaded from the job.xml but from the
//site.xml files in the classpath //site.xml files in the classpath
Writer writer = new FileWriter(classpathDir + "/core-site.xml"); Writer writer = new FileWriter(new File(classpathDir, CONFIG_SITE_XML));
conf.writeXml(writer); conf.writeXml(writer);
writer.close(); writer.close();
conf.setInt(HttpServer.HTTP_MAX_THREADS, 10); conf.setInt(HttpServer.HTTP_MAX_THREADS, 10);
conf.addResource(CONFIG_SITE_XML);
server = createServer("test", conf); server = createServer("test", conf);
server.addServlet("echo", "/echo", TestHttpServer.EchoServlet.class); server.addServlet("echo", "/echo", TestHttpServer.EchoServlet.class);
server.start(); server.start();
@ -83,7 +88,8 @@ public void cleanup() throws Exception {
server.stop(); server.stop();
String classpathDir = String classpathDir =
KeyStoreTestUtil.getClasspathDir(TestSSLHttpServer.class); KeyStoreTestUtil.getClasspathDir(TestSSLHttpServer.class);
new File(classpathDir + "/core-site.xml").delete(); new File(classpathDir, CONFIG_SITE_XML).delete();
HttpConfig.setSecure(false);
} }
@ -98,7 +104,9 @@ public void testEcho() throws Exception {
private static String readOut(URL url) throws Exception { private static String readOut(URL url) throws Exception {
StringBuilder out = new StringBuilder(); StringBuilder out = new StringBuilder();
HttpsURLConnection conn = (HttpsURLConnection) url.openConnection(); HttpsURLConnection conn = (HttpsURLConnection) url.openConnection();
SSLFactory sslf = new SSLFactory(SSLFactory.Mode.CLIENT, new Configuration()); Configuration conf = new Configuration();
conf.addResource(CONFIG_SITE_XML);
SSLFactory sslf = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
sslf.init(); sslf.init();
conn.setSSLSocketFactory(sslf.createSSLSocketFactory()); conn.setSSLSocketFactory(sslf.createSSLSocketFactory());
InputStream in = conn.getInputStream(); InputStream in = conn.getInputStream();

View File

@ -114,6 +114,12 @@ Trunk (unreleased changes)
HDFS-3789. JournalManager#format() should be able to throw IOException HDFS-3789. JournalManager#format() should be able to throw IOException
(Ivan Kelly via todd) (Ivan Kelly via todd)
HDFS-3723. Add support -h, -help to all the commands. (Jing Zhao via
suresh)
HDFS-3803. Change BlockPoolSliceScanner chatty INFO log to DEBUG.
(Andrew Purtell via suresh)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES
@ -183,6 +189,8 @@ Trunk (unreleased changes)
HDFS-3625. Fix TestBackupNode by properly initializing edit log during HDFS-3625. Fix TestBackupNode by properly initializing edit log during
startup. (Junping Du via todd) startup. (Junping Du via todd)
HDFS-3792. Fix two findbugs introduced by HDFS-3695 (todd)
Branch-2 ( Unreleased changes ) Branch-2 ( Unreleased changes )
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES
@ -209,6 +217,8 @@ Branch-2 ( Unreleased changes )
HDFS-3637. Add support for encrypting the DataTransferProtocol. (atm) HDFS-3637. Add support for encrypting the DataTransferProtocol. (atm)
HDFS-3150. Add option for clients to contact DNs via hostname. (eli)
IMPROVEMENTS IMPROVEMENTS
HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG HDFS-3390. DFSAdmin should print full stack traces of errors when DEBUG
@ -381,6 +391,14 @@ Branch-2 ( Unreleased changes )
HDFS-3276. initializeSharedEdits should have a -nonInteractive flag (todd) HDFS-3276. initializeSharedEdits should have a -nonInteractive flag (todd)
HDFS-3765. namenode -initializeSharedEdits should be able to initialize
all shared storages. (Vinay and todd via todd)
HDFS-3802. StartupOption.name in HdfsServerConstants should be final.
(Jing Zhao via szetszwo)
HDFS-3796. Speed up edit log tests by avoiding fsync() (todd)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2982. Startup performance suffers when there are many edit log HDFS-2982. Startup performance suffers when there are many edit log
@ -587,6 +605,15 @@ Branch-2 ( Unreleased changes )
IOExceptions of stream closures can mask root exceptions. (Uma Maheswara IOExceptions of stream closures can mask root exceptions. (Uma Maheswara
Rao G via szetszwo) Rao G via szetszwo)
HDFS-3790. test_fuse_dfs.c doesn't compile on centos 5. (Colin Patrick
McCabe via atm)
HDFS-3658. Fix bugs in TestDFSClientRetries and add more tests. (szetszwo)
HDFS-3794. WebHDFS OPEN returns the incorrect Content-Length in the HTTP
header when offset is specified and length is omitted.
(Ravi Prakash via szetszwo)
BREAKDOWN OF HDFS-3042 SUBTASKS BREAKDOWN OF HDFS-3042 SUBTASKS
HDFS-2185. HDFS portion of ZK-based FailoverController (todd) HDFS-2185. HDFS portion of ZK-based FailoverController (todd)
@ -1462,6 +1489,9 @@ Release 0.23.3 - UNRELEASED
HDFS-3553. Hftp proxy tokens are broken (daryn) HDFS-3553. Hftp proxy tokens are broken (daryn)
HDFS-3718. Datanode won't shutdown because of runaway DataBlockScanner
thread (Kihwal Lee via daryn)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.contrib.bkjournal; package org.apache.hadoop.contrib.bkjournal;
import static org.junit.Assert.*; import static org.junit.Assert.*;
import org.junit.Test; import org.junit.Test;
import org.junit.Before; import org.junit.Before;
import org.junit.After; import org.junit.After;
@ -25,6 +26,9 @@
import org.junit.AfterClass; import org.junit.AfterClass;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
@ -35,12 +39,16 @@
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSEditLogTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ExitUtil;
import org.apache.hadoop.util.ExitUtil.ExitException; import org.apache.hadoop.util.ExitUtil.ExitException;
import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.proto.BookieServer;
@ -48,7 +56,9 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.URISyntaxException;
/** /**
* Integration test to ensure that the BookKeeper JournalManager * Integration test to ensure that the BookKeeper JournalManager
@ -68,6 +78,11 @@ public static void setupBookkeeper() throws Exception {
bkutil.start(); bkutil.start();
} }
@Before
public void clearExitStatus() {
ExitUtil.resetFirstExitException();
}
@AfterClass @AfterClass
public static void teardownBookkeeper() throws Exception { public static void teardownBookkeeper() throws Exception {
bkutil.teardown(); bkutil.teardown();
@ -244,4 +259,97 @@ public void testMultiplePrimariesStarted() throws Exception {
} }
} }
} }
/**
* Use NameNode INTIALIZESHAREDEDITS to initialize the shared edits. i.e. copy
* the edits log segments to new bkjm shared edits.
*
* @throws Exception
*/
@Test
public void testInitializeBKSharedEdits() throws Exception {
MiniDFSCluster cluster = null;
try {
Configuration conf = new Configuration();
HAUtil.setAllowStandbyReads(conf, true);
conf.setInt(DFSConfigKeys.DFS_HA_TAILEDITS_PERIOD_KEY, 1);
MiniDFSNNTopology topology = MiniDFSNNTopology.simpleHATopology();
cluster = new MiniDFSCluster.Builder(conf).nnTopology(topology)
.numDataNodes(0).build();
cluster.waitActive();
// Shutdown and clear the current filebased shared dir.
cluster.shutdownNameNodes();
File shareddir = new File(cluster.getSharedEditsDir(0, 1));
assertTrue("Initial Shared edits dir not fully deleted",
FileUtil.fullyDelete(shareddir));
// Check namenodes should not start without shared dir.
assertCanNotStartNamenode(cluster, 0);
assertCanNotStartNamenode(cluster, 1);
// Configure bkjm as new shared edits dir in both namenodes
Configuration nn1Conf = cluster.getConfiguration(0);
Configuration nn2Conf = cluster.getConfiguration(1);
nn1Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
.createJournalURI("/initializeSharedEdits").toString());
nn2Conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, BKJMUtil
.createJournalURI("/initializeSharedEdits").toString());
BKJMUtil.addJournalManagerDefinition(nn1Conf);
BKJMUtil.addJournalManagerDefinition(nn2Conf);
// Initialize the BKJM shared edits.
assertFalse(NameNode.initializeSharedEdits(nn1Conf));
// NameNode should be able to start and should be in sync with BKJM as
// shared dir
assertCanStartHANameNodes(cluster, conf, "/testBKJMInitialize");
} finally {
if (cluster != null) {
cluster.shutdown();
}
}
}
private void assertCanNotStartNamenode(MiniDFSCluster cluster, int nnIndex) {
try {
cluster.restartNameNode(nnIndex, false);
fail("Should not have been able to start NN" + (nnIndex)
+ " without shared dir");
} catch (IOException ioe) {
LOG.info("Got expected exception", ioe);
GenericTestUtils.assertExceptionContains(
"Cannot start an HA namenode with name dirs that need recovery", ioe);
}
}
private void assertCanStartHANameNodes(MiniDFSCluster cluster,
Configuration conf, String path) throws ServiceFailedException,
IOException, URISyntaxException, InterruptedException {
// Now should be able to start both NNs. Pass "false" here so that we don't
// try to waitActive on all NNs, since the second NN doesn't exist yet.
cluster.restartNameNode(0, false);
cluster.restartNameNode(1, true);
// Make sure HA is working.
cluster
.getNameNode(0)
.getRpcServer()
.transitionToActive(
new StateChangeRequestInfo(RequestSource.REQUEST_BY_USER));
FileSystem fs = null;
try {
Path newPath = new Path(path);
fs = HATestUtil.configureFailoverFs(cluster, conf);
assertTrue(fs.mkdirs(newPath));
HATestUtil.waitForStandbyToCatchUp(cluster.getNameNode(0),
cluster.getNameNode(1));
assertTrue(NameNodeAdapter.getFileInfo(cluster.getNameNode(1),
newPath.toString(), false).isDir());
} finally {
if (fs != null) {
fs.close();
}
}
}
} }

View File

@ -86,11 +86,11 @@ protected boolean removeEldestEntry(
} }
private synchronized ClientDatanodeProtocol getDatanodeProxy( private synchronized ClientDatanodeProtocol getDatanodeProxy(
DatanodeInfo node, Configuration conf, int socketTimeout) DatanodeInfo node, Configuration conf, int socketTimeout,
throws IOException { boolean connectToDnViaHostname) throws IOException {
if (proxy == null) { if (proxy == null) {
proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf, proxy = DFSUtil.createClientDatanodeProtocolProxy(node, conf,
socketTimeout); socketTimeout, connectToDnViaHostname);
} }
return proxy; return proxy;
} }
@ -156,14 +156,16 @@ private void removeBlockLocalPathInfo(ExtendedBlock b) {
*/ */
static BlockReaderLocal newBlockReader(Configuration conf, String file, static BlockReaderLocal newBlockReader(Configuration conf, String file,
ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node, ExtendedBlock blk, Token<BlockTokenIdentifier> token, DatanodeInfo node,
int socketTimeout, long startOffset, long length) throws IOException { int socketTimeout, long startOffset, long length,
boolean connectToDnViaHostname) throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node
.getIpcPort()); .getIpcPort());
// check the cache first // check the cache first
BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk); BlockLocalPathInfo pathinfo = localDatanodeInfo.getBlockLocalPathInfo(blk);
if (pathinfo == null) { if (pathinfo == null) {
pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token); pathinfo = getBlockPathInfo(blk, node, conf, socketTimeout, token,
connectToDnViaHostname);
} }
// check to see if the file exists. It may so happen that the // check to see if the file exists. It may so happen that the
@ -241,11 +243,12 @@ private static synchronized LocalDatanodeInfo getLocalDatanodeInfo(int port) {
private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk, private static BlockLocalPathInfo getBlockPathInfo(ExtendedBlock blk,
DatanodeInfo node, Configuration conf, int timeout, DatanodeInfo node, Configuration conf, int timeout,
Token<BlockTokenIdentifier> token) throws IOException { Token<BlockTokenIdentifier> token, boolean connectToDnViaHostname)
throws IOException {
LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort()); LocalDatanodeInfo localDatanodeInfo = getLocalDatanodeInfo(node.getIpcPort());
BlockLocalPathInfo pathinfo = null; BlockLocalPathInfo pathinfo = null;
ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node, ClientDatanodeProtocol proxy = localDatanodeInfo.getDatanodeProxy(node,
conf, timeout); conf, timeout, connectToDnViaHostname);
try { try {
// make RPC to local datanode to find local pathnames of blocks // make RPC to local datanode to find local pathnames of blocks
pathinfo = proxy.getBlockLocalPathInfo(blk, token); pathinfo = proxy.getBlockLocalPathInfo(blk, token);

View File

@ -49,6 +49,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_ENCRYPT_DATA_TRANSFER_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.DataInputStream; import java.io.DataInputStream;
@ -213,6 +215,7 @@ static class Conf {
final String taskId; final String taskId;
final FsPermission uMask; final FsPermission uMask;
final boolean useLegacyBlockReader; final boolean useLegacyBlockReader;
final boolean connectToDnViaHostname;
Conf(Configuration conf) { Conf(Configuration conf) {
maxFailoverAttempts = conf.getInt( maxFailoverAttempts = conf.getInt(
@ -263,6 +266,8 @@ static class Conf {
useLegacyBlockReader = conf.getBoolean( useLegacyBlockReader = conf.getBoolean(
DFS_CLIENT_USE_LEGACY_BLOCKREADER, DFS_CLIENT_USE_LEGACY_BLOCKREADER,
DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT); DFS_CLIENT_USE_LEGACY_BLOCKREADER_DEFAULT);
connectToDnViaHostname = conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME,
DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
} }
private int getChecksumType(Configuration conf) { private int getChecksumType(Configuration conf) {
@ -473,6 +478,14 @@ String getClientName() {
return clientName; return clientName;
} }
/**
* @return whether the client should use hostnames instead of IPs
* when connecting to DataNodes
*/
boolean connectToDnViaHostname() {
return dfsClientConf.connectToDnViaHostname;
}
void checkOpen() throws IOException { void checkOpen() throws IOException {
if (!clientRunning) { if (!clientRunning) {
IOException result = new IOException("Filesystem closed"); IOException result = new IOException("Filesystem closed");
@ -729,12 +742,12 @@ public long renewDelegationToken(Token<DelegationTokenIdentifier> token)
*/ */
static BlockReader getLocalBlockReader(Configuration conf, static BlockReader getLocalBlockReader(Configuration conf,
String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken, String src, ExtendedBlock blk, Token<BlockTokenIdentifier> accessToken,
DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock) DatanodeInfo chosenNode, int socketTimeout, long offsetIntoBlock,
throws InvalidToken, IOException { boolean connectToDnViaHostname) throws InvalidToken, IOException {
try { try {
return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken, return BlockReaderLocal.newBlockReader(conf, src, blk, accessToken,
chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes() chosenNode, socketTimeout, offsetIntoBlock, blk.getNumBytes()
- offsetIntoBlock); - offsetIntoBlock, connectToDnViaHostname);
} catch (RemoteException re) { } catch (RemoteException re) {
throw re.unwrapRemoteException(InvalidToken.class, throw re.unwrapRemoteException(InvalidToken.class,
AccessControlException.class); AccessControlException.class);
@ -1425,7 +1438,8 @@ public HdfsFileStatus getFileLinkInfo(String src) throws IOException {
public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException { public MD5MD5CRC32FileChecksum getFileChecksum(String src) throws IOException {
checkOpen(); checkOpen();
return getFileChecksum(src, namenode, socketFactory, return getFileChecksum(src, namenode, socketFactory,
dfsClientConf.socketTimeout, getDataEncryptionKey()); dfsClientConf.socketTimeout, getDataEncryptionKey(),
dfsClientConf.connectToDnViaHostname);
} }
@InterfaceAudience.Private @InterfaceAudience.Private
@ -1471,7 +1485,8 @@ public DataEncryptionKey getDataEncryptionKey()
*/ */
public static MD5MD5CRC32FileChecksum getFileChecksum(String src, public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout, ClientProtocol namenode, SocketFactory socketFactory, int socketTimeout,
DataEncryptionKey encryptionKey) throws IOException { DataEncryptionKey encryptionKey, boolean connectToDnViaHostname)
throws IOException {
//get all block locations //get all block locations
LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE); LocatedBlocks blockLocations = callGetBlockLocations(namenode, src, 0, Long.MAX_VALUE);
if (null == blockLocations) { if (null == blockLocations) {
@ -1509,9 +1524,11 @@ public static MD5MD5CRC32FileChecksum getFileChecksum(String src,
try { try {
//connect to a datanode //connect to a datanode
sock = socketFactory.createSocket(); sock = socketFactory.createSocket();
NetUtils.connect(sock, String dnAddr = datanodes[j].getXferAddr(connectToDnViaHostname);
NetUtils.createSocketAddr(datanodes[j].getXferAddr()), if (LOG.isDebugEnabled()) {
timeout); LOG.debug("Connecting to datanode " + dnAddr);
}
NetUtils.connect(sock, NetUtils.createSocketAddr(dnAddr), timeout);
sock.setSoTimeout(timeout); sock.setSoTimeout(timeout);
OutputStream unbufOut = NetUtils.getOutputStream(sock); OutputStream unbufOut = NetUtils.getOutputStream(sock);

View File

@ -52,6 +52,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT"; public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_POLICY_DEFAULT = "DEFAULT";
public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity"; public static final String DFS_CLIENT_SOCKET_CACHE_CAPACITY_KEY = "dfs.client.socketcache.capacity";
public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16; public static final int DFS_CLIENT_SOCKET_CACHE_CAPACITY_DEFAULT = 16;
public static final String DFS_CLIENT_USE_DN_HOSTNAME = "dfs.client.use.datanode.hostname";
public static final boolean DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT = false;
// HA related configuration // HA related configuration
public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider"; public static final String DFS_CLIENT_FAILOVER_PROXY_PROVIDER_KEY_PREFIX = "dfs.client.failover.proxy.provider";
@ -81,6 +83,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false; public static final boolean DFS_DATANODE_SYNC_BEHIND_WRITES_DEFAULT = false;
public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads"; public static final String DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY = "dfs.datanode.drop.cache.behind.reads";
public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false; public static final boolean DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT = false;
public static final String DFS_DATANODE_USE_DN_HOSTNAME = "dfs.datanode.use.datanode.hostname";
public static final boolean DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT = false;
public static final String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port"; public static final String DFS_NAMENODE_HTTP_PORT_KEY = "dfs.http.port";
public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070; public static final int DFS_NAMENODE_HTTP_PORT_DEFAULT = 50070;

View File

@ -199,7 +199,8 @@ private long readBlockLength(LocatedBlock locatedblock) throws IOException {
try { try {
cdp = DFSUtil.createClientDatanodeProtocolProxy( cdp = DFSUtil.createClientDatanodeProtocolProxy(
datanode, dfsClient.conf, dfsClient.getConf().socketTimeout, locatedblock); datanode, dfsClient.conf, dfsClient.getConf().socketTimeout,
dfsClient.getConf().connectToDnViaHostname, locatedblock);
final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock()); final long n = cdp.getReplicaVisibleLength(locatedblock.getBlock());
@ -716,8 +717,12 @@ private DNAddrPair chooseDataNode(LocatedBlock block)
DatanodeInfo[] nodes = block.getLocations(); DatanodeInfo[] nodes = block.getLocations();
try { try {
DatanodeInfo chosenNode = bestNode(nodes, deadNodes); DatanodeInfo chosenNode = bestNode(nodes, deadNodes);
InetSocketAddress targetAddr = final String dnAddr =
NetUtils.createSocketAddr(chosenNode.getXferAddr()); chosenNode.getXferAddr(dfsClient.connectToDnViaHostname());
if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
}
InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
return new DNAddrPair(chosenNode, targetAddr); return new DNAddrPair(chosenNode, targetAddr);
} catch (IOException ie) { } catch (IOException ie) {
String blockInfo = block.getBlock() + " file=" + src; String blockInfo = block.getBlock() + " file=" + src;
@ -875,7 +880,8 @@ protected BlockReader getBlockReader(InetSocketAddress dnAddr,
if (dfsClient.shouldTryShortCircuitRead(dnAddr)) { if (dfsClient.shouldTryShortCircuitRead(dnAddr)) {
return DFSClient.getLocalBlockReader(dfsClient.conf, src, block, return DFSClient.getLocalBlockReader(dfsClient.conf, src, block,
blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset); blockToken, chosenNode, dfsClient.hdfsTimeout, startOffset,
dfsClient.connectToDnViaHostname());
} }
IOException err = null; IOException err = null;
@ -1183,7 +1189,7 @@ static DatanodeInfo bestNode(DatanodeInfo nodes[],
throw new IOException("No live nodes contain current block"); throw new IOException("No live nodes contain current block");
} }
/** Utility class to encapsulate data node info and its ip address. */ /** Utility class to encapsulate data node info and its address. */
static class DNAddrPair { static class DNAddrPair {
DatanodeInfo info; DatanodeInfo info;
InetSocketAddress addr; InetSocketAddress addr;

View File

@ -1100,7 +1100,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) { if (ie instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
DFSClient.LOG.info("Will fetch a new encryption key and retry, " DFSClient.LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to " + "encryption key was invalid when connecting to "
+ nodes[0].getXferAddr() + " : " + ie); + nodes[0] + " : " + ie);
// The encryption key used is invalid. // The encryption key used is invalid.
refetchEncryptionKey--; refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey(); dfsClient.clearDataEncryptionKey();
@ -1112,7 +1112,8 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes, long newGS,
// find the datanode that matches // find the datanode that matches
if (firstBadLink.length() != 0) { if (firstBadLink.length() != 0) {
for (int i = 0; i < nodes.length; i++) { for (int i = 0; i < nodes.length; i++) {
if (nodes[i].getXferAddr().equals(firstBadLink)) { // NB: Unconditionally using the xfer addr w/o hostname
if (firstBadLink.equals(nodes[i].getXferAddr())) {
errorIndex = i; errorIndex = i;
break; break;
} }
@ -1216,11 +1217,11 @@ private void setLastException(IOException e) {
*/ */
static Socket createSocketForPipeline(final DatanodeInfo first, static Socket createSocketForPipeline(final DatanodeInfo first,
final int length, final DFSClient client) throws IOException { final int length, final DFSClient client) throws IOException {
if(DFSClient.LOG.isDebugEnabled()) { final String dnAddr = first.getXferAddr(client.connectToDnViaHostname());
DFSClient.LOG.debug("Connecting to datanode " + first); if (DFSClient.LOG.isDebugEnabled()) {
DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
} }
final InetSocketAddress isa = final InetSocketAddress isa = NetUtils.createSocketAddr(dnAddr);
NetUtils.createSocketAddr(first.getXferAddr());
final Socket sock = client.socketFactory.createSocket(); final Socket sock = client.socketFactory.createSocket();
final int timeout = client.getDatanodeReadTimeout(length); final int timeout = client.getDatanodeReadTimeout(length);
NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), timeout); NetUtils.connect(sock, isa, client.getRandomLocalInterfaceAddr(), timeout);

View File

@ -18,8 +18,21 @@
package org.apache.hadoop.hdfs; package org.apache.hadoop.hdfs;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HA_NAMENODE_ID_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTPS_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SECONDARY_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMESERVICE_ID;
import java.io.IOException; import java.io.IOException;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException; import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
@ -33,10 +46,17 @@
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.StringTokenizer;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.CommandLineParser;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.commons.cli.PosixParser;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -57,8 +77,7 @@
import org.apache.hadoop.net.NodeBase; import org.apache.hadoop.net.NodeBase;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.commons.logging.Log; import org.apache.hadoop.util.ToolRunner;
import org.apache.commons.logging.LogFactory;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -424,7 +443,6 @@ private static Map<String, InetSocketAddress> getAddressesForNameserviceId(
* *
* @param conf configuration * @param conf configuration
* @return list of InetSocketAddresses * @return list of InetSocketAddresses
* @throws IOException if no addresses are configured
*/ */
public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses( public static Map<String, Map<String, InetSocketAddress>> getHaNnRpcAddresses(
Configuration conf) { Configuration conf) {
@ -841,17 +859,17 @@ public static int roundBytesToGB(long bytes) {
/** Create a {@link ClientDatanodeProtocol} proxy */ /** Create a {@link ClientDatanodeProtocol} proxy */
public static ClientDatanodeProtocol createClientDatanodeProtocolProxy( public static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout, DatanodeID datanodeid, Configuration conf, int socketTimeout,
LocatedBlock locatedBlock) throws IOException { boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout, return new ClientDatanodeProtocolTranslatorPB(datanodeid, conf, socketTimeout,
locatedBlock); connectToDnViaHostname, locatedBlock);
} }
/** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */ /** Create {@link ClientDatanodeProtocol} proxy using kerberos ticket */
static ClientDatanodeProtocol createClientDatanodeProtocolProxy( static ClientDatanodeProtocol createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout) DatanodeID datanodeid, Configuration conf, int socketTimeout,
throws IOException { boolean connectToDnViaHostname) throws IOException {
return new ClientDatanodeProtocolTranslatorPB( return new ClientDatanodeProtocolTranslatorPB(
datanodeid, conf, socketTimeout); datanodeid, conf, socketTimeout, connectToDnViaHostname);
} }
/** Create a {@link ClientDatanodeProtocol} proxy */ /** Create a {@link ClientDatanodeProtocol} proxy */
@ -1073,4 +1091,44 @@ public static String getOnlyNameServiceIdOrNull(Configuration conf) {
return null; return null;
} }
} }
public static Options helpOptions = new Options();
public static Option helpOpt = new Option("h", "help", false,
"get help information");
static {
helpOptions.addOption(helpOpt);
}
/**
* Parse the arguments for commands
*
* @param args the argument to be parsed
* @param helpDescription help information to be printed out
* @param out Printer
* @param printGenericCommandUsage whether to print the
* generic command usage defined in ToolRunner
* @return true when the argument matches help option, false if not
*/
public static boolean parseHelpArgument(String[] args,
String helpDescription, PrintStream out, boolean printGenericCommandUsage) {
if (args.length == 1) {
try {
CommandLineParser parser = new PosixParser();
CommandLine cmdLine = parser.parse(helpOptions, args);
if (cmdLine.hasOption(helpOpt.getOpt())
|| cmdLine.hasOption(helpOpt.getLongOpt())) {
// should print out the help information
out.println(helpDescription + "\n");
if (printGenericCommandUsage) {
ToolRunner.printGenericCommandUsage(out);
}
return true;
}
} catch (ParseException pe) {
return false;
}
}
return false;
}
} }

View File

@ -104,7 +104,7 @@ public String getXferAddr() {
/** /**
* @return IP:ipcPort string * @return IP:ipcPort string
*/ */
public String getIpcAddr() { private String getIpcAddr() {
return ipAddr + ":" + ipcPort; return ipAddr + ":" + ipcPort;
} }
@ -122,6 +122,29 @@ public String getXferAddrWithHostname() {
return hostName + ":" + xferPort; return hostName + ":" + xferPort;
} }
/**
* @return hostname:ipcPort
*/
private String getIpcAddrWithHostname() {
return hostName + ":" + ipcPort;
}
/**
* @param useHostname true to use the DN hostname, use the IP otherwise
* @return name:xferPort
*/
public String getXferAddr(boolean useHostname) {
return useHostname ? getXferAddrWithHostname() : getXferAddr();
}
/**
* @param useHostname true to use the DN hostname, use the IP otherwise
* @return name:ipcPort
*/
public String getIpcAddr(boolean useHostname) {
return useHostname ? getIpcAddrWithHostname() : getIpcAddr();
}
/** /**
* @return data storage ID. * @return data storage ID.
*/ */

View File

@ -73,10 +73,10 @@ public class ClientDatanodeProtocolTranslatorPB implements
RefreshNamenodesRequestProto.newBuilder().build(); RefreshNamenodesRequestProto.newBuilder().build();
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
Configuration conf, int socketTimeout, LocatedBlock locatedBlock) Configuration conf, int socketTimeout, boolean connectToDnViaHostname,
throws IOException { LocatedBlock locatedBlock) throws IOException {
rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf, rpcProxy = createClientDatanodeProtocolProxy( datanodeid, conf,
socketTimeout, locatedBlock); socketTimeout, connectToDnViaHostname, locatedBlock);
} }
public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr, public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr,
@ -90,11 +90,17 @@ public ClientDatanodeProtocolTranslatorPB(InetSocketAddress addr,
* @param datanodeid Datanode to connect to. * @param datanodeid Datanode to connect to.
* @param conf Configuration. * @param conf Configuration.
* @param socketTimeout Socket timeout to use. * @param socketTimeout Socket timeout to use.
* @param connectToDnViaHostname connect to the Datanode using its hostname
* @throws IOException * @throws IOException
*/ */
public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid, public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
Configuration conf, int socketTimeout) throws IOException { Configuration conf, int socketTimeout, boolean connectToDnViaHostname)
InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr()); throws IOException {
final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
}
rpcProxy = createClientDatanodeProtocolProxy(addr, rpcProxy = createClientDatanodeProtocolProxy(addr,
UserGroupInformation.getCurrentUser(), conf, UserGroupInformation.getCurrentUser(), conf,
NetUtils.getDefaultSocketFactory(conf), socketTimeout); NetUtils.getDefaultSocketFactory(conf), socketTimeout);
@ -102,10 +108,11 @@ public ClientDatanodeProtocolTranslatorPB(DatanodeID datanodeid,
static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy( static ClientDatanodeProtocolPB createClientDatanodeProtocolProxy(
DatanodeID datanodeid, Configuration conf, int socketTimeout, DatanodeID datanodeid, Configuration conf, int socketTimeout,
LocatedBlock locatedBlock) throws IOException { boolean connectToDnViaHostname, LocatedBlock locatedBlock) throws IOException {
InetSocketAddress addr = NetUtils.createSocketAddr(datanodeid.getIpcAddr()); final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("ClientDatanodeProtocol addr=" + addr); LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
} }
// Since we're creating a new UserGroupInformation here, we know that no // Since we're creating a new UserGroupInformation here, we know that no

View File

@ -17,6 +17,7 @@
*/ */
package org.apache.hadoop.hdfs.server.balancer; package org.apache.hadoop.hdfs.server.balancer;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed; import static org.apache.hadoop.hdfs.protocol.HdfsProtoUtil.vintPrefixed;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
@ -26,6 +27,7 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintStream;
import java.net.Socket; import java.net.Socket;
import java.net.URI; import java.net.URI;
import java.text.DateFormat; import java.text.DateFormat;
@ -68,7 +70,6 @@
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicy;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault; import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.Util;
import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -79,7 +80,6 @@
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
import static com.google.common.base.Preconditions.checkArgument;
/** <p>The balancer is a tool that balances disk space usage on an HDFS cluster /** <p>The balancer is a tool that balances disk space usage on an HDFS cluster
* when some datanodes become full or when new empty nodes join the cluster. * when some datanodes become full or when new empty nodes join the cluster.
@ -189,6 +189,13 @@ public class Balancer {
*/ */
public static final int MAX_NUM_CONCURRENT_MOVES = 5; public static final int MAX_NUM_CONCURRENT_MOVES = 5;
private static final String USAGE = "Usage: java "
+ Balancer.class.getSimpleName()
+ "\n\t[-policy <policy>]\tthe balancing policy: "
+ BalancingPolicy.Node.INSTANCE.getName() + " or "
+ BalancingPolicy.Pool.INSTANCE.getName()
+ "\n\t[-threshold <threshold>]\tPercentage of disk capacity";
private final NameNodeConnector nnc; private final NameNodeConnector nnc;
private final BalancingPolicy policy; private final BalancingPolicy policy;
private final double threshold; private final double threshold;
@ -1550,7 +1557,7 @@ static Parameters parse(String[] args) {
} }
} }
} catch(RuntimeException e) { } catch(RuntimeException e) {
printUsage(); printUsage(System.err);
throw e; throw e;
} }
} }
@ -1558,13 +1565,8 @@ static Parameters parse(String[] args) {
return new Parameters(policy, threshold); return new Parameters(policy, threshold);
} }
private static void printUsage() { private static void printUsage(PrintStream out) {
System.out.println("Usage: java " + Balancer.class.getSimpleName()); out.println(USAGE + "\n");
System.out.println(" [-policy <policy>]\tthe balancing policy: "
+ BalancingPolicy.Node.INSTANCE.getName() + " or "
+ BalancingPolicy.Pool.INSTANCE.getName());
System.out.println(
" [-threshold <threshold>]\tPercentage of disk capacity");
} }
} }
@ -1573,6 +1575,10 @@ private static void printUsage() {
* @param args Command line arguments * @param args Command line arguments
*/ */
public static void main(String[] args) { public static void main(String[] args) {
if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) {
System.exit(0);
}
try { try {
System.exit(ToolRunner.run(new HdfsConfiguration(), new Cli(), args)); System.exit(ToolRunner.run(new HdfsConfiguration(), new Cli(), args));
} catch (Throwable e) { } catch (Throwable e) {

View File

@ -61,7 +61,7 @@ static public enum StartupOption{
FORCE("-force"), FORCE("-force"),
NONINTERACTIVE("-nonInteractive"); NONINTERACTIVE("-nonInteractive");
private String name = null; private final String name;
// Used only with format and upgrade options // Used only with format and upgrade options
private String clusterId = null; private String clusterId = null;

View File

@ -554,9 +554,11 @@ private synchronized void updateBytesLeft(long len) {
} }
private synchronized void startNewPeriod() { private synchronized void startNewPeriod() {
LOG.info("Starting a new period : work left in prev period : " if (LOG.isDebugEnabled()) {
LOG.debug("Starting a new period : work left in prev period : "
+ String.format("%.2f%%", totalBytesToScan == 0 ? 0 + String.format("%.2f%%", totalBytesToScan == 0 ? 0
: (bytesLeft * 100.0) / totalBytesToScan)); : (bytesLeft * 100.0) / totalBytesToScan));
}
// reset the byte counts : // reset the byte counts :
bytesLeft = totalBytesToScan; bytesLeft = totalBytesToScan;

View File

@ -55,7 +55,7 @@ class DNConf {
final boolean dropCacheBehindReads; final boolean dropCacheBehindReads;
final boolean syncOnClose; final boolean syncOnClose;
final boolean encryptDataTransfer; final boolean encryptDataTransfer;
final boolean connectToDnViaHostname;
final long readaheadLength; final long readaheadLength;
final long heartBeatInterval; final long heartBeatInterval;
@ -97,7 +97,9 @@ public DNConf(Configuration conf) {
dropCacheBehindReads = conf.getBoolean( dropCacheBehindReads = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY, DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_KEY,
DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT); DFSConfigKeys.DFS_DATANODE_DROP_CACHE_BEHIND_READS_DEFAULT);
connectToDnViaHostname = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, this.blockReportInterval = conf.getLong(DFS_BLOCKREPORT_INTERVAL_MSEC_KEY,
DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT);

View File

@ -46,6 +46,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_STARTUP_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_USER_NAME_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HTTPS_ENABLE_KEY;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.BufferedOutputStream; import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
@ -55,6 +56,7 @@
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
@ -98,8 +100,8 @@
import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil; import org.apache.hadoop.hdfs.protocol.HdfsProtoUtil;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair; import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender; import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService; import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
@ -124,9 +126,6 @@
import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.JspHelper;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.common.Util;
import static org.apache.hadoop.util.ExitUtil.terminate;
import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
@ -171,9 +170,9 @@
import org.apache.hadoop.util.VersionInfo; import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON; import org.mortbay.util.ajax.JSON;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner; import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
/********************************************************** /**********************************************************
@ -231,6 +230,8 @@ public class DataNode extends Configured
static final Log ClientTraceLog = static final Log ClientTraceLog =
LogFactory.getLog(DataNode.class.getName() + ".clienttrace"); LogFactory.getLog(DataNode.class.getName() + ".clienttrace");
private static final String USAGE = "Usage: java DataNode [-rollback | -regular]";
/** /**
* Use {@link NetUtils#createSocketAddr(String)} instead. * Use {@link NetUtils#createSocketAddr(String)} instead.
*/ */
@ -276,6 +277,7 @@ public static InetSocketAddress createSocketAddr(String target) {
private Configuration conf; private Configuration conf;
private final String userWithLocalPathAccess; private final String userWithLocalPathAccess;
private boolean connectToDnViaHostname;
ReadaheadPool readaheadPool; ReadaheadPool readaheadPool;
/** /**
@ -296,8 +298,11 @@ public static InetSocketAddress createSocketAddr(String target) {
final SecureResources resources) throws IOException { final SecureResources resources) throws IOException {
super(conf); super(conf);
this.userWithLocalPathAccess = conf this.userWithLocalPathAccess =
.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY); conf.get(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY);
this.connectToDnViaHostname = conf.getBoolean(
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,
DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT);
try { try {
hostName = getHostName(conf); hostName = getHostName(conf);
LOG.info("Configured hostname is " + hostName); LOG.info("Configured hostname is " + hostName);
@ -878,7 +883,7 @@ public String getDisplayName() {
/** /**
* NB: The datanode can perform data transfer on the streaming * NB: The datanode can perform data transfer on the streaming
* address however clients are given the IPC IP address for data * address however clients are given the IPC IP address for data
* transfer, and that may be be a different address. * transfer, and that may be a different address.
* *
* @return socket address for data transfer * @return socket address for data transfer
*/ */
@ -925,12 +930,12 @@ DatanodeProtocolClientSideTranslatorPB connectToNN(
} }
public static InterDatanodeProtocol createInterDataNodeProtocolProxy( public static InterDatanodeProtocol createInterDataNodeProtocolProxy(
DatanodeID datanodeid, final Configuration conf, final int socketTimeout) DatanodeID datanodeid, final Configuration conf, final int socketTimeout,
throws IOException { final boolean connectToDnViaHostname) throws IOException {
final InetSocketAddress addr = final String dnAddr = datanodeid.getIpcAddr(connectToDnViaHostname);
NetUtils.createSocketAddr(datanodeid.getIpcAddr()); final InetSocketAddress addr = NetUtils.createSocketAddr(dnAddr);
if (InterDatanodeProtocol.LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
InterDatanodeProtocol.LOG.debug("InterDatanodeProtocol addr=" + addr); LOG.debug("Connecting to datanode " + dnAddr + " addr=" + addr);
} }
final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser(); final UserGroupInformation loginUgi = UserGroupInformation.getLoginUser();
try { try {
@ -1061,6 +1066,7 @@ public void shutdown() {
} }
} }
this.shouldRun = false;
shutdownPeriodicScanners(); shutdownPeriodicScanners();
if (infoServer != null) { if (infoServer != null) {
@ -1074,7 +1080,6 @@ public void shutdown() {
ipcServer.stop(); ipcServer.stop();
} }
this.shouldRun = false;
if (dataXceiverServer != null) { if (dataXceiverServer != null) {
((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill(); ((DataXceiverServer) this.dataXceiverServer.getRunnable()).kill();
this.dataXceiverServer.interrupt(); this.dataXceiverServer.interrupt();
@ -1386,8 +1391,11 @@ public void run() {
final boolean isClient = clientname.length() > 0; final boolean isClient = clientname.length() > 0;
try { try {
InetSocketAddress curTarget = final String dnAddr = targets[0].getXferAddr(connectToDnViaHostname);
NetUtils.createSocketAddr(targets[0].getXferAddr()); InetSocketAddress curTarget = NetUtils.createSocketAddr(dnAddr);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr);
}
sock = newSocket(); sock = newSocket();
NetUtils.connect(sock, curTarget, dnConf.socketTimeout); NetUtils.connect(sock, curTarget, dnConf.socketTimeout);
sock.setSoTimeout(targets.length * dnConf.socketTimeout); sock.setSoTimeout(targets.length * dnConf.socketTimeout);
@ -1534,7 +1542,7 @@ public static DataNode instantiateDataNode(String args [], Configuration conf,
} }
if (!parseArguments(args, conf)) { if (!parseArguments(args, conf)) {
printUsage(); printUsage(System.err);
return null; return null;
} }
Collection<URI> dataDirs = getStorageDirs(conf); Collection<URI> dataDirs = getStorageDirs(conf);
@ -1648,9 +1656,8 @@ public String toString() {
+ xmitsInProgress.get() + "}"; + xmitsInProgress.get() + "}";
} }
private static void printUsage() { private static void printUsage(PrintStream out) {
System.err.println("Usage: java DataNode"); out.println(USAGE + "\n");
System.err.println(" [-rollback]");
} }
/** /**
@ -1735,6 +1742,10 @@ public static void secureMain(String args[], SecureResources resources) {
} }
public static void main(String args[]) { public static void main(String args[]) {
if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {
System.exit(0);
}
secureMain(args, null); secureMain(args, null);
} }
@ -1843,7 +1854,7 @@ private void recoverBlock(RecoveringBlock rBlock) throws IOException {
DatanodeRegistration bpReg = bpos.bpRegistration; DatanodeRegistration bpReg = bpos.bpRegistration;
InterDatanodeProtocol datanode = bpReg.equals(id)? InterDatanodeProtocol datanode = bpReg.equals(id)?
this: DataNode.createInterDataNodeProtocolProxy(id, getConf(), this: DataNode.createInterDataNodeProtocolProxy(id, getConf(),
dnConf.socketTimeout); dnConf.socketTimeout, dnConf.connectToDnViaHostname);
ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock); ReplicaRecoveryInfo info = callInitReplicaRecovery(datanode, rBlock);
if (info != null && if (info != null &&
info.getGenerationStamp() >= block.getGenerationStamp() && info.getGenerationStamp() >= block.getGenerationStamp() &&

View File

@ -86,7 +86,7 @@ class DataXceiver extends Receiver implements Runnable {
private final DataNode datanode; private final DataNode datanode;
private final DNConf dnConf; private final DNConf dnConf;
private final DataXceiverServer dataXceiverServer; private final DataXceiverServer dataXceiverServer;
private final boolean connectToDnViaHostname;
private long opStartTime; //the start time of receiving an Op private long opStartTime; //the start time of receiving an Op
private final SocketInputWrapper socketIn; private final SocketInputWrapper socketIn;
private OutputStream socketOut; private OutputStream socketOut;
@ -113,6 +113,7 @@ private DataXceiver(Socket s,
this.isLocal = s.getInetAddress().equals(s.getLocalAddress()); this.isLocal = s.getInetAddress().equals(s.getLocalAddress());
this.datanode = datanode; this.datanode = datanode;
this.dataXceiverServer = dataXceiverServer; this.dataXceiverServer = dataXceiverServer;
this.connectToDnViaHostname = datanode.getDnConf().connectToDnViaHostname;
remoteAddress = s.getRemoteSocketAddress().toString(); remoteAddress = s.getRemoteSocketAddress().toString();
localAddress = s.getLocalSocketAddress().toString(); localAddress = s.getLocalSocketAddress().toString();
@ -404,7 +405,10 @@ public void writeBlock(final ExtendedBlock block,
if (targets.length > 0) { if (targets.length > 0) {
InetSocketAddress mirrorTarget = null; InetSocketAddress mirrorTarget = null;
// Connect to backup machine // Connect to backup machine
mirrorNode = targets[0].getXferAddr(); mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + mirrorNode);
}
mirrorTarget = NetUtils.createSocketAddr(mirrorNode); mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
mirrorSock = datanode.newSocket(); mirrorSock = datanode.newSocket();
try { try {
@ -457,7 +461,8 @@ public void writeBlock(final ExtendedBlock block,
if (isClient) { if (isClient) {
BlockOpResponseProto.newBuilder() BlockOpResponseProto.newBuilder()
.setStatus(ERROR) .setStatus(ERROR)
.setFirstBadLink(mirrorNode) // NB: Unconditionally using the xfer addr w/o hostname
.setFirstBadLink(targets[0].getXferAddr())
.build() .build()
.writeDelimitedTo(replyOut); .writeDelimitedTo(replyOut);
replyOut.flush(); replyOut.flush();
@ -729,8 +734,11 @@ public void replaceBlock(final ExtendedBlock block,
try { try {
// get the output stream to the proxy // get the output stream to the proxy
InetSocketAddress proxyAddr = final String dnAddr = proxySource.getXferAddr(connectToDnViaHostname);
NetUtils.createSocketAddr(proxySource.getXferAddr()); if (LOG.isDebugEnabled()) {
LOG.debug("Connecting to datanode " + dnAddr);
}
InetSocketAddress proxyAddr = NetUtils.createSocketAddr(dnAddr);
proxySock = datanode.newSocket(); proxySock = datanode.newSocket();
NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout); NetUtils.connect(proxySock, proxyAddr, dnConf.socketTimeout);
proxySock.setSoTimeout(dnConf.socketTimeout); proxySock.setSoTimeout(dnConf.socketTimeout);
@ -891,6 +899,7 @@ private void checkAccess(DataOutputStream out, final boolean reply,
if (mode == BlockTokenSecretManager.AccessMode.WRITE) { if (mode == BlockTokenSecretManager.AccessMode.WRITE) {
DatanodeRegistration dnR = DatanodeRegistration dnR =
datanode.getDNRegistrationForBP(blk.getBlockPoolId()); datanode.getDNRegistrationForBP(blk.getBlockPoolId());
// NB: Unconditionally using the xfer addr w/o hostname
resp.setFirstBadLink(dnR.getXferAddr()); resp.setFirstBadLink(dnR.getXferAddr());
} }
resp.build().writeDelimitedTo(out); resp.build().writeDelimitedTo(out);

View File

@ -411,7 +411,7 @@ private Response get(
} }
final long n = length.getValue() != null? length.getValue() final long n = length.getValue() != null? length.getValue()
: in.getVisibleLength(); : in.getVisibleLength() - offset.getValue();
return Response.ok(new OpenEntity(in, n, dfsclient)).type( return Response.ok(new OpenEntity(in, n, dfsclient)).type(
MediaType.APPLICATION_OCTET_STREAM).build(); MediaType.APPLICATION_OCTET_STREAM).build();
} }

View File

@ -49,6 +49,8 @@ public class EditLogFileOutputStream extends EditLogOutputStream {
private EditsDoubleBuffer doubleBuf; private EditsDoubleBuffer doubleBuf;
static ByteBuffer fill = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH); static ByteBuffer fill = ByteBuffer.allocateDirect(MIN_PREALLOCATION_LENGTH);
private static boolean shouldSkipFsyncForTests = false;
static { static {
fill.position(0); fill.position(0);
for (int i = 0; i < fill.capacity(); i++) { for (int i = 0; i < fill.capacity(); i++) {
@ -184,8 +186,10 @@ public void flushAndSync() throws IOException {
} }
preallocate(); // preallocate file if necessay preallocate(); // preallocate file if necessay
doubleBuf.flushTo(fp); doubleBuf.flushTo(fp);
if (!shouldSkipFsyncForTests) {
fc.force(false); // metadata updates not needed fc.force(false); // metadata updates not needed
} }
}
/** /**
* @return true if the number of buffered data exceeds the intial buffer size * @return true if the number of buffered data exceeds the intial buffer size
@ -247,4 +251,15 @@ public void setFileChannelForTesting(FileChannel fc) {
public FileChannel getFileChannelForTesting() { public FileChannel getFileChannelForTesting() {
return fc; return fc;
} }
/**
* For the purposes of unit tests, we don't need to actually
* write durably to disk. So, we can skip the fsync() calls
* for a speed improvement.
* @param skip true if fsync should <em>not</em> be called
*/
@VisibleForTesting
public static void setShouldSkipFsyncForTesting(boolean skip) {
shouldSkipFsyncForTests = skip;
}
} }

View File

@ -341,7 +341,7 @@ synchronized void close() {
* File-based journals are skipped, since they are formatted by the * File-based journals are skipped, since they are formatted by the
* Storage format code. * Storage format code.
*/ */
void formatNonFileJournals(NamespaceInfo nsInfo) throws IOException { synchronized void formatNonFileJournals(NamespaceInfo nsInfo) throws IOException {
Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS, Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
"Bad state: %s", state); "Bad state: %s", state);
@ -352,7 +352,7 @@ void formatNonFileJournals(NamespaceInfo nsInfo) throws IOException {
} }
} }
List<FormatConfirmable> getFormatConfirmables() { synchronized List<FormatConfirmable> getFormatConfirmables() {
Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS, Preconditions.checkState(state == State.BETWEEN_LOG_SEGMENTS,
"Bad state: %s", state); "Bad state: %s", state);

View File

@ -3742,6 +3742,11 @@ private SafeModeInfo(Configuration conf) {
this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0); this.extension = conf.getInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0);
this.safeReplication = conf.getInt(DFS_NAMENODE_REPLICATION_MIN_KEY, this.safeReplication = conf.getInt(DFS_NAMENODE_REPLICATION_MIN_KEY,
DFS_NAMENODE_REPLICATION_MIN_DEFAULT); DFS_NAMENODE_REPLICATION_MIN_DEFAULT);
LOG.info(DFS_NAMENODE_SAFEMODE_THRESHOLD_PCT_KEY + " = " + threshold);
LOG.info(DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY + " = " + datanodeThreshold);
LOG.info(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + " = " + extension);
// default to safe mode threshold (i.e., don't populate queues before leaving safe mode) // default to safe mode threshold (i.e., don't populate queues before leaving safe mode)
this.replQueueThreshold = this.replQueueThreshold =
conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY, conf.getFloat(DFS_NAMENODE_REPL_QUEUE_THRESHOLD_PCT_KEY,

View File

@ -127,7 +127,7 @@ public void doGet(HttpServletRequest request, HttpServletResponse response
datanode, conf, getUGI(request, conf)); datanode, conf, getUGI(request, conf));
final ClientProtocol nnproxy = dfs.getNamenode(); final ClientProtocol nnproxy = dfs.getNamenode();
final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum( final MD5MD5CRC32FileChecksum checksum = DFSClient.getFileChecksum(
path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey()); path, nnproxy, socketFactory, socketTimeout, dfs.getDataEncryptionKey(), false);
MD5MD5CRC32FileChecksum.write(xml, checksum); MD5MD5CRC32FileChecksum.write(xml, checksum);
} catch(IOException ioe) { } catch(IOException ioe) {
writeXml(ioe, path, xml); writeXml(ioe, path, xml);

View File

@ -17,18 +17,13 @@
*/ */
package org.apache.hadoop.hdfs.server.namenode; package org.apache.hadoop.hdfs.server.namenode;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.PrintStream;
import java.io.OutputStream;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URI; import java.net.URI;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.Collection; import java.util.Collection;
import java.util.Iterator;
import java.util.List; import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -42,10 +37,12 @@
import org.apache.hadoop.ha.ServiceFailedException; import org.apache.hadoop.ha.ServiceFailedException;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Trash; import org.apache.hadoop.fs.Trash;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*; import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -53,9 +50,6 @@
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.JournalSet.JournalAndStream;
import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState; import org.apache.hadoop.hdfs.server.namenode.ha.ActiveState;
import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby; import org.apache.hadoop.hdfs.server.namenode.ha.BootstrapStandby;
import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext;
@ -68,8 +62,6 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@ -80,14 +72,12 @@
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol; import org.apache.hadoop.security.authorize.RefreshAuthorizationPolicyProtocol;
import org.apache.hadoop.tools.GetUserMappingsProtocol; import org.apache.hadoop.tools.GetUserMappingsProtocol;
import org.apache.hadoop.util.ExitUtil.ExitException;
import org.apache.hadoop.util.ServicePlugin; import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ExitUtil.ExitException;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.ToolRunner.confirmPrompt;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
/********************************************************** /**********************************************************
@ -198,6 +188,22 @@ public static enum OperationCategory {
DFS_HA_AUTO_FAILOVER_ENABLED_KEY DFS_HA_AUTO_FAILOVER_ENABLED_KEY
}; };
private static final String USAGE = "Usage: java NameNode ["
+ StartupOption.BACKUP.getName() + "] | ["
+ StartupOption.CHECKPOINT.getName() + "] | ["
+ StartupOption.FORMAT.getName() + " ["
+ StartupOption.CLUSTERID.getName() + " cid ] ["
+ StartupOption.FORCE.getName() + "] ["
+ StartupOption.NONINTERACTIVE.getName() + "] ] | ["
+ StartupOption.UPGRADE.getName() + "] | ["
+ StartupOption.ROLLBACK.getName() + "] | ["
+ StartupOption.FINALIZE.getName() + "] | ["
+ StartupOption.IMPORT.getName() + "] | ["
+ StartupOption.INITIALIZESHAREDEDITS.getName() + "] | ["
+ StartupOption.BOOTSTRAPSTANDBY.getName() + "] | ["
+ StartupOption.RECOVER.getName() + " [ " + StartupOption.FORCE.getName()
+ " ] ]";
public long getProtocolVersion(String protocol, public long getProtocolVersion(String protocol,
long clientVersion) throws IOException { long clientVersion) throws IOException {
if (protocol.equals(ClientProtocol.class.getName())) { if (protocol.equals(ClientProtocol.class.getName())) {
@ -767,9 +773,18 @@ private static boolean initializeSharedEdits(Configuration conf,
String nsId = DFSUtil.getNamenodeNameServiceId(conf); String nsId = DFSUtil.getNamenodeNameServiceId(conf);
String namenodeId = HAUtil.getNameNodeId(conf, nsId); String namenodeId = HAUtil.getNameNodeId(conf, nsId);
initializeGenericKeys(conf, nsId, namenodeId); initializeGenericKeys(conf, nsId, namenodeId);
if (conf.get(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY) == null) {
LOG.fatal("No shared edits directory configured for namespace " +
nsId + " namenode " + namenodeId);
return false;
}
NNStorage existingStorage = null; NNStorage existingStorage = null;
try { try {
FSNamesystem fsns = FSNamesystem.loadFromDisk(conf, Configuration confWithoutShared = new Configuration(conf);
confWithoutShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
FSNamesystem fsns = FSNamesystem.loadFromDisk(confWithoutShared,
FSNamesystem.getNamespaceDirs(conf), FSNamesystem.getNamespaceDirs(conf),
FSNamesystem.getNamespaceEditsDirs(conf, false)); FSNamesystem.getNamespaceEditsDirs(conf, false));
@ -800,10 +815,8 @@ private static boolean initializeSharedEdits(Configuration conf,
fsns.getFSImage().getEditLog().initJournalsForWrite(); fsns.getFSImage().getEditLog().initJournalsForWrite();
fsns.getFSImage().getEditLog().recoverUnclosedStreams(); fsns.getFSImage().getEditLog().recoverUnclosedStreams();
if (copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs, copyEditLogSegmentsToSharedDir(fsns, sharedEditsDirs, newSharedStorage,
newSharedStorage, conf)) { conf);
return true; // aborted
}
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("Could not initialize shared edits dir", ioe); LOG.error("Could not initialize shared edits dir", ioe);
return true; // aborted return true; // aborted
@ -822,43 +835,59 @@ private static boolean initializeSharedEdits(Configuration conf,
return false; // did not abort return false; // did not abort
} }
private static boolean copyEditLogSegmentsToSharedDir(FSNamesystem fsns, private static void copyEditLogSegmentsToSharedDir(FSNamesystem fsns,
Collection<URI> sharedEditsDirs, NNStorage newSharedStorage, Collection<URI> sharedEditsDirs, NNStorage newSharedStorage,
Configuration conf) throws FileNotFoundException, IOException { Configuration conf) throws IOException {
Preconditions.checkArgument(!sharedEditsDirs.isEmpty(),
"No shared edits specified");
// Copy edit log segments into the new shared edits dir. // Copy edit log segments into the new shared edits dir.
for (JournalAndStream jas : fsns.getFSImage().getEditLog().getJournals()) { List<URI> sharedEditsUris = new ArrayList<URI>(sharedEditsDirs);
FileJournalManager fjm = null; FSEditLog newSharedEditLog = new FSEditLog(conf, newSharedStorage,
if (!(jas.getManager() instanceof FileJournalManager)) { sharedEditsUris);
LOG.error("Cannot populate shared edits dir from non-file " + newSharedEditLog.initJournalsForWrite();
"journal manager: " + jas.getManager()); newSharedEditLog.recoverUnclosedStreams();
return true; // aborted
} else { FSEditLog sourceEditLog = fsns.getFSImage().editLog;
fjm = (FileJournalManager) jas.getManager();
long fromTxId = fsns.getFSImage().getMostRecentCheckpointTxId();
Collection<EditLogInputStream> streams = sourceEditLog.selectInputStreams(
fromTxId+1, 0);
// Set the nextTxid to the CheckpointTxId+1
newSharedEditLog.setNextTxId(fromTxId + 1);
// Copy all edits after last CheckpointTxId to shared edits dir
for (EditLogInputStream stream : streams) {
LOG.debug("Beginning to copy stream " + stream + " to shared edits");
FSEditLogOp op;
boolean segmentOpen = false;
while ((op = stream.readOp()) != null) {
if (LOG.isTraceEnabled()) {
LOG.trace("copying op: " + op);
} }
for (EditLogFile elf : fjm.getLogFiles(fsns.getFSImage() if (!segmentOpen) {
.getMostRecentCheckpointTxId())) { newSharedEditLog.startLogSegment(op.txid, false);
File editLogSegment = elf.getFile(); segmentOpen = true;
for (URI sharedEditsUri : sharedEditsDirs) { }
StorageDirectory sharedEditsDir = newSharedStorage
.getStorageDirectory(sharedEditsUri); newSharedEditLog.logEdit(op);
File targetFile = new File(sharedEditsDir.getCurrentDir(),
editLogSegment.getName()); if (op.opCode == FSEditLogOpCodes.OP_END_LOG_SEGMENT) {
if (!targetFile.exists()) { newSharedEditLog.logSync();
InputStream in = null; newSharedEditLog.endCurrentLogSegment(false);
OutputStream out = null; LOG.debug("ending log segment because of END_LOG_SEGMENT op in " + stream);
try { segmentOpen = false;
in = new FileInputStream(editLogSegment);
out = new AtomicFileOutputStream(targetFile);
IOUtils.copyBytes(in, out, conf);
} finally {
IOUtils.cleanup(LOG, in, out);
} }
} }
if (segmentOpen) {
LOG.debug("ending log segment because of end of stream in " + stream);
newSharedEditLog.logSync();
newSharedEditLog.endCurrentLogSegment(false);
segmentOpen = false;
} }
} }
} }
return false; // did not abort
}
private static boolean finalize(Configuration conf, private static boolean finalize(Configuration conf,
boolean isConfirmationNeeded boolean isConfirmationNeeded
@ -882,25 +911,8 @@ private static boolean finalize(Configuration conf,
return false; return false;
} }
private static void printUsage() { private static void printUsage(PrintStream out) {
System.err.println( out.println(USAGE + "\n");
"Usage: java NameNode [" +
StartupOption.BACKUP.getName() + "] | [" +
StartupOption.CHECKPOINT.getName() + "] | [" +
StartupOption.FORMAT.getName() + " [" + StartupOption.CLUSTERID.getName() +
" cid ] [" + StartupOption.FORCE.getName() + "] [" +
StartupOption.NONINTERACTIVE.getName() + "] ] | [" +
StartupOption.UPGRADE.getName() + "] | [" +
StartupOption.ROLLBACK.getName() + "] | [" +
StartupOption.FINALIZE.getName() + "] | [" +
StartupOption.IMPORT.getName() + "] | [" +
StartupOption.INITIALIZESHAREDEDITS.getName() +
" [" + StartupOption.FORCE.getName() + "] [" +
StartupOption.NONINTERACTIVE.getName() + "]" +
"] | [" +
StartupOption.BOOTSTRAPSTANDBY.getName() + "] | [" +
StartupOption.RECOVER.getName() + " [ " +
StartupOption.FORCE.getName() + " ] ]");
} }
private static StartupOption parseArguments(String args[]) { private static StartupOption parseArguments(String args[]) {
@ -1048,7 +1060,7 @@ public static NameNode createNameNode(String argv[], Configuration conf)
conf = new HdfsConfiguration(); conf = new HdfsConfiguration();
StartupOption startOpt = parseArguments(argv); StartupOption startOpt = parseArguments(argv);
if (startOpt == null) { if (startOpt == null) {
printUsage(); printUsage(System.err);
return null; return null;
} }
setStartupOption(conf, startOpt); setStartupOption(conf, startOpt);
@ -1162,6 +1174,10 @@ protected String getNameServiceId(Configuration conf) {
/** /**
*/ */
public static void main(String argv[]) throws Exception { public static void main(String argv[]) throws Exception {
if (DFSUtil.parseHelpArgument(argv, NameNode.USAGE, System.out, true)) {
System.exit(0);
}
try { try {
StringUtils.startupShutdownMessage(NameNode.class, argv, LOG); StringUtils.startupShutdownMessage(NameNode.class, argv, LOG);
NameNode namenode = createNameNode(argv, null); NameNode namenode = createNameNode(argv, null);

View File

@ -562,6 +562,9 @@ public static void main(String[] argv) throws Exception {
if (opts == null) { if (opts == null) {
LOG.fatal("Failed to parse options"); LOG.fatal("Failed to parse options");
terminate(1); terminate(1);
} else if (opts.shouldPrintHelp()) {
opts.usage();
System.exit(0);
} }
StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG); StringUtils.startupShutdownMessage(SecondaryNameNode.class, argv, LOG);
@ -595,6 +598,7 @@ static class CommandLineOpts {
private final Option geteditsizeOpt; private final Option geteditsizeOpt;
private final Option checkpointOpt; private final Option checkpointOpt;
private final Option formatOpt; private final Option formatOpt;
private final Option helpOpt;
Command cmd; Command cmd;
@ -605,6 +609,7 @@ enum Command {
private boolean shouldForce; private boolean shouldForce;
private boolean shouldFormat; private boolean shouldFormat;
private boolean shouldPrintHelp;
CommandLineOpts() { CommandLineOpts() {
geteditsizeOpt = new Option("geteditsize", geteditsizeOpt = new Option("geteditsize",
@ -612,20 +617,32 @@ enum Command {
checkpointOpt = OptionBuilder.withArgName("force") checkpointOpt = OptionBuilder.withArgName("force")
.hasOptionalArg().withDescription("checkpoint on startup").create("checkpoint");; .hasOptionalArg().withDescription("checkpoint on startup").create("checkpoint");;
formatOpt = new Option("format", "format the local storage during startup"); formatOpt = new Option("format", "format the local storage during startup");
helpOpt = new Option("h", "help", false, "get help information");
options.addOption(geteditsizeOpt); options.addOption(geteditsizeOpt);
options.addOption(checkpointOpt); options.addOption(checkpointOpt);
options.addOption(formatOpt); options.addOption(formatOpt);
options.addOption(helpOpt);
} }
public boolean shouldFormat() { public boolean shouldFormat() {
return shouldFormat; return shouldFormat;
} }
public boolean shouldPrintHelp() {
return shouldPrintHelp;
}
public void parse(String ... argv) throws ParseException { public void parse(String ... argv) throws ParseException {
CommandLineParser parser = new PosixParser(); CommandLineParser parser = new PosixParser();
CommandLine cmdLine = parser.parse(options, argv); CommandLine cmdLine = parser.parse(options, argv);
if (cmdLine.hasOption(helpOpt.getOpt())
|| cmdLine.hasOption(helpOpt.getLongOpt())) {
shouldPrintHelp = true;
return;
}
boolean hasGetEdit = cmdLine.hasOption(geteditsizeOpt.getOpt()); boolean hasGetEdit = cmdLine.hasOption(geteditsizeOpt.getOpt());
boolean hasCheckpoint = cmdLine.hasOption(checkpointOpt.getOpt()); boolean hasCheckpoint = cmdLine.hasOption(checkpointOpt.getOpt());
if (hasGetEdit && hasCheckpoint) { if (hasGetEdit && hasCheckpoint) {
@ -662,8 +679,13 @@ public boolean shouldForceCheckpoint() {
} }
void usage() { void usage() {
String header = "The Secondary NameNode is a helper "
+ "to the primary NameNode. The Secondary is responsible "
+ "for supporting periodic checkpoints of the HDFS metadata. "
+ "The current design allows only one Secondary NameNode "
+ "per HDFS cluster.";
HelpFormatter formatter = new HelpFormatter(); HelpFormatter formatter = new HelpFormatter();
formatter.printHelp("secondarynamenode", options); formatter.printHelp("secondarynamenode", header, options, "", false);
} }
} }

View File

@ -43,6 +43,10 @@ protected void setErrOut(PrintStream errOut) {
this.errOut = errOut; this.errOut = errOut;
} }
protected void setOut(PrintStream out) {
this.out = out;
}
@Override @Override
public void setConf(Configuration conf) { public void setConf(Configuration conf) {
if (conf != null) { if (conf != null) {

View File

@ -162,6 +162,10 @@ protected String getScopeInsideParentNode() {
public static void main(String args[]) public static void main(String args[])
throws Exception { throws Exception {
if (DFSUtil.parseHelpArgument(args,
ZKFailoverController.USAGE, System.out, true)) {
System.exit(0);
}
GenericOptionsParser parser = new GenericOptionsParser( GenericOptionsParser parser = new GenericOptionsParser(
new HdfsConfiguration(), args); new HdfsConfiguration(), args);

View File

@ -73,6 +73,25 @@ public class DFSck extends Configured implements Tool {
HdfsConfiguration.init(); HdfsConfiguration.init();
} }
private static final String USAGE = "Usage: DFSck <path> "
+ "[-list-corruptfileblocks | "
+ "[-move | -delete | -openforwrite] "
+ "[-files [-blocks [-locations | -racks]]]]\n"
+ "\t<path>\tstart checking from this path\n"
+ "\t-move\tmove corrupted files to /lost+found\n"
+ "\t-delete\tdelete corrupted files\n"
+ "\t-files\tprint out files being checked\n"
+ "\t-openforwrite\tprint out files opened for write\n"
+ "\t-list-corruptfileblocks\tprint out list of missing "
+ "blocks and files they belong to\n"
+ "\t-blocks\tprint out block report\n"
+ "\t-locations\tprint out locations for every block\n"
+ "\t-racks\tprint out network topology for data-node locations\n"
+ "\t\tBy default fsck ignores files opened for write, "
+ "use -openforwrite to report such files. They are usually "
+ " tagged CORRUPT or HEALTHY depending on their block "
+ "allocation status";
private final UserGroupInformation ugi; private final UserGroupInformation ugi;
private final PrintStream out; private final PrintStream out;
@ -93,25 +112,9 @@ public DFSck(Configuration conf, PrintStream out) throws IOException {
/** /**
* Print fsck usage information * Print fsck usage information
*/ */
static void printUsage() { static void printUsage(PrintStream out) {
System.err.println("Usage: DFSck <path> [-list-corruptfileblocks | " + out.println(USAGE + "\n");
"[-move | -delete | -openforwrite] " + ToolRunner.printGenericCommandUsage(out);
"[-files [-blocks [-locations | -racks]]]]");
System.err.println("\t<path>\tstart checking from this path");
System.err.println("\t-move\tmove corrupted files to /lost+found");
System.err.println("\t-delete\tdelete corrupted files");
System.err.println("\t-files\tprint out files being checked");
System.err.println("\t-openforwrite\tprint out files opened for write");
System.err.println("\t-list-corruptfileblocks\tprint out list of missing "
+ "blocks and files they belong to");
System.err.println("\t-blocks\tprint out block report");
System.err.println("\t-locations\tprint out locations for every block");
System.err.println("\t-racks\tprint out network topology for data-node locations");
System.err.println("\t\tBy default fsck ignores files opened for write, " +
"use -openforwrite to report such files. They are usually " +
" tagged CORRUPT or HEALTHY depending on their block " +
"allocation status");
ToolRunner.printGenericCommandUsage(System.err);
} }
/** /**
* @param args * @param args
@ -119,7 +122,7 @@ static void printUsage() {
@Override @Override
public int run(final String[] args) throws IOException { public int run(final String[] args) throws IOException {
if (args.length == 0) { if (args.length == 0) {
printUsage(); printUsage(System.err);
return -1; return -1;
} }
@ -258,12 +261,12 @@ else if (args[idx].equals("-list-corruptfileblocks")) {
} else { } else {
System.err.println("fsck: can only operate on one path at a time '" System.err.println("fsck: can only operate on one path at a time '"
+ args[idx] + "'"); + args[idx] + "'");
printUsage(); printUsage(System.err);
return -1; return -1;
} }
} else { } else {
System.err.println("fsck: Illegal option '" + args[idx] + "'"); System.err.println("fsck: Illegal option '" + args[idx] + "'");
printUsage(); printUsage(System.err);
return -1; return -1;
} }
} }
@ -304,10 +307,14 @@ public static void main(String[] args) throws Exception {
// -files option is also used by GenericOptionsParser // -files option is also used by GenericOptionsParser
// Make sure that is not the first argument for fsck // Make sure that is not the first argument for fsck
int res = -1; int res = -1;
if ((args.length == 0 ) || ("-files".equals(args[0]))) if ((args.length == 0) || ("-files".equals(args[0]))) {
printUsage(); printUsage(System.err);
else ToolRunner.printGenericCommandUsage(System.err);
} else if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) {
res = 0;
} else {
res = ToolRunner.run(new DFSck(new HdfsConfiguration()), args); res = ToolRunner.run(new DFSck(new HdfsConfiguration()), args);
}
System.exit(res); System.exit(res);
} }
} }

View File

@ -40,7 +40,6 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.HftpFileSystem; import org.apache.hadoop.hdfs.HftpFileSystem;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
@ -48,9 +47,7 @@
import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet; import org.apache.hadoop.hdfs.server.namenode.CancelDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet; import org.apache.hadoop.hdfs.server.namenode.GetDelegationTokenServlet;
import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet; import org.apache.hadoop.hdfs.server.namenode.RenewDelegationTokenServlet;
import org.apache.hadoop.hdfs.web.URLUtils;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.Credentials;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
@ -71,8 +68,10 @@ public class DelegationTokenFetcher {
private static final String CANCEL = "cancel"; private static final String CANCEL = "cancel";
private static final String RENEW = "renew"; private static final String RENEW = "renew";
private static final String PRINT = "print"; private static final String PRINT = "print";
private static final String HELP = "help";
private static final String HELP_SHORT = "h";
private static void printUsage(PrintStream err) throws IOException { private static void printUsage(PrintStream err) {
err.println("fetchdt retrieves delegation tokens from the NameNode"); err.println("fetchdt retrieves delegation tokens from the NameNode");
err.println(); err.println();
err.println("fetchdt <opts> <token file>"); err.println("fetchdt <opts> <token file>");
@ -107,6 +106,7 @@ public static void main(final String[] args) throws Exception {
fetcherOptions.addOption(CANCEL, false, "cancel the token"); fetcherOptions.addOption(CANCEL, false, "cancel the token");
fetcherOptions.addOption(RENEW, false, "renew the token"); fetcherOptions.addOption(RENEW, false, "renew the token");
fetcherOptions.addOption(PRINT, false, "print the token"); fetcherOptions.addOption(PRINT, false, "print the token");
fetcherOptions.addOption(HELP_SHORT, HELP, false, "print out help information");
GenericOptionsParser parser = new GenericOptionsParser(conf, GenericOptionsParser parser = new GenericOptionsParser(conf,
fetcherOptions, args); fetcherOptions, args);
CommandLine cmd = parser.getCommandLine(); CommandLine cmd = parser.getCommandLine();
@ -119,9 +119,14 @@ public static void main(final String[] args) throws Exception {
final boolean cancel = cmd.hasOption(CANCEL); final boolean cancel = cmd.hasOption(CANCEL);
final boolean renew = cmd.hasOption(RENEW); final boolean renew = cmd.hasOption(RENEW);
final boolean print = cmd.hasOption(PRINT); final boolean print = cmd.hasOption(PRINT);
final boolean help = cmd.hasOption(HELP);
String[] remaining = parser.getRemainingArgs(); String[] remaining = parser.getRemainingArgs();
// check option validity // check option validity
if (help) {
printUsage(System.out);
System.exit(0);
}
if (cancel && renew || cancel && print || renew && print || cancel && renew if (cancel && renew || cancel && print || renew && print || cancel && renew
&& print) { && print) {
System.err.println("ERROR: Only specify cancel, renew or print."); System.err.println("ERROR: Only specify cancel, renew or print.");

View File

@ -324,6 +324,10 @@ public Integer run() throws Exception {
} }
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) {
System.exit(0);
}
int res = ToolRunner.run(new GetConf(new HdfsConfiguration()), args); int res = ToolRunner.run(new GetConf(new HdfsConfiguration()), args);
System.exit(res); System.exit(res);
} }

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.namenode.NameNode;
@ -44,6 +45,8 @@ public class GetGroups extends GetGroupsBase {
private static final Log LOG = LogFactory.getLog(GetGroups.class); private static final Log LOG = LogFactory.getLog(GetGroups.class);
static final String USAGE = "Usage: hdfs groups [username ...]";
static{ static{
HdfsConfiguration.init(); HdfsConfiguration.init();
} }
@ -86,6 +89,10 @@ protected GetUserMappingsProtocol getUgmProtocol() throws IOException {
} }
public static void main(String[] argv) throws Exception { public static void main(String[] argv) throws Exception {
if (DFSUtil.parseHelpArgument(argv, USAGE, System.out, true)) {
System.exit(0);
}
int res = ToolRunner.run(new GetGroups(new HdfsConfiguration()), argv); int res = ToolRunner.run(new GetGroups(new HdfsConfiguration()), argv);
System.exit(res); System.exit(res);
} }

View File

@ -75,7 +75,7 @@ static int fuserMount(int *procRet, ...)
{ {
int ret, status; int ret, status;
size_t i = 0; size_t i = 0;
char *args[64], *c, *env[] = { NULL }; char *args[64], *c;
va_list ap; va_list ap;
pid_t pid, pret; pid_t pid, pret;
@ -99,7 +99,7 @@ static int fuserMount(int *procRet, ...)
ret, strerror(ret)); ret, strerror(ret));
return -ret; return -ret;
} else if (pid == 0) { } else if (pid == 0) {
if (execvpe("fusermount", args, env)) { if (execvp("fusermount", args)) {
ret = errno; ret = errno;
fprintf(stderr, "FUSE_TEST: failed to execute fusermount: " fprintf(stderr, "FUSE_TEST: failed to execute fusermount: "
"error %d: %s\n", ret, strerror(ret)); "error %d: %s\n", ret, strerror(ret));

View File

@ -53,7 +53,7 @@
<name>dfs.datanode.address</name> <name>dfs.datanode.address</name>
<value>0.0.0.0:50010</value> <value>0.0.0.0:50010</value>
<description> <description>
The address where the datanode server will listen to. The datanode server address and port for data transfer.
If the port is 0 then the server will start on a free port. If the port is 0 then the server will start on a free port.
</description> </description>
</property> </property>
@ -925,6 +925,22 @@
</description> </description>
</property> </property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>false</value>
<description>Whether clients should use datanode hostnames when
connecting to datanodes.
</description>
</property>
<property>
<name>dfs.datanode.use.datanode.hostname</name>
<value>false</value>
<description>Whether datanodes should use datanode hostnames when
connecting to other datanodes for data transfer.
</description>
</property>
<property> <property>
<name>dfs.client.local.interfaces</name> <name>dfs.client.local.interfaces</name>
<value></value> <value></value>

View File

@ -118,6 +118,8 @@ public class MiniDFSCluster {
public static final String PROP_TEST_BUILD_DATA = "test.build.data"; public static final String PROP_TEST_BUILD_DATA = "test.build.data";
/** Configuration option to set the data dir: {@value} */ /** Configuration option to set the data dir: {@value} */
public static final String HDFS_MINIDFS_BASEDIR = "hdfs.minidfs.basedir"; public static final String HDFS_MINIDFS_BASEDIR = "hdfs.minidfs.basedir";
public static final String DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY
= DFS_NAMENODE_SAFEMODE_EXTENSION_KEY + ".testing";
static { DefaultMetricsSystem.setMiniClusterMode(true); } static { DefaultMetricsSystem.setMiniClusterMode(true); }
@ -143,6 +145,7 @@ public static class Builder {
private boolean setupHostsFile = false; private boolean setupHostsFile = false;
private MiniDFSNNTopology nnTopology = null; private MiniDFSNNTopology nnTopology = null;
private boolean checkExitOnShutdown = true; private boolean checkExitOnShutdown = true;
private boolean checkDataNodeHostConfig = false;
public Builder(Configuration conf) { public Builder(Configuration conf) {
this.conf = conf; this.conf = conf;
@ -260,6 +263,14 @@ public Builder checkExitOnShutdown(boolean val) {
return this; return this;
} }
/**
* Default: false
*/
public Builder checkDataNodeHostConfig(boolean val) {
this.checkDataNodeHostConfig = val;
return this;
}
/** /**
* Default: null * Default: null
*/ */
@ -324,7 +335,8 @@ private MiniDFSCluster(Builder builder) throws IOException {
builder.waitSafeMode, builder.waitSafeMode,
builder.setupHostsFile, builder.setupHostsFile,
builder.nnTopology, builder.nnTopology,
builder.checkExitOnShutdown); builder.checkExitOnShutdown,
builder.checkDataNodeHostConfig);
} }
public class DataNodeProperties { public class DataNodeProperties {
@ -561,7 +573,7 @@ public MiniDFSCluster(int nameNodePort,
manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs, manageNameDfsDirs, true, manageDataDfsDirs, manageDataDfsDirs,
operation, racks, hosts, operation, racks, hosts,
simulatedCapacities, null, true, false, simulatedCapacities, null, true, false,
MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true); MiniDFSNNTopology.simpleSingleNN(nameNodePort, 0), true, false);
} }
private void initMiniDFSCluster( private void initMiniDFSCluster(
@ -571,7 +583,8 @@ private void initMiniDFSCluster(
boolean manageDataDfsDirs, StartupOption operation, String[] racks, boolean manageDataDfsDirs, StartupOption operation, String[] racks,
String[] hosts, long[] simulatedCapacities, String clusterId, String[] hosts, long[] simulatedCapacities, String clusterId,
boolean waitSafeMode, boolean setupHostsFile, boolean waitSafeMode, boolean setupHostsFile,
MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown) MiniDFSNNTopology nnTopology, boolean checkExitOnShutdown,
boolean checkDataNodeHostConfig)
throws IOException { throws IOException {
ExitUtil.disableSystemExit(); ExitUtil.disableSystemExit();
@ -587,7 +600,9 @@ private void initMiniDFSCluster(
int replication = conf.getInt(DFS_REPLICATION_KEY, 3); int replication = conf.getInt(DFS_REPLICATION_KEY, 3);
conf.setInt(DFS_REPLICATION_KEY, Math.min(replication, numDataNodes)); conf.setInt(DFS_REPLICATION_KEY, Math.min(replication, numDataNodes));
conf.setInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, 0); int safemodeExtension = conf.getInt(
DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY, 0);
conf.setInt(DFS_NAMENODE_SAFEMODE_EXTENSION_KEY, safemodeExtension);
conf.setInt(DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 3); // 3 second conf.setInt(DFS_NAMENODE_DECOMMISSION_INTERVAL_KEY, 3); // 3 second
conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY, conf.setClass(NET_TOPOLOGY_NODE_SWITCH_MAPPING_IMPL_KEY,
StaticMapping.class, DNSToSwitchMapping.class); StaticMapping.class, DNSToSwitchMapping.class);
@ -626,7 +641,7 @@ private void initMiniDFSCluster(
// Start the DataNodes // Start the DataNodes
startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks, startDataNodes(conf, numDataNodes, manageDataDfsDirs, operation, racks,
hosts, simulatedCapacities, setupHostsFile); hosts, simulatedCapacities, setupHostsFile, false, checkDataNodeHostConfig);
waitClusterUp(); waitClusterUp();
//make sure ProxyUsers uses the latest conf //make sure ProxyUsers uses the latest conf
ProxyUsers.refreshSuperUserGroupsConfiguration(conf); ProxyUsers.refreshSuperUserGroupsConfiguration(conf);
@ -978,7 +993,21 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile) throws IOException { boolean setupHostsFile) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts, startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, false); simulatedCapacities, setupHostsFile, false, false);
}
/**
* @see MiniDFSCluster#startDataNodes(Configuration, int, boolean, StartupOption,
* String[], String[], long[], boolean, boolean, boolean)
*/
public synchronized void startDataNodes(Configuration conf, int numDataNodes,
boolean manageDfsDirs, StartupOption operation,
String[] racks, String[] hosts,
long[] simulatedCapacities,
boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException {
startDataNodes(conf, numDataNodes, manageDfsDirs, operation, racks, hosts,
simulatedCapacities, setupHostsFile, checkDataNodeAddrConfig, false);
} }
/** /**
@ -1004,6 +1033,7 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
* @param simulatedCapacities array of capacities of the simulated data nodes * @param simulatedCapacities array of capacities of the simulated data nodes
* @param setupHostsFile add new nodes to dfs hosts files * @param setupHostsFile add new nodes to dfs hosts files
* @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config * @param checkDataNodeAddrConfig if true, only set DataNode port addresses if not already set in config
* @param checkDataNodeHostConfig if true, only set DataNode hostname key if not already set in config
* *
* @throws IllegalStateException if NameNode has been shutdown * @throws IllegalStateException if NameNode has been shutdown
*/ */
@ -1012,11 +1042,16 @@ public synchronized void startDataNodes(Configuration conf, int numDataNodes,
String[] racks, String[] hosts, String[] racks, String[] hosts,
long[] simulatedCapacities, long[] simulatedCapacities,
boolean setupHostsFile, boolean setupHostsFile,
boolean checkDataNodeAddrConfig) throws IOException { boolean checkDataNodeAddrConfig,
boolean checkDataNodeHostConfig) throws IOException {
if (operation == StartupOption.RECOVER) { if (operation == StartupOption.RECOVER) {
return; return;
} }
if (checkDataNodeHostConfig) {
conf.setIfUnset(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
} else {
conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1"); conf.set(DFS_DATANODE_HOST_NAME_KEY, "127.0.0.1");
}
int curDatanodesNum = dataNodes.size(); int curDatanodesNum = dataNodes.size();
// for mincluster's the default initialDelay for BRs is 0 // for mincluster's the default initialDelay for BRs is 0

View File

@ -41,6 +41,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
@ -79,8 +80,10 @@
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.log4j.Level; import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
import org.mockito.internal.stubbing.answers.ThrowsException; import org.mockito.internal.stubbing.answers.ThrowsException;
@ -765,7 +768,7 @@ public void testClientDNProtocolTimeout() throws IOException {
try { try {
proxy = DFSUtil.createClientDatanodeProtocolProxy( proxy = DFSUtil.createClientDatanodeProtocolProxy(
fakeDnId, conf, 500, fakeBlock); fakeDnId, conf, 500, false, fakeBlock);
proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1)); proxy.getReplicaVisibleLength(new ExtendedBlock("bpid", 1));
fail ("Did not get expected exception: SocketTimeoutException"); fail ("Did not get expected exception: SocketTimeoutException");
@ -842,6 +845,8 @@ public static void namenodeRestartTest(final Configuration conf,
final Path dir = new Path("/testNamenodeRestart"); final Path dir = new Path("/testNamenodeRestart");
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_CLIENT_RETRY_POLICY_ENABLED_KEY, true);
conf.setInt(DFSConfigKeys.DFS_NAMENODE_SAFEMODE_MIN_DATANODES_KEY, 1);
conf.setInt(MiniDFSCluster.DFS_NAMENODE_SAFEMODE_EXTENSION_TESTING_KEY, 5000);
final short numDatanodes = 3; final short numDatanodes = 3;
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
@ -864,11 +869,38 @@ public static void namenodeRestartTest(final Configuration conf,
final FileStatus s1 = fs.getFileStatus(file1); final FileStatus s1 = fs.getFileStatus(file1);
assertEquals(length, s1.getLen()); assertEquals(length, s1.getLen());
//create file4, write some data but not close
final Path file4 = new Path(dir, "file4");
final FSDataOutputStream out4 = fs.create(file4, false, 4096,
fs.getDefaultReplication(file4), 1024L, null);
final byte[] bytes = new byte[1000];
new Random().nextBytes(bytes);
out4.write(bytes);
out4.write(bytes);
out4.hflush();
//shutdown namenode //shutdown namenode
assertTrue(HdfsUtils.isHealthy(uri)); assertTrue(HdfsUtils.isHealthy(uri));
cluster.shutdownNameNode(0); cluster.shutdownNameNode(0);
assertFalse(HdfsUtils.isHealthy(uri)); assertFalse(HdfsUtils.isHealthy(uri));
//namenode is down, continue writing file4 in a thread
final Thread file4thread = new Thread(new Runnable() {
@Override
public void run() {
try {
//write some more data and then close the file
out4.write(bytes);
out4.write(bytes);
out4.write(bytes);
out4.close();
} catch (Exception e) {
exceptions.add(e);
}
}
});
file4thread.start();
//namenode is down, read the file in a thread //namenode is down, read the file in a thread
final Thread reader = new Thread(new Runnable() { final Thread reader = new Thread(new Runnable() {
@Override @Override
@ -927,10 +959,26 @@ public void run() {
//check file1 and file3 //check file1 and file3
thread.join(); thread.join();
assertEmpty(exceptions);
assertEquals(s1.getLen(), fs.getFileStatus(file3).getLen()); assertEquals(s1.getLen(), fs.getFileStatus(file3).getLen());
assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file3)); assertEquals(fs.getFileChecksum(file1), fs.getFileChecksum(file3));
reader.join(); reader.join();
assertEmpty(exceptions);
//check file4
file4thread.join();
assertEmpty(exceptions);
{
final FSDataInputStream in = fs.open(file4);
int count = 0;
for(int r; (r = in.read()) != -1; count++) {
Assert.assertEquals(String.format("count=%d", count),
bytes[count % bytes.length], (byte)r);
}
Assert.assertEquals(5 * bytes.length, count);
in.close();
}
//enter safe mode //enter safe mode
assertTrue(HdfsUtils.isHealthy(uri)); assertTrue(HdfsUtils.isHealthy(uri));
@ -970,18 +1018,27 @@ public void run() {
LOG.info("GOOD!", fnfe); LOG.info("GOOD!", fnfe);
} }
if (!exceptions.isEmpty()) { assertEmpty(exceptions);
LOG.error("There are " + exceptions.size() + " exception(s):");
for(int i = 0; i < exceptions.size(); i++) {
LOG.error("Exception " + i, exceptions.get(i));
}
fail();
}
} finally { } finally {
cluster.shutdown(); cluster.shutdown();
} }
} }
static void assertEmpty(final List<Exception> exceptions) {
if (!exceptions.isEmpty()) {
final StringBuilder b = new StringBuilder("There are ")
.append(exceptions.size())
.append(" exception(s):");
for(int i = 0; i < exceptions.size(); i++) {
b.append("\n Exception ")
.append(i)
.append(": ")
.append(StringUtils.stringifyException(exceptions.get(i)));
}
fail(b.toString());
}
}
private static FileSystem createFsWithDifferentUsername( private static FileSystem createFsWithDifferentUsername(
final Configuration conf, final boolean isWebHDFS final Configuration conf, final boolean isWebHDFS
) throws IOException, InterruptedException { ) throws IOException, InterruptedException {

View File

@ -417,7 +417,6 @@ public void testFileChecksum() throws Exception {
final Configuration conf = getTestConfiguration(); final Configuration conf = getTestConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true); conf.setBoolean(DFSConfigKeys.DFS_WEBHDFS_ENABLED_KEY, true);
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build(); final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(2).build();
final FileSystem hdfs = cluster.getFileSystem(); final FileSystem hdfs = cluster.getFileSystem();

View File

@ -171,7 +171,14 @@ public void testServerDefaults() throws IOException {
@Test @Test
public void testFileCreation() throws IOException { public void testFileCreation() throws IOException {
checkFileCreation(null); checkFileCreation(null, false);
}
/** Same test but the client should use DN hostnames */
@Test
public void testFileCreationUsingHostname() throws IOException {
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
checkFileCreation(null, true);
} }
/** Same test but the client should bind to a local interface */ /** Same test but the client should bind to a local interface */
@ -180,10 +187,10 @@ public void testFileCreationSetLocalInterface() throws IOException {
assumeTrue(System.getProperty("os.name").startsWith("Linux")); assumeTrue(System.getProperty("os.name").startsWith("Linux"));
// The mini cluster listens on the loopback so we can use it here // The mini cluster listens on the loopback so we can use it here
checkFileCreation("lo"); checkFileCreation("lo", false);
try { try {
checkFileCreation("bogus-interface"); checkFileCreation("bogus-interface", false);
fail("Able to specify a bogus interface"); fail("Able to specify a bogus interface");
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
assertEquals("No such interface bogus-interface", e.getMessage()); assertEquals("No such interface bogus-interface", e.getMessage());
@ -193,16 +200,28 @@ public void testFileCreationSetLocalInterface() throws IOException {
/** /**
* Test if file creation and disk space consumption works right * Test if file creation and disk space consumption works right
* @param netIf the local interface, if any, clients should use to access DNs * @param netIf the local interface, if any, clients should use to access DNs
* @param useDnHostname whether the client should contact DNs by hostname
*/ */
public void checkFileCreation(String netIf) throws IOException { public void checkFileCreation(String netIf, boolean useDnHostname)
throws IOException {
Configuration conf = new HdfsConfiguration(); Configuration conf = new HdfsConfiguration();
if (netIf != null) { if (netIf != null) {
conf.set(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES, netIf); conf.set(DFSConfigKeys.DFS_CLIENT_LOCAL_INTERFACES, netIf);
} }
conf.setBoolean(DFSConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, useDnHostname);
if (useDnHostname) {
// Since the mini cluster only listens on the loopback we have to
// ensure the hostname used to access DNs maps to the loopback. We
// do this by telling the DN to advertise localhost as its hostname
// instead of the default hostname.
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
}
if (simulatedStorage) { if (simulatedStorage) {
SimulatedFSDataset.setFactory(conf); SimulatedFSDataset.setFactory(conf);
} }
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).build(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.checkDataNodeHostConfig(true)
.build();
FileSystem fs = cluster.getFileSystem(); FileSystem fs = cluster.getFileSystem();
try { try {

View File

@ -92,7 +92,6 @@ public static void setUp() throws IOException {
RAN.setSeed(seed); RAN.setSeed(seed);
config = new Configuration(); config = new Configuration();
config.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build(); cluster = new MiniDFSCluster.Builder(config).numDataNodes(2).build();
hdfs = cluster.getFileSystem(); hdfs = cluster.getFileSystem();
blockPoolId = cluster.getNamesystem().getBlockPoolId(); blockPoolId = cluster.getNamesystem().getBlockPoolId();

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assume.assumeTrue;
import java.io.File; import java.io.File;
@ -41,6 +42,7 @@ public class TestMiniDFSCluster {
private static final String CLUSTER_2 = "cluster2"; private static final String CLUSTER_2 = "cluster2";
private static final String CLUSTER_3 = "cluster3"; private static final String CLUSTER_3 = "cluster3";
private static final String CLUSTER_4 = "cluster4"; private static final String CLUSTER_4 = "cluster4";
private static final String CLUSTER_5 = "cluster5";
protected String testDataPath; protected String testDataPath;
protected File testDataDir; protected File testDataDir;
@Before @Before
@ -125,4 +127,25 @@ public void testIsClusterUpAfterShutdown() throws Throwable {
} }
} }
} }
/** MiniDFSCluster should not clobber dfs.datanode.hostname if requested */
@Test(timeout=100000)
public void testClusterSetDatanodeHostname() throws Throwable {
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "MYHOST");
File testDataCluster5 = new File(testDataPath, CLUSTER_5);
String c5Path = testDataCluster5.getAbsolutePath();
conf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, c5Path);
MiniDFSCluster cluster5 = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.checkDataNodeHostConfig(true)
.build();
try {
assertEquals("DataNode hostname config not respected", "MYHOST",
cluster5.getDataNodes().get(0).getDatanodeId().getHostName());
} finally {
MiniDFSCluster.shutdownCluster(cluster5);
}
}
} }

View File

@ -246,7 +246,7 @@ public void testGetBlockLocalPathInfo() throws IOException, InterruptedException
@Override @Override
public ClientDatanodeProtocol run() throws Exception { public ClientDatanodeProtocol run() throws Exception {
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
60000); 60000, false);
} }
}); });
@ -264,7 +264,7 @@ public ClientDatanodeProtocol run() throws Exception {
@Override @Override
public ClientDatanodeProtocol run() throws Exception { public ClientDatanodeProtocol run() throws Exception {
return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf, return DFSUtil.createClientDatanodeProtocolProxy(dnInfo, conf,
60000); 60000, false);
} }
}); });
try { try {

View File

@ -304,7 +304,7 @@ public void testBlockTokenRpcLeak() throws Exception {
long endTime = Time.now() + 3000; long endTime = Time.now() + 3000;
while (Time.now() < endTime) { while (Time.now() < endTime) {
proxy = DFSUtil.createClientDatanodeProtocolProxy(fakeDnId, conf, 1000, proxy = DFSUtil.createClientDatanodeProtocolProxy(fakeDnId, conf, 1000,
fakeBlock); false, fakeBlock);
assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3)); assertEquals(block3.getBlockId(), proxy.getReplicaVisibleLength(block3));
if (proxy != null) { if (proxy != null) {
RPC.stopProxy(proxy); RPC.stopProxy(proxy);

View File

@ -105,10 +105,13 @@ public static DatanodeProtocolClientSideTranslatorPB spyOnBposToNN(
} }
public static InterDatanodeProtocol createInterDatanodeProtocolProxy( public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
DataNode dn, DatanodeID datanodeid, final Configuration conf DataNode dn, DatanodeID datanodeid, final Configuration conf,
) throws IOException { boolean connectToDnViaHostname) throws IOException {
if (connectToDnViaHostname != dn.getDnConf().connectToDnViaHostname) {
throw new AssertionError("Unexpected DN hostname configuration");
}
return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf, return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf,
dn.getDnConf().socketTimeout); dn.getDnConf().socketTimeout, dn.getDnConf().connectToDnViaHostname);
} }
public static void shutdownBlockScanner(DataNode dn) { public static void shutdownBlockScanner(DataNode dn) {

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSClientAdapter; import org.apache.hadoop.hdfs.DFSClientAdapter;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -59,6 +60,8 @@
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
import static org.junit.Assume.assumeTrue;
/** /**
* This tests InterDataNodeProtocol for block handling. * This tests InterDataNodeProtocol for block handling.
*/ */
@ -125,17 +128,42 @@ public static LocatedBlock getLastLocatedBlock(
return blocks.get(blocks.size() - 1); return blocks.get(blocks.size() - 1);
} }
/** Test block MD access via a DN */
@Test
public void testBlockMetaDataInfo() throws Exception {
checkBlockMetaDataInfo(false);
}
/** The same as above, but use hostnames for DN<->DN communication */
@Test
public void testBlockMetaDataInfoWithHostname() throws Exception {
assumeTrue(System.getProperty("os.name").startsWith("Linux"));
checkBlockMetaDataInfo(true);
}
/** /**
* The following test first creates a file. * The following test first creates a file.
* It verifies the block information from a datanode. * It verifies the block information from a datanode.
* Then, it updates the block with new information and verifies again. * Then, it updates the block with new information and verifies again.
* @param useDnHostname whether DNs should connect to other DNs by hostname
*/ */
@Test private void checkBlockMetaDataInfo(boolean useDnHostname) throws Exception {
public void testBlockMetaDataInfo() throws Exception {
MiniDFSCluster cluster = null; MiniDFSCluster cluster = null;
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, useDnHostname);
if (useDnHostname) {
// Since the mini cluster only listens on the loopback we have to
// ensure the hostname used to access DNs maps to the loopback. We
// do this by telling the DN to advertise localhost as its hostname
// instead of the default hostname.
conf.set(DFSConfigKeys.DFS_DATANODE_HOST_NAME_KEY, "localhost");
}
try { try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.checkDataNodeHostConfig(true)
.build();
cluster.waitActive(); cluster.waitActive();
//create a file //create a file
@ -154,7 +182,7 @@ public void testBlockMetaDataInfo() throws Exception {
//connect to a data node //connect to a data node
DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort()); DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy( InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy(
datanode, datanodeinfo[0], conf); datanode, datanodeinfo[0], conf, useDnHostname);
//stop block scanner, so we could compare lastScanTime //stop block scanner, so we could compare lastScanTime
DataNodeTestUtils.shutdownBlockScanner(datanode); DataNodeTestUtils.shutdownBlockScanner(datanode);
@ -364,7 +392,7 @@ public void testInterDNProtocolTimeout() throws Throwable {
try { try {
proxy = DataNode.createInterDataNodeProtocolProxy( proxy = DataNode.createInterDataNodeProtocolProxy(
dInfo, conf, 500); dInfo, conf, 500, false);
proxy.initReplicaRecovery(new RecoveringBlock( proxy.initReplicaRecovery(new RecoveringBlock(
new ExtendedBlock("bpid", 1), null, 100)); new ExtendedBlock("bpid", 1), null, 100));
fail ("Expected SocketTimeoutException exception, but did not get."); fail ("Expected SocketTimeoutException exception, but did not get.");

View File

@ -119,6 +119,11 @@ public class TestEditLog {
"a4ff 0000 0000 0000 0000 0000 0000 0000" "a4ff 0000 0000 0000 0000 0000 0000 0000"
).replace(" ","")); ).replace(" ",""));
static {
// No need to fsync for the purposes of tests. This makes
// the tests run much faster.
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
}
static final byte TRAILER_BYTE = FSEditLogOpCodes.OP_INVALID.getOpCode(); static final byte TRAILER_BYTE = FSEditLogOpCodes.OP_INVALID.getOpCode();

View File

@ -40,6 +40,12 @@ public class TestEditLogFileOutputStream {
final static int MIN_PREALLOCATION_LENGTH = final static int MIN_PREALLOCATION_LENGTH =
EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH; EditLogFileOutputStream.MIN_PREALLOCATION_LENGTH;
static {
// No need to fsync for the purposes of tests. This makes
// the tests run much faster.
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
}
@Before @Before
@After @After
public void deleteEditsFile() { public void deleteEditsFile() {

View File

@ -51,6 +51,12 @@
public class TestFileJournalManager { public class TestFileJournalManager {
static final Log LOG = LogFactory.getLog(TestFileJournalManager.class); static final Log LOG = LogFactory.getLog(TestFileJournalManager.class);
static {
// No need to fsync for the purposes of tests. This makes
// the tests run much faster.
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
}
/** /**
* Find out how many transactions we can read from a * Find out how many transactions we can read from a
* FileJournalManager, starting at a given transaction ID. * FileJournalManager, starting at a given transaction ID.

View File

@ -57,6 +57,7 @@ public class TestNameNodeRecovery {
static { static {
recoverStartOpt.setForce(MetaRecoveryContext.FORCE_ALL); recoverStartOpt.setForce(MetaRecoveryContext.FORCE_ALL);
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
} }
static void runEditLogTest(EditLogTestSetup elts) throws IOException { static void runEditLogTest(EditLogTestSetup elts) throws IOException {

View File

@ -49,6 +49,12 @@ public class TestSecurityTokenEditLog {
static final int NUM_THREADS = 100; static final int NUM_THREADS = 100;
static final int opsPerTrans = 3; static final int opsPerTrans = 3;
static {
// No need to fsync for the purposes of tests. This makes
// the tests run much faster.
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
}
// //
// an object that does a bunch of transactions // an object that does a bunch of transactions
// //

View File

@ -65,7 +65,7 @@ public abstract class HATestUtil {
* @throws CouldNotCatchUpException if the standby doesn't catch up to the * @throws CouldNotCatchUpException if the standby doesn't catch up to the
* active in NN_LAG_TIMEOUT milliseconds * active in NN_LAG_TIMEOUT milliseconds
*/ */
static void waitForStandbyToCatchUp(NameNode active, public static void waitForStandbyToCatchUp(NameNode active,
NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException { NameNode standby) throws InterruptedException, IOException, CouldNotCatchUpException {
long activeTxId = active.getNamesystem().getFSImage().getEditLog() long activeTxId = active.getNamesystem().getFSImage().getEditLog()

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.HAUtil;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.server.namenode.EditLogFileOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil; import org.apache.hadoop.hdfs.server.namenode.FSImageTestUtil;
import org.apache.hadoop.hdfs.server.namenode.NNStorage; import org.apache.hadoop.hdfs.server.namenode.NNStorage;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter; import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@ -53,6 +54,12 @@ public class TestEditLogsDuringFailover {
LogFactory.getLog(TestEditLogsDuringFailover.class); LogFactory.getLog(TestEditLogsDuringFailover.class);
private static final int NUM_DIRS_IN_LOG = 5; private static final int NUM_DIRS_IN_LOG = 5;
static {
// No need to fsync for the purposes of tests. This makes
// the tests run much faster.
EditLogFileOutputStream.setShouldSkipFsyncForTesting(true);
}
@Test @Test
public void testStartup() throws Exception { public void testStartup() throws Exception {
Configuration conf = new Configuration(); Configuration conf = new Configuration();

View File

@ -158,6 +158,13 @@ public void testInitializeSharedEdits() throws Exception {
assertCanStartHaNameNodes("2"); assertCanStartHaNameNodes("2");
} }
@Test
public void testFailWhenNoSharedEditsSpecified() throws Exception {
Configuration confNoShared = new Configuration(conf);
confNoShared.unset(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY);
assertFalse(NameNode.initializeSharedEdits(confNoShared, true));
}
@Test @Test
public void testDontOverWriteExistingDir() { public void testDontOverWriteExistingDir() {
assertFalse(NameNode.initializeSharedEdits(conf, false)); assertFalse(NameNode.initializeSharedEdits(conf, false));

View File

@ -55,7 +55,9 @@ public class TestDFSHAAdmin {
private DFSHAAdmin tool; private DFSHAAdmin tool;
private ByteArrayOutputStream errOutBytes = new ByteArrayOutputStream(); private ByteArrayOutputStream errOutBytes = new ByteArrayOutputStream();
private ByteArrayOutputStream outBytes = new ByteArrayOutputStream();
private String errOutput; private String errOutput;
private String output;
private HAServiceProtocol mockProtocol; private HAServiceProtocol mockProtocol;
private ZKFCProtocol mockZkfcProtocol; private ZKFCProtocol mockZkfcProtocol;
@ -111,12 +113,14 @@ protected HAServiceTarget resolveTarget(String nnId) {
}; };
tool.setConf(getHAConf()); tool.setConf(getHAConf());
tool.setErrOut(new PrintStream(errOutBytes)); tool.setErrOut(new PrintStream(errOutBytes));
tool.setOut(new PrintStream(outBytes));
} }
private void assertOutputContains(String string) { private void assertOutputContains(String string) {
if (!errOutput.contains(string)) { if (!errOutput.contains(string) && !output.contains(string)) {
fail("Expected output to contain '" + string + "' but was:\n" + fail("Expected output to contain '" + string +
errOutput); "' but err_output was:\n" + errOutput +
"\n and output was: \n" + output);
} }
} }
@ -143,7 +147,7 @@ public void testNamenodeResolution() throws Exception {
@Test @Test
public void testHelp() throws Exception { public void testHelp() throws Exception {
assertEquals(-1, runTool("-help")); assertEquals(0, runTool("-help"));
assertEquals(0, runTool("-help", "transitionToActive")); assertEquals(0, runTool("-help", "transitionToActive"));
assertOutputContains("Transitions the service into Active"); assertOutputContains("Transitions the service into Active");
} }
@ -378,10 +382,12 @@ public void testFencingConfigPerNameNode() throws Exception {
private Object runTool(String ... args) throws Exception { private Object runTool(String ... args) throws Exception {
errOutBytes.reset(); errOutBytes.reset();
outBytes.reset();
LOG.info("Running: DFSHAAdmin " + Joiner.on(" ").join(args)); LOG.info("Running: DFSHAAdmin " + Joiner.on(" ").join(args));
int ret = tool.run(args); int ret = tool.run(args);
errOutput = new String(errOutBytes.toByteArray(), Charsets.UTF_8); errOutput = new String(errOutBytes.toByteArray(), Charsets.UTF_8);
LOG.info("Output:\n" + errOutput); output = new String(outBytes.toByteArray(), Charsets.UTF_8);
LOG.info("Err_output:\n" + errOutput + "\nOutput:\n" + output);
return ret; return ret;
} }

View File

@ -809,6 +809,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-3782. teragen terasort jobs fail when using webhdfs:// (Jason MAPREDUCE-3782. teragen terasort jobs fail when using webhdfs:// (Jason
Lowe via bobby) Lowe via bobby)
MAPREDUCE-4053. Counters group names deprecation is wrong, iterating over
group names deprecated names don't show up (Robert Evans via tgraves)
Release 0.23.2 - UNRELEASED Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -24,6 +24,7 @@
import java.io.DataInput; import java.io.DataInput;
import java.io.DataOutput; import java.io.DataOutput;
import java.io.IOException; import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
@ -185,7 +186,15 @@ public synchronized C findCounter(String scheme, FileSystemCounter key) {
* @return Set of counter names. * @return Set of counter names.
*/ */
public synchronized Iterable<String> getGroupNames() { public synchronized Iterable<String> getGroupNames() {
return Iterables.concat(fgroups.keySet(), groups.keySet()); HashSet<String> deprecated = new HashSet<String>();
for(Map.Entry<String, String> entry : legacyMap.entrySet()) {
String newGroup = entry.getValue();
boolean isFGroup = isFrameworkGroup(newGroup);
if(isFGroup ? fgroups.containsKey(newGroup) : groups.containsKey(newGroup)) {
deprecated.add(entry.getKey());
}
}
return Iterables.concat(fgroups.keySet(), groups.keySet(), deprecated);
} }
@Override @Override

View File

@ -22,6 +22,7 @@
import java.io.IOException; import java.io.IOException;
import java.text.ParseException; import java.text.ParseException;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.Random; import java.util.Random;
@ -224,6 +225,23 @@ public void testFileSystemGroupIteratorConcurrency() {
iterator.next(); iterator.next();
} }
@Test
public void testLegacyGetGroupNames() {
Counters counters = new Counters();
// create 2 filesystem counter groups
counters.findCounter("fs1", FileSystemCounter.BYTES_READ).increment(1);
counters.findCounter("fs2", FileSystemCounter.BYTES_READ).increment(1);
counters.incrCounter("group1", "counter1", 1);
HashSet<String> groups = new HashSet<String>(counters.getGroupNames());
HashSet<String> expectedGroups = new HashSet<String>();
expectedGroups.add("group1");
expectedGroups.add("FileSystemCounter"); //Legacy Name
expectedGroups.add("org.apache.hadoop.mapreduce.FileSystemCounter");
assertEquals(expectedGroups, groups);
}
@Test @Test
public void testMakeCompactString() { public void testMakeCompactString() {
final String GC1 = "group1.counter1:1"; final String GC1 = "group1.counter1:1";

View File

@ -458,7 +458,7 @@
<dependency> <dependency>
<groupId>log4j</groupId> <groupId>log4j</groupId>
<artifactId>log4j</artifactId> <artifactId>log4j</artifactId>
<version>1.2.15</version> <version>1.2.17</version>
<exclusions> <exclusions>
<exclusion> <exclusion>
<groupId>com.sun.jdmk</groupId> <groupId>com.sun.jdmk</groupId>