HDFS-3190. Simple refactors in existing NN code to assist QuorumJournalManager extension. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1356525 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
161ed29087
commit
8dd3148e73
|
@ -104,6 +104,9 @@ Trunk (unreleased changes)
|
|||
|
||||
HDFS-3573. Supply NamespaceInfo when instantiating JournalManagers (todd)
|
||||
|
||||
HDFS-3190. Simple refactors in existing NN code to assist
|
||||
QuorumJournalManager extension. (todd)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
BUG FIXES
|
||||
|
|
|
@ -158,6 +158,21 @@ public abstract class Storage extends StorageInfo {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A list of the given File in every available storage directory,
|
||||
* regardless of whether it might exist.
|
||||
*/
|
||||
public List<File> getFiles(StorageDirType dirType, String fileName) {
|
||||
ArrayList<File> list = new ArrayList<File>();
|
||||
Iterator<StorageDirectory> it =
|
||||
(dirType == null) ? dirIterator() : dirIterator(dirType);
|
||||
for ( ;it.hasNext(); ) {
|
||||
list.add(new File(it.next().getCurrentDir(), fileName));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Return default iterator
|
||||
* This iterator returns all entries in storageDirs
|
||||
|
|
|
@ -0,0 +1,39 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.common;
|
||||
|
||||
import java.io.File;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
|
||||
|
||||
/**
|
||||
* Interface which implementations of {@link JournalManager} can use to report
|
||||
* errors on underlying storage directories. This avoids a circular dependency
|
||||
* between journal managers and the storage which instantiates them.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public interface StorageErrorReporter {
|
||||
|
||||
/**
|
||||
* Indicate that some error occurred on the given file.
|
||||
*
|
||||
* @param f the file which had an error.
|
||||
*/
|
||||
public void reportErrorOnFile(File f);
|
||||
}
|
|
@ -32,6 +32,7 @@ import java.util.regex.Pattern;
|
|||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage.StorageDirectory;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorageRetentionManager.StoragePurger;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.EditLogValidation;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeFile;
|
||||
|
@ -53,7 +54,7 @@ class FileJournalManager implements JournalManager {
|
|||
private static final Log LOG = LogFactory.getLog(FileJournalManager.class);
|
||||
|
||||
private final StorageDirectory sd;
|
||||
private final NNStorage storage;
|
||||
private final StorageErrorReporter errorReporter;
|
||||
private int outputBufferCapacity = 512*1024;
|
||||
|
||||
private static final Pattern EDITS_REGEX = Pattern.compile(
|
||||
|
@ -67,9 +68,10 @@ class FileJournalManager implements JournalManager {
|
|||
StoragePurger purger
|
||||
= new NNStorageRetentionManager.DeletionStoragePurger();
|
||||
|
||||
public FileJournalManager(StorageDirectory sd, NNStorage storage) {
|
||||
public FileJournalManager(StorageDirectory sd,
|
||||
StorageErrorReporter errorReporter) {
|
||||
this.sd = sd;
|
||||
this.storage = storage;
|
||||
this.errorReporter = errorReporter;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -85,7 +87,10 @@ class FileJournalManager implements JournalManager {
|
|||
stm.create();
|
||||
return stm;
|
||||
} catch (IOException e) {
|
||||
storage.reportErrorsOnDirectory(sd);
|
||||
LOG.warn("Unable to start log segment " + txid +
|
||||
" at " + currentInProgress + ": " +
|
||||
e.getLocalizedMessage());
|
||||
errorReporter.reportErrorOnFile(currentInProgress);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
@ -103,7 +108,7 @@ class FileJournalManager implements JournalManager {
|
|||
"Can't finalize edits file " + inprogressFile + " since finalized file " +
|
||||
"already exists");
|
||||
if (!inprogressFile.renameTo(dstFile)) {
|
||||
storage.reportErrorsOnDirectory(sd);
|
||||
errorReporter.reportErrorOnFile(dstFile);
|
||||
throw new IllegalStateException("Unable to finalize edits file " + inprogressFile);
|
||||
}
|
||||
if (inprogressFile.equals(currentInProgress)) {
|
||||
|
|
|
@ -40,6 +40,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.HAUtil;
|
||||
import org.apache.hadoop.hdfs.server.common.JspHelper;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||
import org.apache.hadoop.hdfs.util.DataTransferThrottler;
|
||||
|
@ -292,7 +293,7 @@ public class GetImageServlet extends HttpServlet {
|
|||
}
|
||||
|
||||
static String getParamStringToPutImage(long txid,
|
||||
InetSocketAddress imageListenAddress, NNStorage storage) {
|
||||
InetSocketAddress imageListenAddress, Storage storage) {
|
||||
|
||||
return "putimage=1" +
|
||||
"&" + TXID_PARAM + "=" + txid +
|
||||
|
|
|
@ -17,13 +17,10 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.Closeable;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.io.OutputStream;
|
||||
import java.net.URI;
|
||||
import java.net.UnknownHostException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -48,10 +45,11 @@ import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
|||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
||||
import org.apache.hadoop.hdfs.server.common.UpgradeManager;
|
||||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.hdfs.util.AtomicFileOutputStream;
|
||||
import org.apache.hadoop.hdfs.util.PersistentLongFile;
|
||||
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.net.DNS;
|
||||
|
@ -66,7 +64,8 @@ import com.google.common.collect.Maps;
|
|||
* the NameNode.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class NNStorage extends Storage implements Closeable {
|
||||
public class NNStorage extends Storage implements Closeable,
|
||||
StorageErrorReporter {
|
||||
private static final Log LOG = LogFactory.getLog(NNStorage.class.getName());
|
||||
|
||||
static final String DEPRECATED_MESSAGE_DIGEST_PROPERTY = "imageMD5Digest";
|
||||
|
@ -422,18 +421,7 @@ public class NNStorage extends Storage implements Closeable {
|
|||
*/
|
||||
static long readTransactionIdFile(StorageDirectory sd) throws IOException {
|
||||
File txidFile = getStorageFile(sd, NameNodeFile.SEEN_TXID);
|
||||
long txid = 0L;
|
||||
if (txidFile.exists() && txidFile.canRead()) {
|
||||
BufferedReader br = new BufferedReader(new FileReader(txidFile));
|
||||
try {
|
||||
txid = Long.valueOf(br.readLine());
|
||||
br.close();
|
||||
br = null;
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, br);
|
||||
}
|
||||
}
|
||||
return txid;
|
||||
return PersistentLongFile.readFile(txidFile, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -446,15 +434,7 @@ public class NNStorage extends Storage implements Closeable {
|
|||
Preconditions.checkArgument(txid >= 0, "bad txid: " + txid);
|
||||
|
||||
File txIdFile = getStorageFile(sd, NameNodeFile.SEEN_TXID);
|
||||
OutputStream fos = new AtomicFileOutputStream(txIdFile);
|
||||
try {
|
||||
fos.write(String.valueOf(txid).getBytes());
|
||||
fos.write('\n');
|
||||
fos.close();
|
||||
fos = null;
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, fos);
|
||||
}
|
||||
PersistentLongFile.writeFile(txIdFile, txid);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -772,20 +752,6 @@ public class NNStorage extends Storage implements Closeable {
|
|||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return A list of the given File in every available storage directory,
|
||||
* regardless of whether it might exist.
|
||||
*/
|
||||
List<File> getFiles(NameNodeDirType dirType, String fileName) {
|
||||
ArrayList<File> list = new ArrayList<File>();
|
||||
Iterator<StorageDirectory> it =
|
||||
(dirType == null) ? dirIterator() : dirIterator(dirType);
|
||||
for ( ;it.hasNext(); ) {
|
||||
list.add(new File(it.next().getCurrentDir(), fileName));
|
||||
}
|
||||
return list;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the upgrade manager for use in a distributed upgrade.
|
||||
* @param um The upgrade manager
|
||||
|
@ -884,7 +850,7 @@ public class NNStorage extends Storage implements Closeable {
|
|||
* @param sd A storage directory to mark as errored.
|
||||
* @throws IOException
|
||||
*/
|
||||
void reportErrorsOnDirectory(StorageDirectory sd) {
|
||||
private void reportErrorsOnDirectory(StorageDirectory sd) {
|
||||
LOG.error("Error reported on storage directory " + sd);
|
||||
|
||||
String lsd = listStorageDirectories();
|
||||
|
@ -945,7 +911,8 @@ public class NNStorage extends Storage implements Closeable {
|
|||
* Report that an IOE has occurred on some file which may
|
||||
* or may not be within one of the NN image storage directories.
|
||||
*/
|
||||
void reportErrorOnFile(File f) {
|
||||
@Override
|
||||
public void reportErrorOnFile(File f) {
|
||||
// We use getAbsolutePath here instead of getCanonicalPath since we know
|
||||
// that there is some IO problem on that drive.
|
||||
// getCanonicalPath may need to call stat() or readlink() and it's likely
|
||||
|
|
|
@ -32,6 +32,8 @@ import org.apache.commons.logging.LogFactory;
|
|||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.common.Util;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NNStorage.NameNodeDirType;
|
||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||
|
@ -60,7 +62,7 @@ public class TransferFsImage {
|
|||
}
|
||||
|
||||
public static MD5Hash downloadImageToStorage(
|
||||
String fsName, long imageTxId, NNStorage dstStorage, boolean needDigest)
|
||||
String fsName, long imageTxId, Storage dstStorage, boolean needDigest)
|
||||
throws IOException {
|
||||
String fileid = GetImageServlet.getParamStringForImage(
|
||||
imageTxId, dstStorage);
|
||||
|
@ -116,7 +118,7 @@ public class TransferFsImage {
|
|||
*/
|
||||
public static void uploadImageFromStorage(String fsName,
|
||||
InetSocketAddress imageListenAddress,
|
||||
NNStorage storage, long txid) throws IOException {
|
||||
Storage storage, long txid) throws IOException {
|
||||
|
||||
String fileid = GetImageServlet.getParamStringToPutImage(
|
||||
txid, imageListenAddress, storage);
|
||||
|
@ -199,17 +201,20 @@ public class TransferFsImage {
|
|||
*/
|
||||
static MD5Hash getFileClient(String nnHostPort,
|
||||
String queryString, List<File> localPaths,
|
||||
NNStorage dstStorage, boolean getChecksum) throws IOException {
|
||||
byte[] buf = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
|
||||
Storage dstStorage, boolean getChecksum) throws IOException {
|
||||
|
||||
String str = "http://" + nnHostPort + "/getimage?" + queryString;
|
||||
LOG.info("Opening connection to " + str);
|
||||
//
|
||||
// open connection to remote server
|
||||
//
|
||||
long startTime = Util.monotonicNow();
|
||||
URL url = new URL(str);
|
||||
|
||||
return doGetUrl(url, localPaths, dstStorage, getChecksum);
|
||||
}
|
||||
|
||||
public static MD5Hash doGetUrl(URL url, List<File> localPaths,
|
||||
Storage dstStorage, boolean getChecksum) throws IOException {
|
||||
long startTime = Util.monotonicNow();
|
||||
HttpURLConnection connection = (HttpURLConnection)
|
||||
SecurityUtil.openSecureHttpConnection(url);
|
||||
|
||||
|
@ -227,7 +232,7 @@ public class TransferFsImage {
|
|||
advertisedSize = Long.parseLong(contentLength);
|
||||
} else {
|
||||
throw new IOException(CONTENT_LENGTH + " header is not provided " +
|
||||
"by the namenode when trying to fetch " + str);
|
||||
"by the namenode when trying to fetch " + url);
|
||||
}
|
||||
|
||||
if (localPaths != null) {
|
||||
|
@ -268,15 +273,16 @@ public class TransferFsImage {
|
|||
try {
|
||||
if (f.exists()) {
|
||||
LOG.warn("Overwriting existing file " + f
|
||||
+ " with file downloaded from " + str);
|
||||
+ " with file downloaded from " + url);
|
||||
}
|
||||
outputStreams.add(new FileOutputStream(f));
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Unable to download file " + f, ioe);
|
||||
// This will be null if we're downloading the fsimage to a file
|
||||
// outside of an NNStorage directory.
|
||||
if (dstStorage != null) {
|
||||
dstStorage.reportErrorOnFile(f);
|
||||
if (dstStorage != null &&
|
||||
(dstStorage instanceof StorageErrorReporter)) {
|
||||
((StorageErrorReporter)dstStorage).reportErrorOnFile(f);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -288,6 +294,7 @@ public class TransferFsImage {
|
|||
}
|
||||
|
||||
int num = 1;
|
||||
byte[] buf = new byte[HdfsConstants.IO_FILE_BUFFER_SIZE];
|
||||
while (num > 0) {
|
||||
num = stream.read(buf);
|
||||
if (num > 0) {
|
||||
|
@ -308,7 +315,7 @@ public class TransferFsImage {
|
|||
// only throw this exception if we think we read all of it on our end
|
||||
// -- otherwise a client-side IOException would be masked by this
|
||||
// exception that makes it look like a server-side problem!
|
||||
throw new IOException("File " + str + " received length " + received +
|
||||
throw new IOException("File " + url + " received length " + received +
|
||||
" is not of the advertised size " +
|
||||
advertisedSize);
|
||||
}
|
||||
|
@ -324,7 +331,7 @@ public class TransferFsImage {
|
|||
|
||||
if (advertisedDigest != null &&
|
||||
!computedDigest.equals(advertisedDigest)) {
|
||||
throw new IOException("File " + str + " computed digest " +
|
||||
throw new IOException("File " + url + " computed digest " +
|
||||
computedDigest + " does not match advertised digest " +
|
||||
advertisedDigest);
|
||||
}
|
||||
|
|
|
@ -91,4 +91,20 @@ public class AtomicFileOutputStream extends FilterOutputStream {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the atomic file, but do not "commit" the temporary file
|
||||
* on top of the destination. This should be used if there is a failure
|
||||
* in writing.
|
||||
*/
|
||||
public void abort() {
|
||||
try {
|
||||
super.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Unable to abort file " + tmpFile, ioe);
|
||||
}
|
||||
if (!tmpFile.delete()) {
|
||||
LOG.warn("Unable to delete tmp file during abort " + tmpFile);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -0,0 +1,100 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.util;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
|
||||
/**
|
||||
* Class that represents a file on disk which persistently stores
|
||||
* a single <code>long</code> value. The file is updated atomically
|
||||
* and durably (i.e fsynced).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class PersistentLongFile {
|
||||
private static final Log LOG = LogFactory.getLog(
|
||||
PersistentLongFile.class);
|
||||
|
||||
private final File file;
|
||||
private final long defaultVal;
|
||||
|
||||
private long value;
|
||||
private boolean loaded = false;
|
||||
|
||||
public PersistentLongFile(File file, long defaultVal) {
|
||||
this.file = file;
|
||||
this.defaultVal = defaultVal;
|
||||
}
|
||||
|
||||
public long get() throws IOException {
|
||||
if (!loaded) {
|
||||
value = readFile(file, defaultVal);
|
||||
loaded = true;
|
||||
}
|
||||
return value;
|
||||
}
|
||||
|
||||
public void set(long newVal) throws IOException {
|
||||
writeFile(file, newVal);
|
||||
value = newVal;
|
||||
loaded = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Atomically write the given value to the given file, including fsyncing.
|
||||
*
|
||||
* @param file destination file
|
||||
* @param val value to write
|
||||
* @throws IOException if the file cannot be written
|
||||
*/
|
||||
public static void writeFile(File file, long val) throws IOException {
|
||||
AtomicFileOutputStream fos = new AtomicFileOutputStream(file);
|
||||
try {
|
||||
fos.write(String.valueOf(val).getBytes());
|
||||
fos.write('\n');
|
||||
fos.close();
|
||||
fos = null;
|
||||
} finally {
|
||||
if (fos != null) {
|
||||
fos.abort();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static long readFile(File file, long defaultVal) throws IOException {
|
||||
long val = defaultVal;
|
||||
if (file.exists()) {
|
||||
BufferedReader br = new BufferedReader(new FileReader(file));
|
||||
try {
|
||||
val = Long.valueOf(br.readLine());
|
||||
br.close();
|
||||
br = null;
|
||||
} finally {
|
||||
IOUtils.cleanup(LOG, br);
|
||||
}
|
||||
}
|
||||
return val;
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue