diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 2148381bbef..ad2e6fa9692 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -499,6 +499,9 @@ Release 2.1.1-beta - 2013-09-23 HADOOP-9961. versions of a few transitive dependencies diverged between hadoop subprojects. (rvs via tucu) + HADOOP-9977. Hadoop services won't start with different keypass and + keystorepass when https is enabled. (cnauroth) + Release 2.1.0-beta - 2013-08-22 INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SetReplication.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SetReplication.java index 3a5f919b78e..1f65fed913c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SetReplication.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/shell/SetReplication.java @@ -39,11 +39,14 @@ class SetReplication extends FsCommand { } public static final String NAME = "setrep"; - public static final String USAGE = "[-R] [-w] ..."; + public static final String USAGE = "[-R] [-w] ..."; public static final String DESCRIPTION = - "Set the replication level of a file.\n" + - "The -R flag requests a recursive change of replication level\n" + - "for an entire tree."; + "Set the replication level of a file. If is a directory\n" + + "then the command recursively changes the replication factor of\n" + + "all files under the directory tree rooted at .\n" + + "The -w flag requests that the command wait for the replication\n" + + "to complete. This can potentially take a very long time.\n" + + "The -R flag is accepted for backwards compatibility. It has no effect."; protected short newRep = 0; protected List waitList = new LinkedList(); @@ -54,7 +57,7 @@ class SetReplication extends FsCommand { CommandFormat cf = new CommandFormat(2, Integer.MAX_VALUE, "R", "w"); cf.parse(args); waitOpt = cf.getOpt("w"); - setRecursive(cf.getOpt("R")); + setRecursive(true); try { newRep = Short.parseShort(args.removeFirst()); @@ -126,4 +129,4 @@ class SetReplication extends FsCommand { out.println(" done"); } } -} \ No newline at end of file +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java index c6a7c6d5eec..ef4fad041ea 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/ssl/FileBasedKeyStoresFactory.java @@ -53,6 +53,8 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory { "ssl.{0}.keystore.location"; public static final String SSL_KEYSTORE_PASSWORD_TPL_KEY = "ssl.{0}.keystore.password"; + public static final String SSL_KEYSTORE_KEYPASSWORD_TPL_KEY = + "ssl.{0}.keystore.keypassword"; public static final String SSL_KEYSTORE_TYPE_TPL_KEY = "ssl.{0}.keystore.type"; @@ -136,7 +138,7 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory { conf.get(resolvePropertyName(mode, SSL_KEYSTORE_TYPE_TPL_KEY), DEFAULT_KEYSTORE_TYPE); KeyStore keystore = KeyStore.getInstance(keystoreType); - String keystorePassword = null; + String keystoreKeyPassword = null; if (requireClientCert || mode == SSLFactory.Mode.SERVER) { String locationProperty = resolvePropertyName(mode, SSL_KEYSTORE_LOCATION_TPL_KEY); @@ -147,11 +149,17 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory { } String passwordProperty = resolvePropertyName(mode, SSL_KEYSTORE_PASSWORD_TPL_KEY); - keystorePassword = conf.get(passwordProperty, ""); + String keystorePassword = conf.get(passwordProperty, ""); if (keystorePassword.isEmpty()) { throw new GeneralSecurityException("The property '" + passwordProperty + "' has not been set in the ssl configuration file."); } + String keyPasswordProperty = + resolvePropertyName(mode, SSL_KEYSTORE_KEYPASSWORD_TPL_KEY); + // Key password defaults to the same value as store password for + // compatibility with legacy configurations that did not use a separate + // configuration property for key password. + keystoreKeyPassword = conf.get(keyPasswordProperty, keystorePassword); LOG.debug(mode.toString() + " KeyStore: " + keystoreLocation); InputStream is = new FileInputStream(keystoreLocation); @@ -167,8 +175,8 @@ public class FileBasedKeyStoresFactory implements KeyStoresFactory { KeyManagerFactory keyMgrFactory = KeyManagerFactory .getInstance(SSLFactory.SSLCERTIFICATE); - keyMgrFactory.init(keystore, (keystorePassword != null) ? - keystorePassword.toCharArray() : null); + keyMgrFactory.init(keystore, (keystoreKeyPassword != null) ? + keystoreKeyPassword.toCharArray() : null); keyManagers = keyMgrFactory.getKeyManagers(); //trust store diff --git a/hadoop-common-project/hadoop-common/src/site/apt/FileSystemShell.apt.vm b/hadoop-common-project/hadoop-common/src/site/apt/FileSystemShell.apt.vm index 5c0869c0ae2..78cd880a67a 100644 --- a/hadoop-common-project/hadoop-common/src/site/apt/FileSystemShell.apt.vm +++ b/hadoop-common-project/hadoop-common/src/site/apt/FileSystemShell.apt.vm @@ -381,17 +381,22 @@ rmr setrep - Usage: << >>> + Usage: << >>> - Changes the replication factor of a file. + Changes the replication factor of a file. If is a directory then + the command recursively changes the replication factor of all files under + the directory tree rooted at . Options: - * The -R option will recursively increase the replication factor of files within a directory. + * The -w flag requests that the command wait for the replication + to complete. This can potentially take a very long time. + + * The -R flag is accepted for backwards compatibility. It has no effect. Example: - * <<>> + * <<>> Exit Code: diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/KeyStoreTestUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/KeyStoreTestUtil.java index c57cbfdd96e..937b437a3d3 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/KeyStoreTestUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/KeyStoreTestUtil.java @@ -145,6 +145,28 @@ public class KeyStoreTestUtil { saveKeyStore(ks, filename, password); } + /** + * Creates a keystore with a single key and saves it to a file. + * + * @param filename String file to save + * @param password String store password to set on keystore + * @param keyPassword String key password to set on key + * @param alias String alias to use for the key + * @param privateKey Key to save in keystore + * @param cert Certificate to use as certificate chain associated to key + * @throws GeneralSecurityException for any error with the security APIs + * @throws IOException if there is an I/O error saving the file + */ + public static void createKeyStore(String filename, + String password, String keyPassword, String alias, + Key privateKey, Certificate cert) + throws GeneralSecurityException, IOException { + KeyStore ks = createEmptyKeyStore(); + ks.setKeyEntry(alias, privateKey, keyPassword.toCharArray(), + new Certificate[]{cert}); + saveKeyStore(ks, filename, password); + } + public static void createTrustStore(String filename, String password, String alias, Certificate cert) @@ -178,6 +200,19 @@ public class KeyStoreTestUtil { f.delete(); } + /** + * Performs complete setup of SSL configuration in preparation for testing an + * SSLFactory. This includes keys, certs, keystores, truststores, the server + * SSL configuration file, the client SSL configuration file, and the master + * configuration file read by the SSLFactory. + * + * @param keystoresDir String directory to save keystores + * @param sslConfDir String directory to save SSL configuration files + * @param conf Configuration master configuration to be used by an SSLFactory, + * which will be mutated by this method + * @param useClientCert boolean true to make the client present a cert in the + * SSL handshake + */ public static void setupSSLConfig(String keystoresDir, String sslConfDir, Configuration conf, boolean useClientCert) throws Exception { @@ -213,53 +248,13 @@ public class KeyStoreTestUtil { KeyStoreTestUtil.createTrustStore(trustKS, trustPassword, certs); - Configuration clientSSLConf = new Configuration(false); - clientSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName( - SSLFactory.Mode.CLIENT, - FileBasedKeyStoresFactory.SSL_KEYSTORE_LOCATION_TPL_KEY), clientKS); - clientSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName( - SSLFactory.Mode.CLIENT, - FileBasedKeyStoresFactory.SSL_KEYSTORE_PASSWORD_TPL_KEY), clientPassword); - clientSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName( - SSLFactory.Mode.CLIENT, - FileBasedKeyStoresFactory.SSL_TRUSTSTORE_LOCATION_TPL_KEY), trustKS); - clientSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName( - SSLFactory.Mode.CLIENT, - FileBasedKeyStoresFactory.SSL_TRUSTSTORE_PASSWORD_TPL_KEY), trustPassword); - clientSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName( - SSLFactory.Mode.CLIENT, - FileBasedKeyStoresFactory.SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY), "1000"); + Configuration clientSSLConf = createClientSSLConfig(clientKS, clientPassword, + clientPassword, trustKS); + Configuration serverSSLConf = createServerSSLConfig(serverKS, serverPassword, + serverPassword, trustKS); - Configuration serverSSLConf = new Configuration(false); - serverSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName( - SSLFactory.Mode.SERVER, - FileBasedKeyStoresFactory.SSL_KEYSTORE_LOCATION_TPL_KEY), serverKS); - serverSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName( - SSLFactory.Mode.SERVER, - FileBasedKeyStoresFactory.SSL_KEYSTORE_PASSWORD_TPL_KEY), serverPassword); - serverSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName( - SSLFactory.Mode.SERVER, - FileBasedKeyStoresFactory.SSL_TRUSTSTORE_LOCATION_TPL_KEY), trustKS); - serverSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName( - SSLFactory.Mode.SERVER, - FileBasedKeyStoresFactory.SSL_TRUSTSTORE_PASSWORD_TPL_KEY), trustPassword); - serverSSLConf.set(FileBasedKeyStoresFactory.resolvePropertyName( - SSLFactory.Mode.SERVER, - FileBasedKeyStoresFactory.SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY), "1000"); - - Writer writer = new FileWriter(sslClientConfFile); - try { - clientSSLConf.writeXml(writer); - } finally { - writer.close(); - } - - writer = new FileWriter(sslServerConfFile); - try { - serverSSLConf.writeXml(writer); - } finally { - writer.close(); - } + saveConfig(sslClientConfFile, clientSSLConf); + saveConfig(sslServerConfFile, serverSSLConf); conf.set(SSLFactory.SSL_HOSTNAME_VERIFIER_KEY, "ALLOW_ALL"); conf.set(SSLFactory.SSL_CLIENT_CONF_KEY, sslClientConfFile.getName()); @@ -267,4 +262,101 @@ public class KeyStoreTestUtil { conf.setBoolean(SSLFactory.SSL_REQUIRE_CLIENT_CERT_KEY, useClientCert); } + /** + * Creates SSL configuration for a client. + * + * @param clientKS String client keystore file + * @param password String store password, or null to avoid setting store + * password + * @param keyPassword String key password, or null to avoid setting key + * password + * @param trustKS String truststore file + * @return Configuration for client SSL + */ + public static Configuration createClientSSLConfig(String clientKS, + String password, String keyPassword, String trustKS) { + Configuration clientSSLConf = createSSLConfig(SSLFactory.Mode.CLIENT, + clientKS, password, keyPassword, trustKS); + return clientSSLConf; + } + + /** + * Creates SSL configuration for a server. + * + * @param serverKS String server keystore file + * @param password String store password, or null to avoid setting store + * password + * @param keyPassword String key password, or null to avoid setting key + * password + * @param trustKS String truststore file + * @return Configuration for server SSL + */ + public static Configuration createServerSSLConfig(String serverKS, + String password, String keyPassword, String trustKS) throws IOException { + Configuration serverSSLConf = createSSLConfig(SSLFactory.Mode.SERVER, + serverKS, password, keyPassword, trustKS); + return serverSSLConf; + } + + /** + * Creates SSL configuration. + * + * @param mode SSLFactory.Mode mode to configure + * @param keystore String keystore file + * @param password String store password, or null to avoid setting store + * password + * @param keyPassword String key password, or null to avoid setting key + * password + * @param trustKS String truststore file + * @return Configuration for SSL + */ + private static Configuration createSSLConfig(SSLFactory.Mode mode, + String keystore, String password, String keyPassword, String trustKS) { + String trustPassword = "trustP"; + + Configuration sslConf = new Configuration(false); + if (keystore != null) { + sslConf.set(FileBasedKeyStoresFactory.resolvePropertyName(mode, + FileBasedKeyStoresFactory.SSL_KEYSTORE_LOCATION_TPL_KEY), keystore); + } + if (password != null) { + sslConf.set(FileBasedKeyStoresFactory.resolvePropertyName(mode, + FileBasedKeyStoresFactory.SSL_KEYSTORE_PASSWORD_TPL_KEY), password); + } + if (keyPassword != null) { + sslConf.set(FileBasedKeyStoresFactory.resolvePropertyName(mode, + FileBasedKeyStoresFactory.SSL_KEYSTORE_KEYPASSWORD_TPL_KEY), + keyPassword); + } + if (trustKS != null) { + sslConf.set(FileBasedKeyStoresFactory.resolvePropertyName(mode, + FileBasedKeyStoresFactory.SSL_TRUSTSTORE_LOCATION_TPL_KEY), trustKS); + } + if (trustPassword != null) { + sslConf.set(FileBasedKeyStoresFactory.resolvePropertyName(mode, + FileBasedKeyStoresFactory.SSL_TRUSTSTORE_PASSWORD_TPL_KEY), + trustPassword); + } + sslConf.set(FileBasedKeyStoresFactory.resolvePropertyName(mode, + FileBasedKeyStoresFactory.SSL_TRUSTSTORE_RELOAD_INTERVAL_TPL_KEY), "1000"); + + return sslConf; + } + + /** + * Saves configuration to a file. + * + * @param file File to save + * @param conf Configuration contents to write to file + * @throws IOException if there is an I/O error saving the file + */ + public static void saveConfig(File file, Configuration conf) + throws IOException { + Writer writer = new FileWriter(file); + try { + conf.writeXml(writer); + } finally { + writer.close(); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestSSLFactory.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestSSLFactory.java index 784fb1f2894..1711a742260 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestSSLFactory.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/security/ssl/TestSSLFactory.java @@ -29,12 +29,19 @@ import javax.net.ssl.HttpsURLConnection; import java.io.File; import java.net.URL; import java.security.GeneralSecurityException; +import java.security.KeyPair; +import java.security.cert.X509Certificate; +import java.util.Collections; +import java.util.Map; public class TestSSLFactory { private static final String BASEDIR = System.getProperty("test.build.dir", "target/test-dir") + "/" + TestSSLFactory.class.getSimpleName(); + private static final String KEYSTORES_DIR = + new File(BASEDIR).getAbsolutePath(); + private String sslConfsDir; @BeforeClass public static void setUp() throws Exception { @@ -46,18 +53,16 @@ public class TestSSLFactory { private Configuration createConfiguration(boolean clientCert) throws Exception { Configuration conf = new Configuration(); - String keystoresDir = new File(BASEDIR).getAbsolutePath(); - String sslConfsDir = KeyStoreTestUtil.getClasspathDir(TestSSLFactory.class); - KeyStoreTestUtil.setupSSLConfig(keystoresDir, sslConfsDir, conf, clientCert); + KeyStoreTestUtil.setupSSLConfig(KEYSTORES_DIR, sslConfsDir, conf, + clientCert); return conf; } @After @Before public void cleanUp() throws Exception { - String keystoresDir = new File(BASEDIR).getAbsolutePath(); - String sslConfsDir = KeyStoreTestUtil.getClasspathDir(TestSSLFactory.class); - KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfsDir); + sslConfsDir = KeyStoreTestUtil.getClasspathDir(TestSSLFactory.class); + KeyStoreTestUtil.cleanupSSLConfig(KEYSTORES_DIR, sslConfsDir); } @Test(expected = IllegalStateException.class) @@ -181,4 +186,90 @@ public class TestSSLFactory { } } + @Test + public void testServerDifferentPasswordAndKeyPassword() throws Exception { + checkSSLFactoryInitWithPasswords(SSLFactory.Mode.SERVER, "password", + "keyPassword", "password", "keyPassword"); + } + + @Test + public void testServerKeyPasswordDefaultsToPassword() throws Exception { + checkSSLFactoryInitWithPasswords(SSLFactory.Mode.SERVER, "password", + "password", "password", null); + } + + @Test + public void testClientDifferentPasswordAndKeyPassword() throws Exception { + checkSSLFactoryInitWithPasswords(SSLFactory.Mode.CLIENT, "password", + "keyPassword", "password", "keyPassword"); + } + + @Test + public void testClientKeyPasswordDefaultsToPassword() throws Exception { + checkSSLFactoryInitWithPasswords(SSLFactory.Mode.CLIENT, "password", + "password", "password", null); + } + + /** + * Checks that SSLFactory initialization is successful with the given + * arguments. This is a helper method for writing test cases that cover + * different combinations of settings for the store password and key password. + * It takes care of bootstrapping a keystore, a truststore, and SSL client or + * server configuration. Then, it initializes an SSLFactory. If no exception + * is thrown, then initialization was successful. + * + * @param mode SSLFactory.Mode mode to test + * @param password String store password to set on keystore + * @param keyPassword String key password to set on keystore + * @param confPassword String store password to set in SSL config file, or null + * to avoid setting in SSL config file + * @param confKeyPassword String key password to set in SSL config file, or + * null to avoid setting in SSL config file + * @throws Exception for any error + */ + private void checkSSLFactoryInitWithPasswords(SSLFactory.Mode mode, + String password, String keyPassword, String confPassword, + String confKeyPassword) throws Exception { + String keystore = new File(KEYSTORES_DIR, "keystore.jks").getAbsolutePath(); + String truststore = new File(KEYSTORES_DIR, "truststore.jks") + .getAbsolutePath(); + String trustPassword = "trustP"; + + // Create keys, certs, keystore, and truststore. + KeyPair keyPair = KeyStoreTestUtil.generateKeyPair("RSA"); + X509Certificate cert = KeyStoreTestUtil.generateCertificate("CN=Test", + keyPair, 30, "SHA1withRSA"); + KeyStoreTestUtil.createKeyStore(keystore, password, keyPassword, "Test", + keyPair.getPrivate(), cert); + Map certs = Collections.singletonMap("server", + cert); + KeyStoreTestUtil.createTrustStore(truststore, trustPassword, certs); + + // Create SSL configuration file, for either server or client. + final String sslConfFileName; + final Configuration sslConf; + if (mode == SSLFactory.Mode.SERVER) { + sslConfFileName = "ssl-server.xml"; + sslConf = KeyStoreTestUtil.createServerSSLConfig(keystore, confPassword, + confKeyPassword, truststore); + } else { + sslConfFileName = "ssl-client.xml"; + sslConf = KeyStoreTestUtil.createClientSSLConfig(keystore, confPassword, + confKeyPassword, truststore); + } + KeyStoreTestUtil.saveConfig(new File(sslConfsDir, sslConfFileName), sslConf); + + // Create the master configuration for use by the SSLFactory, which by + // default refers to the ssl-server.xml or ssl-client.xml created above. + Configuration conf = new Configuration(); + conf.setBoolean(SSLFactory.SSL_REQUIRE_CLIENT_CERT_KEY, true); + + // Try initializing an SSLFactory. + SSLFactory sslFactory = new SSLFactory(mode, conf); + try { + sslFactory.init(); + } finally { + sslFactory.destroy(); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml index 62d94474f55..544494d01b8 100644 --- a/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml +++ b/hadoop-common-project/hadoop-common/src/test/resources/testConf.xml @@ -601,16 +601,28 @@ RegexpComparator - ^-setrep \[-R\] \[-w\] <rep> <path/file> \.\.\.:( |\t)*Set the replication level of a file.( )* + ^-setrep \[-R\] \[-w\] <rep> <path> \.\.\.:( |\t)*Set the replication level of a file. If <path> is a directory( )* RegexpComparator - ^( |\t)*The -R flag requests a recursive change of replication level( )* + ^( |\t)*then the command recursively changes the replication factor of( )* RegexpComparator - ^( |\t)*for an entire tree.( )* + ^( |\t)*all files under the directory tree rooted at <path>\.( )* + + RegexpComparator + ^( |\t)*The -w flag requests that the command wait for the replication( )* + + + RegexpComparator + ^( |\t)*to complete. This can potentially take a very long time\.( )* + + + RegexpComparator + ^( |\t)*The -R flag is accepted for backwards compatibility\. It has no effect\.( )* + diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java index 457ffbd621c..420ab8825f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/AsyncDataService.java @@ -97,7 +97,7 @@ public class AsyncDataService { void writeAsync(OpenFileCtx openFileCtx) { if (LOG.isDebugEnabled()) { LOG.debug("Scheduling write back task for fileId: " - + openFileCtx.copyLatestAttr().getFileId()); + + openFileCtx.getLatestAttr().getFileId()); } WriteBackTask wbTask = new WriteBackTask(openFileCtx); execute(wbTask); @@ -125,7 +125,7 @@ public class AsyncDataService { public String toString() { // Called in AsyncDataService.execute for displaying error messages. return "write back data for fileId" - + openFileCtx.copyLatestAttr().getFileId() + " with nextOffset " + + openFileCtx.getLatestAttr().getFileId() + " with nextOffset " + openFileCtx.getNextOffset(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java index ce00cd720d2..f02dcc0e77a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OffsetRange.java @@ -17,19 +17,34 @@ */ package org.apache.hadoop.hdfs.nfs.nfs3; +import java.util.Comparator; + +import com.google.common.base.Preconditions; + /** * OffsetRange is the range of read/write request. A single point (e.g.,[5,5]) * is not a valid range. */ -public class OffsetRange implements Comparable { +public class OffsetRange { + + public static final Comparator ReverseComparatorOnMin = + new Comparator() { + @Override + public int compare(OffsetRange o1, OffsetRange o2) { + if (o1.getMin() == o2.getMin()) { + return o1.getMax() < o2.getMax() ? + 1 : (o1.getMax() > o2.getMax() ? -1 : 0); + } else { + return o1.getMin() < o2.getMin() ? 1 : -1; + } + } + }; + private final long min; private final long max; OffsetRange(long min, long max) { - if ((min >= max) || (min < 0) || (max < 0)) { - throw new IllegalArgumentException("Wrong offset range: (" + min + "," - + max + ")"); - } + Preconditions.checkArgument(min >= 0 && max >= 0 && min < max); this.min = min; this.max = max; } @@ -49,24 +64,10 @@ public class OffsetRange implements Comparable { @Override public boolean equals(Object o) { - assert (o instanceof OffsetRange); - OffsetRange range = (OffsetRange) o; - return (min == range.getMin()) && (max == range.getMax()); - } - - private static int compareTo(long left, long right) { - if (left < right) { - return -1; - } else if (left > right) { - return 1; - } else { - return 0; + if (o instanceof OffsetRange) { + OffsetRange range = (OffsetRange) o; + return (min == range.getMin()) && (max == range.getMax()); } - } - - @Override - public int compareTo(OffsetRange other) { - final int d = compareTo(min, other.getMin()); - return d != 0 ? d : compareTo(max, other.getMax()); + return false; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java index 62ee4902c2e..8ab39302f36 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/OpenFileCtx.java @@ -22,12 +22,14 @@ import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.RandomAccessFile; +import java.nio.channels.ClosedChannelException; import java.security.InvalidParameterException; import java.util.EnumSet; import java.util.Iterator; -import java.util.SortedMap; -import java.util.TreeMap; -import java.util.concurrent.locks.ReentrantLock; +import java.util.Map.Entry; +import java.util.concurrent.ConcurrentNavigableMap; +import java.util.concurrent.ConcurrentSkipListMap; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -50,8 +52,11 @@ import org.apache.hadoop.nfs.nfs3.response.WccAttr; import org.apache.hadoop.nfs.nfs3.response.WccData; import org.apache.hadoop.oncrpc.XDR; import org.apache.hadoop.oncrpc.security.VerifierNone; +import org.apache.hadoop.util.Daemon; import org.jboss.netty.channel.Channel; +import com.google.common.base.Preconditions; + /** * OpenFileCtx saves the context of one HDFS file output stream. Access to it is * synchronized by its member lock. @@ -59,34 +64,42 @@ import org.jboss.netty.channel.Channel; class OpenFileCtx { public static final Log LOG = LogFactory.getLog(OpenFileCtx.class); - /** - * Lock to synchronize OpenFileCtx changes. Thread should get this lock before - * any read/write operation to an OpenFileCtx object - */ - private final ReentrantLock ctxLock; + // Pending writes water mark for dump, 1MB + private static long DUMP_WRITE_WATER_MARK = 1024 * 1024; + + public final static int COMMIT_FINISHED = 0; + public final static int COMMIT_WAIT = 1; + public final static int COMMIT_INACTIVE_CTX = 2; + public final static int COMMIT_INACTIVE_WITH_PENDING_WRITE = 3; + public final static int COMMIT_ERROR = 4; // The stream status. False means the stream is closed. - private boolean activeState; + private volatile boolean activeState; // The stream write-back status. True means one thread is doing write back. - private boolean asyncStatus; + private volatile boolean asyncStatus; + /** + * The current offset of the file in HDFS. All the content before this offset + * has been written back to HDFS. + */ + private AtomicLong nextOffset; private final HdfsDataOutputStream fos; + + // TODO: make it mutable and update it after each writing back to HDFS private final Nfs3FileAttributes latestAttr; - private long nextOffset; - private final SortedMap pendingWrites; + private final ConcurrentNavigableMap pendingWrites; // The last write, commit request or write-back event. Updating time to keep // output steam alive. private long lastAccessTime; - // Pending writes water mark for dump, 1MB - private static int DUMP_WRITE_WATER_MARK = 1024 * 1024; + private volatile boolean enabledDump; private FileOutputStream dumpOut; - private long nonSequentialWriteInMemory; - private boolean enabledDump; + private AtomicLong nonSequentialWriteInMemory; private RandomAccessFile raf; private final String dumpFilePath; + private Daemon dumpThread; private void updateLastAccessTime() { lastAccessTime = System.currentTimeMillis(); @@ -96,89 +109,50 @@ class OpenFileCtx { return System.currentTimeMillis() - lastAccessTime > streamTimeout; } + public long getNextOffset() { + return nextOffset.get(); + } + // Increase or decrease the memory occupation of non-sequential writes private long updateNonSequentialWriteInMemory(long count) { - nonSequentialWriteInMemory += count; + long newValue = nonSequentialWriteInMemory.addAndGet(count); if (LOG.isDebugEnabled()) { LOG.debug("Update nonSequentialWriteInMemory by " + count + " new value:" - + nonSequentialWriteInMemory); + + newValue); } - if (nonSequentialWriteInMemory < 0) { - LOG.error("nonSequentialWriteInMemory is negative after update with count " - + count); - throw new IllegalArgumentException( - "nonSequentialWriteInMemory is negative after update with count " - + count); - } - return nonSequentialWriteInMemory; + Preconditions.checkState(newValue >= 0, + "nonSequentialWriteInMemory is negative after update with count " + + count); + return newValue; } OpenFileCtx(HdfsDataOutputStream fos, Nfs3FileAttributes latestAttr, String dumpFilePath) { this.fos = fos; this.latestAttr = latestAttr; - pendingWrites = new TreeMap(); + // We use the ReverseComparatorOnMin as the comparator of the map. In this + // way, we first dump the data with larger offset. In the meanwhile, we + // retrieve the last element to write back to HDFS. + pendingWrites = new ConcurrentSkipListMap( + OffsetRange.ReverseComparatorOnMin); updateLastAccessTime(); activeState = true; asyncStatus = false; dumpOut = null; raf = null; - nonSequentialWriteInMemory = 0; + nonSequentialWriteInMemory = new AtomicLong(0); + this.dumpFilePath = dumpFilePath; enabledDump = dumpFilePath == null ? false: true; - nextOffset = latestAttr.getSize(); - assert(nextOffset == this.fos.getPos()); - - ctxLock = new ReentrantLock(true); + nextOffset = new AtomicLong(); + nextOffset.set(latestAttr.getSize()); + assert(nextOffset.get() == this.fos.getPos()); + dumpThread = null; } - private void lockCtx() { - if (LOG.isTraceEnabled()) { - StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace(); - StackTraceElement e = stacktrace[2]; - String methodName = e.getMethodName(); - LOG.trace("lock ctx, caller:" + methodName); - } - ctxLock.lock(); - } - - private void unlockCtx() { - ctxLock.unlock(); - if (LOG.isTraceEnabled()) { - StackTraceElement[] stacktrace = Thread.currentThread().getStackTrace(); - StackTraceElement e = stacktrace[2]; - String methodName = e.getMethodName(); - LOG.info("unlock ctx, caller:" + methodName); - } - } - - // Make a copy of the latestAttr - public Nfs3FileAttributes copyLatestAttr() { - Nfs3FileAttributes ret; - lockCtx(); - try { - ret = new Nfs3FileAttributes(latestAttr); - } finally { - unlockCtx(); - } - return ret; - } - - private long getNextOffsetUnprotected() { - assert(ctxLock.isLocked()); - return nextOffset; - } - - public long getNextOffset() { - long ret; - lockCtx(); - try { - ret = getNextOffsetUnprotected(); - } finally { - unlockCtx(); - } - return ret; + public Nfs3FileAttributes getLatestAttr() { + return latestAttr; } // Get flushed offset. Note that flushed data may not be persisted. @@ -187,12 +161,7 @@ class OpenFileCtx { } // Check if need to dump the new writes - private void checkDump(long count) { - assert (ctxLock.isLocked()); - - // Always update the in memory count - updateNonSequentialWriteInMemory(count); - + private void checkDump() { if (!enabledDump) { if (LOG.isDebugEnabled()) { LOG.debug("Do nothing, dump is disabled."); @@ -200,66 +169,111 @@ class OpenFileCtx { return; } - if (nonSequentialWriteInMemory < DUMP_WRITE_WATER_MARK) { + if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) { return; } - // Create dump outputstream for the first time - if (dumpOut == null) { - LOG.info("Create dump file:" + dumpFilePath); - File dumpFile = new File(dumpFilePath); - try { - if (dumpFile.exists()) { - LOG.fatal("The dump file should not exist:" + dumpFilePath); - throw new RuntimeException("The dump file should not exist:" - + dumpFilePath); + // wake up the dumper thread to dump the data + synchronized (this) { + if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) { + if (LOG.isDebugEnabled()) { + LOG.debug("Asking dumper to dump..."); } - dumpOut = new FileOutputStream(dumpFile); - } catch (IOException e) { - LOG.error("Got failure when creating dump stream " + dumpFilePath - + " with error:" + e); - enabledDump = false; - IOUtils.cleanup(LOG, dumpOut); - return; - } - } - // Get raf for the first dump - if (raf == null) { - try { - raf = new RandomAccessFile(dumpFilePath, "r"); - } catch (FileNotFoundException e) { - LOG.error("Can't get random access to file " + dumpFilePath); - // Disable dump - enabledDump = false; - return; - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Start dump, current write number:" + pendingWrites.size()); - } - Iterator it = pendingWrites.keySet().iterator(); - while (it.hasNext()) { - OffsetRange key = it.next(); - WriteCtx writeCtx = pendingWrites.get(key); - try { - long dumpedDataSize = writeCtx.dumpData(dumpOut, raf); - if (dumpedDataSize > 0) { - updateNonSequentialWriteInMemory(-dumpedDataSize); + if (dumpThread == null) { + dumpThread = new Daemon(new Dumper()); + dumpThread.start(); + } else { + this.notifyAll(); } - } catch (IOException e) { - LOG.error("Dump data failed:" + writeCtx + " with error:" + e); - // Disable dump - enabledDump = false; - return; } } - if (nonSequentialWriteInMemory != 0) { - LOG.fatal("After dump, nonSequentialWriteInMemory is not zero: " - + nonSequentialWriteInMemory); - throw new RuntimeException( - "After dump, nonSequentialWriteInMemory is not zero: " - + nonSequentialWriteInMemory); + } + + class Dumper implements Runnable { + /** Dump data into a file */ + private void dump() { + // Create dump outputstream for the first time + if (dumpOut == null) { + LOG.info("Create dump file:" + dumpFilePath); + File dumpFile = new File(dumpFilePath); + try { + synchronized (this) { + // check if alive again + Preconditions.checkState(dumpFile.createNewFile(), + "The dump file should not exist: %s", dumpFilePath); + dumpOut = new FileOutputStream(dumpFile); + } + } catch (IOException e) { + LOG.error("Got failure when creating dump stream " + dumpFilePath, e); + enabledDump = false; + if (dumpOut != null) { + try { + dumpOut.close(); + } catch (IOException e1) { + LOG.error("Can't close dump stream " + dumpFilePath, e); + } + } + return; + } + } + + // Get raf for the first dump + if (raf == null) { + try { + raf = new RandomAccessFile(dumpFilePath, "r"); + } catch (FileNotFoundException e) { + LOG.error("Can't get random access to file " + dumpFilePath); + // Disable dump + enabledDump = false; + return; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Start dump. Before dump, nonSequentialWriteInMemory == " + + nonSequentialWriteInMemory.get()); + } + + Iterator it = pendingWrites.keySet().iterator(); + while (activeState && it.hasNext() + && nonSequentialWriteInMemory.get() > 0) { + OffsetRange key = it.next(); + WriteCtx writeCtx = pendingWrites.get(key); + try { + long dumpedDataSize = writeCtx.dumpData(dumpOut, raf); + if (dumpedDataSize > 0) { + updateNonSequentialWriteInMemory(-dumpedDataSize); + } + } catch (IOException e) { + LOG.error("Dump data failed:" + writeCtx + " with error:" + e + + " OpenFileCtx state:" + activeState); + // Disable dump + enabledDump = false; + return; + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("After dump, nonSequentialWriteInMemory == " + + nonSequentialWriteInMemory.get()); + } + } + + @Override + public void run() { + while (activeState && enabledDump) { + if (nonSequentialWriteInMemory.get() >= DUMP_WRITE_WATER_MARK) { + dump(); + } + synchronized (OpenFileCtx.this) { + if (nonSequentialWriteInMemory.get() < DUMP_WRITE_WATER_MARK) { + try { + OpenFileCtx.this.wait(); + } catch (InterruptedException e) { + } + } + } + } } } @@ -283,148 +297,196 @@ class OpenFileCtx { public void receivedNewWrite(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, AsyncDataService asyncDataService, IdUserGroup iug) { - - lockCtx(); - try { - if (!activeState) { - LOG.info("OpenFileCtx is inactive, fileId:" - + request.getHandle().getFileId()); - WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, - fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( - new XDR(), xid, new VerifierNone()), xid); - } else { - // Handle repeated write requests(same xid or not). - // If already replied, send reply again. If not replied, drop the - // repeated request. - WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel, - xid); - if (existantWriteCtx != null) { - if (!existantWriteCtx.getReplied()) { - if (LOG.isDebugEnabled()) { - LOG.debug("Repeated write request which hasn't be served: xid=" - + xid + ", drop it."); - } - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Repeated write request which is already served: xid=" - + xid + ", resend response."); - } - WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, - fileWcc, request.getCount(), request.getStableHow(), - Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( - new XDR(), xid, new VerifierNone()), xid); + + if (!activeState) { + LOG.info("OpenFileCtx is inactive, fileId:" + + request.getHandle().getFileId()); + WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, + fileWcc, 0, request.getStableHow(), Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannel(channel, + response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()), + xid); + } else { + // Update the write time first + updateLastAccessTime(); + + // Handle repeated write requests (same xid or not). + // If already replied, send reply again. If not replied, drop the + // repeated request. + WriteCtx existantWriteCtx = checkRepeatedWriteRequest(request, channel, + xid); + if (existantWriteCtx != null) { + if (!existantWriteCtx.getReplied()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Repeated write request which hasn't be served: xid=" + + xid + ", drop it."); } - updateLastAccessTime(); - } else { - receivedNewWriteInternal(dfsClient, request, channel, xid, - asyncDataService, iug); + if (LOG.isDebugEnabled()) { + LOG.debug("Repeated write request which is already served: xid=" + + xid + ", resend response."); + } + WccData fileWcc = new WccData(latestAttr.getWccAttr(), latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, + fileWcc, request.getCount(), request.getStableHow(), + Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( + new XDR(), xid, new VerifierNone()), xid); + } + } else { + // not a repeated write request + receivedNewWriteInternal(dfsClient, request, channel, xid, + asyncDataService, iug); + } + } + } + + /** + * Creates and adds a WriteCtx into the pendingWrites map. This is a + * synchronized method to handle concurrent writes. + * + * @return A non-null {@link WriteCtx} instance if the incoming write + * request's offset >= nextOffset. Otherwise null. + */ + private synchronized WriteCtx addWritesToCache(WRITE3Request request, + Channel channel, int xid) { + long offset = request.getOffset(); + int count = request.getCount(); + long cachedOffset = nextOffset.get(); + + if (LOG.isDebugEnabled()) { + LOG.debug("requesed offset=" + offset + " and current offset=" + + cachedOffset); + } + + // Fail non-append call + if (offset < cachedOffset) { + LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + "," + + nextOffset + ")"); + return null; + } else { + DataState dataState = offset == cachedOffset ? WriteCtx.DataState.NO_DUMP + : WriteCtx.DataState.ALLOW_DUMP; + WriteCtx writeCtx = new WriteCtx(request.getHandle(), + request.getOffset(), request.getCount(), request.getStableHow(), + request.getData().array(), channel, xid, false, dataState); + if (LOG.isDebugEnabled()) { + LOG.debug("Add new write to the list with nextOffset " + cachedOffset + + " and requesed offset=" + offset); + } + if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { + // update the memory size + updateNonSequentialWriteInMemory(count); + } + // check if there is a WriteCtx with the same range in pendingWrites + WriteCtx oldWriteCtx = checkRepeatedWriteRequest(request, channel, xid); + if (oldWriteCtx == null) { + addWrite(writeCtx); + } else { + LOG.warn("Got a repeated request, same range, with xid:" + + writeCtx.getXid()); + } + return writeCtx; + } + } + + /** Process an overwrite write request */ + private void processOverWrite(DFSClient dfsClient, WRITE3Request request, + Channel channel, int xid, IdUserGroup iug) { + WccData wccData = new WccData(latestAttr.getWccAttr(), null); + long offset = request.getOffset(); + int count = request.getCount(); + WriteStableHow stableHow = request.getStableHow(); + WRITE3Response response; + long cachedOffset = nextOffset.get(); + if (offset + count > cachedOffset) { + LOG.warn("Haven't noticed any partial overwrite for a sequential file" + + " write requests. Treat it as a real random write, no support."); + response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, + WriteStableHow.UNSTABLE, 0); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Process perfectOverWrite"); + } + // TODO: let executor handle perfect overwrite + response = processPerfectOverWrite(dfsClient, offset, count, stableHow, + request.getData().array(), + Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug); + } + updateLastAccessTime(); + Nfs3Utils.writeChannel(channel, + response.writeHeaderAndResponse(new XDR(), xid, new VerifierNone()), + xid); + } + + /** + * Check if we can start the write (back to HDFS) now. If there is no hole for + * writing, and there is no other threads writing (i.e., asyncStatus is + * false), start the writing and set asyncStatus to true. + * + * @return True if the new write is sequencial and we can start writing + * (including the case that there is already a thread writing). + */ + private synchronized boolean checkAndStartWrite( + AsyncDataService asyncDataService, WriteCtx writeCtx) { + + if (writeCtx.getOffset() == nextOffset.get()) { + if (!asyncStatus) { + if (LOG.isDebugEnabled()) { + LOG.debug("Trigger the write back task. Current nextOffset: " + + nextOffset.get()); + } + asyncStatus = true; + asyncDataService.execute(new AsyncDataService.WriteBackTask(this)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("The write back thread is working."); } } - - } finally { - unlockCtx(); + return true; + } else { + return false; } } private void receivedNewWriteInternal(DFSClient dfsClient, WRITE3Request request, Channel channel, int xid, AsyncDataService asyncDataService, IdUserGroup iug) { - long offset = request.getOffset(); - int count = request.getCount(); WriteStableHow stableHow = request.getStableHow(); - - // Get file length, fail non-append call WccAttr preOpAttr = latestAttr.getWccAttr(); - if (LOG.isDebugEnabled()) { - LOG.debug("requesed offset=" + offset + " and current filesize=" - + preOpAttr.getSize()); - } + int count = request.getCount(); - long nextOffset = getNextOffsetUnprotected(); - if (offset == nextOffset) { - LOG.info("Add to the list, update nextOffset and notify the writer," - + " nextOffset:" + nextOffset); - WriteCtx writeCtx = new WriteCtx(request.getHandle(), - request.getOffset(), request.getCount(), request.getStableHow(), - request.getData().array(), channel, xid, false, DataState.NO_DUMP); - addWrite(writeCtx); - - // Create an async task and change openFileCtx status to indicate async - // task pending - if (!asyncStatus) { - asyncStatus = true; - asyncDataService.execute(new AsyncDataService.WriteBackTask(this)); - } - - // Update the write time first - updateLastAccessTime(); - Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr); - - // Send response immediately for unstable write - if (request.getStableHow() == WriteStableHow.UNSTABLE) { - WccData fileWcc = new WccData(preOpAttr, postOpAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, - fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( - new XDR(), xid, new VerifierNone()), xid); - writeCtx.setReplied(true); - } - - } else if (offset > nextOffset) { - LOG.info("Add new write to the list but not update nextOffset:" - + nextOffset); - WriteCtx writeCtx = new WriteCtx(request.getHandle(), - request.getOffset(), request.getCount(), request.getStableHow(), - request.getData().array(), channel, xid, false, DataState.ALLOW_DUMP); - addWrite(writeCtx); - - // Check if need to dump some pending requests to file - checkDump(request.getCount()); - updateLastAccessTime(); - Nfs3FileAttributes postOpAttr = new Nfs3FileAttributes(latestAttr); - - // In test, noticed some Linux client sends a batch (e.g., 1MB) - // of reordered writes and won't send more writes until it gets - // responses of the previous batch. So here send response immediately for - // unstable non-sequential write - if (request.getStableHow() == WriteStableHow.UNSTABLE) { - WccData fileWcc = new WccData(preOpAttr, postOpAttr); - WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, - fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); - Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( - new XDR(), xid, new VerifierNone()), xid); - writeCtx.setReplied(true); - } - - } else { + WriteCtx writeCtx = addWritesToCache(request, channel, xid); + if (writeCtx == null) { // offset < nextOffset - LOG.warn("(offset,count,nextOffset):" + "(" + offset + "," + count + "," - + nextOffset + ")"); - WccData wccData = new WccData(preOpAttr, null); - WRITE3Response response; - - if (offset + count > nextOffset) { - LOG.warn("Haven't noticed any partial overwrite out of a sequential file" - + "write requests, so treat it as a real random write, no support."); - response = new WRITE3Response(Nfs3Status.NFS3ERR_INVAL, wccData, 0, - WriteStableHow.UNSTABLE, 0); - } else { - if (LOG.isDebugEnabled()) { - LOG.debug("Process perfectOverWrite"); + processOverWrite(dfsClient, request, channel, xid, iug); + } else { + // The writes is added to pendingWrites. + // Check and start writing back if necessary + boolean startWriting = checkAndStartWrite(asyncDataService, writeCtx); + if (!startWriting) { + // offset > nextOffset. check if we need to dump data + checkDump(); + + // In test, noticed some Linux client sends a batch (e.g., 1MB) + // of reordered writes and won't send more writes until it gets + // responses of the previous batch. So here send response immediately + // for unstable non-sequential write + if (request.getStableHow() == WriteStableHow.UNSTABLE) { + if (LOG.isDebugEnabled()) { + LOG.debug("UNSTABLE write request, send response for offset: " + + writeCtx.getOffset()); + } + WccData fileWcc = new WccData(preOpAttr, latestAttr); + WRITE3Response response = new WRITE3Response(Nfs3Status.NFS3_OK, + fileWcc, count, stableHow, Nfs3Constant.WRITE_COMMIT_VERF); + Nfs3Utils + .writeChannel(channel, response.writeHeaderAndResponse(new XDR(), + xid, new VerifierNone()), xid); + writeCtx.setReplied(true); } - response = processPerfectOverWrite(dfsClient, offset, count, stableHow, - request.getData().array(), - Nfs3Utils.getFileIdPath(request.getHandle()), wccData, iug); } - - updateLastAccessTime(); - Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( - new XDR(), xid, new VerifierNone()), xid); } } @@ -436,7 +498,6 @@ class OpenFileCtx { private WRITE3Response processPerfectOverWrite(DFSClient dfsClient, long offset, int count, WriteStableHow stableHow, byte[] data, String path, WccData wccData, IdUserGroup iug) { - assert (ctxLock.isLocked()); WRITE3Response response = null; // Read the content back @@ -447,21 +508,30 @@ class OpenFileCtx { try { // Sync file data and length to avoid partial read failure fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); - + } catch (ClosedChannelException closedException) { + LOG.info("The FSDataOutputStream has been closed. " + + "Continue processing the perfect overwrite."); + } catch (IOException e) { + LOG.info("hsync failed when processing possible perfect overwrite, path=" + + path + " error:" + e); + return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, + Nfs3Constant.WRITE_COMMIT_VERF); + } + + try { fis = new FSDataInputStream(dfsClient.open(path)); readCount = fis.read(offset, readbuffer, 0, count); if (readCount < count) { LOG.error("Can't read back " + count + " bytes, partial read size:" + readCount); - return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, - stableHow, Nfs3Constant.WRITE_COMMIT_VERF); + return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, + Nfs3Constant.WRITE_COMMIT_VERF); } - } catch (IOException e) { LOG.info("Read failed when processing possible perfect overwrite, path=" + path + " error:" + e); - return response = new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, - stableHow, Nfs3Constant.WRITE_COMMIT_VERF); + return new WRITE3Response(Nfs3Status.NFS3ERR_IO, wccData, 0, stableHow, + Nfs3Constant.WRITE_COMMIT_VERF); } finally { IOUtils.cleanup(LOG, fis); } @@ -492,40 +562,26 @@ class OpenFileCtx { } return response; } - - public final static int COMMIT_FINISHED = 0; - public final static int COMMIT_WAIT = 1; - public final static int COMMIT_INACTIVE_CTX = 2; - public final static int COMMIT_ERROR = 3; /** * return one commit status: COMMIT_FINISHED, COMMIT_WAIT, * COMMIT_INACTIVE_CTX, COMMIT_ERROR */ public int checkCommit(long commitOffset) { - int ret = COMMIT_WAIT; - - lockCtx(); - try { - if (!activeState) { - ret = COMMIT_INACTIVE_CTX; - } else { - ret = checkCommitInternal(commitOffset); - } - } finally { - unlockCtx(); - } - return ret; + return activeState ? checkCommitInternal(commitOffset) + : COMMIT_INACTIVE_CTX; } private int checkCommitInternal(long commitOffset) { if (commitOffset == 0) { // Commit whole file - commitOffset = getNextOffsetUnprotected(); + commitOffset = nextOffset.get(); } long flushed = getFlushedOffset(); - LOG.info("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset); + if (LOG.isDebugEnabled()) { + LOG.debug("getFlushedOffset=" + flushed + " commitOffset=" + commitOffset); + } if (flushed < commitOffset) { // Keep stream active updateLastAccessTime(); @@ -538,6 +594,13 @@ class OpenFileCtx { fos.hsync(EnumSet.of(SyncFlag.UPDATE_LENGTH)); // Nothing to do for metadata since attr related change is pass-through ret = COMMIT_FINISHED; + } catch (ClosedChannelException cce) { + ret = COMMIT_INACTIVE_CTX; + if (pendingWrites.isEmpty()) { + ret = COMMIT_INACTIVE_CTX; + } else { + ret = COMMIT_INACTIVE_WITH_PENDING_WRITE; + } } catch (IOException e) { LOG.error("Got stream error during data sync:" + e); // Do nothing. Stream will be closed eventually by StreamMonitor. @@ -550,18 +613,16 @@ class OpenFileCtx { } private void addWrite(WriteCtx writeCtx) { - assert (ctxLock.isLocked()); long offset = writeCtx.getOffset(); int count = writeCtx.getCount(); pendingWrites.put(new OffsetRange(offset, offset + count), writeCtx); } - /** * Check stream status to decide if it should be closed * @return true, remove stream; false, keep stream */ - public boolean streamCleanup(long fileId, long streamTimeout) { + public synchronized boolean streamCleanup(long fileId, long streamTimeout) { if (streamTimeout < WriteManager.MINIMIUM_STREAM_TIMEOUT) { throw new InvalidParameterException("StreamTimeout" + streamTimeout + "ms is less than MINIMIUM_STREAM_TIMEOUT " @@ -569,107 +630,97 @@ class OpenFileCtx { } boolean flag = false; - if (!ctxLock.tryLock()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Another thread is working on it" + ctxLock.toString()); + // Check the stream timeout + if (checkStreamTimeout(streamTimeout)) { + if (LOG.isDebugEnabled()) { + LOG.debug("closing stream for fileId:" + fileId); } - return flag; - } - - try { - // Check the stream timeout - if (checkStreamTimeout(streamTimeout)) { - LOG.info("closing stream for fileId:" + fileId); - cleanup(); - flag = true; - } - } finally { - unlockCtx(); + cleanup(); + flag = true; } return flag; } - // Invoked by AsynDataService to do the write back - public void executeWriteBack() { - long nextOffset; - OffsetRange key; - WriteCtx writeCtx; - + /** + * Get (and remove) the next WriteCtx from {@link #pendingWrites} if possible. + * + * @return Null if {@link #pendingWrites} is null, or the next WriteCtx's + * offset is larger than nextOffSet. + */ + private synchronized WriteCtx offerNextToWrite() { + if (pendingWrites.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("The asyn write task has no pending writes, fileId: " + + latestAttr.getFileId()); + } + this.asyncStatus = false; + } else { + Entry lastEntry = pendingWrites.lastEntry(); + OffsetRange range = lastEntry.getKey(); + WriteCtx toWrite = lastEntry.getValue(); + + if (LOG.isTraceEnabled()) { + LOG.trace("range.getMin()=" + range.getMin() + " nextOffset=" + + nextOffset); + } + + long offset = nextOffset.get(); + if (range.getMin() > offset) { + if (LOG.isDebugEnabled()) { + LOG.debug("The next sequencial write has not arrived yet"); + } + this.asyncStatus = false; + } else if (range.getMin() < offset && range.getMax() > offset) { + // shouldn't happen since we do sync for overlapped concurrent writers + LOG.warn("Got a overlapping write (" + range.getMin() + "," + + range.getMax() + "), nextOffset=" + offset + + ". Silently drop it now"); + pendingWrites.remove(range); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Remove write(" + range.getMin() + "-" + range.getMax() + + ") from the list"); + } + // after writing, remove the WriteCtx from cache + pendingWrites.remove(range); + // update nextOffset + nextOffset.addAndGet(toWrite.getCount()); + if (LOG.isDebugEnabled()) { + LOG.debug("Change nextOffset to " + nextOffset.get()); + } + return toWrite; + } + } + return null; + } + + /** Invoked by AsynDataService to write back to HDFS */ + void executeWriteBack() { + Preconditions.checkState(asyncStatus, + "The openFileCtx has false async status"); try { - // Don't lock OpenFileCtx for all writes to reduce the timeout of other - // client request to the same file - while (true) { - lockCtx(); - if (!asyncStatus) { - // This should never happen. There should be only one thread working - // on one OpenFileCtx anytime. - LOG.fatal("The openFileCtx has false async status"); - throw new RuntimeException("The openFileCtx has false async status"); - } - // Any single write failure can change activeState to false, so do the - // check each loop. - if (pendingWrites.isEmpty()) { - if (LOG.isDebugEnabled()) { - LOG.debug("The asyn write task has no pendding writes, fileId: " - + latestAttr.getFileId()); - } - break; - } - if (!activeState) { - if (LOG.isDebugEnabled()) { - LOG.debug("The openFileCtx is not active anymore, fileId: " - + latestAttr.getFileId()); - } - break; - } - - // Get the next sequential write - nextOffset = getNextOffsetUnprotected(); - key = pendingWrites.firstKey(); - if (LOG.isTraceEnabled()) { - LOG.trace("key.getMin()=" + key.getMin() + " nextOffset=" - + nextOffset); - } - - if (key.getMin() > nextOffset) { - if (LOG.isDebugEnabled()) { - LOG.info("The next sequencial write has not arrived yet"); - } - break; - - } else if (key.getMin() < nextOffset && key.getMax() > nextOffset) { - // Can't handle overlapping write. Didn't see it in tests yet. - LOG.fatal("Got a overlapping write (" + key.getMin() + "," - + key.getMax() + "), nextOffset=" + nextOffset); - throw new RuntimeException("Got a overlapping write (" + key.getMin() - + "," + key.getMax() + "), nextOffset=" + nextOffset); - - } else { - if (LOG.isTraceEnabled()) { - LOG.trace("Remove write(" + key.getMin() + "-" + key.getMax() - + ") from the list"); - } - writeCtx = pendingWrites.remove(key); + while (activeState) { + WriteCtx toWrite = offerNextToWrite(); + if (toWrite != null) { // Do the write - doSingleWrite(writeCtx); + doSingleWrite(toWrite); updateLastAccessTime(); + } else { + break; } - - unlockCtx(); } - + + if (!activeState && LOG.isDebugEnabled()) { + LOG.debug("The openFileCtx is not active anymore, fileId: " + + +latestAttr.getFileId()); + } } finally { - // Always reset the async status so another async task can be created - // for this file + // make sure we reset asyncStatus to false asyncStatus = false; - if (ctxLock.isHeldByCurrentThread()) { - unlockCtx(); - } } } private void doSingleWrite(final WriteCtx writeCtx) { - assert(ctxLock.isLocked()); Channel channel = writeCtx.getChannel(); int xid = writeCtx.getXid(); @@ -679,20 +730,25 @@ class OpenFileCtx { byte[] data = null; try { data = writeCtx.getData(); - } catch (IOException e1) { + } catch (Exception e1) { LOG.error("Failed to get request data offset:" + offset + " count:" + count + " error:" + e1); // Cleanup everything cleanup(); return; } - assert (data.length == count); + + Preconditions.checkState(data.length == count); FileHandle handle = writeCtx.getHandle(); - LOG.info("do write, fileId: " + handle.getFileId() + " offset: " + offset - + " length:" + count + " stableHow:" + stableHow.getValue()); + if (LOG.isDebugEnabled()) { + LOG.debug("do write, fileId: " + handle.getFileId() + " offset: " + + offset + " length:" + count + " stableHow:" + stableHow.getValue()); + } try { + // The write is not protected by lock. asyncState is used to make sure + // there is one thread doing write back at any time fos.write(data, 0, count); long flushedOffset = getFlushedOffset(); @@ -701,11 +757,20 @@ class OpenFileCtx { + flushedOffset + " and nextOffset should be" + (offset + count)); } - nextOffset = flushedOffset; + + if (LOG.isDebugEnabled()) { + LOG.debug("After writing " + handle.getFileId() + " at offset " + + offset + ", update the memory count."); + } // Reduce memory occupation size if request was allowed dumped - if (writeCtx.getDataState() == DataState.ALLOW_DUMP) { - updateNonSequentialWriteInMemory(-count); + if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { + synchronized (writeCtx) { + if (writeCtx.getDataState() == WriteCtx.DataState.ALLOW_DUMP) { + writeCtx.setDataState(WriteCtx.DataState.NO_DUMP); + updateNonSequentialWriteInMemory(-count); + } + } } if (!writeCtx.getReplied()) { @@ -716,7 +781,6 @@ class OpenFileCtx { Nfs3Utils.writeChannel(channel, response.writeHeaderAndResponse( new XDR(), xid, new VerifierNone()), xid); } - } catch (IOException e) { LOG.error("Error writing to fileId " + handle.getFileId() + " at offset " + offset + " and length " + data.length, e); @@ -733,9 +797,21 @@ class OpenFileCtx { } } - private void cleanup() { - assert(ctxLock.isLocked()); + private synchronized void cleanup() { + if (!activeState) { + LOG.info("Current OpenFileCtx is already inactive, no need to cleanup."); + return; + } activeState = false; + + // stop the dump thread + if (dumpThread != null) { + dumpThread.interrupt(); + try { + dumpThread.join(3000); + } catch (InterruptedException e) { + } + } // Close stream try { @@ -753,7 +829,7 @@ class OpenFileCtx { while (!pendingWrites.isEmpty()) { OffsetRange key = pendingWrites.firstKey(); LOG.info("Fail pending write: (" + key.getMin() + "," + key.getMax() - + "), nextOffset=" + getNextOffsetUnprotected()); + + "), nextOffset=" + nextOffset.get()); WriteCtx writeCtx = pendingWrites.remove(key); if (!writeCtx.getReplied()) { @@ -767,23 +843,23 @@ class OpenFileCtx { } // Cleanup dump file - if (dumpOut!=null){ + if (dumpOut != null) { try { dumpOut.close(); } catch (IOException e) { e.printStackTrace(); } + File dumpFile = new File(dumpFilePath); + if (dumpFile.exists() && !dumpFile.delete()) { + LOG.error("Failed to delete dumpfile: " + dumpFile); + } } - if (raf!=null) { + if (raf != null) { try { raf.close(); } catch (IOException e) { e.printStackTrace(); } } - File dumpFile = new File(dumpFilePath); - if (dumpFile.delete()) { - LOG.error("Failed to delete dumpfile: "+ dumpFile); - } } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java index a034aba61be..f1af6520940 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteCtx.java @@ -27,6 +27,8 @@ import org.apache.hadoop.nfs.nfs3.FileHandle; import org.apache.hadoop.nfs.nfs3.Nfs3Constant.WriteStableHow; import org.jboss.netty.channel.Channel; +import com.google.common.base.Preconditions; + /** * WriteCtx saves the context of one write request, such as request, channel, * xid and reply status. @@ -49,13 +51,21 @@ class WriteCtx { private final long offset; private final int count; private final WriteStableHow stableHow; - private byte[] data; + private volatile byte[] data; private final Channel channel; private final int xid; private boolean replied; - private DataState dataState; + /** + * Data belonging to the same {@link OpenFileCtx} may be dumped to a file. + * After being dumped to the file, the corresponding {@link WriteCtx} records + * the dump file and the offset. + */ + private RandomAccessFile raf; + private long dumpFileOffset; + + private volatile DataState dataState; public DataState getDataState() { return dataState; @@ -64,12 +74,13 @@ class WriteCtx { public void setDataState(DataState dataState) { this.dataState = dataState; } - - private RandomAccessFile raf; - private long dumpFileOffset; - // Return the dumped data size - public long dumpData(FileOutputStream dumpOut, RandomAccessFile raf) + /** + * Writing the data into a local file. After the writing, if + * {@link #dataState} is still ALLOW_DUMP, set {@link #data} to null and set + * {@link #dataState} to DUMPED. + */ + long dumpData(FileOutputStream dumpOut, RandomAccessFile raf) throws IOException { if (dataState != DataState.ALLOW_DUMP) { if (LOG.isTraceEnabled()) { @@ -84,48 +95,63 @@ class WriteCtx { if (LOG.isDebugEnabled()) { LOG.debug("After dump, new dumpFileOffset:" + dumpFileOffset); } - data = null; - dataState = DataState.DUMPED; - return count; + // it is possible that while we dump the data, the data is also being + // written back to HDFS. After dump, if the writing back has not finished + // yet, we change its flag to DUMPED and set the data to null. Otherwise + // this WriteCtx instance should have been removed from the buffer. + if (dataState == DataState.ALLOW_DUMP) { + synchronized (this) { + if (dataState == DataState.ALLOW_DUMP) { + data = null; + dataState = DataState.DUMPED; + return count; + } + } + } + return 0; } - public FileHandle getHandle() { + FileHandle getHandle() { return handle; } - public long getOffset() { + long getOffset() { return offset; } - public int getCount() { + int getCount() { return count; } - public WriteStableHow getStableHow() { + WriteStableHow getStableHow() { return stableHow; } - public byte[] getData() throws IOException { + byte[] getData() throws IOException { if (dataState != DataState.DUMPED) { - if (data == null) { - throw new IOException("Data is not dumpted but has null:" + this); - } - } else { - // read back - if (data != null) { - throw new IOException("Data is dumpted but not null"); - } - data = new byte[count]; - raf.seek(dumpFileOffset); - int size = raf.read(data, 0, count); - if (size != count) { - throw new IOException("Data count is " + count + ", but read back " - + size + "bytes"); + synchronized (this) { + if (dataState != DataState.DUMPED) { + Preconditions.checkState(data != null); + return data; + } } } + // read back from dumped file + this.loadData(); return data; } + private void loadData() throws IOException { + Preconditions.checkState(data == null); + data = new byte[count]; + raf.seek(dumpFileOffset); + int size = raf.read(data, 0, count); + if (size != count) { + throw new IOException("Data count is " + count + ", but read back " + + size + "bytes"); + } + } + Channel getChannel() { return channel; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java index 4cfc3aeb4ce..1471ddfc1f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/nfs3/WriteManager.java @@ -67,8 +67,8 @@ public class WriteManager { */ private long streamTimeout; - public static final long DEFAULT_STREAM_TIMEOUT = 10 * 1000; // 10 second - public static final long MINIMIUM_STREAM_TIMEOUT = 1 * 1000; // 1 second + public static final long DEFAULT_STREAM_TIMEOUT = 10 * 60 * 1000; //10 minutes + public static final long MINIMIUM_STREAM_TIMEOUT = 10 * 1000; //10 seconds void addOpenFileStream(FileHandle h, OpenFileCtx ctx) { openFileMap.put(h, ctx); @@ -215,6 +215,10 @@ public class WriteManager { LOG.info("Inactive stream, fileId=" + fileHandle.getFileId() + " commitOffset=" + commitOffset); return true; + } else if (ret == OpenFileCtx.COMMIT_INACTIVE_WITH_PENDING_WRITE) { + LOG.info("Inactive stream with pending writes, fileId=" + + fileHandle.getFileId() + " commitOffset=" + commitOffset); + return false; } assert (ret == OpenFileCtx.COMMIT_WAIT || ret == OpenFileCtx.COMMIT_ERROR); if (ret == OpenFileCtx.COMMIT_ERROR) { diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java index 0759a7a6647..034ffcd2721 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/test/java/org/apache/hadoop/hdfs/nfs/nfs3/TestOffsetRange.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.nfs.nfs3; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import java.io.IOException; @@ -51,8 +52,9 @@ public class TestOffsetRange { OffsetRange r3 = new OffsetRange(1, 3); OffsetRange r4 = new OffsetRange(3, 4); - assertTrue(r2.compareTo(r3) == 0); - assertTrue(r2.compareTo(r1) == 1); - assertTrue(r2.compareTo(r4) == -1); + assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r3)); + assertEquals(0, OffsetRange.ReverseComparatorOnMin.compare(r2, r2)); + assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r1) < 0); + assertTrue(OffsetRange.ReverseComparatorOnMin.compare(r2, r4) > 0); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 443090787df..cb313b6f5ef 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -286,8 +286,12 @@ Release 2.3.0 - UNRELEASED and the excludedNodes parameter types respectively to Node and Set. (Junping Du via szetszwo) + HDFS-5240. Separate formatting from logging in the audit logger API (daryn) + OPTIMIZATIONS + HDFS-5239. Allow FSNamesystem lock fairness to be configurable (daryn) + BUG FIXES HDFS-5034. Remove debug prints from GetFileLinkInfo (Andrew Wang via Colin Patrick McCabe) @@ -321,6 +325,8 @@ Release 2.2.0 - UNRELEASED BUG FIXES + HDFS-5139. Remove redundant -R option from setrep. + Release 2.1.1-beta - 2013-09-23 INCOMPATIBLE CHANGES @@ -408,6 +414,9 @@ Release 2.1.1-beta - 2013-09-23 HDFS-5212. Refactor RpcMessage and NFS3Response to support different types of authentication information. (jing9) + HDFS-4971. Move IO operations out of locking in OpenFileCtx. (brandonli and + jing9) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 9798eae18b8..ebf69dff20e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -443,7 +443,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private final long accessTimePrecision; /** Lock to protect FSNamesystem. */ - private ReentrantReadWriteLock fsLock = new ReentrantReadWriteLock(true); + private ReentrantReadWriteLock fsLock; /** * Used when this NN is in standby state to read from the shared edit log. @@ -618,6 +618,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats, */ FSNamesystem(Configuration conf, FSImage fsImage, boolean ignoreRetryCache) throws IOException { + boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true); + LOG.info("fsLock is fair:" + fair); + fsLock = new ReentrantReadWriteLock(fair); try { resourceRecheckInterval = conf.getLong( DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY, @@ -6893,10 +6896,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } sb.append(trackingId); } - auditLog.info(sb); + logAuditMessage(sb.toString()); } } + public void logAuditMessage(String message) { + auditLog.info(message); + } } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java index 42873785f37..63bf317a5a3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSShell.java @@ -61,6 +61,8 @@ import static org.junit.Assert.*; public class TestDFSShell { private static final Log LOG = LogFactory.getLog(TestDFSShell.class); private static AtomicInteger counter = new AtomicInteger(); + private final int SUCCESS = 0; + private final int ERROR = 1; static final String TEST_ROOT_DIR = PathUtils.getTestDirName(TestDFSShell.class); @@ -1619,9 +1621,6 @@ public class TestDFSShell { // force Copy Option is -f @Test (timeout = 30000) public void testCopyCommandsWithForceOption() throws Exception { - final int SUCCESS = 0; - final int ERROR = 1; - Configuration conf = new Configuration(); MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) .format(true).build(); @@ -1682,7 +1681,55 @@ public class TestDFSShell { } cluster.shutdown(); } + } + // setrep for file and directory. + @Test (timeout = 30000) + public void testSetrep() throws Exception { + + Configuration conf = new Configuration(); + MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf).numDataNodes(1) + .format(true).build(); + FsShell shell = null; + FileSystem fs = null; + + final String testdir1 = "/tmp/TestDFSShell-testSetrep-" + counter.getAndIncrement(); + final String testdir2 = testdir1 + "/nestedDir"; + final Path hdfsFile1 = new Path(testdir1, "testFileForSetrep"); + final Path hdfsFile2 = new Path(testdir2, "testFileForSetrep"); + final Short oldRepFactor = new Short((short) 1); + final Short newRepFactor = new Short((short) 3); + try { + String[] argv; + cluster.waitActive(); + fs = cluster.getFileSystem(); + assertThat(fs.mkdirs(new Path(testdir2)), is(true)); + shell = new FsShell(conf); + + fs.create(hdfsFile1, true).close(); + fs.create(hdfsFile2, true).close(); + + // Tests for setrep on a file. + argv = new String[] { "-setrep", newRepFactor.toString(), hdfsFile1.toString() }; + assertThat(shell.run(argv), is(SUCCESS)); + assertThat(fs.getFileStatus(hdfsFile1).getReplication(), is(newRepFactor)); + assertThat(fs.getFileStatus(hdfsFile2).getReplication(), is(oldRepFactor)); + + // Tests for setrep + + // Tests for setrep on a directory and make sure it is applied recursively. + argv = new String[] { "-setrep", newRepFactor.toString(), testdir1 }; + assertThat(shell.run(argv), is(SUCCESS)); + assertThat(fs.getFileStatus(hdfsFile1).getReplication(), is(newRepFactor)); + assertThat(fs.getFileStatus(hdfsFile2).getReplication(), is(newRepFactor)); + + } finally { + if (shell != null) { + shell.close(); + } + + cluster.shutdown(); + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java index 01ea90a32dd..a2bd172d5e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java @@ -20,8 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_NAME_DIR_KEY; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; +import static org.junit.Assert.*; import java.io.File; import java.io.IOException; @@ -142,4 +141,21 @@ public class TestFSNamesystem { assertTrue("Replication queues weren't being populated after entering " + "safemode 2nd time", fsn.isPopulatingReplQueues()); } + + @Test + public void testFsLockFairness() throws IOException, InterruptedException{ + Configuration conf = new Configuration(); + + FSEditLog fsEditLog = Mockito.mock(FSEditLog.class); + FSImage fsImage = Mockito.mock(FSImage.class); + Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog); + + conf.setBoolean("dfs.namenode.fslock.fair", true); + FSNamesystem fsNamesystem = new FSNamesystem(conf, fsImage); + assertTrue(fsNamesystem.getFsLockForTests().isFair()); + + conf.setBoolean("dfs.namenode.fslock.fair", false); + fsNamesystem = new FSNamesystem(conf, fsImage); + assertFalse(fsNamesystem.getFsLockForTests().isFair()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml index 563d51a841d..16a44ff4cf8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/testHDFSConf.xml @@ -6049,7 +6049,7 @@ -fs NAMENODE -mkdir /dir0 -fs NAMENODE -touchz /dir0/file0 -fs NAMENODE -touchz /dir0/file1 - -fs NAMENODE -setrep -R 2 /dir0 + -fs NAMENODE -setrep 2 /dir0 -fs NAMENODE -rm -r /user @@ -6072,7 +6072,7 @@ -fs NAMENODE -mkdir -p dir0 -fs NAMENODE -touchz dir0/file0 -fs NAMENODE -touchz dir0/file1 - -fs NAMENODE -setrep -R 2 dir0 + -fs NAMENODE -setrep 2 dir0 -fs NAMENODE -rm -r /user @@ -6089,6 +6089,24 @@ + + setrep: -R ignored for existing file + + -fs NAMENODE -mkdir -p dir0 + -fs NAMENODE -touchz dir0/file0 + -fs NAMENODE -setrep -R 2 dir0/file0 + + + -fs NAMENODE -rm -r /user + + + + RegexpComparator + ^Replication 2 set: dir0/file0 + + + + setrep: non existent file (absolute path) @@ -6145,7 +6163,7 @@ -fs NAMENODE -mkdir hdfs:///dir0/ -fs NAMENODE -touchz hdfs:///dir0/file0 -fs NAMENODE -touchz hdfs:///dir0/file1 - -fs NAMENODE -setrep -R 2 hdfs:///dir0 + -fs NAMENODE -setrep 2 hdfs:///dir0 -fs NAMENODE -rm -r hdfs:///* @@ -6203,7 +6221,7 @@ -fs NAMENODE -mkdir -p NAMENODE/dir0 -fs NAMENODE -touchz NAMENODE/dir0/file0 -fs NAMENODE -touchz NAMENODE/dir0/file1 - -fs NAMENODE -setrep -R 2 NAMENODE/dir0 + -fs NAMENODE -setrep 2 NAMENODE/dir0 -fs NAMENODE -rm -r NAMENODE/* diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index c10f96013fc..5bee07164d9 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -178,6 +178,9 @@ Release 2.3.0 - UNRELEASED MAPREDUCE-5404. HSAdminServer does not use ephemeral ports in minicluster mode (Ted Yu via jlowe) + MAPREDUCE-5522. Incorrect oreder expected from JobQueueInfo (Jinghui Wang + via bobby) + Release 2.2.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java index 4313e4080e2..6f3424707b9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestQueue.java @@ -128,10 +128,15 @@ public class TestQueue { assertEquals(secondSubQueue.getJobQueueInfo().getChildren().size(), 0); // test assertEquals(manager.getSchedulerInfo("first"), "queueInfo"); - assertEquals(manager.getJobQueueInfos()[0].getQueueName(), secondSubQueue - .getJobQueueInfo().getQueueName()); - assertEquals(manager.getJobQueueInfos()[1].getQueueName(), firstSubQueue - .getJobQueueInfo().getQueueName()); + Set queueJobQueueInfos = new HashSet(); + for(JobQueueInfo jobInfo : manager.getJobQueueInfos()){ + queueJobQueueInfos.add(jobInfo.getQueueName()); + } + Set rootJobQueueInfos = new HashSet(); + for(Queue queue : root.getChildren()){ + rootJobQueueInfos.add(queue.getJobQueueInfo().getQueueName()); + } + assertEquals(queueJobQueueInfos, rootJobQueueInfos); // test getJobQueueInfoMapping assertEquals( manager.getJobQueueInfoMapping().get("first").getQueueName(), "first"); diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 4b41f79b697..d3e5158dc3d 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -43,6 +43,9 @@ Release 2.3.0 - UNRELEASED YARN-1060. Two tests in TestFairScheduler are missing @Test annotation (Niranjan Singh via Sandy Ryza) + YARN-1188. The context of QueueMetrics becomes default when using + FairScheduler (Tsuyoshi Ozawa via Sandy Ryza) + Release 2.2.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java index 6305a6c2348..ff0956e5f74 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueMetrics.java @@ -21,12 +21,14 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +@Metrics(context="yarn") public class FSQueueMetrics extends QueueMetrics { @Metric("Fair share of memory in MB") MutableGaugeInt fairShareMB;