diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 2e702a9bb94..3c472f3b3f8 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -4,6 +4,9 @@ Trunk (unreleased changes) INCOMPATIBLE CHANGES + HADOOP-8124. Remove the deprecated FSDataOutputStream constructor, + FSDataOutputStream.sync() and Syncable.sync(). (szetszwo) + NEW FEATURES IMPROVEMENTS diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java index 62b0f966a27..e75bef5509b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FSDataOutputStream.java @@ -17,7 +17,11 @@ */ package org.apache.hadoop.fs; -import java.io.*; +import java.io.BufferedOutputStream; +import java.io.DataOutputStream; +import java.io.FilterOutputStream; +import java.io.IOException; +import java.io.OutputStream; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -28,20 +32,19 @@ @InterfaceAudience.Public @InterfaceStability.Stable public class FSDataOutputStream extends DataOutputStream implements Syncable { - private OutputStream wrappedStream; + private final OutputStream wrappedStream; private static class PositionCache extends FilterOutputStream { - private FileSystem.Statistics statistics; - long position; + private final FileSystem.Statistics statistics; + private long position; - public PositionCache(OutputStream out, - FileSystem.Statistics stats, - long pos) throws IOException { + PositionCache(OutputStream out, FileSystem.Statistics stats, long pos) { super(out); statistics = stats; position = pos; } + @Override public void write(int b) throws IOException { out.write(b); position++; @@ -50,6 +53,7 @@ public void write(int b) throws IOException { } } + @Override public void write(byte b[], int off, int len) throws IOException { out.write(b, off, len); position += len; // update position @@ -58,27 +62,22 @@ public void write(byte b[], int off, int len) throws IOException { } } - public long getPos() throws IOException { + long getPos() { return position; // return cached position } - + + @Override public void close() throws IOException { out.close(); } } - @Deprecated - public FSDataOutputStream(OutputStream out) throws IOException { - this(out, null); - } - - public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats) - throws IOException { + public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats) { this(out, stats, 0); } public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats, - long startPosition) throws IOException { + long startPosition) { super(new PositionCache(out, stats, startPosition)); wrappedStream = out; } @@ -88,13 +87,14 @@ public FSDataOutputStream(OutputStream out, FileSystem.Statistics stats, * * @return the current position in the output stream */ - public long getPos() throws IOException { + public long getPos() { return ((PositionCache)out).getPos(); } /** * Close the underlying output stream. */ + @Override public void close() throws IOException { out.close(); // This invokes PositionCache.close() } @@ -109,14 +109,6 @@ public OutputStream getWrappedStream() { return wrappedStream; } - @Override // Syncable - @Deprecated - public void sync() throws IOException { - if (wrappedStream instanceof Syncable) { - ((Syncable)wrappedStream).sync(); - } - } - @Override // Syncable public void hflush() throws IOException { if (wrappedStream instanceof Syncable) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java index 85abe067f31..7ec3509ce1d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Syncable.java @@ -27,11 +27,6 @@ @InterfaceAudience.Public @InterfaceStability.Evolving public interface Syncable { - /** - * @deprecated As of HADOOP 0.21.0, replaced by hflush - * @see #hflush() - */ - @Deprecated public void sync() throws IOException; /** Flush out the data in client's user buffer. After the return of * this call, new readers will see the data. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java index 476eaeb14b5..0ba5dbff906 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/SequenceFile.java @@ -1196,7 +1196,7 @@ public void sync() throws IOException { /** flush all currently written data to the file system */ public void syncFs() throws IOException { if (out != null) { - out.sync(); // flush contents to file system + out.hflush(); // flush contents to file system } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f748a853ed2..2a8a5a76c95 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -4,6 +4,8 @@ Trunk (unreleased changes) INCOMPATIBLE CHANGES + HDFS-3034. Remove the deprecated DFSOutputStream.sync() method. (szetszwo) + NEW FEATURES HDFS-2430. The number of failed or low-resource volumes the NN can tolerate @@ -68,6 +70,8 @@ Trunk (unreleased changes) HDFS-3030. Remove getProtocolVersion and getProtocolSignature from translators. (jitendra) + HDFS-3036. Remove unused method DFSUtil#isDefaultNamenodeAddress. (atm) + OPTIMIZATIONS HDFS-2477. Optimize computing the diff between a block report and the @@ -117,6 +121,9 @@ Trunk (unreleased changes) HDFS-2908. Add apache license header for StorageReport.java. (Brandon Li via jitendra) + HDFS-3037. TestMulitipleNNDataBlockScanner#testBlockScannerAfterRestart is + racy. (atm) + Release 0.23.3 - UNRELEASED INCOMPATIBLE CHANGES @@ -162,6 +169,8 @@ Release 0.23.3 - UNRELEASED HDFS-2899. Service protocol changes in DatanodeProtocol to add multiple storages. (suresh) + HDFS-3021. Use generic type to declare FSDatasetInterface. (szetszwo) + IMPROVEMENTS HDFS-2018. Move all journal stream management code into one place. @@ -252,6 +261,8 @@ Release 0.23.3 - UNRELEASED HDFS-3020. Fix editlog to automatically sync when buffer is full. (todd) + HDFS-3038. Add FSEditLog.metrics to findbugs exclude list. (todd via atm) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index 5590055539f..301d3028252 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -247,6 +247,15 @@ + + + + + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index ff81d7752b9..08e69e78a90 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -1410,12 +1410,6 @@ protected synchronized void writeChunk(byte[] b, int offset, int len, byte[] che } } } - - @Override - @Deprecated - public synchronized void sync() throws IOException { - hflush(); - } /** * Flushes out to all replicas of the block. The data is in the buffers diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java index e63ed0d26b6..cbc0f0ea238 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSUtil.java @@ -676,8 +676,6 @@ public static Collection getNameServiceUris(Configuration conf, * corresponding to the key with matching address, by doing a reverse * lookup on the list of nameservices until it finds a match. * - * If null is returned, client should try {@link #isDefaultNamenodeAddress} - * to check pre-Federation, non-HA configurations. * Since the process of resolving URIs to Addresses is slightly expensive, * this utility method should not be used in performance-critical routines. * @@ -768,38 +766,6 @@ private static String getSuffixedConf(Configuration conf, return conf.get(key, defaultVal); } - /** - * Given the InetSocketAddress for any configured communication with a - * namenode, this method determines whether it is the configured - * communication channel for the "default" namenode. - * It does a reverse lookup on the list of default communication parameters - * to see if the given address matches any of them. - * Since the process of resolving URIs to Addresses is slightly expensive, - * this utility method should not be used in performance-critical routines. - * - * @param conf - configuration - * @param address - InetSocketAddress for configured communication with NN. - * Configured addresses are typically given as URIs, but we may have to - * compare against a URI typed in by a human, or the server name may be - * aliased, so we compare unambiguous InetSocketAddresses instead of just - * comparing URI substrings. - * @param keys - list of configured communication parameters that should - * be checked for matches. For example, to compare against RPC addresses, - * provide the list DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, - * DFS_NAMENODE_RPC_ADDRESS_KEY - * @return - boolean confirmation if matched generic parameter - */ - public static boolean isDefaultNamenodeAddress(Configuration conf, - InetSocketAddress address, String... keys) { - for (String key : keys) { - String candidateAddress = conf.get(key); - if (candidateAddress != null - && address.equals(NetUtils.createSocketAddr(candidateAddress))) - return true; - } - return false; - } - /** * Sets the node specific setting into generic configuration key. Looks up * value of "key.nameserviceId.namenodeId" and if found sets that value into diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java index 54c1b6f3952..50e45750c2b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockPoolSliceScanner.java @@ -74,7 +74,7 @@ class BlockPoolSliceScanner { private long scanPeriod = DEFAULT_SCAN_PERIOD_HOURS * 3600 * 1000; private DataNode datanode; - private final FSDatasetInterface dataset; + private final FSDatasetInterface dataset; // sorted set private TreeSet blockInfoSet; @@ -133,7 +133,8 @@ public int compareTo(BlockScanInfo other) { } } - BlockPoolSliceScanner(DataNode datanode, FSDatasetInterface dataset, + BlockPoolSliceScanner(DataNode datanode, + FSDatasetInterface dataset, Configuration conf, String bpid) { this.datanode = datanode; this.dataset = dataset; @@ -216,7 +217,7 @@ void init() throws IOException { * otherwise, pick the first directory. */ File dir = null; - List volumes = dataset.getVolumes(); + final List volumes = dataset.getVolumes(); for (FSVolumeInterface vol : volumes) { File bpDir = vol.getDirectory(blockPoolId); if (LogFileHandler.isFilePresent(bpDir, verificationLogFile)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java index 31cf30a925d..c96be75f125 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockVolumeChoosingPolicy.java @@ -21,7 +21,6 @@ import java.util.List; import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; /************************************************** @@ -34,7 +33,7 @@ * ***************************************************/ @InterfaceAudience.Private -public interface BlockVolumeChoosingPolicy { +public interface BlockVolumeChoosingPolicy { /** * Returns a specific FSVolume after applying a suitable choice algorithm @@ -48,7 +47,5 @@ public interface BlockVolumeChoosingPolicy { * @return the chosen volume to store the block. * @throws IOException when disks are unavailable or are full. */ - public FSVolumeInterface chooseVolume(List volumes, long blockSize) - throws IOException; - + public V chooseVolume(List volumes, long blockSize) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java index c0d0bff23c2..9f772bc60bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataBlockScanner.java @@ -33,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; /** * DataBlockScanner manages block scanning for all the block pools. For each @@ -44,7 +45,7 @@ public class DataBlockScanner implements Runnable { public static final Log LOG = LogFactory.getLog(DataBlockScanner.class); private final DataNode datanode; - private final FSDatasetInterface dataset; + private final FSDatasetInterface dataset; private final Configuration conf; /** @@ -55,7 +56,9 @@ public class DataBlockScanner implements Runnable { new TreeMap(); Thread blockScannerThread = null; - DataBlockScanner(DataNode datanode, FSDatasetInterface dataset, Configuration conf) { + DataBlockScanner(DataNode datanode, + FSDatasetInterface dataset, + Configuration conf) { this.datanode = datanode; this.dataset = dataset; this.conf = conf; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 79bccef626f..098809cb3a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -123,6 +123,7 @@ import org.apache.hadoop.hdfs.server.common.JspHelper; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.Util; +import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; import org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter.SecureResources; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods; @@ -139,7 +140,6 @@ import org.apache.hadoop.http.HttpServer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.ProtobufRpcEngine; -import org.apache.hadoop.ipc.ProtocolSignature; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.Server; @@ -234,7 +234,7 @@ public static InetSocketAddress createSocketAddr(String target) { volatile boolean shouldRun = true; private BlockPoolManager blockPoolManager; - public volatile FSDatasetInterface data = null; + public volatile FSDatasetInterface data = null; private String clusterId = null; public final static String EMPTY_DEL_HINT = ""; @@ -812,7 +812,7 @@ int getBpOsCount() { * handshake with the the first namenode is completed. */ private void initStorage(final NamespaceInfo nsInfo) throws IOException { - final FSDatasetInterface.Factory factory + final FSDatasetInterface.Factory> factory = FSDatasetInterface.Factory.getFactory(conf); if (!factory.isSimulated()) { @@ -1694,11 +1694,11 @@ public void scheduleAllBlockReport(long delay) { /** * This method is used for testing. * Examples are adding and deleting blocks directly. - * The most common usage will be when the data node's storage is similated. + * The most common usage will be when the data node's storage is simulated. * * @return the fsdataset that stores the blocks */ - public FSDatasetInterface getFSDataset() { + FSDatasetInterface getFSDataset() { return data; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java index 40b51a28b2b..97ff5a8416e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DirectoryScanner.java @@ -55,7 +55,7 @@ public class DirectoryScanner implements Runnable { private static final Log LOG = LogFactory.getLog(DirectoryScanner.class); private final DataNode datanode; - private final FSDatasetInterface dataset; + private final FSDatasetInterface dataset; private final ExecutorService reportCompileThreadPool; private final ScheduledExecutorService masterThread; private final long scanPeriodMsecs; @@ -219,7 +219,7 @@ public long getGenStamp() { } } - DirectoryScanner(DataNode dn, FSDatasetInterface dataset, Configuration conf) { + DirectoryScanner(DataNode dn, FSDatasetInterface dataset, Configuration conf) { this.datanode = dn; this.dataset = dataset; int interval = conf.getInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_INTERVAL_KEY, @@ -411,7 +411,7 @@ private void addDifference(LinkedList diffRecord, } /** Is the given volume still valid in the dataset? */ - private static boolean isValid(final FSDatasetInterface dataset, + private static boolean isValid(final FSDatasetInterface dataset, final FSVolumeInterface volume) { for (FSVolumeInterface vol : dataset.getVolumes()) { if (vol == volume) { @@ -424,7 +424,7 @@ private static boolean isValid(final FSDatasetInterface dataset, /** Get lists of blocks on the disk sorted by blockId, per blockpool */ private Map getDiskReport() { // First get list of data directories - final List volumes = dataset.getVolumes(); + final List volumes = dataset.getVolumes(); ArrayList dirReports = new ArrayList(volumes.size()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java index b1540fe5a82..457be4158e0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDataset.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.server.common.GenerationStamp; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; +import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; @@ -74,13 +75,13 @@ * ***************************************************/ @InterfaceAudience.Private -class FSDataset implements FSDatasetInterface { +class FSDataset implements FSDatasetInterface { /** * A factory for creating FSDataset objects. */ - static class Factory extends FSDatasetInterface.Factory { + static class Factory extends FSDatasetInterface.Factory { @Override - public FSDatasetInterface createFSDatasetInterface(DataNode datanode, + public FSDataset createFSDatasetInterface(DataNode datanode, DataStorage storage, Configuration conf) throws IOException { return new FSDataset(datanode, storage, conf); } @@ -786,13 +787,13 @@ static class FSVolumeSet { * Read access to this unmodifiable list is not synchronized. * This list is replaced on modification holding "this" lock. */ - private volatile List volumes = null; + private volatile List volumes = null; - BlockVolumeChoosingPolicy blockChooser; + BlockVolumeChoosingPolicy blockChooser; int numFailedVolumes; - FSVolumeSet(List volumes, int failedVols, - BlockVolumeChoosingPolicy blockChooser) { + FSVolumeSet(List volumes, int failedVols, + BlockVolumeChoosingPolicy blockChooser) { this.volumes = Collections.unmodifiableList(volumes); this.blockChooser = blockChooser; this.numFailedVolumes = failedVols; @@ -810,29 +811,29 @@ private int numberOfFailedVolumes() { * @return next volume to store the block in. */ synchronized FSVolume getNextVolume(long blockSize) throws IOException { - return (FSVolume)blockChooser.chooseVolume(volumes, blockSize); + return blockChooser.chooseVolume(volumes, blockSize); } private long getDfsUsed() throws IOException { long dfsUsed = 0L; - for (FSVolumeInterface v : volumes) { - dfsUsed += ((FSVolume)v).getDfsUsed(); + for (FSVolume v : volumes) { + dfsUsed += v.getDfsUsed(); } return dfsUsed; } private long getBlockPoolUsed(String bpid) throws IOException { long dfsUsed = 0L; - for (FSVolumeInterface v : volumes) { - dfsUsed += ((FSVolume)v).getBlockPoolUsed(bpid); + for (FSVolume v : volumes) { + dfsUsed += v.getBlockPoolUsed(bpid); } return dfsUsed; } private long getCapacity() { long capacity = 0L; - for (FSVolumeInterface v : volumes) { - capacity += ((FSVolume)v).getCapacity(); + for (FSVolume v : volumes) { + capacity += v.getCapacity(); } return capacity; } @@ -845,17 +846,16 @@ private long getRemaining() throws IOException { return remaining; } - private void getVolumeMap(ReplicasMap volumeMap) - throws IOException { - for (FSVolumeInterface v : volumes) { - ((FSVolume)v).getVolumeMap(volumeMap); + private void getVolumeMap(ReplicasMap volumeMap) throws IOException { + for (FSVolume v : volumes) { + v.getVolumeMap(volumeMap); } } private void getVolumeMap(String bpid, ReplicasMap volumeMap) throws IOException { - for (FSVolumeInterface v : volumes) { - ((FSVolume)v).getVolumeMap(bpid, volumeMap); + for (FSVolume v : volumes) { + v.getVolumeMap(bpid, volumeMap); } } @@ -871,10 +871,10 @@ private synchronized List checkDirs() { ArrayList removedVols = null; // Make a copy of volumes for performing modification - final List volumeList = new ArrayList(volumes); + final List volumeList = new ArrayList(volumes); for (int idx = 0; idx < volumeList.size(); idx++) { - FSVolume fsv = (FSVolume)volumeList.get(idx); + FSVolume fsv = volumeList.get(idx); try { fsv.checkDirs(); } catch (DiskErrorException e) { @@ -891,8 +891,8 @@ private synchronized List checkDirs() { // Remove null volumes from the volumes array if (removedVols != null && removedVols.size() > 0) { - List newVols = new ArrayList(); - for (FSVolumeInterface vol : volumeList) { + final List newVols = new ArrayList(); + for (FSVolume vol : volumeList) { if (vol != null) { newVols.add(vol); } @@ -914,21 +914,21 @@ public String toString() { private void addBlockPool(String bpid, Configuration conf) throws IOException { - for (FSVolumeInterface v : volumes) { - ((FSVolume)v).addBlockPool(bpid, conf); + for (FSVolume v : volumes) { + v.addBlockPool(bpid, conf); } } private void removeBlockPool(String bpid) { - for (FSVolumeInterface v : volumes) { - ((FSVolume)v).shutdownBlockPool(bpid); + for (FSVolume v : volumes) { + v.shutdownBlockPool(bpid); } } private void shutdown() { - for (FSVolumeInterface volume : volumes) { + for (FSVolume volume : volumes) { if(volume != null) { - ((FSVolume)volume).shutdown(); + volume.shutdown(); } } } @@ -991,7 +991,7 @@ private static long parseGenerationStamp(File blockFile, File metaFile } @Override // FSDatasetInterface - public List getVolumes() { + public List getVolumes() { return volumes.volumes; } @@ -1099,7 +1099,7 @@ private FSDataset(DataNode datanode, DataStorage storage, Configuration conf + ", volume failures tolerated: " + volFailuresTolerated); } - final List volArray = new ArrayList( + final List volArray = new ArrayList( storage.getNumStorageDirs()); for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { final File dir = storage.getStorageDir(idx).getCurrentDir(); @@ -1108,12 +1108,12 @@ private FSDataset(DataNode datanode, DataStorage storage, Configuration conf } volumeMap = new ReplicasMap(this); - BlockVolumeChoosingPolicy blockChooserImpl = - (BlockVolumeChoosingPolicy) ReflectionUtils.newInstance( - conf.getClass(DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY, + @SuppressWarnings("unchecked") + final BlockVolumeChoosingPolicy blockChooserImpl = + ReflectionUtils.newInstance(conf.getClass( + DFSConfigKeys.DFS_DATANODE_BLOCKVOLUMECHOICEPOLICY, RoundRobinVolumesPolicy.class, - BlockVolumeChoosingPolicy.class), - conf); + BlockVolumeChoosingPolicy.class), conf); volumes = new FSVolumeSet(volArray, volsFailed, blockChooserImpl); volumes.getVolumeMap(volumeMap); @@ -2001,7 +2001,7 @@ public void invalidate(String bpid, Block invalidBlks[]) throws IOException { boolean error = false; for (int i = 0; i < invalidBlks.length; i++) { File f = null; - FSVolume v; + final FSVolume v; synchronized (this) { f = getFile(bpid, invalidBlks[i].getBlockId()); ReplicaInfo dinfo = volumeMap.get(bpid, invalidBlks[i]); @@ -2553,8 +2553,7 @@ private static class VolumeInfo { private Collection getVolumeInfo() { Collection info = new ArrayList(); - for (FSVolumeInterface v : volumes.volumes) { - final FSVolume volume = (FSVolume)v; + for (FSVolume volume : volumes.volumes) { long used = 0; long free = 0; try { @@ -2590,8 +2589,8 @@ public Map getVolumeInfoMap() { public synchronized void deleteBlockPool(String bpid, boolean force) throws IOException { if (!force) { - for (FSVolumeInterface volume : volumes.volumes) { - if (!((FSVolume)volume).isBPDirEmpty(bpid)) { + for (FSVolume volume : volumes.volumes) { + if (!volume.isBPDirEmpty(bpid)) { DataNode.LOG.warn(bpid + " has some block files, cannot delete unless forced"); throw new IOException("Cannot delete block pool, " @@ -2599,8 +2598,8 @@ public synchronized void deleteBlockPool(String bpid, boolean force) } } } - for (FSVolumeInterface volume : volumes.volumes) { - ((FSVolume)volume).deleteBPDirectories(bpid, force); + for (FSVolume volume : volumes.volumes) { + volume.deleteBPDirectories(bpid, force); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java index 2487ca6ed9b..6e2fb201c84 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java @@ -50,13 +50,15 @@ * */ @InterfaceAudience.Private -public interface FSDatasetInterface extends FSDatasetMBean { +public interface FSDatasetInterface + extends FSDatasetMBean { /** * A factory for creating FSDatasetInterface objects. */ - public abstract class Factory { + public abstract class Factory> { /** @return the configured factory. */ - public static Factory getFactory(Configuration conf) { + public static Factory getFactory(Configuration conf) { + @SuppressWarnings("rawtypes") final Class clazz = conf.getClass( DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY, FSDataset.Factory.class, @@ -65,7 +67,7 @@ public static Factory getFactory(Configuration conf) { } /** Create a FSDatasetInterface object. */ - public abstract FSDatasetInterface createFSDatasetInterface( + public abstract D createFSDatasetInterface( DataNode datanode, DataStorage storage, Configuration conf ) throws IOException; @@ -94,7 +96,7 @@ interface FSVolumeInterface { } /** @return a list of volumes. */ - public List getVolumes(); + public List getVolumes(); /** @return a volume information map (name => info). */ public Map getVolumeInfoMap(); @@ -234,7 +236,7 @@ static class BlockWriteStreams { this.checksum = checksum; } - void close() throws IOException { + void close() { IOUtils.closeStream(dataOut); IOUtils.closeStream(checksumOut); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java index 1463287268f..00fdffab2f7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/RoundRobinVolumesPolicy.java @@ -23,13 +23,14 @@ import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException; -public class RoundRobinVolumesPolicy implements BlockVolumeChoosingPolicy { +public class RoundRobinVolumesPolicy + implements BlockVolumeChoosingPolicy { private int curVolume = 0; @Override - public synchronized FSVolumeInterface chooseVolume( - List volumes, long blockSize) throws IOException { + public synchronized V chooseVolume(final List volumes, final long blockSize + ) throws IOException { if(volumes.size() < 1) { throw new DiskOutOfSpaceException("No more available volumes"); } @@ -44,7 +45,7 @@ public synchronized FSVolumeInterface chooseVolume( long maxAvailable = 0; while (true) { - FSVolumeInterface volume = volumes.get(curVolume); + final V volume = volumes.get(curVolume); curVolume = (curVolume + 1) % volumes.size(); long availableVolumeSize = volume.getAvailable(); if (availableVolumeSize > blockSize) { return volume; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java index 584446ac13c..8888bec1cf4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/MiniDFSCluster.java @@ -1768,8 +1768,8 @@ public Iterable getBlockReport(String bpid, int dataNodeIndex) { if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } - return dataNodes.get(dataNodeIndex).datanode.getFSDataset().getBlockReport( - bpid); + final DataNode dn = dataNodes.get(dataNodeIndex).datanode; + return DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid); } @@ -1801,7 +1801,8 @@ public void injectBlocks(int dataNodeIndex, Iterable blocksToInject) thro if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } - FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset(); + final DataNode dn = dataNodes.get(dataNodeIndex).datanode; + final FSDatasetInterface dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } @@ -1819,7 +1820,8 @@ public void injectBlocks(int nameNodeIndex, int dataNodeIndex, if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) { throw new IndexOutOfBoundsException(); } - FSDatasetInterface dataSet = dataNodes.get(dataNodeIndex).datanode.getFSDataset(); + final DataNode dn = dataNodes.get(dataNodeIndex).datanode; + final FSDatasetInterface dataSet = DataNodeTestUtils.getFSDataset(dn); if (!(dataSet instanceof SimulatedFSDataset)) { throw new IOException("injectBlocks is valid only for SimilatedFSDataset"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRemove.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRemove.java index 8a8d404658e..1b23c5f319d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRemove.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSRemove.java @@ -25,6 +25,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; public class TestDFSRemove extends junit.framework.TestCase { final Path dir = new Path("/test/remove/"); @@ -45,7 +46,7 @@ static void createFile(FileSystem fs, Path f) throws IOException { static long getTotalDfsUsed(MiniDFSCluster cluster) throws IOException { long total = 0; for(DataNode node : cluster.getDataNodes()) { - total += node.getFSDataset().getDfsUsed(); + total += DataNodeTestUtils.getFSDataset(node).getDfsUsed(); } return total; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java index a9b62c3aead..ef8f8503958 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSUtil.java @@ -232,32 +232,6 @@ public void checkNameServiceId(Configuration conf, String addr, assertEquals(expectedNameServiceId, nameserviceId); } - /** - * Test for - * {@link DFSUtil#isDefaultNamenodeAddress(Configuration, InetSocketAddress, String...)} - */ - @Test - public void testSingleNamenode() throws URISyntaxException { - HdfsConfiguration conf = new HdfsConfiguration(); - final String DEFAULT_ADDRESS = "localhost:9000"; - final String NN2_ADDRESS = "localhost:9001"; - conf.set(DFS_NAMENODE_RPC_ADDRESS_KEY, DEFAULT_ADDRESS); - conf.set(CommonConfigurationKeys.FS_DEFAULT_NAME_KEY, DEFAULT_ADDRESS); - - InetSocketAddress testAddress1 = NetUtils.createSocketAddr(DEFAULT_ADDRESS); - boolean isDefault = DFSUtil.isDefaultNamenodeAddress(conf, testAddress1, - DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY); - assertTrue(isDefault); - InetSocketAddress testAddress2 = NetUtils.createSocketAddr(NN2_ADDRESS); - isDefault = DFSUtil.isDefaultNamenodeAddress(conf, testAddress2, - DFS_NAMENODE_SERVICE_RPC_ADDRESS_KEY, DFS_NAMENODE_RPC_ADDRESS_KEY); - assertFalse(isDefault); - - Collection uris = DFSUtil.getNameServiceUris(conf, DFS_NAMENODE_RPC_ADDRESS_KEY); - assertEquals(1, uris.size()); - assertTrue(uris.contains(new URI("hdfs://" + DEFAULT_ADDRESS))); - } - /** Tests to ensure default namenode is used as fallback */ @Test public void testDefaultNamenode() throws IOException { @@ -554,4 +528,4 @@ public void testGetNNUris() throws Exception { assertTrue(uris.contains(new URI("hdfs://" + NS2_NN_HOST))); assertTrue(uris.contains(new URI("hdfs://" + NN_HOST))); } -} \ No newline at end of file +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java index 89b65f5e057..bc43f845bdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestFileCreation.java @@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; +import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.LeaseManager; @@ -210,8 +211,10 @@ public void testFileCreation() throws IOException { // can't check capacities for real storage since the OS file system may be changing under us. if (simulatedStorage) { DataNode dn = cluster.getDataNodes().get(0); - assertEquals(fileSize, dn.getFSDataset().getDfsUsed()); - assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, dn.getFSDataset().getRemaining()); + FSDatasetInterface dataset = DataNodeTestUtils.getFSDataset(dn); + assertEquals(fileSize, dataset.getDfsUsed()); + assertEquals(SimulatedFSDataset.DEFAULT_CAPACITY-fileSize, + dataset.getRemaining()); } } finally { cluster.shutdown(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java index 26acd0560ac..c9be8f9524c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/DataNodeTestUtils.java @@ -41,6 +41,17 @@ public class DataNodeTestUtils { return dn.getDNRegistrationForBP(bpid); } + /** + * This method is used for testing. + * Examples are adding and deleting blocks directly. + * The most common usage will be when the data node's storage is simulated. + * + * @return the fsdataset that stores the blocks + */ + public static FSDatasetInterface getFSDataset(DataNode dn) { + return dn.getFSDataset(); + } + public static File getFile(DataNode dn, String bpid, long bid) { return ((FSDataset)dn.getFSDataset()).getFile(bpid, bid); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index 04a93b7bb2e..bd873440081 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -61,10 +61,11 @@ * * Note the synchronization is coarse grained - it is at each method. */ -public class SimulatedFSDataset implements FSDatasetInterface { - static class Factory extends FSDatasetInterface.Factory { +public class SimulatedFSDataset + implements FSDatasetInterface { + static class Factory extends FSDatasetInterface.Factory { @Override - public FSDatasetInterface createFSDatasetInterface(DataNode datanode, + public SimulatedFSDataset createFSDatasetInterface(DataNode datanode, DataStorage storage, Configuration conf) throws IOException { return new SimulatedFSDataset(datanode, storage, conf); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java index a12cb334801..f8318762757 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockReport.java @@ -210,13 +210,14 @@ public void blockReport_02() throws IOException { LOG.debug("Number of blocks allocated " + lBlocks.size()); } + final DataNode dn0 = cluster.getDataNodes().get(DN_N0); for (ExtendedBlock b : blocks2Remove) { if(LOG.isDebugEnabled()) { LOG.debug("Removing the block " + b.getBlockName()); } for (File f : findAllFiles(dataDir, new MyFileFilter(b.getBlockName(), true))) { - cluster.getDataNodes().get(DN_N0).getFSDataset().unfinalizeBlock(b); + DataNodeTestUtils.getFSDataset(dn0).unfinalizeBlock(b); if (!f.delete()) LOG.warn("Couldn't delete " + b.getBlockName()); } @@ -225,9 +226,8 @@ public void blockReport_02() throws IOException { waitTil(DN_RESCAN_EXTRA_WAIT); // all blocks belong to the same file, hence same BP - DataNode dn = cluster.getDataNodes().get(DN_N0); String poolId = cluster.getNamesystem().getBlockPoolId(); - DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId); + DatanodeRegistration dnR = dn0.getDNRegistrationForBP(poolId); StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(), new BlockListAsLongs(blocks, null).getBlockListAsLongs()) }; cluster.getNameNodeRpc().blockReport(dnR, poolId, report); @@ -602,15 +602,15 @@ private void waitForTempReplica(Block bl, int DN_N1) throws IOException { cluster.waitActive(); // Look about specified DN for the replica of the block from 1st DN + final DataNode dn1 = cluster.getDataNodes().get(DN_N1); + final FSDataset dataset1 = (FSDataset)DataNodeTestUtils.getFSDataset(dn1); String bpid = cluster.getNamesystem().getBlockPoolId(); - Replica r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()). - fetchReplicaInfo(bpid, bl.getBlockId()); + Replica r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId()); long start = System.currentTimeMillis(); int count = 0; while (r == null) { waitTil(5); - r = ((FSDataset) cluster.getDataNodes().get(DN_N1).getFSDataset()). - fetchReplicaInfo(bpid, bl.getBlockId()); + r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId()); long waiting_period = System.currentTimeMillis() - start; if (count++ % 100 == 0) if(LOG.isDebugEnabled()) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java index 1ebee2f89ae..1d9f803e917 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailure.java @@ -145,8 +145,11 @@ public void testVolumeFailure() throws IOException { DataNode dn = cluster.getDataNodes().get(1); //corresponds to dir data3 String bpid = cluster.getNamesystem().getBlockPoolId(); DatanodeRegistration dnR = dn.getDNRegistrationForBP(bpid); - StorageBlockReport[] report = { new StorageBlockReport(dnR.getStorageID(), - dn.getFSDataset().getBlockReport(bpid).getBlockListAsLongs()) }; + final StorageBlockReport[] report = { + new StorageBlockReport(dnR.getStorageID(), + DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid + ).getBlockListAsLongs()) + }; cluster.getNameNodeRpc().blockReport(dnR, bpid, report); // verify number of blocks and files... diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java index 47bfa703db0..ea256853f98 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeVolumeFailureToleration.java @@ -24,11 +24,7 @@ import java.io.File; import java.io.IOException; -import java.util.concurrent.TimeoutException; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; @@ -38,7 +34,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; -import org.apache.log4j.Level; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -47,12 +42,6 @@ * Test the ability of a DN to tolerate volume failures. */ public class TestDataNodeVolumeFailureToleration { - - private static final Log LOG = LogFactory.getLog(TestDataNodeVolumeFailureToleration.class); - { - ((Log4JLogger)TestDataNodeVolumeFailureToleration.LOG).getLogger().setLevel(Level.ALL); - } - private FileSystem fs; private MiniDFSCluster cluster; private Configuration conf; @@ -130,7 +119,7 @@ public void testValidVolumesAtStartup() throws Exception { assertTrue("The DN should have started up fine.", cluster.isDataNodeUp()); DataNode dn = cluster.getDataNodes().get(0); - String si = dn.getFSDataset().getStorageInfo(); + String si = DataNodeTestUtils.getFSDataset(dn).getStorageInfo(); assertTrue("The DN should have started with this directory", si.contains(dataDir1Actual.getPath())); assertFalse("The DN shouldn't have a bad directory.", @@ -227,7 +216,7 @@ public void testVolumeAndTolerableConfiguration() throws Exception { */ private void testVolumeConfig(int volumesTolerated, int volumesFailed, boolean expectedBPServiceState, boolean manageDfsDirs) - throws IOException, InterruptedException, TimeoutException { + throws IOException, InterruptedException { assumeTrue(!System.getProperty("os.name").startsWith("Windows")); final int dnIndex = 0; // Fail the current directory since invalid storage directory perms diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java index 1b0c158740f..8707805613c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java @@ -38,7 +38,7 @@ import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.server.common.GenerationStamp; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.FSVolumeInterface; +import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume; /** * Tests {@link DirectoryScanner} handling of differences @@ -142,7 +142,7 @@ private String getMetaFile(long id) { /** Create a block file in a random volume*/ private long createBlockFile() throws IOException { - List volumes = fds.getVolumes(); + List volumes = fds.getVolumes(); int index = rand.nextInt(volumes.size() - 1); long id = getFreeBlockId(); File finalizedDir = volumes.get(index).getFinalizedDir(bpid); @@ -155,7 +155,7 @@ private long createBlockFile() throws IOException { /** Create a metafile in a random volume*/ private long createMetaFile() throws IOException { - List volumes = fds.getVolumes(); + List volumes = fds.getVolumes(); int index = rand.nextInt(volumes.size() - 1); long id = getFreeBlockId(); File finalizedDir = volumes.get(index).getFinalizedDir(bpid); @@ -168,7 +168,7 @@ private long createMetaFile() throws IOException { /** Create block file and corresponding metafile in a rondom volume */ private long createBlockMetaFile() throws IOException { - List volumes = fds.getVolumes(); + List volumes = fds.getVolumes(); int index = rand.nextInt(volumes.size() - 1); long id = getFreeBlockId(); File finalizedDir = volumes.get(index).getFinalizedDir(bpid); @@ -228,7 +228,8 @@ public void runTest(int parallelism) throws Exception { try { cluster.waitActive(); bpid = cluster.getNamesystem().getBlockPoolId(); - fds = (FSDataset) cluster.getDataNodes().get(0).getFSDataset(); + fds = (FSDataset)DataNodeTestUtils.getFSDataset( + cluster.getDataNodes().get(0)); CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY, parallelism); DataNode dn = cluster.getDataNodes().get(0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMulitipleNNDataBlockScanner.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMulitipleNNDataBlockScanner.java index 8441e184068..a21cab57568 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMulitipleNNDataBlockScanner.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestMulitipleNNDataBlockScanner.java @@ -149,6 +149,9 @@ public void testBlockScannerAfterRestart() throws IOException, cluster.waitActive(); DataNode dn = cluster.getDataNodes().get(0); for (int i = 0; i < 3; i++) { + while (!dn.blockScanner.isInitialized(bpids[i])) { + Thread.sleep(1000); + } long blocksScanned = 0; while (blocksScanned != 20) { if (dn.blockScanner != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java index 73937efbc39..f401be3af15 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestRoundRobinVolumesPolicy.java @@ -43,8 +43,10 @@ public void testRR() throws Exception { volumes.add(Mockito.mock(FSVolumeInterface.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(200L); - RoundRobinVolumesPolicy policy = ReflectionUtils.newInstance( - RoundRobinVolumesPolicy.class, null); + @SuppressWarnings("unchecked") + final RoundRobinVolumesPolicy policy = + (RoundRobinVolumesPolicy)ReflectionUtils.newInstance( + RoundRobinVolumesPolicy.class, null); // Test two rounds of round-robin choosing Assert.assertEquals(volumes.get(0), policy.chooseVolume(volumes, 0)); @@ -79,7 +81,8 @@ public void testRRPolicyExceptionMessage() volumes.add(Mockito.mock(FSVolumeInterface.class)); Mockito.when(volumes.get(1).getAvailable()).thenReturn(600L); - RoundRobinVolumesPolicy policy = new RoundRobinVolumesPolicy(); + final RoundRobinVolumesPolicy policy + = new RoundRobinVolumesPolicy(); int blockSize = 700; try { policy.chooseVolume(volumes, blockSize); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index 6a6c81a6fa2..752419fe288 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -21,6 +21,7 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; + import junit.framework.TestCase; import org.apache.hadoop.conf.Configuration; @@ -28,8 +29,6 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface; -import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams; import org.apache.hadoop.util.DataChecksum; @@ -56,7 +55,7 @@ long blockIdToLen(long blkid) { return blkid*BLOCK_LENGTH_MULTIPLIER; } - int addSomeBlocks(FSDatasetInterface fsdataset, int startingBlockId) + int addSomeBlocks(SimulatedFSDataset fsdataset, int startingBlockId) throws IOException { int bytesAdded = 0; for (int i = startingBlockId; i < startingBlockId+NUMBLOCKS; ++i) { @@ -83,24 +82,24 @@ int addSomeBlocks(FSDatasetInterface fsdataset, int startingBlockId) } return bytesAdded; } - int addSomeBlocks(FSDatasetInterface fsdataset ) throws IOException { + int addSomeBlocks(SimulatedFSDataset fsdataset ) throws IOException { return addSomeBlocks(fsdataset, 1); } public void testFSDatasetFactory() { final Configuration conf = new Configuration(); - FSDatasetInterface.Factory f = FSDatasetInterface.Factory.getFactory(conf); + FSDatasetInterface.Factory f = FSDatasetInterface.Factory.getFactory(conf); assertEquals(FSDataset.Factory.class, f.getClass()); assertFalse(f.isSimulated()); SimulatedFSDataset.setFactory(conf); - FSDatasetInterface.Factory s = FSDatasetInterface.Factory.getFactory(conf); + FSDatasetInterface.Factory s = FSDatasetInterface.Factory.getFactory(conf); assertEquals(SimulatedFSDataset.Factory.class, s.getClass()); assertTrue(s.isSimulated()); } public void testGetMetaData() throws IOException { - FSDatasetInterface fsdataset = getSimulatedFSDataset(); + final SimulatedFSDataset fsdataset = getSimulatedFSDataset(); ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0); try { assertFalse(fsdataset.metaFileExists(b)); @@ -121,7 +120,7 @@ public void testGetMetaData() throws IOException { public void testStorageUsage() throws IOException { - FSDatasetInterface fsdataset = getSimulatedFSDataset(); + final SimulatedFSDataset fsdataset = getSimulatedFSDataset(); assertEquals(fsdataset.getDfsUsed(), 0); assertEquals(fsdataset.getRemaining(), fsdataset.getCapacity()); int bytesAdded = addSomeBlocks(fsdataset); @@ -131,7 +130,7 @@ public void testStorageUsage() throws IOException { - void checkBlockDataAndSize(FSDatasetInterface fsdataset, ExtendedBlock b, + void checkBlockDataAndSize(SimulatedFSDataset fsdataset, ExtendedBlock b, long expectedLen) throws IOException { InputStream input = fsdataset.getBlockInputStream(b); long lengthRead = 0; @@ -144,7 +143,7 @@ void checkBlockDataAndSize(FSDatasetInterface fsdataset, ExtendedBlock b, } public void testWriteRead() throws IOException { - FSDatasetInterface fsdataset = getSimulatedFSDataset(); + final SimulatedFSDataset fsdataset = getSimulatedFSDataset(); addSomeBlocks(fsdataset); for (int i=1; i <= NUMBLOCKS; ++i) { ExtendedBlock b = new ExtendedBlock(bpid, i, 0, 0); @@ -244,7 +243,7 @@ public void testInjectionNonEmpty() throws IOException { } public void checkInvalidBlock(ExtendedBlock b) throws IOException { - FSDatasetInterface fsdataset = getSimulatedFSDataset(); + final SimulatedFSDataset fsdataset = getSimulatedFSDataset(); assertFalse(fsdataset.isValidBlock(b)); try { fsdataset.getLength(b); @@ -269,7 +268,7 @@ public void checkInvalidBlock(ExtendedBlock b) throws IOException { } public void testInValidBlocks() throws IOException { - FSDatasetInterface fsdataset = getSimulatedFSDataset(); + final SimulatedFSDataset fsdataset = getSimulatedFSDataset(); ExtendedBlock b = new ExtendedBlock(bpid, 1, 5, 0); checkInvalidBlock(b); @@ -280,7 +279,7 @@ public void testInValidBlocks() throws IOException { } public void testInvalidate() throws IOException { - FSDatasetInterface fsdataset = getSimulatedFSDataset(); + final SimulatedFSDataset fsdataset = getSimulatedFSDataset(); int bytesAdded = addSomeBlocks(fsdataset); Block[] deleteBlocks = new Block[2]; deleteBlocks[0] = new Block(1, 0, 0); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java index 2004e8f6aab..ea769c057e1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestDNFencing.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.DataNodeAdapter; +import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils; import org.apache.hadoop.hdfs.server.namenode.FSInodeInfo; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.NameNode; @@ -564,7 +565,8 @@ private int getTrueReplication(MiniDFSCluster cluster, ExtendedBlock block) throws IOException { int count = 0; for (DataNode dn : cluster.getDataNodes()) { - if (dn.getFSDataset().getStoredBlock(block.getBlockPoolId(), block.getBlockId()) != null) { + if (DataNodeTestUtils.getFSDataset(dn).getStoredBlock( + block.getBlockPoolId(), block.getBlockId()) != null) { count++; } } diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 9535449de50..1365973c1ba 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -44,6 +44,9 @@ Trunk (unreleased changes) MAPREDUCE-2944. Improve checking of input for JobClient.displayTasks() (XieXianshan via harsh) + MAPREDUCE-3956. Remove the use of the deprecated Syncable.sync() method from + TeraOutputFormat in the terasort example. (szetszwo) + BUG FIXES MAPREDUCE-3757. [Rumen] Fixed Rumen Folder to adjust shuffleFinished and @@ -97,6 +100,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-3909 Javadoc the Service interfaces (stevel) + MAPREDUCE-3885. Avoid an unnecessary copy for all requests/responses in + MRs ProtoOverHadoopRpcEngine. (Devaraj Das via sseth) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java index 1900e117f5a..9cde04a78c9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-examples/src/main/java/org/apache/hadoop/examples/terasort/TeraOutputFormat.java @@ -71,7 +71,7 @@ public synchronized void write(Text key, public void close(TaskAttemptContext context) throws IOException { if (finalSync) { - out.sync(); + out.hsync(); } out.close(); } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java index bbece2f34ff..ca65a27beb9 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/ipc/ProtoOverHadoopRpcEngine.java @@ -34,6 +34,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.DataOutputOutputStream; +import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.ipc.Client; import org.apache.hadoop.ipc.ProtocolMetaInfoPB; @@ -46,6 +48,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; +import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.yarn.exceptions.impl.pb.YarnRemoteExceptionPBImpl; import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcRequest; import org.apache.hadoop.yarn.ipc.RpcProtos.ProtoSpecificRpcResponse; @@ -213,13 +216,13 @@ public ProtoSpecificRequestWritable() { @Override public void write(DataOutput out) throws IOException { - out.writeInt(message.toByteArray().length); - out.write(message.toByteArray()); + ((Message)message).writeDelimitedTo( + DataOutputOutputStream.constructOutputStream(out)); } @Override public void readFields(DataInput in) throws IOException { - int length = in.readInt(); + int length = ProtoUtil.readRawVarint32(in); byte[] bytes = new byte[length]; in.readFully(bytes); message = ProtoSpecificRpcRequest.parseFrom(bytes); @@ -241,13 +244,13 @@ public ProtoSpecificResponseWritable(ProtoSpecificRpcResponse message) { @Override public void write(DataOutput out) throws IOException { - out.writeInt(message.toByteArray().length); - out.write(message.toByteArray()); + ((Message)message).writeDelimitedTo( + DataOutputOutputStream.constructOutputStream(out)); } @Override public void readFields(DataInput in) throws IOException { - int length = in.readInt(); + int length = ProtoUtil.readRawVarint32(in); byte[] bytes = new byte[length]; in.readFully(bytes); message = ProtoSpecificRpcResponse.parseFrom(bytes);