diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index f78536a0d0f..321b137fcd8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -104,6 +104,9 @@ Trunk (unreleased changes) HDFS-3573. Supply NamespaceInfo when instantiating JournalManagers (todd) + HDFS-3190. Simple refactors in existing NN code to assist + QuorumJournalManager extension. (todd) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java index 5ed6e358867..c9ff1f3ab56 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/Storage.java @@ -158,6 +158,21 @@ public abstract class Storage extends StorageInfo { } } + /** + * @return A list of the given File in every available storage directory, + * regardless of whether it might exist. + */ + public List getFiles(StorageDirType dirType, String fileName) { + ArrayList list = new ArrayList(); + Iterator it = + (dirType == null) ? dirIterator() : dirIterator(dirType); + for ( ;it.hasNext(); ) { + list.add(new File(it.next().getCurrentDir(), fileName)); + } + return list; + } + + /** * Return default iterator * This iterator returns all entries in storageDirs diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageErrorReporter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageErrorReporter.java new file mode 100644 index 00000000000..2c11abf8b7c --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/common/StorageErrorReporter.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.common; + +import java.io.File; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.server.namenode.JournalManager; + +/** + * Interface which implementations of {@link JournalManager} can use to report + * errors on underlying storage directories. This avoids a circular dependency + * between journal managers and the storage which instantiates them. + */ +@InterfaceAudience.Private +public interface StorageErrorReporter { + + /** + * Indicate that some error occurred on the given file. + * + * @param f the file which had an error. + */ + public void reportErrorOnFile(File f); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java index e9974b63e7d..2e0aaaa2dcb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FileJournalManager.java @@ -32,6 +32,7 @@ import java.util.regex.Pattern; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory; +import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger; import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile; @@ -53,7 +54,7 @@ class FileJournalManager implements JournalManager { private static final Log LOG = LogFactory.getLog(FileJournalManager.class); private final StorageDirectory sd; - private final NNStorage storage; + private final StorageErrorReporter errorReporter; private int outputBufferCapacity = 512*1024; private static final Pattern EDITS_REGEX = Pattern.compile( @@ -67,9 +68,10 @@ class FileJournalManager implements JournalManager { StoragePurger purger = new NNStorageRetentionManager.DeletionStoragePurger(); - public FileJournalManager(StorageDirectory sd, NNStorage storage) { + public FileJournalManager(StorageDirectory sd, + StorageErrorReporter errorReporter) { this.sd = sd; - this.storage = storage; + this.errorReporter = errorReporter; } @Override @@ -85,7 +87,10 @@ class FileJournalManager implements JournalManager { stm.create(); return stm; } catch (IOException e) { - storage.reportErrorsOnDirectory(sd); + LOG.warn("Unable to start log segment " + txid + + " at " + currentInProgress + ": " + + e.getLocalizedMessage()); + errorReporter.reportErrorOnFile(currentInProgress); throw e; } } @@ -103,7 +108,7 @@ class FileJournalManager implements JournalManager { "Can't finalize edits file " + inprogressFile + " since finalized file " + "already exists"); if (!inprogressFile.renameTo(dstFile)) { - storage.reportErrorsOnDirectory(sd); + errorReporter.reportErrorOnFile(dstFile); throw new IllegalStateException("Unable to finalize edits file " + inprogressFile); } if (inprogressFile.equals(currentInProgress)) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java index 958cb14b9db..3656008730c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GetImageServlet.java @@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HAUtil; import org.apache.hadoop.hdfs.server.common.JspHelper; +import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.util.DataTransferThrottler; @@ -292,7 +293,7 @@ public class GetImageServlet extends HttpServlet { } static String getParamStringToPutImage(long txid, - InetSocketAddress imageListenAddress, NNStorage storage) { + InetSocketAddress imageListenAddress, Storage storage) { return "putimage=1" + "&" + TXID_PARAM + "=" + txid + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java index d77efd60a5c..40156071158 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NNStorage.java @@ -17,13 +17,10 @@ */ package org.apache.hadoop.hdfs.server.namenode; -import java.io.BufferedReader; import java.io.Closeable; import java.io.File; -import java.io.FileReader; import java.io.IOException; import java.io.RandomAccessFile; -import java.io.OutputStream; import java.net.URI; import java.net.UnknownHostException; import java.util.ArrayList; @@ -48,10 +45,11 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption; import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; import org.apache.hadoop.hdfs.server.common.UpgradeManager; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.hdfs.util.AtomicFileOutputStream; +import org.apache.hadoop.hdfs.util.PersistentLongFile; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.DNS; @@ -66,7 +64,8 @@ import com.google.common.collect.Maps; * the NameNode. */ @InterfaceAudience.Private -public class NNStorage extends Storage implements Closeable { +public class NNStorage extends Storage implements Closeable, + StorageErrorReporter { private static final Log LOG = LogFactory.getLog(NNStorage.class.getName()); static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest"; @@ -422,18 +421,7 @@ public class NNStorage extends Storage implements Closeable { */ static long readTransactionIdFile(StorageDirectory sd) throws IOException { File txidFile = getStorageFile(sd, NameNodeFile.SEEN_TXID); - long txid = 0L; - if (txidFile.exists() && txidFile.canRead()) { - BufferedReader br = new BufferedReader(new FileReader(txidFile)); - try { - txid = Long.valueOf(br.readLine()); - br.close(); - br = null; - } finally { - IOUtils.cleanup(LOG, br); - } - } - return txid; + return PersistentLongFile.readFile(txidFile, 0); } /** @@ -446,15 +434,7 @@ public class NNStorage extends Storage implements Closeable { Preconditions.checkArgument(txid >= 0, "bad txid: " + txid); File txIdFile = getStorageFile(sd, NameNodeFile.SEEN_TXID); - OutputStream fos = new AtomicFileOutputStream(txIdFile); - try { - fos.write(String.valueOf(txid).getBytes()); - fos.write('\n'); - fos.close(); - fos = null; - } finally { - IOUtils.cleanup(LOG, fos); - } + PersistentLongFile.writeFile(txIdFile, txid); } /** @@ -772,20 +752,6 @@ public class NNStorage extends Storage implements Closeable { return null; } - /** - * @return A list of the given File in every available storage directory, - * regardless of whether it might exist. - */ - List getFiles(NameNodeDirType dirType, String fileName) { - ArrayList list = new ArrayList(); - Iterator it = - (dirType == null) ? dirIterator() : dirIterator(dirType); - for ( ;it.hasNext(); ) { - list.add(new File(it.next().getCurrentDir(), fileName)); - } - return list; - } - /** * Set the upgrade manager for use in a distributed upgrade. * @param um The upgrade manager @@ -884,7 +850,7 @@ public class NNStorage extends Storage implements Closeable { * @param sd A storage directory to mark as errored. * @throws IOException */ - void reportErrorsOnDirectory(StorageDirectory sd) { + private void reportErrorsOnDirectory(StorageDirectory sd) { LOG.error("Error reported on storage directory " + sd); String lsd = listStorageDirectories(); @@ -945,7 +911,8 @@ public class NNStorage extends Storage implements Closeable { * Report that an IOE has occurred on some file which may * or may not be within one of the NN image storage directories. */ - void reportErrorOnFile(File f) { + @Override + public void reportErrorOnFile(File f) { // We use getAbsolutePath here instead of getCanonicalPath since we know // that there is some IO problem on that drive. // getCanonicalPath may need to call stat() or readlink() and it's likely diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java index 492cf28fbe7..d887d310421 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/TransferFsImage.java @@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; +import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Util; import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; @@ -60,7 +62,7 @@ public class TransferFsImage { } public static MD5Hash downloadImageToStorage( - String fsName, long imageTxId, NNStorage dstStorage, boolean needDigest) + String fsName, long imageTxId, Storage dstStorage, boolean needDigest) throws IOException { String fileid = GetImageServlet.getParamStringForImage( imageTxId, dstStorage); @@ -116,7 +118,7 @@ public class TransferFsImage { */ public static void uploadImageFromStorage(String fsName, InetSocketAddress imageListenAddress, - NNStorage storage, long txid) throws IOException { + Storage storage, long txid) throws IOException { String fileid = GetImageServlet.getParamStringToPutImage( txid, imageListenAddress, storage); @@ -199,17 +201,20 @@ public class TransferFsImage { */ static MD5Hash getFileClient(String nnHostPort, String queryString, List localPaths, - NNStorage dstStorage, boolean getChecksum) throws IOException { - byte[] buf = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE]; + Storage dstStorage, boolean getChecksum) throws IOException { String str = "http://" + nnHostPort + "/getimage?" + queryString; LOG.info("Opening connection to " + str); // // open connection to remote server // - long startTime = Util.monotonicNow(); URL url = new URL(str); - + return doGetUrl(url, localPaths, dstStorage, getChecksum); + } + + public static MD5Hash doGetUrl(URL url, List localPaths, + Storage dstStorage, boolean getChecksum) throws IOException { + long startTime = Util.monotonicNow(); HttpURLConnection connection = (HttpURLConnection) SecurityUtil.openSecureHttpConnection(url); @@ -227,7 +232,7 @@ public class TransferFsImage { advertisedSize = Long.parseLong(contentLength); } else { throw new IOException(CONTENT_LENGTH + " header is not provided " + - "by the namenode when trying to fetch " + str); + "by the namenode when trying to fetch " + url); } if (localPaths != null) { @@ -268,15 +273,16 @@ public class TransferFsImage { try { if (f.exists()) { LOG.warn("Overwriting existing file " + f - + " with file downloaded from " + str); + + " with file downloaded from " + url); } outputStreams.add(new FileOutputStream(f)); } catch (IOException ioe) { LOG.warn("Unable to download file " + f, ioe); // This will be null if we're downloading the fsimage to a file // outside of an NNStorage directory. - if (dstStorage != null) { - dstStorage.reportErrorOnFile(f); + if (dstStorage != null && + (dstStorage instanceof StorageErrorReporter)) { + ((StorageErrorReporter)dstStorage).reportErrorOnFile(f); } } } @@ -288,6 +294,7 @@ public class TransferFsImage { } int num = 1; + byte[] buf = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE]; while (num > 0) { num = stream.read(buf); if (num > 0) { @@ -308,7 +315,7 @@ public class TransferFsImage { // only throw this exception if we think we read all of it on our end // -- otherwise a client-side IOException would be masked by this // exception that makes it look like a server-side problem! - throw new IOException("File " + str + " received length " + received + + throw new IOException("File " + url + " received length " + received + " is not of the advertised size " + advertisedSize); } @@ -324,7 +331,7 @@ public class TransferFsImage { if (advertisedDigest != null && !computedDigest.equals(advertisedDigest)) { - throw new IOException("File " + str + " computed digest " + + throw new IOException("File " + url + " computed digest " + computedDigest + " does not match advertised digest " + advertisedDigest); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/AtomicFileOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/AtomicFileOutputStream.java index 9ac4861f384..3c7d8d884b5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/AtomicFileOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/AtomicFileOutputStream.java @@ -91,4 +91,20 @@ public class AtomicFileOutputStream extends FilterOutputStream { } } + /** + * Close the atomic file, but do not "commit" the temporary file + * on top of the destination. This should be used if there is a failure + * in writing. + */ + public void abort() { + try { + super.close(); + } catch (IOException ioe) { + LOG.warn("Unable to abort file " + tmpFile, ioe); + } + if (!tmpFile.delete()) { + LOG.warn("Unable to delete tmp file during abort " + tmpFile); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java new file mode 100644 index 00000000000..5e776226fa6 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/util/PersistentLongFile.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.util; + +import java.io.BufferedReader; +import java.io.File; +import java.io.FileReader; +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.io.IOUtils; + +/** + * Class that represents a file on disk which persistently stores + * a single long value. The file is updated atomically + * and durably (i.e fsynced). + */ +@InterfaceAudience.Private +public class PersistentLongFile { + private static final Log LOG = LogFactory.getLog( + PersistentLongFile.class); + + private final File file; + private final long defaultVal; + + private long value; + private boolean loaded = false; + + public PersistentLongFile(File file, long defaultVal) { + this.file = file; + this.defaultVal = defaultVal; + } + + public long get() throws IOException { + if (!loaded) { + value = readFile(file, defaultVal); + loaded = true; + } + return value; + } + + public void set(long newVal) throws IOException { + writeFile(file, newVal); + value = newVal; + loaded = true; + } + + /** + * Atomically write the given value to the given file, including fsyncing. + * + * @param file destination file + * @param val value to write + * @throws IOException if the file cannot be written + */ + public static void writeFile(File file, long val) throws IOException { + AtomicFileOutputStream fos = new AtomicFileOutputStream(file); + try { + fos.write(String.valueOf(val).getBytes()); + fos.write('\n'); + fos.close(); + fos = null; + } finally { + if (fos != null) { + fos.abort(); + } + } + } + + public static long readFile(File file, long defaultVal) throws IOException { + long val = defaultVal; + if (file.exists()) { + BufferedReader br = new BufferedReader(new FileReader(file)); + try { + val = Long.valueOf(br.readLine()); + br.close(); + br = null; + } finally { + IOUtils.cleanup(LOG, br); + } + } + return val; + } +}