From db437b7cbb7cab883302c88a95f6388fae8c7afa Mon Sep 17 00:00:00 2001 From: Chris Nauroth Date: Wed, 18 Jun 2014 23:27:45 +0000 Subject: [PATCH 1/8] HADOOP-10716. Cannot use more than 1 har filesystem. Contributed by Rushabh Shah. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603668 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-common-project/hadoop-common/CHANGES.txt | 3 +++ .../hadoop-common/src/main/resources/core-default.xml | 5 +++++ 2 files changed, 8 insertions(+) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 121d5d6d1e5..f732e80bc2f 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -573,6 +573,9 @@ Release 2.5.0 - UNRELEASED HADOOP-10660. GraphiteSink should implement Closeable (Chen He and Ted Yu via raviprak) + HADOOP-10716. Cannot use more than 1 har filesystem. + (Rushabh Shah via cnauroth) + BREAKDOWN OF HADOOP-10514 SUBTASKS AND RELATED JIRAS HADOOP-10520. Extended attributes definition and FileSystem APIs for diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 0dbb53f3bd8..aef6582431e 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -1358,4 +1358,9 @@ true. + + fs.har.impl.disable.cache + true + Don't cache 'har' filesystem instances. + From bd23a2ff22dba8a5203e8e498244f985e728da51 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Wed, 18 Jun 2014 23:28:50 +0000 Subject: [PATCH 2/8] MAPREDUCE-5896. InputSplits should indicate which locations have the block cached in memory. (Sandy Ryza via kasha) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603670 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../apache/hadoop/mapred/FileInputFormat.java | 50 +++++++++++++++---- .../org/apache/hadoop/mapred/FileSplit.java | 22 +++++++- .../mapred/InputSplitWithLocationInfo.java | 39 +++++++++++++++ .../hadoop/mapred/SplitLocationInfo.java | 46 +++++++++++++++++ .../apache/hadoop/mapreduce/InputSplit.java | 17 +++++++ .../mapreduce/lib/input/FileInputFormat.java | 19 +++++-- .../hadoop/mapreduce/lib/input/FileSplit.java | 34 +++++++++++++ .../hadoop/mapred/TestFileInputFormat.java | 28 ++++++++++- .../lib/input/TestFileInputFormat.java | 29 +++++++++-- 10 files changed, 269 insertions(+), 18 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java create mode 100644 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index f3c4d0aa473..ae327063310 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -213,6 +213,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5834. Increased test-timeouts in TestGridMixClasses to avoid occassional failures. (Mit Desai via vinodkv) + MAPREDUCE-5896. InputSplits should indicate which locations have the block + cached in memory. (Sandy Ryza via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java index 9863427076e..0ae56717ab9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileInputFormat.java @@ -295,6 +295,15 @@ public abstract class FileInputFormat implements InputFormat { String[] hosts) { return new FileSplit(file, start, length, hosts); } + + /** + * A factory that makes the split for this class. It can be overridden + * by sub-classes to make sub-types + */ + protected FileSplit makeSplit(Path file, long start, long length, + String[] hosts, String[] inMemoryHosts) { + return new FileSplit(file, start, length, hosts, inMemoryHosts); + } /** Splits files returned by {@link #listStatus(JobConf)} when * they're too big.*/ @@ -337,22 +346,22 @@ public abstract class FileInputFormat implements InputFormat { long bytesRemaining = length; while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { - String[] splitHosts = getSplitHosts(blkLocations, + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length-bytesRemaining, splitSize, clusterMap); splits.add(makeSplit(path, length-bytesRemaining, splitSize, - splitHosts)); + splitHosts[0], splitHosts[1])); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { - String[] splitHosts = getSplitHosts(blkLocations, length + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, - splitHosts)); + splitHosts[0], splitHosts[1])); } } else { - String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap); - splits.add(makeSplit(path, 0, length, splitHosts)); + String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap); + splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1])); } } else { //Create empty hosts array for zero length files @@ -538,10 +547,30 @@ public abstract class FileInputFormat implements InputFormat { * @param blkLocations The list of block locations * @param offset * @param splitSize - * @return array of hosts that contribute most to this split + * @return an array of hosts that contribute most to this split * @throws IOException */ protected String[] getSplitHosts(BlockLocation[] blkLocations, + long offset, long splitSize, NetworkTopology clusterMap) throws IOException { + return getSplitHostsAndCachedHosts(blkLocations, offset, splitSize, + clusterMap)[0]; + } + + /** + * This function identifies and returns the hosts that contribute + * most for a given split. For calculating the contribution, rack + * locality is treated on par with host locality, so hosts from racks + * that contribute the most are preferred over hosts on racks that + * contribute less + * @param blkLocations The list of block locations + * @param offset + * @param splitSize + * @return two arrays - one of hosts that contribute most to this split, and + * one of hosts that contribute most to this split that have the data + * cached on them + * @throws IOException + */ + private String[][] getSplitHostsAndCachedHosts(BlockLocation[] blkLocations, long offset, long splitSize, NetworkTopology clusterMap) throws IOException { @@ -552,7 +581,8 @@ public abstract class FileInputFormat implements InputFormat { //If this is the only block, just return if (bytesInThisBlock >= splitSize) { - return blkLocations[startIndex].getHosts(); + return new String[][] { blkLocations[startIndex].getHosts(), + blkLocations[startIndex].getCachedHosts() }; } long bytesInFirstBlock = bytesInThisBlock; @@ -639,7 +669,9 @@ public abstract class FileInputFormat implements InputFormat { } // for all indices - return identifyHosts(allTopos.length, racksMap); + // We don't yet support cached hosts when bytesInThisBlock > splitSize + return new String[][] { identifyHosts(allTopos.length, racksMap), + new String[0]}; } private String[] identifyHosts(int replicationFactor, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java index fb1c651a9a9..c38f2f78f83 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileSplit.java @@ -24,6 +24,7 @@ import java.io.DataOutput; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; /** A section of an input file. Returned by {@link @@ -33,7 +34,7 @@ import org.apache.hadoop.fs.Path; @InterfaceAudience.Public @InterfaceStability.Stable public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit - implements InputSplit { + implements InputSplitWithLocationInfo { org.apache.hadoop.mapreduce.lib.input.FileSplit fs; protected FileSplit() { fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(); @@ -62,6 +63,20 @@ public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit length, hosts); } + /** Constructs a split with host information + * + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process + * @param hosts the list of hosts containing the block, possibly null + * @param inMemoryHosts the list of hosts containing the block in memory + */ + public FileSplit(Path file, long start, long length, String[] hosts, + String[] inMemoryHosts) { + fs = new org.apache.hadoop.mapreduce.lib.input.FileSplit(file, start, + length, hosts, inMemoryHosts); + } + public FileSplit(org.apache.hadoop.mapreduce.lib.input.FileSplit fs) { this.fs = fs; } @@ -92,4 +107,9 @@ public class FileSplit extends org.apache.hadoop.mapreduce.InputSplit return fs.getLocations(); } + @Override + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return fs.getLocationInfo(); + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java new file mode 100644 index 00000000000..bb95882188a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/InputSplitWithLocationInfo.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Public +@Evolving +public interface InputSplitWithLocationInfo extends InputSplit { + /** + * Gets info about which nodes the input split is stored on and how it is + * stored at each location. + * + * @return list of SplitLocationInfos describing how the split + * data is stored at each location. A null value indicates that all the + * locations have the data stored on disk. + * @throws IOException + */ + SplitLocationInfo[] getLocationInfo() throws IOException; +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java new file mode 100644 index 00000000000..a8e69fb52dd --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/SplitLocationInfo.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapred; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +@Public +@Evolving +public class SplitLocationInfo { + private boolean inMemory; + private String location; + + public SplitLocationInfo(String location, boolean inMemory) { + this.location = location; + this.inMemory = inMemory; + } + + public boolean isOnDisk() { + return true; + } + + public boolean isInMemory() { + return inMemory; + } + + public String getLocation() { + return location; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java index 95d4a8c4796..515b423f935 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/InputSplit.java @@ -22,6 +22,8 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.RecordReader; @@ -51,10 +53,25 @@ public abstract class InputSplit { /** * Get the list of nodes by name where the data for the split would be local. * The locations do not need to be serialized. + * * @return a new array of the node nodes. * @throws IOException * @throws InterruptedException */ public abstract String[] getLocations() throws IOException, InterruptedException; + + /** + * Gets info about which nodes the input split is stored on and how it is + * stored at each location. + * + * @return list of SplitLocationInfos describing how the split + * data is stored at each location. A null value indicates that all the + * locations have the data stored on disk. + * @throws IOException + */ + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return null; + } } \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java index 5f32f11ca0c..56fb9fcdf11 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileInputFormat.java @@ -35,6 +35,7 @@ import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.RemoteIterator; import org.apache.hadoop.mapred.LocatedFileStatusFetcher; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; @@ -359,6 +360,15 @@ public abstract class FileInputFormat extends InputFormat { String[] hosts) { return new FileSplit(file, start, length, hosts); } + + /** + * A factory that makes the split for this class. It can be overridden + * by sub-classes to make sub-types + */ + protected FileSplit makeSplit(Path file, long start, long length, + String[] hosts, String[] inMemoryHosts) { + return new FileSplit(file, start, length, hosts, inMemoryHosts); + } /** * Generate the list of files and make them into FileSplits. @@ -392,17 +402,20 @@ public abstract class FileInputFormat extends InputFormat { while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, splitSize, - blkLocations[blkIndex].getHosts())); + blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts())); bytesRemaining -= splitSize; } if (bytesRemaining != 0) { int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining); splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining, - blkLocations[blkIndex].getHosts())); + blkLocations[blkIndex].getHosts(), + blkLocations[blkIndex].getCachedHosts())); } } else { // not splitable - splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts())); + splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(), + blkLocations[0].getCachedHosts())); } } else { //Create empty hosts array for zero length files diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java index 72c845060e3..9fba79c2a42 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/input/FileSplit.java @@ -22,11 +22,13 @@ import java.io.IOException; import java.io.DataInput; import java.io.DataOutput; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.InterfaceStability.Evolving; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; @@ -41,6 +43,7 @@ public class FileSplit extends InputSplit implements Writable { private long start; private long length; private String[] hosts; + private SplitLocationInfo[] hostInfos; public FileSplit() {} @@ -57,6 +60,31 @@ public class FileSplit extends InputSplit implements Writable { this.length = length; this.hosts = hosts; } + + /** Constructs a split with host and cached-blocks information + * + * @param file the file name + * @param start the position of the first byte in the file to process + * @param length the number of bytes in the file to process + * @param hosts the list of hosts containing the block + * @param inMemoryHosts the list of hosts containing the block in memory + */ + public FileSplit(Path file, long start, long length, String[] hosts, + String[] inMemoryHosts) { + this(file, start, length, hosts); + hostInfos = new SplitLocationInfo[hosts.length]; + for (int i = 0; i < hosts.length; i++) { + // because N will be tiny, scanning is probably faster than a HashSet + boolean inMemory = false; + for (String inMemoryHost : inMemoryHosts) { + if (inMemoryHost.equals(hosts[i])) { + inMemory = true; + break; + } + } + hostInfos[i] = new SplitLocationInfo(hosts[i], inMemory); + } + } /** The file containing this split's data. */ public Path getPath() { return file; } @@ -98,4 +126,10 @@ public class FileSplit extends InputSplit implements Writable { return this.hosts; } } + + @Override + @Evolving + public SplitLocationInfo[] getLocationInfo() throws IOException { + return hostInfos; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java index 0bb4e96470f..ba636b60db7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileInputFormat.java @@ -102,6 +102,29 @@ public class TestFileInputFormat { FileSystem.closeAll(); } + @Test + public void testSplitLocationInfo() throws Exception { + Configuration conf = getConfiguration(); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1/a2"); + JobConf job = new JobConf(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + fileInputFormat.configure(job); + FileSplit[] splits = (FileSplit[]) fileInputFormat.getSplits(job, 1); + String[] locations = splits[0].getLocations(); + Assert.assertEquals(2, locations.length); + SplitLocationInfo[] locationInfo = splits[0].getLocationInfo(); + Assert.assertEquals(2, locationInfo.length); + SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? + locationInfo[0] : locationInfo[1]; + SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? + locationInfo[0] : locationInfo[1]; + Assert.assertTrue(localhostInfo.isOnDisk()); + Assert.assertTrue(localhostInfo.isInMemory()); + Assert.assertTrue(otherhostInfo.isOnDisk()); + Assert.assertFalse(otherhostInfo.isInMemory()); + } + @Test public void testListStatusSimple() throws IOException { Configuration conf = new Configuration(); @@ -223,8 +246,9 @@ public class TestFileInputFormat { public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException { return new BlockLocation[] { - new BlockLocation(new String[] { "localhost:50010" }, - new String[] { "localhost" }, 0, len) }; + new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" }, + new String[] { "localhost", "otherhost" }, new String[] { "localhost" }, + new String[0], 0, len, false) }; } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java index 246c158732f..3f877f11eb9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/input/TestFileInputFormat.java @@ -39,6 +39,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; import org.apache.hadoop.fs.RawLocalFileSystem; import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hadoop.mapred.SplitLocationInfo; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.Job; import org.junit.After; @@ -139,6 +140,28 @@ public class TestFileInputFormat { 1, mockFs.numListLocatedStatusCalls); FileSystem.closeAll(); } + + @Test + public void testSplitLocationInfo() throws Exception { + Configuration conf = getConfiguration(); + conf.set(org.apache.hadoop.mapreduce.lib.input.FileInputFormat.INPUT_DIR, + "test:///a1/a2"); + Job job = Job.getInstance(conf); + TextInputFormat fileInputFormat = new TextInputFormat(); + List splits = fileInputFormat.getSplits(job); + String[] locations = splits.get(0).getLocations(); + Assert.assertEquals(2, locations.length); + SplitLocationInfo[] locationInfo = splits.get(0).getLocationInfo(); + Assert.assertEquals(2, locationInfo.length); + SplitLocationInfo localhostInfo = locations[0].equals("localhost") ? + locationInfo[0] : locationInfo[1]; + SplitLocationInfo otherhostInfo = locations[0].equals("otherhost") ? + locationInfo[0] : locationInfo[1]; + Assert.assertTrue(localhostInfo.isOnDisk()); + Assert.assertTrue(localhostInfo.isInMemory()); + Assert.assertTrue(otherhostInfo.isOnDisk()); + Assert.assertFalse(otherhostInfo.isInMemory()); + } @Test public void testListStatusSimple() throws IOException { @@ -402,9 +425,9 @@ public class TestFileInputFormat { public BlockLocation[] getFileBlockLocations(Path p, long start, long len) throws IOException { return new BlockLocation[] { - new BlockLocation(new String[] { "localhost:50010" }, - new String[] { "localhost" }, 0, len) }; - } + new BlockLocation(new String[] { "localhost:50010", "otherhost:50010" }, + new String[] { "localhost", "otherhost" }, new String[] { "localhost" }, + new String[0], 0, len, false) }; } @Override protected RemoteIterator listLocatedStatus(Path f, From eb93f73ea85cd017786d848fa7e52a31c7d5b199 Mon Sep 17 00:00:00 2001 From: Brandon Li Date: Wed, 18 Jun 2014 23:51:49 +0000 Subject: [PATCH 3/8] HDFS-6553. Add missing DeprecationDeltas for NFS Kerberos configurations. Contributed by Stephen Chu git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603677 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/hdfs/nfs/conf/NfsConfiguration.java | 14 +++++++++++++- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 +++ 2 files changed, 16 insertions(+), 1 deletion(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfiguration.java b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfiguration.java index 5dc53d83ddb..ff927946d16 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfiguration.java +++ b/hadoop-hdfs-project/hadoop-hdfs-nfs/src/main/java/org/apache/hadoop/hdfs/nfs/conf/NfsConfiguration.java @@ -36,6 +36,8 @@ public class NfsConfiguration extends HdfsConfiguration { NfsConfigKeys.DFS_NFS_SERVER_PORT_KEY), new DeprecationDelta("nfs3.mountd.port", NfsConfigKeys.DFS_NFS_MOUNTD_PORT_KEY), + new DeprecationDelta("dfs.nfs.exports.cache.size", + Nfs3Constant.NFS_EXPORTS_CACHE_SIZE_KEY), new DeprecationDelta("dfs.nfs.exports.cache.expirytime.millis", Nfs3Constant.NFS_EXPORTS_CACHE_EXPIRYTIME_MILLIS_KEY), new DeprecationDelta("hadoop.nfs.userupdate.milly", @@ -51,6 +53,16 @@ public class NfsConfiguration extends HdfsConfiguration { new DeprecationDelta("dfs.nfs3.export.point", NfsConfigKeys.DFS_NFS_EXPORT_POINT_KEY), new DeprecationDelta("nfs.allow.insecure.ports", - NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_KEY) }); + NfsConfigKeys.DFS_NFS_PORT_MONITORING_DISABLED_KEY), + new DeprecationDelta("dfs.nfs.keytab.file", + NfsConfigKeys.DFS_NFS_KEYTAB_FILE_KEY), + new DeprecationDelta("dfs.nfs.kerberos.principal", + NfsConfigKeys.DFS_NFS_KERBEROS_PRINCIPAL_KEY), + new DeprecationDelta("dfs.nfs.rtmax", + NfsConfigKeys.DFS_NFS_MAX_READ_TRANSFER_SIZE_KEY), + new DeprecationDelta("dfs.nfs.wtmax", + NfsConfigKeys.DFS_NFS_MAX_WRITE_TRANSFER_SIZE_KEY), + new DeprecationDelta("dfs.nfs.dtmax", + NfsConfigKeys.DFS_NFS_MAX_READDIR_TRANSFER_SIZE_KEY) }); } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index bd848218ef7..d25e8468c1c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -665,6 +665,9 @@ Release 2.5.0 - UNRELEASED HDFS-6559. Fix wrong option "dfsadmin -rollingUpgrade start" in the document. (Akira Ajisaka via Arpit Agarwal) + HDFS-6553. Add missing DeprecationDeltas for NFS Kerberos configurations + (Stephen Chu via brandonli) + BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh) From a4e0ff5e052abad498595ee198b49c5310c9ec0d Mon Sep 17 00:00:00 2001 From: Haohui Mai Date: Thu, 19 Jun 2014 04:13:56 +0000 Subject: [PATCH 4/8] HDFS-6480. Move waitForReady() from FSDirectory to FSNamesystem. Contributed by Haohui Mai. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603705 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../hdfs/server/namenode/Checkpointer.java | 2 +- .../hdfs/server/namenode/FSDirectory.java | 98 ++++--------------- .../hdfs/server/namenode/FSNamesystem.java | 81 ++++++++++++++- .../server/namenode/SecondaryNameNode.java | 2 +- .../TestCommitBlockSynchronization.java | 2 +- .../hdfs/server/namenode/TestFSDirectory.java | 11 --- .../server/namenode/TestFSNamesystem.java | 19 ++++ .../hdfs/server/namenode/TestFsLimits.java | 5 +- 9 files changed, 122 insertions(+), 100 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index d25e8468c1c..b6dc8960b65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -454,6 +454,8 @@ Release 2.5.0 - UNRELEASED HDFS-6530. Fix Balancer documentation. (szetszwo) + HDFS-6480. Move waitForReady() from FSDirectory to FSNamesystem. (wheat9) + OPTIMIZATIONS HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java index 62aefb9c1dd..9327f4382ec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/Checkpointer.java @@ -252,7 +252,7 @@ class Checkpointer extends Daemon { backupNode.namesystem.writeLock(); try { - backupNode.namesystem.dir.setReady(); + backupNode.namesystem.setImageLoaded(); if(backupNode.namesystem.getBlocksTotal() > 0) { backupNode.namesystem.setBlockTotal(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index dc553ab73b9..37d8825c8f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -26,11 +26,10 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.List; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.FileAlreadyExistsException; @@ -83,15 +82,14 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; -/************************************************* - * FSDirectory stores the filesystem directory state. - * It handles writing/loading values to disk, and logging - * changes as we go. - * - * It keeps the filename->blockset mapping always-current - * and logged to disk. - * - *************************************************/ +/** + * Both FSDirectory and FSNamesystem manage the state of the namespace. + * FSDirectory is a pure in-memory data structure, all of whose operations + * happen entirely in memory. In contrast, FSNamesystem persists the operations + * to the disk. + * @see org.apache.hadoop.hdfs.server.namenode.FSNamesystem + **/ +@InterfaceAudience.Private public class FSDirectory implements Closeable { private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) { final INodeDirectory r = new INodeDirectory( @@ -120,7 +118,6 @@ public class FSDirectory implements Closeable { INodeDirectory rootDir; FSImage fsImage; private final FSNamesystem namesystem; - private volatile boolean ready = false; private volatile boolean skipQuotaCheck = false; //skip while consuming edits private final int maxComponentLength; private final int maxDirItems; @@ -132,7 +129,6 @@ public class FSDirectory implements Closeable { // lock to protect the directory and BlockMap private final ReentrantReadWriteLock dirLock; - private final Condition cond; // utility methods to acquire and release read lock and write lock void readLock() { @@ -175,7 +171,6 @@ public class FSDirectory implements Closeable { FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) { this.dirLock = new ReentrantReadWriteLock(true); // fair - this.cond = dirLock.writeLock().newCondition(); rootDir = createRoot(ns); inodeMap = INodeMap.newInstance(rootDir); this.fsImage = fsImage; @@ -231,38 +226,6 @@ public class FSDirectory implements Closeable { return rootDir; } - /** - * Notify that loading of this FSDirectory is complete, and - * it is ready for use - */ - void imageLoadComplete() { - Preconditions.checkState(!ready, "FSDirectory already loaded"); - setReady(); - } - - void setReady() { - if(ready) return; - writeLock(); - try { - setReady(true); - this.nameCache.initialized(); - cond.signalAll(); - } finally { - writeUnlock(); - } - } - - //This is for testing purposes only - @VisibleForTesting - boolean isReady() { - return ready; - } - - // exposed for unit tests - protected void setReady(boolean flag) { - ready = flag; - } - /** * Shutdown the filestore */ @@ -271,22 +234,12 @@ public class FSDirectory implements Closeable { fsImage.close(); } - /** - * Block until the object is ready to be used. - */ - void waitForReady() { - if (!ready) { - writeLock(); - try { - while (!ready) { - try { - cond.await(5000, TimeUnit.MILLISECONDS); - } catch (InterruptedException ignored) { - } - } - } finally { - writeUnlock(); - } + void markNameCacheInitialized() { + writeLock(); + try { + nameCache.initialized(); + } finally { + writeUnlock(); } } @@ -312,7 +265,6 @@ public class FSDirectory implements Closeable { String clientMachine, DatanodeDescriptor clientNode) throws FileAlreadyExistsException, QuotaExceededException, UnresolvedLinkException, SnapshotAccessControlException, AclException { - waitForReady(); long modTime = now(); INodeFile newNode = new INodeFile(namesystem.allocateNewInodeId(), null, @@ -385,8 +337,6 @@ public class FSDirectory implements Closeable { */ BlockInfo addBlock(String path, INodesInPath inodesInPath, Block block, DatanodeStorageInfo[] targets) throws IOException { - waitForReady(); - writeLock(); try { final INodeFile fileINode = inodesInPath.getLastINode().asFile(); @@ -424,8 +374,6 @@ public class FSDirectory implements Closeable { boolean removeBlock(String path, INodeFile fileNode, Block block) throws IOException { Preconditions.checkArgument(fileNode.isUnderConstruction()); - waitForReady(); - writeLock(); try { return unprotectedRemoveBlock(path, fileNode, block); @@ -469,7 +417,6 @@ public class FSDirectory implements Closeable { NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " +src+" to "+dst); } - waitForReady(); writeLock(); try { if (!unprotectedRenameTo(src, dst, mtime)) @@ -492,7 +439,6 @@ public class FSDirectory implements Closeable { NameNode.stateChangeLog.debug("DIR* FSDirectory.renameTo: " + src + " to " + dst); } - waitForReady(); writeLock(); try { if (unprotectedRenameTo(src, dst, mtime, options)) { @@ -1024,7 +970,6 @@ public class FSDirectory implements Closeable { Block[] setReplication(String src, short replication, short[] blockRepls) throws QuotaExceededException, UnresolvedLinkException, SnapshotAccessControlException { - waitForReady(); writeLock(); try { return unprotectedSetReplication(src, replication, blockRepls); @@ -1147,7 +1092,6 @@ public class FSDirectory implements Closeable { writeLock(); try { // actual move - waitForReady(); unprotectedConcat(target, srcs, timestamp); } finally { writeUnlock(); @@ -1230,7 +1174,6 @@ public class FSDirectory implements Closeable { if (NameNode.stateChangeLog.isDebugEnabled()) { NameNode.stateChangeLog.debug("DIR* FSDirectory.delete: " + src); } - waitForReady(); final long filesRemoved; writeLock(); try { @@ -1703,7 +1646,7 @@ public class FSDirectory implements Closeable { long nsDelta, long dsDelta, boolean checkQuota) throws QuotaExceededException { assert hasWriteLock(); - if (!ready) { + if (!namesystem.isImageLoaded()) { //still initializing. do not check or update quotas. return; } @@ -1896,7 +1839,7 @@ public class FSDirectory implements Closeable { */ private void verifyQuotaForRename(INode[] src, INode[] dst) throws QuotaExceededException { - if (!ready || skipQuotaCheck) { + if (!namesystem.isImageLoaded() || skipQuotaCheck) { // Do not check quota if edits log is still being processed return; } @@ -1952,7 +1895,7 @@ public class FSDirectory implements Closeable { void verifyINodeName(byte[] childName) throws HadoopIllegalArgumentException { if (Arrays.equals(HdfsConstants.DOT_SNAPSHOT_DIR_BYTES, childName)) { String s = "\"" + HdfsConstants.DOT_SNAPSHOT_DIR + "\" is a reserved name."; - if (!ready) { + if (!namesystem.isImageLoaded()) { s += " Please rename it before upgrade."; } throw new HadoopIllegalArgumentException(s); @@ -1979,7 +1922,7 @@ public class FSDirectory implements Closeable { getFullPathName((INode[])parentPath, pos - 1): (String)parentPath; final PathComponentTooLongException e = new PathComponentTooLongException( maxComponentLength, length, p, DFSUtil.bytes2String(childName)); - if (ready) { + if (namesystem.isImageLoaded()) { throw e; } else { // Do not throw if edits log is still being processed @@ -2003,7 +1946,7 @@ public class FSDirectory implements Closeable { if (count >= maxDirItems) { final MaxDirectoryItemsExceededException e = new MaxDirectoryItemsExceededException(maxDirItems, count); - if (ready) { + if (namesystem.isImageLoaded()) { e.setPathName(getFullPathName(pathComponents, pos - 1)); throw e; } else { @@ -2339,7 +2282,6 @@ public class FSDirectory implements Closeable { void reset() { writeLock(); try { - setReady(false); rootDir = createRoot(getFSNamesystem()); inodeMap.clear(); addToInodeMap(rootDir); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index b7698b62df4..0658a3c3eed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -103,6 +103,7 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -514,6 +515,59 @@ public class FSNamesystem implements Namesystem, FSClusterStats, private final NNConf nnConf; + private volatile boolean imageLoaded = false; + private final Condition cond; + /** + * Notify that loading of this FSDirectory is complete, and + * it is imageLoaded for use + */ + void imageLoadComplete() { + Preconditions.checkState(!imageLoaded, "FSDirectory already loaded"); + setImageLoaded(); + } + + void setImageLoaded() { + if(imageLoaded) return; + writeLock(); + try { + setImageLoaded(true); + dir.markNameCacheInitialized(); + cond.signalAll(); + } finally { + writeUnlock(); + } + } + + //This is for testing purposes only + @VisibleForTesting + boolean isImageLoaded() { + return imageLoaded; + } + + // exposed for unit tests + protected void setImageLoaded(boolean flag) { + imageLoaded = flag; + } + + /** + * Block until the object is imageLoaded to be used. + */ + void waitForLoadingFSImage() { + if (!imageLoaded) { + writeLock(); + try { + while (!imageLoaded) { + try { + cond.await(5000, TimeUnit.MILLISECONDS); + } catch (InterruptedException ignored) { + } + } + } finally { + writeUnlock(); + } + } + } + /** * Set the last allocated inode id when fsimage or editlog is loaded. */ @@ -555,6 +609,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, inodeId.setCurrentValue(INodeId.LAST_RESERVED_ID); snapshotManager.clearSnapshottableDirs(); cacheManager.clear(); + setImageLoaded(false); } @VisibleForTesting @@ -682,6 +737,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean fair = conf.getBoolean("dfs.namenode.fslock.fair", true); LOG.info("fsLock is fair:" + fair); fsLock = new FSNamesystemLock(fair); + cond = fsLock.writeLock().newCondition(); try { resourceRecheckInterval = conf.getLong( DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY, @@ -921,7 +977,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } writeUnlock(); } - dir.imageLoadComplete(); + imageLoadComplete(); } private void startSecretManager() { @@ -1840,6 +1896,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, HdfsFileStatus resultingStat = null; FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.WRITE); + waitForLoadingFSImage(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -2115,6 +2172,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, FSPermissionChecker pc = getPermissionChecker(); checkOperation(OperationCategory.WRITE); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + waitForLoadingFSImage(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -2242,6 +2300,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); boolean create = flag.contains(CreateFlag.CREATE); boolean overwrite = flag.contains(CreateFlag.OVERWRITE); + + waitForLoadingFSImage(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -2730,6 +2790,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, Block newBlock = null; long offset; checkOperation(OperationCategory.WRITE); + waitForLoadingFSImage(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -2952,6 +3013,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, } checkOperation(OperationCategory.WRITE); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + waitForLoadingFSImage(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -3050,6 +3112,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean success = false; checkOperation(OperationCategory.WRITE); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + waitForLoadingFSImage(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -3249,6 +3312,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, try { checkOperation(OperationCategory.WRITE); checkNameNodeSafeMode("Cannot rename " + src); + waitForLoadingFSImage(); src = FSDirectory.resolvePath(src, srcComponents, dir); dst = FSDirectory.resolvePath(dst, dstComponents, dir); checkOperation(OperationCategory.WRITE); @@ -3356,6 +3420,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, false); } + waitForLoadingFSImage(); long mtime = now(); dir.renameTo(src, dst, mtime, options); getEditLog().logRename(src, dst, mtime, logRetryCache, options); @@ -3429,6 +3494,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, checkOperation(OperationCategory.WRITE); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); boolean ret = false; + + waitForLoadingFSImage(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -3902,6 +3969,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats, NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName); checkOperation(OperationCategory.WRITE); byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src); + + waitForLoadingFSImage(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -4103,6 +4172,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, INodeFile pendingFile, int latestSnapshot) throws IOException, UnresolvedLinkException { assert hasWriteLock(); + FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature(); Preconditions.checkArgument(uc != null); leaseManager.removeLease(uc.getClientName(), src); @@ -4114,6 +4184,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, // since we just remove the uc feature from pendingFile final INodeFile newFile = pendingFile.toCompleteFile(now()); + waitForLoadingFSImage(); // close file and persist block allocations for this file closeFile(src, newFile); @@ -4172,6 +4243,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, + ")"); checkOperation(OperationCategory.WRITE); String src = ""; + waitForLoadingFSImage(); writeLock(); try { checkOperation(OperationCategory.WRITE); @@ -4517,7 +4589,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, */ private void closeFile(String path, INodeFile file) { assert hasWriteLock(); - dir.waitForReady(); + waitForLoadingFSImage(); // file is closed getEditLog().logCloseFile(path, file); if (NameNode.stateChangeLog.isDebugEnabled()) { @@ -4541,7 +4613,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean createParent, boolean logRetryCache) throws UnresolvedLinkException, FileAlreadyExistsException, QuotaExceededException, SnapshotAccessControlException, AclException { - dir.waitForReady(); + waitForLoadingFSImage(); final long modTime = now(); if (createParent) { @@ -5804,7 +5876,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, boolean ignoreEmptyDir, boolean resolveLink) throws AccessControlException, UnresolvedLinkException { if (!pc.isSuperUser()) { - dir.waitForReady(); + waitForLoadingFSImage(); readLock(); try { pc.checkPermission(path, dir, doCheckOwner, ancestorAccess, @@ -6271,6 +6343,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats, + ", newNodes=" + Arrays.asList(newNodes) + ", clientName=" + clientName + ")"); + waitForLoadingFSImage(); writeLock(); boolean success = false; try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java index 2f3e2902eb0..10f1720dfb0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/SecondaryNameNode.java @@ -1064,7 +1064,7 @@ public class SecondaryNameNode implements Runnable, } finally { dstNamesystem.writeUnlock(); } - dstNamesystem.dir.imageLoadComplete(); + dstNamesystem.imageLoadComplete(); } // error simulation code for junit test CheckpointFaultInjector.getInstance().duringMerge(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java index dac8c0fc364..45be905f89d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestCommitBlockSynchronization.java @@ -32,7 +32,6 @@ import java.io.IOException; import static org.junit.Assert.fail; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyBoolean; import static org.mockito.Mockito.*; /** @@ -50,6 +49,7 @@ public class TestCommitBlockSynchronization { final DatanodeStorageInfo[] targets = {}; FSNamesystem namesystem = new FSNamesystem(conf, image); + namesystem.setImageLoaded(true); FSNamesystem namesystemSpy = spy(namesystem); BlockInfoUnderConstruction blockInfo = new BlockInfoUnderConstruction( block, 1, HdfsServerConstants.BlockUCState.UNDER_CONSTRUCTION, targets); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java index 6e31f2cb3b0..bd35c901be6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java @@ -36,7 +36,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; -import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.apache.hadoop.test.GenericTestUtils; import org.junit.After; import org.junit.Assert; @@ -127,16 +126,6 @@ public class TestFSDirectory { } } - @Test - public void testReset() throws Exception { - fsdir.reset(); - Assert.assertFalse(fsdir.isReady()); - final INodeDirectory root = (INodeDirectory) fsdir.getINode("/"); - Assert.assertTrue(root.getChildrenList(Snapshot.CURRENT_STATE_ID).isEmpty()); - fsdir.imageLoadComplete(); - Assert.assertTrue(fsdir.isReady()); - } - @Test public void testSkipQuotaCheck() throws Exception { try { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java index 3af20a7303f..3d0259ea86d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSNamesystem.java @@ -35,6 +35,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.namenode.ha.HAContext; import org.apache.hadoop.hdfs.server.namenode.ha.HAState; +import org.apache.hadoop.hdfs.server.namenode.snapshot.Snapshot; import org.junit.After; import org.junit.Test; import org.mockito.Mockito; @@ -194,4 +195,22 @@ public class TestFSNamesystem { assertFalse(rwLock.isWriteLockedByCurrentThread()); assertEquals(0, rwLock.getWriteHoldCount()); } + + @Test + public void testReset() throws Exception { + Configuration conf = new Configuration(); + FSEditLog fsEditLog = Mockito.mock(FSEditLog.class); + FSImage fsImage = Mockito.mock(FSImage.class); + Mockito.when(fsImage.getEditLog()).thenReturn(fsEditLog); + FSNamesystem fsn = new FSNamesystem(conf, fsImage); + fsn.imageLoadComplete(); + assertTrue(fsn.isImageLoaded()); + fsn.clear(); + assertFalse(fsn.isImageLoaded()); + final INodeDirectory root = (INodeDirectory) fsn.getFSDirectory() + .getINode("/"); + assertTrue(root.getChildrenList(Snapshot.CURRENT_STATE_ID).isEmpty()); + fsn.imageLoadComplete(); + assertTrue(fsn.isImageLoaded()); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java index 0f4a2b889ca..577d505ca60 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFsLimits.java @@ -19,12 +19,9 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.hdfs.server.common.Util.fileAsURI; -import static org.apache.hadoop.util.Time.now; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.anyObject; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; import java.io.File; import java.io.IOException; @@ -57,7 +54,7 @@ public class TestFsLimits { FSEditLog editLog = mock(FSEditLog.class); doReturn(editLog).when(fsImage).getEditLog(); FSNamesystem fsn = new FSNamesystem(conf, fsImage); - fsn.getFSDirectory().setReady(fsIsReady); + fsn.setImageLoaded(fsIsReady); return fsn; } From 3f82484218d5694e62ddcb23376d0e4e332aa8b8 Mon Sep 17 00:00:00 2001 From: Aaron Myers Date: Thu, 19 Jun 2014 05:17:58 +0000 Subject: [PATCH 5/8] HDFS-6563. NameNode cannot save fsimage in certain circumstances when snapshots are in use. Contributed by Aaron T. Myers. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603712 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../server/namenode/FSImageFormatPBINode.java | 6 ++- .../snapshot/FileWithSnapshotFeature.java | 2 +- .../snapshot/TestSnapshotBlocksMap.java | 37 +++++++++++++++++++ 4 files changed, 45 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index b6dc8960b65..a4c408fd9b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -670,6 +670,9 @@ Release 2.5.0 - UNRELEASED HDFS-6553. Add missing DeprecationDeltas for NFS Kerberos configurations (Stephen Chu via brandonli) + HDFS-6563. NameNode cannot save fsimage in certain circumstances when + snapshots are in use. (atm) + BREAKDOWN OF HDFS-2006 SUBTASKS AND RELATED JIRAS HDFS-6299. Protobuf for XAttr and client-side implementation. (Yi Liu via umamahesh) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java index fb8e3f6675b..077570e8eee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSImageFormatPBINode.java @@ -533,8 +533,10 @@ public final class FSImageFormatPBINode { INodeSection.INodeFile.Builder b = buildINodeFile(n, parent.getSaverContext()); - for (Block block : n.getBlocks()) { - b.addBlocks(PBHelper.convert(block)); + if (n.getBlocks() != null) { + for (Block block : n.getBlocks()) { + b.addBlocks(PBHelper.convert(block)); + } } FileUnderConstructionFeature uc = n.getFileUnderConstructionFeature(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java index e32f78a4575..52adfc6dd66 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/snapshot/FileWithSnapshotFeature.java @@ -159,7 +159,7 @@ public class FileWithSnapshotFeature implements INode.Feature { // resize the array. final BlockInfo[] newBlocks; if (n == 0) { - newBlocks = null; + newBlocks = BlockInfo.EMPTY_ARRAY; } else { newBlocks = new BlockInfo[n]; System.arraycopy(oldBlocks, 0, newBlocks, 0, n); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java index fba48fc61ad..c7b6b7ff0f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestSnapshotBlocksMap.java @@ -28,12 +28,14 @@ import static org.junit.Assert.fail; import java.io.IOException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.namenode.FSDirectory; @@ -396,4 +398,39 @@ public class TestSnapshotBlocksMap { assertEquals(1, blks.length); assertEquals(BLOCKSIZE, blks[0].getNumBytes()); } + + /** + * Make sure that a delete of a non-zero-length file which results in a + * zero-length file in a snapshot works. + */ + @Test + public void testDeletionOfLaterBlocksWithZeroSizeFirstBlock() throws Exception { + final Path foo = new Path("/foo"); + final Path bar = new Path(foo, "bar"); + final byte[] testData = "foo bar baz".getBytes(); + + // Create a zero-length file. + DFSTestUtil.createFile(hdfs, bar, 0, REPLICATION, 0L); + assertEquals(0, fsdir.getINode4Write(bar.toString()).asFile().getBlocks().length); + + // Create a snapshot that includes that file. + SnapshotTestHelper.createSnapshot(hdfs, foo, "s0"); + + // Extend that file. + FSDataOutputStream out = hdfs.append(bar); + out.write(testData); + out.close(); + INodeFile barNode = fsdir.getINode4Write(bar.toString()).asFile(); + BlockInfo[] blks = barNode.getBlocks(); + assertEquals(1, blks.length); + assertEquals(testData.length, blks[0].getNumBytes()); + + // Delete the file. + hdfs.delete(bar, true); + + // Now make sure that the NN can still save an fsimage successfully. + cluster.getNameNode().getRpcServer().setSafeMode( + SafeModeAction.SAFEMODE_ENTER, false); + cluster.getNameNode().getRpcServer().saveNamespace(); + } } From 7b9c074b7635e3dcdc38d4e7fb1afbff7145e698 Mon Sep 17 00:00:00 2001 From: Karthik Kambatla Date: Thu, 19 Jun 2014 17:22:56 +0000 Subject: [PATCH 6/8] MAPREDUCE-5844. Add a configurable delay to reducer-preemption. (Maysam Yabandeh via kasha) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603957 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 4 +- .../v2/app/rm/RMContainerAllocator.java | 156 ++++++++++++------ .../v2/app/rm/RMContainerRequestor.java | 31 +++- .../{ => rm}/TestRMContainerAllocator.java | 120 +++++++++++++- .../apache/hadoop/mapreduce/MRJobConfig.java | 12 +- .../src/main/resources/mapred-default.xml | 10 ++ 7 files changed, 277 insertions(+), 59 deletions(-) rename hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/{ => rm}/TestRMContainerAllocator.java (93%) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index ae327063310..8feab76d100 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -216,6 +216,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5896. InputSplits should indicate which locations have the block cached in memory. (Sandy Ryza via kasha) + MAPREDUCE-5844. Add a configurable delay to reducer-preemption. + (Maysam Yabandeh via kasha) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml index 5bdd618eed5..dd4892b1e28 100644 --- a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml +++ b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml @@ -475,8 +475,8 @@ - - + + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index b9d283fe80e..11bc4063fff 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.client.api.NMTokenCache; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.RackResolver; import com.google.common.annotations.VisibleForTesting; @@ -143,15 +144,21 @@ public class RMContainerAllocator extends RMContainerRequestor private int lastCompletedTasks = 0; private boolean recalculateReduceSchedule = false; - private int mapResourceReqt;//memory - private int reduceResourceReqt;//memory + private int mapResourceRequest;//memory + private int reduceResourceRequest;//memory private boolean reduceStarted = false; private float maxReduceRampupLimit = 0; private float maxReducePreemptionLimit = 0; + /** + * after this threshold, if the container request is not allocated, it is + * considered delayed. + */ + private long allocationDelayThresholdMs = 0; private float reduceSlowStart = 0; private long retryInterval; private long retrystartTime; + private Clock clock; private final AMPreemptionPolicy preemptionPolicy; @@ -166,6 +173,7 @@ public class RMContainerAllocator extends RMContainerRequestor super(clientService, context); this.preemptionPolicy = preemptionPolicy; this.stopped = new AtomicBoolean(false); + this.clock = context.getClock(); } @Override @@ -180,6 +188,9 @@ public class RMContainerAllocator extends RMContainerRequestor maxReducePreemptionLimit = conf.getFloat( MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT, MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT); + allocationDelayThresholdMs = conf.getInt( + MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, + MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms RackResolver.init(conf); retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS, MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS); @@ -246,7 +257,7 @@ public class RMContainerAllocator extends RMContainerRequestor getJob().getTotalMaps(), completedMaps, scheduledRequests.maps.size(), scheduledRequests.reduces.size(), assignedRequests.maps.size(), assignedRequests.reduces.size(), - mapResourceReqt, reduceResourceReqt, + mapResourceRequest, reduceResourceRequest, pendingReduces.size(), maxReduceRampupLimit, reduceSlowStart); recalculateReduceSchedule = false; @@ -268,6 +279,18 @@ public class RMContainerAllocator extends RMContainerRequestor scheduleStats.log("Final Stats: "); } + @Private + @VisibleForTesting + AssignedRequests getAssignedRequests() { + return assignedRequests; + } + + @Private + @VisibleForTesting + ScheduledRequests getScheduledRequests() { + return scheduledRequests; + } + public boolean getIsReduceStarted() { return reduceStarted; } @@ -303,16 +326,16 @@ public class RMContainerAllocator extends RMContainerRequestor int supportedMaxContainerCapability = getMaxContainerCapability().getMemory(); if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) { - if (mapResourceReqt == 0) { - mapResourceReqt = reqEvent.getCapability().getMemory(); + if (mapResourceRequest == 0) { + mapResourceRequest = reqEvent.getCapability().getMemory(); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP, - mapResourceReqt))); - LOG.info("mapResourceReqt:"+mapResourceReqt); - if (mapResourceReqt > supportedMaxContainerCapability) { + mapResourceRequest))); + LOG.info("mapResourceRequest:"+ mapResourceRequest); + if (mapResourceRequest > supportedMaxContainerCapability) { String diagMsg = "MAP capability required is more than the supported " + - "max container capability in the cluster. Killing the Job. mapResourceReqt: " + - mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability; + "max container capability in the cluster. Killing the Job. mapResourceRequest: " + + mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability; LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( jobId, diagMsg)); @@ -320,20 +343,20 @@ public class RMContainerAllocator extends RMContainerRequestor } } //set the rounded off memory - reqEvent.getCapability().setMemory(mapResourceReqt); + reqEvent.getCapability().setMemory(mapResourceRequest); scheduledRequests.addMap(reqEvent);//maps are immediately scheduled } else { - if (reduceResourceReqt == 0) { - reduceResourceReqt = reqEvent.getCapability().getMemory(); + if (reduceResourceRequest == 0) { + reduceResourceRequest = reqEvent.getCapability().getMemory(); eventHandler.handle(new JobHistoryEvent(jobId, new NormalizedResourceEvent( org.apache.hadoop.mapreduce.TaskType.REDUCE, - reduceResourceReqt))); - LOG.info("reduceResourceReqt:"+reduceResourceReqt); - if (reduceResourceReqt > supportedMaxContainerCapability) { + reduceResourceRequest))); + LOG.info("reduceResourceRequest:"+ reduceResourceRequest); + if (reduceResourceRequest > supportedMaxContainerCapability) { String diagMsg = "REDUCE capability required is more than the " + "supported max container capability in the cluster. Killing the " + - "Job. reduceResourceReqt: " + reduceResourceReqt + + "Job. reduceResourceRequest: " + reduceResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability; LOG.info(diagMsg); eventHandler.handle(new JobDiagnosticsUpdateEvent( @@ -342,7 +365,7 @@ public class RMContainerAllocator extends RMContainerRequestor } } //set the rounded off memory - reqEvent.getCapability().setMemory(reduceResourceReqt); + reqEvent.getCapability().setMemory(reduceResourceRequest); if (reqEvent.getEarlierAttemptFailed()) { //add to the front of queue for fail fast pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE)); @@ -394,8 +417,22 @@ public class RMContainerAllocator extends RMContainerRequestor return host; } - private void preemptReducesIfNeeded() { - if (reduceResourceReqt == 0) { + @Private + @VisibleForTesting + synchronized void setReduceResourceRequest(int mem) { + this.reduceResourceRequest = mem; + } + + @Private + @VisibleForTesting + synchronized void setMapResourceRequest(int mem) { + this.mapResourceRequest = mem; + } + + @Private + @VisibleForTesting + void preemptReducesIfNeeded() { + if (reduceResourceRequest == 0) { return; //no reduces } //check if reduces have taken over the whole cluster and there are @@ -403,9 +440,9 @@ public class RMContainerAllocator extends RMContainerRequestor if (scheduledRequests.maps.size() > 0) { int memLimit = getMemLimit(); int availableMemForMap = memLimit - ((assignedRequests.reduces.size() - - assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt); + assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest); //availableMemForMap must be sufficient to run atleast 1 map - if (availableMemForMap < mapResourceReqt) { + if (availableMemForMap < mapResourceRequest) { //to make sure new containers are given to maps and not reduces //ramp down all scheduled reduces if any //(since reduces are scheduled at higher priority than maps) @@ -414,22 +451,40 @@ public class RMContainerAllocator extends RMContainerRequestor pendingReduces.add(req); } scheduledRequests.reduces.clear(); - - //preempt for making space for at least one map - int premeptionLimit = Math.max(mapResourceReqt, - (int) (maxReducePreemptionLimit * memLimit)); - - int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt, - premeptionLimit); - - int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt); - toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); - - LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); - assignedRequests.preemptReduce(toPreempt); + + //do further checking to find the number of map requests that were + //hanging around for a while + int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps); + if (hangingMapRequests > 0) { + //preempt for making space for at least one map + int premeptionLimit = Math.max(mapResourceRequest, + (int) (maxReducePreemptionLimit * memLimit)); + + int preemptMem = Math.min(hangingMapRequests * mapResourceRequest, + premeptionLimit); + + int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest); + toPreempt = Math.min(toPreempt, assignedRequests.reduces.size()); + + LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps"); + assignedRequests.preemptReduce(toPreempt); + } } } } + + private int getNumOfHangingRequests(Map requestMap) { + if (allocationDelayThresholdMs <= 0) + return requestMap.size(); + int hangingRequests = 0; + long currTime = clock.getTime(); + for (ContainerRequest request: requestMap.values()) { + long delay = currTime - request.requestTimeMs; + if (delay > allocationDelayThresholdMs) + hangingRequests++; + } + return hangingRequests; + } @Private public void scheduleReduces( @@ -715,11 +770,13 @@ public class RMContainerAllocator extends RMContainerRequestor @Private public int getMemLimit() { int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0; - return headRoom + assignedRequests.maps.size() * mapResourceReqt + - assignedRequests.reduces.size() * reduceResourceReqt; + return headRoom + assignedRequests.maps.size() * mapResourceRequest + + assignedRequests.reduces.size() * reduceResourceRequest; } - - private class ScheduledRequests { + + @Private + @VisibleForTesting + class ScheduledRequests { private final LinkedList earlierFailedMaps = new LinkedList(); @@ -729,7 +786,8 @@ public class RMContainerAllocator extends RMContainerRequestor new HashMap>(); private final Map> mapsRackMapping = new HashMap>(); - private final Map maps = + @VisibleForTesting + final Map maps = new LinkedHashMap(); private final LinkedHashMap reduces = @@ -825,22 +883,22 @@ public class RMContainerAllocator extends RMContainerRequestor int allocatedMemory = allocated.getResource().getMemory(); if (PRIORITY_FAST_FAIL_MAP.equals(priority) || PRIORITY_MAP.equals(priority)) { - if (allocatedMemory < mapResourceReqt + if (allocatedMemory < mapResourceRequest || maps.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a map as either " - + " container memory less than required " + mapResourceReqt + + " container memory less than required " + mapResourceRequest + " or no pending map tasks - maps.isEmpty=" + maps.isEmpty()); isAssignable = false; } } else if (PRIORITY_REDUCE.equals(priority)) { - if (allocatedMemory < reduceResourceReqt + if (allocatedMemory < reduceResourceRequest || reduces.isEmpty()) { LOG.info("Cannot assign container " + allocated + " for a reduce as either " - + " container memory less than required " + reduceResourceReqt + + " container memory less than required " + reduceResourceRequest + " or no pending reduce tasks - reduces.isEmpty=" + reduces.isEmpty()); isAssignable = false; @@ -1119,14 +1177,18 @@ public class RMContainerAllocator extends RMContainerRequestor } } - private class AssignedRequests { + @Private + @VisibleForTesting + class AssignedRequests { private final Map containerToAttemptMap = new HashMap(); private final LinkedHashMap maps = new LinkedHashMap(); - private final LinkedHashMap reduces = + @VisibleForTesting + final LinkedHashMap reduces = new LinkedHashMap(); - private final Set preemptionWaitingReduces = + @VisibleForTesting + final Set preemptionWaitingReduces = new HashSet(); void add(Container container, TaskAttemptId tId) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java index a9b5ce58479..18242119451 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java @@ -29,8 +29,10 @@ import java.util.TreeSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; +import com.google.common.annotations.VisibleForTesting; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; @@ -96,6 +98,8 @@ public abstract class RMContainerRequestor extends RMCommunicator { super(clientService, context); } + @Private + @VisibleForTesting static class ContainerRequest { final TaskAttemptId attemptID; final Resource capability; @@ -103,20 +107,39 @@ public abstract class RMContainerRequestor extends RMCommunicator { final String[] racks; //final boolean earlierAttemptFailed; final Priority priority; - + /** + * the time when this request object was formed; can be used to avoid + * aggressive preemption for recently placed requests + */ + final long requestTimeMs; + public ContainerRequest(ContainerRequestEvent event, Priority priority) { this(event.getAttemptID(), event.getCapability(), event.getHosts(), event.getRacks(), priority); } - + + public ContainerRequest(ContainerRequestEvent event, Priority priority, + long requestTimeMs) { + this(event.getAttemptID(), event.getCapability(), event.getHosts(), + event.getRacks(), priority, requestTimeMs); + } + public ContainerRequest(TaskAttemptId attemptID, - Resource capability, String[] hosts, String[] racks, - Priority priority) { + Resource capability, String[] hosts, String[] racks, + Priority priority) { + this(attemptID, capability, hosts, racks, priority, + System.currentTimeMillis()); + } + + public ContainerRequest(TaskAttemptId attemptID, + Resource capability, String[] hosts, String[] racks, + Priority priority, long requestTimeMs) { this.attemptID = attemptID; this.capability = capability; this.hosts = hosts; this.racks = racks; this.priority = priority; + this.requestTimeMs = requestTimeMs; } public String toString() { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java similarity index 93% rename from hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java rename to hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java index 9c041870758..4c74d7b5c52 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.hadoop.mapreduce.v2.app; +package org.apache.hadoop.mapreduce.v2.app.rm; import static org.mockito.Matchers.anyFloat; import static org.mockito.Matchers.anyInt; @@ -40,6 +40,10 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.mapreduce.v2.app.AppContext; +import org.apache.hadoop.mapreduce.v2.app.ClusterInfo; +import org.apache.hadoop.mapreduce.v2.app.ControlledClock; +import org.apache.hadoop.mapreduce.v2.app.MRApp; import org.junit.Assert; import org.apache.commons.logging.Log; @@ -65,10 +69,6 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl; import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl; -import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; -import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; -import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; -import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator; import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; @@ -80,6 +80,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerState; @@ -422,6 +423,115 @@ public class TestRMContainerAllocator { killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC)); } + @Test(timeout = 30000) + public void testPreemptReducers() throws Exception { + LOG.info("Running testPreemptReducers"); + + Configuration conf = new Configuration(); + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob, new SystemClock()); + allocator.setMapResourceRequest(1024); + allocator.setReduceResourceRequest(1024); + RMContainerAllocator.AssignedRequests assignedRequests = + allocator.getAssignedRequests(); + RMContainerAllocator.ScheduledRequests scheduledRequests = + allocator.getScheduledRequests(); + ContainerRequestEvent event1 = + createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + scheduledRequests.maps.put(mock(TaskAttemptId.class), + new RMContainerRequestor.ContainerRequest(event1, null)); + assignedRequests.reduces.put(mock(TaskAttemptId.class), + mock(Container.class)); + + allocator.preemptReducesIfNeeded(); + Assert.assertEquals("The reducer is not preempted", + 1, assignedRequests.preemptionWaitingReduces.size()); + } + + @Test(timeout = 30000) + public void testNonAggressivelyPreemptReducers() throws Exception { + LOG.info("Running testPreemptReducers"); + + final int preemptThreshold = 2; //sec + Configuration conf = new Configuration(); + conf.setInt( + MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC, + preemptThreshold); + + MyResourceManager rm = new MyResourceManager(conf); + rm.start(); + DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext() + .getDispatcher(); + + // Submit the application + RMApp app = rm.submitApp(1024); + dispatcher.await(); + + MockNM amNodeManager = rm.registerNode("amNM:1234", 2048); + amNodeManager.nodeHeartbeat(true); + dispatcher.await(); + + ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt() + .getAppAttemptId(); + rm.sendAMLaunched(appAttemptId); + dispatcher.await(); + + JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0); + Job mockJob = mock(Job.class); + when(mockJob.getReport()).thenReturn( + MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0, + 0, 0, 0, 0, 0, 0, "jobfile", null, false, "")); + ControlledClock clock = new ControlledClock(null); + clock.setTime(1); + MyContainerAllocator allocator = new MyContainerAllocator(rm, conf, + appAttemptId, mockJob, clock); + allocator.setMapResourceRequest(1024); + allocator.setReduceResourceRequest(1024); + RMContainerAllocator.AssignedRequests assignedRequests = + allocator.getAssignedRequests(); + RMContainerAllocator.ScheduledRequests scheduledRequests = + allocator.getScheduledRequests(); + ContainerRequestEvent event1 = + createReq(jobId, 1, 2048, new String[] { "h1" }, false, false); + scheduledRequests.maps.put(mock(TaskAttemptId.class), + new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime())); + assignedRequests.reduces.put(mock(TaskAttemptId.class), + mock(Container.class)); + + clock.setTime(clock.getTime() + 1); + allocator.preemptReducesIfNeeded(); + Assert.assertEquals("The reducer is aggressively preeempted", 0, + assignedRequests.preemptionWaitingReduces.size()); + + clock.setTime(clock.getTime() + (preemptThreshold) * 1000); + allocator.preemptReducesIfNeeded(); + Assert.assertEquals("The reducer is not preeempted", 1, + assignedRequests.preemptionWaitingReduces.size()); + } + @Test public void testMapReduceScheduling() throws Exception { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java index 33d79ee4118..4795af78d2a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java @@ -579,7 +579,17 @@ public interface MRJobConfig { MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold"; public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD = 50; - + + /** + * The threshold in terms of seconds after which an unsatisfied mapper request + * triggers reducer preemption to free space. Default 0 implies that the reduces + * should be preempted immediately after allocation if there is currently no + * room for newly allocated mappers. + */ + public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC = + "mapreduce.job.reducer.preempt.delay.sec"; + public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0; + public static final String MR_AM_ENV = MR_AM_PREFIX + "env"; diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 9034064cf39..508b0331024 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -82,6 +82,16 @@ + + mapreduce.job.reducer.preempt.delay.sec + 0 + The threshold in terms of seconds after which an unsatisfied mapper + request triggers reducer preemption to free space. Default 0 implies that the + reduces should be preempted immediately after allocation if there is currently no + room for newly allocated mappers. + + + mapreduce.job.max.split.locations 10 From d417e49ce4db119cdeb01be526cdb07f24baf388 Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Thu, 19 Jun 2014 17:37:31 +0000 Subject: [PATCH 7/8] HDFS-6492. Support create-time xattrs and atomically setting multiple xattrs. (wang) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603971 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../main/java/org/apache/hadoop/fs/XAttr.java | 65 +++-- .../hadoop/hdfs/protocolPB/PBHelper.java | 3 + .../hdfs/server/namenode/FSDirectory.java | 170 ++++++++--- .../hdfs/server/namenode/FSEditLog.java | 22 +- .../hdfs/server/namenode/FSEditLogLoader.java | 6 +- .../hdfs/server/namenode/FSEditLogOp.java | 120 +++++--- .../hdfs/server/namenode/FSNamesystem.java | 16 +- .../hadoop-hdfs/src/main/proto/xattr.proto | 4 +- .../hdfs/server/namenode/TestFSDirectory.java | 189 +++++++++++- .../snapshot/TestXAttrWithSnapshot.java | 4 +- .../src/test/resources/editsStored | Bin 4805 -> 4970 bytes .../src/test/resources/editsStored.xml | 276 ++++++++++-------- 13 files changed, 621 insertions(+), 257 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index a4c408fd9b5..8e6a9ee7031 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -736,6 +736,9 @@ Release 2.5.0 - UNRELEASED HDFS-6374. setXAttr should require the user to be the owner of the file or directory (Charles Lamb via wang) + HDFS-6492. Support create-time xattrs and atomically setting multiple + xattrs. (wang) + Release 2.4.1 - 2014-06-23 INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java index 35a768062f6..82272e22a31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/fs/XAttr.java @@ -19,6 +19,8 @@ package org.apache.hadoop.fs; import java.util.Arrays; +import org.apache.commons.lang.builder.EqualsBuilder; +import org.apache.commons.lang.builder.HashCodeBuilder; import org.apache.hadoop.classification.InterfaceAudience; /** @@ -105,42 +107,47 @@ public class XAttr { @Override public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + ((name == null) ? 0 : name.hashCode()); - result = prime * result + ((ns == null) ? 0 : ns.hashCode()); - result = prime * result + Arrays.hashCode(value); - return result; + return new HashCodeBuilder(811, 67) + .append(name) + .append(ns) + .append(value) + .toHashCode(); } @Override public boolean equals(Object obj) { - if (this == obj) { - return true; - } - if (obj == null) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { return false; } - if (getClass() != obj.getClass()) { - return false; - } - XAttr other = (XAttr) obj; - if (name == null) { - if (other.name != null) { - return false; - } - } else if (!name.equals(other.name)) { - return false; - } - if (ns != other.ns) { - return false; - } - if (!Arrays.equals(value, other.value)) { - return false; - } - return true; + XAttr rhs = (XAttr) obj; + return new EqualsBuilder() + .append(ns, rhs.ns) + .append(name, rhs.name) + .append(value, rhs.value) + .isEquals(); } - + + /** + * Similar to {@link #equals(Object)}, except ignores the XAttr value. + * + * @param obj to compare equality + * @return if the XAttrs are equal, ignoring the XAttr value + */ + public boolean equalsIgnoreValue(Object obj) { + if (obj == null) { return false; } + if (obj == this) { return true; } + if (obj.getClass() != getClass()) { + return false; + } + XAttr rhs = (XAttr) obj; + return new EqualsBuilder() + .append(ns, rhs.ns) + .append(name, rhs.name) + .isEquals(); + } + @Override public String toString() { return "XAttr [ns=" + ns + ", name=" + name + ", value=" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index a897577a85b..f3b62c07f11 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -2093,6 +2093,9 @@ public class PBHelper { public static List convertXAttrProto( List xAttrSpec) { + if (xAttrSpec == null) { + return Lists.newArrayListWithCapacity(0); + } ArrayList xAttrs = Lists.newArrayListWithCapacity( xAttrSpec.size()); for (XAttr a : xAttrSpec) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 37d8825c8f3..da8d11e9aba 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -26,6 +26,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.List; +import java.util.ListIterator; import java.util.concurrent.locks.ReentrantReadWriteLock; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -294,6 +295,7 @@ public class FSDirectory implements Closeable { String path, PermissionStatus permissions, List aclEntries, + List xAttrs, short replication, long modificationTime, long atime, @@ -320,6 +322,10 @@ public class FSDirectory implements Closeable { AclStorage.updateINodeAcl(newNode, aclEntries, Snapshot.CURRENT_STATE_ID); } + if (xAttrs != null) { + XAttrStorage.updateINodeXAttrs(newNode, xAttrs, + Snapshot.CURRENT_STATE_ID); + } return newNode; } } catch (IOException e) { @@ -2563,101 +2569,171 @@ public class FSDirectory implements Closeable { } } - XAttr removeXAttr(String src, XAttr xAttr) throws IOException { + /** + * Removes a list of XAttrs from an inode at a path. + * + * @param src path of inode + * @param toRemove XAttrs to be removed + * @return List of XAttrs that were removed + * @throws IOException if the inode does not exist, if quota is exceeded + */ + List removeXAttrs(final String src, final List toRemove) + throws IOException { writeLock(); try { - return unprotectedRemoveXAttr(src, xAttr); + return unprotectedRemoveXAttrs(src, toRemove); } finally { writeUnlock(); } } - - XAttr unprotectedRemoveXAttr(String src, - XAttr xAttr) throws IOException { + + List unprotectedRemoveXAttrs(final String src, + final List toRemove) throws IOException { assert hasWriteLock(); INodesInPath iip = getINodesInPath4Write(normalizePath(src), true); INode inode = resolveLastINode(src, iip); int snapshotId = iip.getLatestSnapshotId(); List existingXAttrs = XAttrStorage.readINodeXAttrs(inode); - List newXAttrs = filterINodeXAttr(existingXAttrs, xAttr); + List removedXAttrs = Lists.newArrayListWithCapacity(toRemove.size()); + List newXAttrs = filterINodeXAttrs(existingXAttrs, toRemove, + removedXAttrs); if (existingXAttrs.size() != newXAttrs.size()) { XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId); - return xAttr; + return removedXAttrs; } return null; } - - List filterINodeXAttr(List existingXAttrs, - XAttr xAttr) throws QuotaExceededException { - if (existingXAttrs == null || existingXAttrs.isEmpty()) { + + /** + * Filter XAttrs from a list of existing XAttrs. Removes matched XAttrs from + * toFilter and puts them into filtered. Upon completion, + * toFilter contains the filter XAttrs that were not found, while + * fitleredXAttrs contains the XAttrs that were found. + * + * @param existingXAttrs Existing XAttrs to be filtered + * @param toFilter XAttrs to filter from the existing XAttrs + * @param filtered Return parameter, XAttrs that were filtered + * @return List of XAttrs that does not contain filtered XAttrs + */ + @VisibleForTesting + List filterINodeXAttrs(final List existingXAttrs, + final List toFilter, final List filtered) { + if (existingXAttrs == null || existingXAttrs.isEmpty() || + toFilter == null || toFilter.isEmpty()) { return existingXAttrs; } - - List xAttrs = Lists.newArrayListWithCapacity(existingXAttrs.size()); + + // Populate a new list with XAttrs that pass the filter + List newXAttrs = + Lists.newArrayListWithCapacity(existingXAttrs.size()); for (XAttr a : existingXAttrs) { - if (!(a.getNameSpace() == xAttr.getNameSpace() - && a.getName().equals(xAttr.getName()))) { - xAttrs.add(a); + boolean add = true; + for (ListIterator it = toFilter.listIterator(); it.hasNext() + ;) { + XAttr filter = it.next(); + if (a.equalsIgnoreValue(filter)) { + add = false; + it.remove(); + filtered.add(filter); + break; + } + } + if (add) { + newXAttrs.add(a); } } - - return xAttrs; + + return newXAttrs; } - void setXAttr(String src, XAttr xAttr, EnumSet flag) - throws IOException { + void setXAttrs(final String src, final List xAttrs, + final EnumSet flag) throws IOException { writeLock(); try { - unprotectedSetXAttr(src, xAttr, flag); + unprotectedSetXAttrs(src, xAttrs, flag); } finally { writeUnlock(); } } - void unprotectedSetXAttr(String src, XAttr xAttr, - EnumSet flag) throws IOException { + void unprotectedSetXAttrs(final String src, final List xAttrs, + final EnumSet flag) + throws QuotaExceededException, IOException { assert hasWriteLock(); INodesInPath iip = getINodesInPath4Write(normalizePath(src), true); INode inode = resolveLastINode(src, iip); int snapshotId = iip.getLatestSnapshotId(); List existingXAttrs = XAttrStorage.readINodeXAttrs(inode); - List newXAttrs = setINodeXAttr(existingXAttrs, xAttr, flag); + List newXAttrs = setINodeXAttrs(existingXAttrs, xAttrs, flag); XAttrStorage.updateINodeXAttrs(inode, newXAttrs, snapshotId); } - - List setINodeXAttr(List existingXAttrs, XAttr xAttr, - EnumSet flag) throws QuotaExceededException, IOException { - List xAttrs = Lists.newArrayListWithCapacity( - existingXAttrs != null ? existingXAttrs.size() + 1 : 1); + + List setINodeXAttrs(final List existingXAttrs, + final List toSet, final EnumSet flag) + throws IOException { + // Check for duplicate XAttrs in toSet + // We need to use a custom comparator, so using a HashSet is not suitable + for (int i = 0; i < toSet.size(); i++) { + for (int j = i + 1; j < toSet.size(); j++) { + if (toSet.get(i).equalsIgnoreValue(toSet.get(j))) { + throw new IOException("Cannot specify the same XAttr to be set " + + "more than once"); + } + } + } + + // Count the current number of user-visible XAttrs for limit checking int userVisibleXAttrsNum = 0; // Number of user visible xAttrs - boolean exist = false; + + // The XAttr list is copied to an exactly-sized array when it's stored, + // so there's no need to size it precisely here. + int newSize = (existingXAttrs != null) ? existingXAttrs.size() : 0; + newSize += toSet.size(); + List xAttrs = Lists.newArrayListWithCapacity(newSize); + + // Check if the XAttr already exists to validate with the provided flag + for (XAttr xAttr: toSet) { + boolean exist = false; + if (existingXAttrs != null) { + for (XAttr a : existingXAttrs) { + if (a.equalsIgnoreValue(xAttr)) { + exist = true; + break; + } + } + } + XAttrSetFlag.validate(xAttr.getName(), exist, flag); + // add the new XAttr since it passed validation + xAttrs.add(xAttr); + if (isUserVisible(xAttr)) { + userVisibleXAttrsNum++; + } + } + + // Add the existing xattrs back in, if they weren't already set if (existingXAttrs != null) { - for (XAttr a: existingXAttrs) { - if ((a.getNameSpace() == xAttr.getNameSpace() - && a.getName().equals(xAttr.getName()))) { - exist = true; - } else { - xAttrs.add(a); - - if (isUserVisible(a)) { + for (XAttr existing : existingXAttrs) { + boolean alreadySet = false; + for (XAttr set : toSet) { + if (set.equalsIgnoreValue(existing)) { + alreadySet = true; + break; + } + } + if (!alreadySet) { + xAttrs.add(existing); + if (isUserVisible(existing)) { userVisibleXAttrsNum++; } } } } - - XAttrSetFlag.validate(xAttr.getName(), exist, flag); - xAttrs.add(xAttr); - - if (isUserVisible(xAttr)) { - userVisibleXAttrsNum++; - } - + if (userVisibleXAttrsNum > inodeXAttrsLimit) { throw new IOException("Cannot add additional XAttr to inode, " + "would exceed limit of " + inodeXAttrsLimit); } - + return xAttrs; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index e88cc53dab5..85cfc1c7746 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -700,12 +700,19 @@ public class FSEditLog implements LogsPurgeable { .setBlocks(newNode.getBlocks()) .setPermissionStatus(permissions) .setClientName(newNode.getFileUnderConstructionFeature().getClientName()) - .setClientMachine(newNode.getFileUnderConstructionFeature().getClientMachine()); + .setClientMachine( + newNode.getFileUnderConstructionFeature().getClientMachine()); AclFeature f = newNode.getAclFeature(); if (f != null) { op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode)); } + + XAttrFeature x = newNode.getXAttrFeature(); + if (x != null) { + op.setXAttrs(x.getXAttrs()); + } + logRpcIds(op, toLogRpcIds); logEdit(op); } @@ -761,6 +768,11 @@ public class FSEditLog implements LogsPurgeable { if (f != null) { op.setAclEntries(AclStorage.readINodeLogicalAcl(newNode)); } + + XAttrFeature x = newNode.getXAttrFeature(); + if (x != null) { + op.setXAttrs(x.getXAttrs()); + } logEdit(op); } @@ -1054,18 +1066,18 @@ public class FSEditLog implements LogsPurgeable { logEdit(op); } - void logSetXAttr(String src, XAttr xAttr, boolean toLogRpcIds) { + void logSetXAttrs(String src, List xAttrs, boolean toLogRpcIds) { final SetXAttrOp op = SetXAttrOp.getInstance(); op.src = src; - op.xAttr = xAttr; + op.xAttrs = xAttrs; logRpcIds(op, toLogRpcIds); logEdit(op); } - void logRemoveXAttr(String src, XAttr xAttr) { + void logRemoveXAttrs(String src, List xAttrs) { final RemoveXAttrOp op = RemoveXAttrOp.getInstance(); op.src = src; - op.xAttr = xAttr; + op.xAttrs = xAttrs; logEdit(op); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java index 262fec055cd..04785c231e7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogLoader.java @@ -355,6 +355,7 @@ public class FSEditLogLoader { lastInodeId); newFile = fsDir.unprotectedAddFile(inodeId, path, addCloseOp.permissions, addCloseOp.aclEntries, + addCloseOp.xAttrs, replication, addCloseOp.mtime, addCloseOp.atime, addCloseOp.blockSize, true, addCloseOp.clientName, addCloseOp.clientMachine); @@ -804,7 +805,7 @@ public class FSEditLogLoader { } case OP_SET_XATTR: { SetXAttrOp setXAttrOp = (SetXAttrOp) op; - fsDir.unprotectedSetXAttr(setXAttrOp.src, setXAttrOp.xAttr, + fsDir.unprotectedSetXAttrs(setXAttrOp.src, setXAttrOp.xAttrs, EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE)); if (toAddRetryCache) { fsNamesys.addCacheEntry(setXAttrOp.rpcClientId, setXAttrOp.rpcCallId); @@ -813,7 +814,8 @@ public class FSEditLogLoader { } case OP_REMOVE_XATTR: { RemoveXAttrOp removeXAttrOp = (RemoveXAttrOp) op; - fsDir.unprotectedRemoveXAttr(removeXAttrOp.src, removeXAttrOp.xAttr); + fsDir.unprotectedRemoveXAttrs(removeXAttrOp.src, + removeXAttrOp.xAttrs); break; } default: diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java index 39e9ca2e4d2..14cf7ac7b4f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLogOp.java @@ -382,6 +382,16 @@ public abstract class FSEditLogOp { } } + private static List readXAttrsFromEditLog(DataInputStream in, + int logVersion) throws IOException { + if (!NameNodeLayoutVersion.supports(NameNodeLayoutVersion.Feature.XATTRS, + logVersion)) { + return null; + } + XAttrEditLogProto proto = XAttrEditLogProto.parseDelimitedFrom(in); + return PBHelper.convertXAttrs(proto.getXAttrsList()); + } + @SuppressWarnings("unchecked") static abstract class AddCloseOp extends FSEditLogOp implements BlockListUpdatingOp { int length; @@ -394,6 +404,7 @@ public abstract class FSEditLogOp { Block[] blocks; PermissionStatus permissions; List aclEntries; + List xAttrs; String clientName; String clientMachine; @@ -461,6 +472,11 @@ public abstract class FSEditLogOp { return (T)this; } + T setXAttrs(List xAttrs) { + this.xAttrs = xAttrs; + return (T)this; + } + T setClientName(String clientName) { this.clientName = clientName; return (T)this; @@ -484,6 +500,9 @@ public abstract class FSEditLogOp { if (this.opCode == OP_ADD) { AclEditLogUtil.write(aclEntries, out); + XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder(); + b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs)); + b.build().writeDelimitedTo(out); FSImageSerialization.writeString(clientName,out); FSImageSerialization.writeString(clientMachine,out); // write clientId and callId @@ -546,9 +565,9 @@ public abstract class FSEditLogOp { this.blocks = readBlocks(in, logVersion); this.permissions = PermissionStatus.read(in); - // clientname, clientMachine and block locations of last block. if (this.opCode == OP_ADD) { aclEntries = AclEditLogUtil.read(in, logVersion); + this.xAttrs = readXAttrsFromEditLog(in, logVersion); this.clientName = FSImageSerialization.readString(in); this.clientMachine = FSImageSerialization.readString(in); // read clientId and callId @@ -1343,6 +1362,7 @@ public abstract class FSEditLogOp { long timestamp; PermissionStatus permissions; List aclEntries; + List xAttrs; private MkdirOp() { super(OP_MKDIR); @@ -1377,6 +1397,11 @@ public abstract class FSEditLogOp { return this; } + MkdirOp setXAttrs(List xAttrs) { + this.xAttrs = xAttrs; + return this; + } + @Override public void writeFields(DataOutputStream out) throws IOException { @@ -1386,6 +1411,9 @@ public abstract class FSEditLogOp { FSImageSerialization.writeLong(timestamp, out); // atime, unused at this permissions.write(out); AclEditLogUtil.write(aclEntries, out); + XAttrEditLogProto.Builder b = XAttrEditLogProto.newBuilder(); + b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs)); + b.build().writeDelimitedTo(out); } @Override @@ -1430,6 +1458,8 @@ public abstract class FSEditLogOp { this.permissions = PermissionStatus.read(in); aclEntries = AclEditLogUtil.read(in, logVersion); + + xAttrs = readXAttrsFromEditLog(in, logVersion); } @Override @@ -1451,6 +1481,8 @@ public abstract class FSEditLogOp { builder.append(opCode); builder.append(", txid="); builder.append(txid); + builder.append(", xAttrs="); + builder.append(xAttrs); builder.append("]"); return builder.toString(); } @@ -1468,6 +1500,9 @@ public abstract class FSEditLogOp { if (aclEntries != null) { appendAclEntriesToXml(contentHandler, aclEntries); } + if (xAttrs != null) { + appendXAttrsToXml(contentHandler, xAttrs); + } } @Override void fromXml(Stanza st) throws InvalidXmlException { @@ -1477,6 +1512,7 @@ public abstract class FSEditLogOp { this.timestamp = Long.parseLong(st.getValue("TIMESTAMP")); this.permissions = permissionStatusFromXml(st); aclEntries = readAclEntriesFromXml(st); + xAttrs = readXAttrsFromXml(st); } } @@ -3499,7 +3535,7 @@ public abstract class FSEditLogOp { } static class RemoveXAttrOp extends FSEditLogOp { - XAttr xAttr; + List xAttrs; String src; private RemoveXAttrOp() { @@ -3514,7 +3550,7 @@ public abstract class FSEditLogOp { void readFields(DataInputStream in, int logVersion) throws IOException { XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in); src = p.getSrc(); - xAttr = PBHelper.convertXAttr(p.getXAttr()); + xAttrs = PBHelper.convertXAttrs(p.getXAttrsList()); } @Override @@ -3523,25 +3559,25 @@ public abstract class FSEditLogOp { if (src != null) { b.setSrc(src); } - b.setXAttr(PBHelper.convertXAttrProto(xAttr)); + b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs)); b.build().writeDelimitedTo(out); } @Override protected void toXml(ContentHandler contentHandler) throws SAXException { XMLUtils.addSaxString(contentHandler, "SRC", src); - appendXAttrToXml(contentHandler, xAttr); + appendXAttrsToXml(contentHandler, xAttrs); } @Override void fromXml(Stanza st) throws InvalidXmlException { src = st.getValue("SRC"); - xAttr = readXAttrFromXml(st); + xAttrs = readXAttrsFromXml(st); } } static class SetXAttrOp extends FSEditLogOp { - XAttr xAttr; + List xAttrs; String src; private SetXAttrOp() { @@ -3556,7 +3592,7 @@ public abstract class FSEditLogOp { void readFields(DataInputStream in, int logVersion) throws IOException { XAttrEditLogProto p = XAttrEditLogProto.parseDelimitedFrom(in); src = p.getSrc(); - xAttr = PBHelper.convertXAttr(p.getXAttr()); + xAttrs = PBHelper.convertXAttrs(p.getXAttrsList()); readRpcIds(in, logVersion); } @@ -3566,7 +3602,7 @@ public abstract class FSEditLogOp { if (src != null) { b.setSrc(src); } - b.setXAttr(PBHelper.convertXAttrProto(xAttr)); + b.addAllXAttrs(PBHelper.convertXAttrProto(xAttrs)); b.build().writeDelimitedTo(out); // clientId and callId writeRpcIds(rpcClientId, rpcCallId, out); @@ -3575,14 +3611,14 @@ public abstract class FSEditLogOp { @Override protected void toXml(ContentHandler contentHandler) throws SAXException { XMLUtils.addSaxString(contentHandler, "SRC", src); - appendXAttrToXml(contentHandler, xAttr); + appendXAttrsToXml(contentHandler, xAttrs); appendRpcIdsToXml(contentHandler, rpcClientId, rpcCallId); } @Override void fromXml(Stanza st) throws InvalidXmlException { src = st.getValue("SRC"); - xAttr = readXAttrFromXml(st); + xAttrs = readXAttrsFromXml(st); readRpcIdsFromXml(st); } } @@ -4202,42 +4238,48 @@ public abstract class FSEditLogOp { } return aclEntries; } - - private static void appendXAttrToXml(ContentHandler contentHandler, - XAttr xAttr) throws SAXException { - contentHandler.startElement("", "", "XATTR", new AttributesImpl()); - XMLUtils.addSaxString(contentHandler, "NAMESPACE", - xAttr.getNameSpace().toString()); - XMLUtils.addSaxString(contentHandler, "NAME", xAttr.getName()); - if (xAttr.getValue() != null) { - try { - XMLUtils.addSaxString(contentHandler, "VALUE", - XAttrCodec.encodeValue(xAttr.getValue(), XAttrCodec.HEX)); - } catch (IOException e) { - throw new SAXException(e); + + private static void appendXAttrsToXml(ContentHandler contentHandler, + List xAttrs) throws SAXException { + for (XAttr xAttr: xAttrs) { + contentHandler.startElement("", "", "XATTR", new AttributesImpl()); + XMLUtils.addSaxString(contentHandler, "NAMESPACE", + xAttr.getNameSpace().toString()); + XMLUtils.addSaxString(contentHandler, "NAME", xAttr.getName()); + if (xAttr.getValue() != null) { + try { + XMLUtils.addSaxString(contentHandler, "VALUE", + XAttrCodec.encodeValue(xAttr.getValue(), XAttrCodec.HEX)); + } catch (IOException e) { + throw new SAXException(e); + } } + contentHandler.endElement("", "", "XATTR"); } - contentHandler.endElement("", "", "XATTR"); } - - private static XAttr readXAttrFromXml(Stanza st) + + private static List readXAttrsFromXml(Stanza st) throws InvalidXmlException { if (!st.hasChildren("XATTR")) { return null; } - - Stanza a = st.getChildren("XATTR").get(0); - XAttr.Builder builder = new XAttr.Builder(); - builder.setNameSpace(XAttr.NameSpace.valueOf(a.getValue("NAMESPACE"))). - setName(a.getValue("NAME")); - String v = a.getValueOrNull("VALUE"); - if (v != null) { - try { - builder.setValue(XAttrCodec.decodeValue(v)); - } catch (IOException e) { - throw new InvalidXmlException(e.toString()); + + List stanzas = st.getChildren("XATTR"); + List xattrs = Lists.newArrayListWithCapacity(stanzas.size()); + for (Stanza a: stanzas) { + XAttr.Builder builder = new XAttr.Builder(); + builder.setNameSpace(XAttr.NameSpace.valueOf(a.getValue("NAMESPACE"))). + setName(a.getValue("NAME")); + String v = a.getValueOrNull("VALUE"); + if (v != null) { + try { + builder.setValue(XAttrCodec.decodeValue(v)); + } catch (IOException e) { + throw new InvalidXmlException(e.toString()); + } } + xattrs.add(builder.build()); } - return builder.build(); + return xattrs; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 0658a3c3eed..3846add44ee 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -8194,8 +8194,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats, checkOwner(pc, src); checkPathAccess(pc, src, FsAction.WRITE); } - dir.setXAttr(src, xAttr, flag); - getEditLog().logSetXAttr(src, xAttr, logRetryCache); + List xAttrs = Lists.newArrayListWithCapacity(1); + xAttrs.add(xAttr); + dir.setXAttrs(src, xAttrs, flag); + getEditLog().logSetXAttrs(src, xAttrs, logRetryCache); resultingStat = getAuditFileInfo(src, false); } finally { writeUnlock(); @@ -8315,10 +8317,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats, checkOwner(pc, src); checkPathAccess(pc, src, FsAction.WRITE); } - - XAttr removedXAttr = dir.removeXAttr(src, xAttr); - if (removedXAttr != null) { - getEditLog().logRemoveXAttr(src, removedXAttr); + + List xAttrs = Lists.newArrayListWithCapacity(1); + xAttrs.add(xAttr); + List removedXAttrs = dir.removeXAttrs(src, xAttrs); + if (removedXAttrs != null && !removedXAttrs.isEmpty()) { + getEditLog().logRemoveXAttrs(src, removedXAttrs); } resultingStat = getAuditFileInfo(src, false); } catch (AccessControlException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/xattr.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/xattr.proto index 9e70a372946..cb86ff27731 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/xattr.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/xattr.proto @@ -35,8 +35,8 @@ message XAttrProto { } message XAttrEditLogProto { - required string src = 1; - optional XAttrProto xAttr = 2; + optional string src = 1; + repeated XAttrProto xAttrs = 2; } enum XAttrSetFlagProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java index bd35c901be6..011901ddfa7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestFSDirectory.java @@ -24,7 +24,9 @@ import java.io.IOException; import java.io.StringReader; import java.util.EnumSet; import java.util.List; +import java.util.Random; +import com.google.common.collect.ImmutableList; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -44,6 +46,11 @@ import org.junit.Test; import com.google.common.collect.Lists; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + /** * Test {@link FSDirectory}, the in-memory namespace tree. */ @@ -73,6 +80,10 @@ public class TestFSDirectory { private DistributedFileSystem hdfs; + private static final int numGeneratedXAttrs = 256; + private static final ImmutableList generatedXAttrs = + ImmutableList.copyOf(generateXAttrs(numGeneratedXAttrs)); + @Before public void setUp() throws Exception { conf = new Configuration(); @@ -118,9 +129,10 @@ public class TestFSDirectory { for(; (line = in.readLine()) != null; ) { line = line.trim(); if (!line.isEmpty() && !line.contains("snapshot")) { - Assert.assertTrue("line=" + line, + assertTrue("line=" + line, line.startsWith(INodeDirectory.DUMPTREE_LAST_ITEM) - || line.startsWith(INodeDirectory.DUMPTREE_EXCEPT_LAST_ITEM)); + || line.startsWith(INodeDirectory.DUMPTREE_EXCEPT_LAST_ITEM) + ); checkClassName(line); } } @@ -165,7 +177,7 @@ public class TestFSDirectory { int i = line.lastIndexOf('('); int j = line.lastIndexOf('@'); final String classname = line.substring(i+1, j); - Assert.assertTrue(classname.startsWith(INodeFile.class.getSimpleName()) + assertTrue(classname.startsWith(INodeFile.class.getSimpleName()) || classname.startsWith(INodeDirectory.class.getSimpleName())); } @@ -182,22 +194,185 @@ public class TestFSDirectory { // Adding a system namespace xAttr, isn't affected by inode xAttrs limit. XAttr newXAttr = (new XAttr.Builder()).setNameSpace(XAttr.NameSpace.SYSTEM). setName("a3").setValue(new byte[]{0x33, 0x33, 0x33}).build(); - List xAttrs = fsdir.setINodeXAttr(existingXAttrs, newXAttr, + List newXAttrs = Lists.newArrayListWithCapacity(1); + newXAttrs.add(newXAttr); + List xAttrs = fsdir.setINodeXAttrs(existingXAttrs, newXAttrs, EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE)); - Assert.assertEquals(xAttrs.size(), 3); + assertEquals(xAttrs.size(), 3); // Adding a trusted namespace xAttr, is affected by inode xAttrs limit. XAttr newXAttr1 = (new XAttr.Builder()).setNameSpace( XAttr.NameSpace.TRUSTED).setName("a4"). setValue(new byte[]{0x34, 0x34, 0x34}).build(); + newXAttrs.set(0, newXAttr1); try { - fsdir.setINodeXAttr(existingXAttrs, newXAttr1, + fsdir.setINodeXAttrs(existingXAttrs, newXAttrs, EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE)); - Assert.fail("Setting user visable xattr on inode should fail if " + + fail("Setting user visible xattr on inode should fail if " + "reaching limit."); } catch (IOException e) { GenericTestUtils.assertExceptionContains("Cannot add additional XAttr " + "to inode, would exceed limit", e); } } + + /** + * Verify that the first num generatedXAttrs are present in + * newXAttrs. + */ + private static void verifyXAttrsPresent(List newXAttrs, + final int num) { + assertEquals("Unexpected number of XAttrs after multiset", num, + newXAttrs.size()); + for (int i=0; i generateXAttrs(final int numXAttrs) { + List generatedXAttrs = Lists.newArrayListWithCapacity(numXAttrs); + for (int i=0; i existingXAttrs = Lists.newArrayListWithCapacity(0); + + // Keep adding a random number of xattrs and verifying until exhausted + final Random rand = new Random(0xFEEDA); + int numExpectedXAttrs = 0; + while (numExpectedXAttrs < numGeneratedXAttrs) { + LOG.info("Currently have " + numExpectedXAttrs + " xattrs"); + final int numToAdd = rand.nextInt(5)+1; + + List toAdd = Lists.newArrayListWithCapacity(numToAdd); + for (int i = 0; i < numToAdd; i++) { + if (numExpectedXAttrs >= numGeneratedXAttrs) { + break; + } + toAdd.add(generatedXAttrs.get(numExpectedXAttrs)); + numExpectedXAttrs++; + } + LOG.info("Attempting to add " + toAdd.size() + " XAttrs"); + for (int i = 0; i < toAdd.size(); i++) { + LOG.info("Will add XAttr " + toAdd.get(i)); + } + List newXAttrs = fsdir.setINodeXAttrs(existingXAttrs, toAdd, + EnumSet.of(XAttrSetFlag.CREATE)); + verifyXAttrsPresent(newXAttrs, numExpectedXAttrs); + existingXAttrs = newXAttrs; + } + + // Keep removing a random number of xattrs and verifying until all gone + while (numExpectedXAttrs > 0) { + LOG.info("Currently have " + numExpectedXAttrs + " xattrs"); + final int numToRemove = rand.nextInt(5)+1; + List toRemove = Lists.newArrayListWithCapacity(numToRemove); + for (int i = 0; i < numToRemove; i++) { + if (numExpectedXAttrs == 0) { + break; + } + toRemove.add(generatedXAttrs.get(numExpectedXAttrs-1)); + numExpectedXAttrs--; + } + final int expectedNumToRemove = toRemove.size(); + LOG.info("Attempting to remove " + expectedNumToRemove + " XAttrs"); + List removedXAttrs = Lists.newArrayList(); + List newXAttrs = fsdir.filterINodeXAttrs(existingXAttrs, + toRemove, removedXAttrs); + assertEquals("Unexpected number of removed XAttrs", + expectedNumToRemove, removedXAttrs.size()); + verifyXAttrsPresent(newXAttrs, numExpectedXAttrs); + existingXAttrs = newXAttrs; + } + } + + @Test(timeout=300000) + public void testXAttrMultiAddRemoveErrors() throws Exception { + + // Test that the same XAttr can not be multiset twice + List existingXAttrs = Lists.newArrayList(); + List toAdd = Lists.newArrayList(); + toAdd.add(generatedXAttrs.get(0)); + toAdd.add(generatedXAttrs.get(1)); + toAdd.add(generatedXAttrs.get(2)); + toAdd.add(generatedXAttrs.get(0)); + try { + fsdir.setINodeXAttrs(existingXAttrs, toAdd, EnumSet.of(XAttrSetFlag + .CREATE)); + fail("Specified the same xattr to be set twice"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("Cannot specify the same " + + "XAttr to be set", e); + } + + // Test that CREATE and REPLACE flags are obeyed + toAdd.remove(generatedXAttrs.get(0)); + existingXAttrs.add(generatedXAttrs.get(0)); + try { + fsdir.setINodeXAttrs(existingXAttrs, toAdd, EnumSet.of(XAttrSetFlag + .CREATE)); + fail("Set XAttr that is already set without REPLACE flag"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("already exists", e); + } + try { + fsdir.setINodeXAttrs(existingXAttrs, toAdd, EnumSet.of(XAttrSetFlag + .REPLACE)); + fail("Set XAttr that does not exist without the CREATE flag"); + } catch (IOException e) { + GenericTestUtils.assertExceptionContains("does not exist", e); + } + + // Sanity test for CREATE + toAdd.remove(generatedXAttrs.get(0)); + List newXAttrs = fsdir.setINodeXAttrs(existingXAttrs, toAdd, + EnumSet.of(XAttrSetFlag.CREATE)); + assertEquals("Unexpected toAdd size", 2, toAdd.size()); + for (XAttr x : toAdd) { + assertTrue("Did not find added XAttr " + x, newXAttrs.contains(x)); + } + existingXAttrs = newXAttrs; + + // Sanity test for REPLACE + toAdd = Lists.newArrayList(); + for (int i=0; i<3; i++) { + XAttr xAttr = (new XAttr.Builder()) + .setNameSpace(XAttr.NameSpace.SYSTEM) + .setName("a" + i) + .setValue(new byte[] { (byte) (i*2) }) + .build(); + toAdd.add(xAttr); + } + newXAttrs = fsdir.setINodeXAttrs(existingXAttrs, toAdd, + EnumSet.of(XAttrSetFlag.REPLACE)); + assertEquals("Unexpected number of new XAttrs", 3, newXAttrs.size()); + for (int i=0; i<3; i++) { + assertArrayEquals("Unexpected XAttr value", + new byte[] {(byte)(i*2)}, newXAttrs.get(i).getValue()); + } + existingXAttrs = newXAttrs; + + // Sanity test for CREATE+REPLACE + toAdd = Lists.newArrayList(); + for (int i=0; i<4; i++) { + toAdd.add(generatedXAttrs.get(i)); + } + newXAttrs = fsdir.setINodeXAttrs(existingXAttrs, toAdd, + EnumSet.of(XAttrSetFlag.CREATE, XAttrSetFlag.REPLACE)); + verifyXAttrsPresent(newXAttrs, 4); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestXAttrWithSnapshot.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestXAttrWithSnapshot.java index 87b856e2216..85c2fda126c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestXAttrWithSnapshot.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/snapshot/TestXAttrWithSnapshot.java @@ -249,10 +249,10 @@ public class TestXAttrWithSnapshot { private static void doSnapshotRootRemovalAssertions(Path path, Path snapshotPath) throws Exception { Map xattrs = hdfs.getXAttrs(path); - Assert.assertEquals(xattrs.size(), 0); + Assert.assertEquals(0, xattrs.size()); xattrs = hdfs.getXAttrs(snapshotPath); - Assert.assertEquals(xattrs.size(), 2); + Assert.assertEquals(2, xattrs.size()); Assert.assertArrayEquals(value1, xattrs.get(name1)); Assert.assertArrayEquals(value2, xattrs.get(name2)); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored b/hadoop-hdfs-project/hadoop-hdfs/src/test/resources/editsStored index c41b65be4cd3b3da524866ca03e7e696e96d5a43..a3561cda528a9bae7b6fb6e16180ca9a9d344d7a 100644 GIT binary patch literal 4970 zcmcgwdrXs86u<3fOL-R*CMZ&1Zm2^j=m4GL6BHGtFz4ocEKLKS$|x;qi;9}C=yaQl zI$bv0GBpn7jGMt%;+%6a>gMKr$H=B;TjC?W#mBNNPT4v4-mm>A7s4OLL)!E8-gAGy zbM86!@zLJiewcvtg---Nfaq$A+IGjmLlXT-h+el`SA2BwxKpP1`x75-S~zg>j>eV) zS_?_fELu(HucW6>DPEiNU(Vv6aQFs}(TizWPa-mBRXG)Vg-3Dt6ruyZ;-<3d%K?p+ zdTMTTxGFu$yfANFt>T&Oao5%Ani)>onBrH9oK=d;XSbEvN=KJZ9y{i>qOo>sc1})K zPJUjF)ovxB*6iG&S@35i=D!L`CXZTbGrV7z?pS&ye;>e@-dpfPLMS{;VVK0gHB5!b zM`v_TeOzp9y9t4ju^iAC1Q^cXYus~{N`~o-7rGesR@e+L&P6Uq3L%2Aslu`0-=BzH zDZ#ihKGwvvED>{NWtB&%@VPzp_L^#V=1vS#wP50J(b0eI$y$YFF@R%+cE(EGbtVD3 zRg?H9jZZWM+05^KW|2j!ZLmihlBL) zXh2LZWnS4lmjVTC|9yU9B$y1QhJ^G9G9|hpBR6>80N<(J+B>N_3QUJ`^Eg3rX_s`h zoy*1j2M%HW)bffCV(7e5Yr?@CQY+ex60iRD=tpGOgkK=fFeMvEZ@wG3T@?vuc_ zo?wTek|+!c_e_dIR!Y|hfq8yBwQ1*h3$&_;xB$^ae=TwEOf40!8R@#TC3+-M6{00b zl8{P|T{5`5QqRa3>M~tBIDy1_3VtcJfQ?E_w=)$hlgUcou-L zI>4#NUoy-ZYE8S}k)Zb5qS}#Tetik;HZo(9!L%F_xWy77Qj=JM~@wukdQ}bkJz-y<5ei#sazD&gvnFOGAlS99dzRCg+N)}VP$!4?}=Os zw5lvtct#V`0Q2+)ElW0bKnB`&J#4AfmPk%83!NPqSZIj=4+9L8$jR`dBP`IWat`Ar zp=obtZqjlN#}44U`_;O8nOe>lyW%ua&Ndt+6A~KWoUVM?sx3?Pwj5lRft1F~|aBod+9KFKu&|v58Iy7}HhQV6A$JS!pEZ1GU4ynUm?#b|Q|L~*c z>JHbP0`wQ$x_@C`!a6*MXVBjmLyzXCYu`!aczC(C+Z;8D&0VR?51X2dYpb5)kLH?m zV!|_&rS^l0KNf{q5vj;MTS)>x}ukCy|m;(DvT`V-i+ZILy9Jb%vy(U4#dt} zw9wEOoD)J#ytCr-bi52?aR~Ka1qh{wKN0_=eGRRr)4+hA=zc$RLi18r)!H3+6QMT) z(BsrVMc}pp?t>T7!KxfSDC3L5L>JCjqluVFUt??p)2Y4cyov`}bo|m2o5$G1QJy-r=q*bN0cvw+Q&4*tVC~FZmx! Cv|RrH literal 4805 zcmcIodr(wW7(ct0$1ZO~OnK;rnxyeqKtWNtyp)${mL+I08(bdoY*`~5)11;Nov>_* z@j*^WN={~_Gv<&}12m?ire>B}(!(i55X9Gvn)H3=oV$Cydtv-Tzn$f~d%o}de&2V_ zcOI94fq?@AWF&mO@Bu_uliIg85FSG5PeSy%HM+F+H^0d<#Q(WX%xvwQ9QS#|N~MRO zXC8|U%RH|O>r?JLosc+e6n5XlFtzJ^2Yo^U%X|tY6ttgYKYz4Vi%e3XoQFP6p;{Du3;Fo;_LLg zzxe*?ZP)-Qk#r8{rh@Zi;FZ-cOGON^bGT-JhC{pB>v0s~Ig&fk=o_Tj*VupJ?JWZI z%i(L;p1$e_^Z=RIeu)NI~Zh#VfQ-d25vs*GXtylG3*W!aIXU~ zR7rJn>#8KE<(Bv6e_Sd8h20 z%f}ux%EPXWpeNf)UN-^JaAX6)qk@PKMCOO5#*Tq!*+BPuLWGOQ;)0Jp3WhlNq6BFs zgxe&=Aq~PsLST&VKD%}ISrfF%h`8*~MSn4nU*;b`_zw zgE;TKwJo^tK0*@+LNXb$26kxF0B-m+N!Fj{439XrEL~N*JsMT}C8r~x`M_UajXxP@?n>a|+Cq#=l`Ip$2}I~I z@91bmmKpRnA>nEkIx-@#&1eI{s?IUnaPZkNAk^jC6DL<4YM&g-@= zRdP-mjMJ5J(gR+EE0KfqgI#y8V^wZOK#(Vxg1>{!cbmq1%LR!i00_+#G!7<961Ae; z9*C;_vUA=18gQ!^z2t(7U|mFaRe)xDL3Z!}L8Gu>oPnwF0Tg6x-&>P6ngC6nHhRp6 zgt-5A>QJt7+Iam5y+zC1w<-%VTAhnrknT{By{sV5wEh^cWHAFH0T#n(zx)+SmKZd^ z*!S(Zy{kr99IzN{a({7@(s#j(HxQW`b5ZvpDWz9Q#_-HYMj|Gyvz7?M?kr zO3o>RaT+LRANCRnF*-Q&rB8lQ7NmSBRu`mWzRzmRw_K2T;{ahb00G*QRJGd!QMF%j zuA7|A*+qD1DrBxCLGw5Oqh6J}(=7A2J2meNGgrMq=J*3l4dEevDQb} zq*}*yvkYRd@ sIY=2 1 - 1394849922137 - 37e1a64049bbef35 + 1403590428625 + 16f34bfba67b2552 @@ -24,8 +24,8 @@ 3 2 - 1394849922140 - 7c0bf5039242fc54 + 1403590428631 + dbe6282854469833 @@ -37,18 +37,18 @@ 16386 /file_create 1 - 1394158722811 - 1394158722811 + 1402899229669 + 1402899229669 512 - DFSClient_NONMAPREDUCE_221786725_1 + DFSClient_NONMAPREDUCE_1233039831_1 127.0.0.1 - jing + andrew supergroup 420 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 6 + e03f4a52-3d85-4e05-8942-286185e639bd + 8 @@ -59,13 +59,13 @@ 0 /file_create 1 - 1394158722832 - 1394158722811 + 1402899229711 + 1402899229669 512 - jing + andrew supergroup 420 @@ -78,9 +78,9 @@ 0 /file_create /file_moved - 1394158722836 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 8 + 1402899229718 + e03f4a52-3d85-4e05-8942-286185e639bd + 10 @@ -89,9 +89,9 @@ 7 0 /file_moved - 1394158722842 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 9 + 1402899229730 + e03f4a52-3d85-4e05-8942-286185e639bd + 11 @@ -101,9 +101,9 @@ 0 16387 /directory_mkdir - 1394158722848 + 1402899229748 - jing + andrew supergroup 493 @@ -136,8 +136,8 @@ 12 /directory_mkdir snapshot1 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 14 + e03f4a52-3d85-4e05-8942-286185e639bd + 16 @@ -147,8 +147,8 @@ /directory_mkdir snapshot1 snapshot2 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 15 + e03f4a52-3d85-4e05-8942-286185e639bd + 17 @@ -157,8 +157,8 @@ 14 /directory_mkdir snapshot2 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 16 + e03f4a52-3d85-4e05-8942-286185e639bd + 18 @@ -169,18 +169,18 @@ 16388 /file_create 1 - 1394158722872 - 1394158722872 + 1402899229871 + 1402899229871 512 - DFSClient_NONMAPREDUCE_221786725_1 + DFSClient_NONMAPREDUCE_1233039831_1 127.0.0.1 - jing + andrew supergroup 420 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 17 + e03f4a52-3d85-4e05-8942-286185e639bd + 19 @@ -191,13 +191,13 @@ 0 /file_create 1 - 1394158722874 - 1394158722872 + 1402899229881 + 1402899229871 512 - jing + andrew supergroup 420 @@ -253,10 +253,10 @@ 0 /file_create /file_moved - 1394158722890 + 1402899229963 NONE - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 24 + e03f4a52-3d85-4e05-8942-286185e639bd + 26 @@ -267,18 +267,18 @@ 16389 /file_concat_target 1 - 1394158722895 - 1394158722895 + 1402899229981 + 1402899229981 512 - DFSClient_NONMAPREDUCE_221786725_1 + DFSClient_NONMAPREDUCE_1233039831_1 127.0.0.1 - jing + andrew supergroup 420 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 26 + e03f4a52-3d85-4e05-8942-286185e639bd + 28 @@ -383,8 +383,8 @@ 0 /file_concat_target 1 - 1394158722986 - 1394158722895 + 1402899230219 + 1402899229981 512 @@ -404,7 +404,7 @@ 1003 - jing + andrew supergroup 420 @@ -418,18 +418,18 @@ 16390 /file_concat_0 1 - 1394158722989 - 1394158722989 + 1402899230235 + 1402899230235 512 - DFSClient_NONMAPREDUCE_221786725_1 + DFSClient_NONMAPREDUCE_1233039831_1 127.0.0.1 - jing + andrew supergroup 420 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 39 + e03f4a52-3d85-4e05-8942-286185e639bd + 41 @@ -534,8 +534,8 @@ 0 /file_concat_0 1 - 1394158723010 - 1394158722989 + 1402899230307 + 1402899230235 512 @@ -555,7 +555,7 @@ 1006 - jing + andrew supergroup 420 @@ -569,18 +569,18 @@ 16391 /file_concat_1 1 - 1394158723012 - 1394158723012 + 1402899230320 + 1402899230320 512 - DFSClient_NONMAPREDUCE_221786725_1 + DFSClient_NONMAPREDUCE_1233039831_1 127.0.0.1 - jing + andrew supergroup 420 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 51 + e03f4a52-3d85-4e05-8942-286185e639bd + 53 @@ -685,8 +685,8 @@ 0 /file_concat_1 1 - 1394158723035 - 1394158723012 + 1402899230383 + 1402899230320 512 @@ -706,7 +706,7 @@ 1009 - jing + andrew supergroup 420 @@ -718,13 +718,13 @@ 56 0 /file_concat_target - 1394158723039 + 1402899230394 /file_concat_0 /file_concat_1 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 62 + e03f4a52-3d85-4e05-8942-286185e639bd + 64 @@ -735,15 +735,15 @@ 16392 /file_symlink /file_concat_target - 1394158723044 - 1394158723044 + 1402899230406 + 1402899230406 - jing + andrew supergroup 511 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 63 + e03f4a52-3d85-4e05-8942-286185e639bd + 65 @@ -754,18 +754,18 @@ 16393 /hard-lease-recovery-test 1 - 1394158723047 - 1394158723047 + 1402899230413 + 1402899230413 512 - DFSClient_NONMAPREDUCE_221786725_1 + DFSClient_NONMAPREDUCE_1233039831_1 127.0.0.1 - jing + andrew supergroup 420 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 64 + e03f4a52-3d85-4e05-8942-286185e639bd + 66 @@ -821,7 +821,7 @@ OP_REASSIGN_LEASE 64 - DFSClient_NONMAPREDUCE_221786725_1 + DFSClient_NONMAPREDUCE_1233039831_1 /hard-lease-recovery-test HDFS_NameNode @@ -834,8 +834,8 @@ 0 /hard-lease-recovery-test 1 - 1394158725708 - 1394158723047 + 1402899232526 + 1402899230413 512 @@ -845,7 +845,7 @@ 1011 - jing + andrew supergroup 420 @@ -856,13 +856,13 @@ 66 pool1 - jing - staff + andrew + andrew 493 9223372036854775807 2305843009213693951 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 71 + e03f4a52-3d85-4e05-8942-286185e639bd + 73 @@ -871,8 +871,8 @@ 67 pool1 99 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 72 + e03f4a52-3d85-4e05-8942-286185e639bd + 74 @@ -883,9 +883,9 @@ /path 1 pool1 - 2305844403372420029 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 73 + 2305844412112927450 + e03f4a52-3d85-4e05-8942-286185e639bd + 75 @@ -894,8 +894,8 @@ 69 1 2 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 74 + e03f4a52-3d85-4e05-8942-286185e639bd + 76 @@ -903,8 +903,8 @@ 70 1 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 75 + e03f4a52-3d85-4e05-8942-286185e639bd + 77 @@ -912,8 +912,8 @@ 71 pool1 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 - 76 + e03f4a52-3d85-4e05-8942-286185e639bd + 78 @@ -921,51 +921,91 @@ 72 /file_concat_target - - - - OP_ROLLING_UPGRADE_START - - 73 - 1394158726098 - - - - OP_ROLLING_UPGRADE_FINALIZE - - 74 - 1394158726098 + + ACCESS + USER + rw- + + + ACCESS + USER + user + rw- + + + ACCESS + GROUP + -w- + + + ACCESS + MASK + rw- + + + ACCESS + OTHER + --- + OP_SET_XATTR - 75 + 73 /file_concat_target USER a1 0x313233 - 9b85a845-bbfa-42f6-8a16-c433614b8eb9 + e03f4a52-3d85-4e05-8942-286185e639bd 80 + + OP_SET_XATTR + + 74 + /file_concat_target + + USER + a2 + 0x373839 + + e03f4a52-3d85-4e05-8942-286185e639bd + 81 + + OP_REMOVE_XATTR - 76 + 75 /file_concat_target USER - a1 + a2 + + OP_ROLLING_UPGRADE_START + + 76 + 1402899233646 + + + + OP_ROLLING_UPGRADE_FINALIZE + + 77 + 1402899233647 + + OP_END_LOG_SEGMENT - 77 + 78 From a3908b89f7b09a2870eb12556feb39a7207f3aea Mon Sep 17 00:00:00 2001 From: Andrew Wang Date: Thu, 19 Jun 2014 17:42:04 +0000 Subject: [PATCH 8/8] Move HDFS-6375 down to 2.5 section in CHANGES.txt git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1603974 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 8e6a9ee7031..a88324336f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -254,9 +254,6 @@ Trunk (Unreleased) HDFS-5794. Fix the inconsistency of layout version number of ADD_DATANODE_AND_STORAGE_UUIDS between trunk and branch-2. (jing9) - HDFS-6375. Listing extended attributes with the search permission. - (Charles Lamb via wang) - Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES @@ -736,6 +733,9 @@ Release 2.5.0 - UNRELEASED HDFS-6374. setXAttr should require the user to be the owner of the file or directory (Charles Lamb via wang) + HDFS-6375. Listing extended attributes with the search permission. + (Charles Lamb via wang) + HDFS-6492. Support create-time xattrs and atomically setting multiple xattrs. (wang)