HDFS-6557. Merge r1604242 from trunk.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1604244 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
5f0a84d919
commit
9ba06f53fc
|
@ -202,6 +202,8 @@ Release 2.5.0 - UNRELEASED
|
||||||
HDFS-6403. Add metrics for log warnings reported by JVM pauses. (Yongjun
|
HDFS-6403. Add metrics for log warnings reported by JVM pauses. (Yongjun
|
||||||
Zhang via atm)
|
Zhang via atm)
|
||||||
|
|
||||||
|
HDFS-6557. Move the reference of fsimage to FSNamesystem. (wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
||||||
|
|
|
@ -118,7 +118,6 @@ public class FSDirectory implements Closeable {
|
||||||
public final static byte[] DOT_INODES =
|
public final static byte[] DOT_INODES =
|
||||||
DFSUtil.string2Bytes(DOT_INODES_STRING);
|
DFSUtil.string2Bytes(DOT_INODES_STRING);
|
||||||
INodeDirectory rootDir;
|
INodeDirectory rootDir;
|
||||||
FSImage fsImage;
|
|
||||||
private final FSNamesystem namesystem;
|
private final FSNamesystem namesystem;
|
||||||
private volatile boolean skipQuotaCheck = false; //skip while consuming edits
|
private volatile boolean skipQuotaCheck = false; //skip while consuming edits
|
||||||
private final int maxComponentLength;
|
private final int maxComponentLength;
|
||||||
|
@ -171,11 +170,10 @@ public class FSDirectory implements Closeable {
|
||||||
*/
|
*/
|
||||||
private final NameCache<ByteArray> nameCache;
|
private final NameCache<ByteArray> nameCache;
|
||||||
|
|
||||||
FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
|
FSDirectory(FSNamesystem ns, Configuration conf) {
|
||||||
this.dirLock = new ReentrantReadWriteLock(true); // fair
|
this.dirLock = new ReentrantReadWriteLock(true); // fair
|
||||||
rootDir = createRoot(ns);
|
rootDir = createRoot(ns);
|
||||||
inodeMap = INodeMap.newInstance(rootDir);
|
inodeMap = INodeMap.newInstance(rootDir);
|
||||||
this.fsImage = fsImage;
|
|
||||||
int configuredLimit = conf.getInt(
|
int configuredLimit = conf.getInt(
|
||||||
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
|
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
|
||||||
this.lsLimit = configuredLimit>0 ?
|
this.lsLimit = configuredLimit>0 ?
|
||||||
|
@ -232,9 +230,7 @@ public class FSDirectory implements Closeable {
|
||||||
* Shutdown the filestore
|
* Shutdown the filestore
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {}
|
||||||
fsImage.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
void markNameCacheInitialized() {
|
void markNameCacheInitialized() {
|
||||||
writeLock();
|
writeLock();
|
||||||
|
|
|
@ -495,7 +495,6 @@ public class FSImage implements Closeable {
|
||||||
FSImage realImage = target.getFSImage();
|
FSImage realImage = target.getFSImage();
|
||||||
FSImage ckptImage = new FSImage(conf,
|
FSImage ckptImage = new FSImage(conf,
|
||||||
checkpointDirs, checkpointEditsDirs);
|
checkpointDirs, checkpointEditsDirs);
|
||||||
target.dir.fsImage = ckptImage;
|
|
||||||
// load from the checkpoint dirs
|
// load from the checkpoint dirs
|
||||||
try {
|
try {
|
||||||
ckptImage.recoverTransitionRead(StartupOption.REGULAR, target, null);
|
ckptImage.recoverTransitionRead(StartupOption.REGULAR, target, null);
|
||||||
|
@ -507,7 +506,6 @@ public class FSImage implements Closeable {
|
||||||
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
|
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
|
||||||
realImage.initEditLog(StartupOption.IMPORT);
|
realImage.initEditLog(StartupOption.IMPORT);
|
||||||
|
|
||||||
target.dir.fsImage = realImage;
|
|
||||||
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
|
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
|
||||||
|
|
||||||
// and save it but keep the same checkpointTime
|
// and save it but keep the same checkpointTime
|
||||||
|
|
|
@ -506,6 +506,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
|
|
||||||
private volatile boolean imageLoaded = false;
|
private volatile boolean imageLoaded = false;
|
||||||
private final Condition cond;
|
private final Condition cond;
|
||||||
|
|
||||||
|
private final FSImage fsImage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify that loading of this FSDirectory is complete, and
|
* Notify that loading of this FSDirectory is complete, and
|
||||||
* it is imageLoaded for use
|
* it is imageLoaded for use
|
||||||
|
@ -727,6 +730,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
LOG.info("fsLock is fair:" + fair);
|
LOG.info("fsLock is fair:" + fair);
|
||||||
fsLock = new FSNamesystemLock(fair);
|
fsLock = new FSNamesystemLock(fair);
|
||||||
cond = fsLock.writeLock().newCondition();
|
cond = fsLock.writeLock().newCondition();
|
||||||
|
this.fsImage = fsImage;
|
||||||
try {
|
try {
|
||||||
resourceRecheckInterval = conf.getLong(
|
resourceRecheckInterval = conf.getLong(
|
||||||
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
|
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
|
||||||
|
@ -818,7 +822,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
|
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
|
||||||
|
|
||||||
this.dtSecretManager = createDelegationTokenSecretManager(conf);
|
this.dtSecretManager = createDelegationTokenSecretManager(conf);
|
||||||
this.dir = new FSDirectory(fsImage, this, conf);
|
this.dir = new FSDirectory(this, conf);
|
||||||
this.snapshotManager = new SnapshotManager(dir);
|
this.snapshotManager = new SnapshotManager(dir);
|
||||||
this.cacheManager = new CacheManager(this, conf, blockManager);
|
this.cacheManager = new CacheManager(this, conf, blockManager);
|
||||||
this.safeMode = new SafeModeInfo(conf);
|
this.safeMode = new SafeModeInfo(conf);
|
||||||
|
@ -1046,7 +1050,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
LOG.info("Starting services required for active state");
|
LOG.info("Starting services required for active state");
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
FSEditLog editLog = dir.fsImage.getEditLog();
|
FSEditLog editLog = getFSImage().getEditLog();
|
||||||
|
|
||||||
if (!editLog.isOpenForWrite()) {
|
if (!editLog.isOpenForWrite()) {
|
||||||
// During startup, we're already open for write during initialization.
|
// During startup, we're already open for write during initialization.
|
||||||
|
@ -1075,12 +1079,12 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
metaSaveAsString());
|
metaSaveAsString());
|
||||||
}
|
}
|
||||||
|
|
||||||
long nextTxId = dir.fsImage.getLastAppliedTxId() + 1;
|
long nextTxId = getFSImage().getLastAppliedTxId() + 1;
|
||||||
LOG.info("Will take over writing edit logs at txnid " +
|
LOG.info("Will take over writing edit logs at txnid " +
|
||||||
nextTxId);
|
nextTxId);
|
||||||
editLog.setNextTxId(nextTxId);
|
editLog.setNextTxId(nextTxId);
|
||||||
|
|
||||||
dir.fsImage.editLog.openForWrite();
|
getFSImage().editLog.openForWrite();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enable quota checks.
|
// Enable quota checks.
|
||||||
|
@ -1155,13 +1159,13 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
|
((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
|
||||||
nnEditLogRoller.interrupt();
|
nnEditLogRoller.interrupt();
|
||||||
}
|
}
|
||||||
if (dir != null && dir.fsImage != null) {
|
if (dir != null && getFSImage() != null) {
|
||||||
if (dir.fsImage.editLog != null) {
|
if (getFSImage().editLog != null) {
|
||||||
dir.fsImage.editLog.close();
|
getFSImage().editLog.close();
|
||||||
}
|
}
|
||||||
// Update the fsimage with the last txid that we wrote
|
// Update the fsimage with the last txid that we wrote
|
||||||
// so that the tailer starts from the right spot.
|
// so that the tailer starts from the right spot.
|
||||||
dir.fsImage.updateLastAppliedTxIdFromWritten();
|
getFSImage().updateLastAppliedTxIdFromWritten();
|
||||||
}
|
}
|
||||||
if (cacheManager != null) {
|
if (cacheManager != null) {
|
||||||
cacheManager.stopMonitorThread();
|
cacheManager.stopMonitorThread();
|
||||||
|
@ -1184,9 +1188,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
*/
|
*/
|
||||||
void startStandbyServices(final Configuration conf) throws IOException {
|
void startStandbyServices(final Configuration conf) throws IOException {
|
||||||
LOG.info("Starting services required for standby state");
|
LOG.info("Starting services required for standby state");
|
||||||
if (!dir.fsImage.editLog.isOpenForRead()) {
|
if (!getFSImage().editLog.isOpenForRead()) {
|
||||||
// During startup, we're already open for read.
|
// During startup, we're already open for read.
|
||||||
dir.fsImage.editLog.initSharedJournalsForRead();
|
getFSImage().editLog.initSharedJournalsForRead();
|
||||||
}
|
}
|
||||||
|
|
||||||
blockManager.setPostponeBlocksFromFuture(true);
|
blockManager.setPostponeBlocksFromFuture(true);
|
||||||
|
@ -1233,8 +1237,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
if (editLogTailer != null) {
|
if (editLogTailer != null) {
|
||||||
editLogTailer.stop();
|
editLogTailer.stop();
|
||||||
}
|
}
|
||||||
if (dir != null && dir.fsImage != null && dir.fsImage.editLog != null) {
|
if (dir != null && getFSImage() != null && getFSImage().editLog != null) {
|
||||||
dir.fsImage.editLog.close();
|
getFSImage().editLog.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1477,9 +1481,9 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
* Version of @see #getNamespaceInfo() that is not protected by a lock.
|
* Version of @see #getNamespaceInfo() that is not protected by a lock.
|
||||||
*/
|
*/
|
||||||
NamespaceInfo unprotectedGetNamespaceInfo() {
|
NamespaceInfo unprotectedGetNamespaceInfo() {
|
||||||
return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
|
return new NamespaceInfo(getFSImage().getStorage().getNamespaceID(),
|
||||||
getClusterId(), getBlockPoolId(),
|
getClusterId(), getBlockPoolId(),
|
||||||
dir.fsImage.getStorage().getCTime());
|
getFSImage().getStorage().getCTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1497,12 +1501,10 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
try {
|
try {
|
||||||
stopActiveServices();
|
stopActiveServices();
|
||||||
stopStandbyServices();
|
stopStandbyServices();
|
||||||
if (dir != null) {
|
|
||||||
dir.close();
|
|
||||||
}
|
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
LOG.error("Error closing FSDirectory", ie);
|
} finally {
|
||||||
IOUtils.cleanup(LOG, dir);
|
IOUtils.cleanup(LOG, dir);
|
||||||
|
IOUtils.cleanup(LOG, fsImage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4502,7 +4504,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
* @return registration ID
|
* @return registration ID
|
||||||
*/
|
*/
|
||||||
String getRegistrationID() {
|
String getRegistrationID() {
|
||||||
return Storage.getRegistrationID(dir.fsImage.getStorage());
|
return Storage.getRegistrationID(getFSImage().getStorage());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -4718,7 +4720,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
}
|
}
|
||||||
|
|
||||||
public FSImage getFSImage() {
|
public FSImage getFSImage() {
|
||||||
return dir.fsImage;
|
return fsImage;
|
||||||
}
|
}
|
||||||
|
|
||||||
public FSEditLog getEditLog() {
|
public FSEditLog getEditLog() {
|
||||||
|
@ -7043,7 +7045,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
||||||
|
|
||||||
@Override // NameNodeMXBean
|
@Override // NameNodeMXBean
|
||||||
public String getClusterId() {
|
public String getClusterId() {
|
||||||
return dir.fsImage.getStorage().getClusterID();
|
return getFSImage().getStorage().getClusterID();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // NameNodeMXBean
|
@Override // NameNodeMXBean
|
||||||
|
|
|
@ -830,7 +830,7 @@ public class NameNode implements NameNodeStatusMXBean {
|
||||||
/** get FSImage */
|
/** get FSImage */
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public FSImage getFSImage() {
|
public FSImage getFSImage() {
|
||||||
return namesystem.dir.fsImage;
|
return namesystem.getFSImage();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1140,7 +1140,7 @@ public class NameNode implements NameNodeStatusMXBean {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
nsys.dir.fsImage.doRollback(nsys);
|
nsys.getFSImage().doRollback(nsys);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -45,6 +45,7 @@ import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a utility class to expose NameNode functionality for unit tests.
|
* This is a utility class to expose NameNode functionality for unit tests.
|
||||||
|
@ -177,8 +178,9 @@ public class NameNodeAdapter {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static FSImage spyOnFsImage(NameNode nn1) {
|
public static FSImage spyOnFsImage(NameNode nn1) {
|
||||||
FSImage spy = Mockito.spy(nn1.getNamesystem().dir.fsImage);
|
FSNamesystem fsn = nn1.getNamesystem();
|
||||||
nn1.getNamesystem().dir.fsImage = spy;
|
FSImage spy = Mockito.spy(fsn.getFSImage());
|
||||||
|
Whitebox.setInternalState(fsn, "fsImage", spy);
|
||||||
return spy;
|
return spy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -75,8 +75,7 @@ public class TestFSPermissionChecker {
|
||||||
return new PermissionStatus(SUPERUSER, SUPERGROUP, perm);
|
return new PermissionStatus(SUPERUSER, SUPERGROUP, perm);
|
||||||
}
|
}
|
||||||
}).when(fsn).createFsOwnerPermissions(any(FsPermission.class));
|
}).when(fsn).createFsOwnerPermissions(any(FsPermission.class));
|
||||||
FSImage image = mock(FSImage.class);
|
dir = new FSDirectory(fsn, conf);
|
||||||
dir = new FSDirectory(image, fsn, conf);
|
|
||||||
inodeRoot = dir.getRoot();
|
inodeRoot = dir.getRoot();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -61,6 +61,7 @@ import org.apache.hadoop.test.GenericTestUtils.DelayAnswer;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
@ -124,14 +125,14 @@ public class TestSaveNamespace {
|
||||||
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
||||||
|
|
||||||
// Replace the FSImage with a spy
|
// Replace the FSImage with a spy
|
||||||
FSImage originalImage = fsn.dir.fsImage;
|
FSImage originalImage = fsn.getFSImage();
|
||||||
NNStorage storage = originalImage.getStorage();
|
NNStorage storage = originalImage.getStorage();
|
||||||
|
|
||||||
NNStorage spyStorage = spy(storage);
|
NNStorage spyStorage = spy(storage);
|
||||||
originalImage.storage = spyStorage;
|
originalImage.storage = spyStorage;
|
||||||
|
|
||||||
FSImage spyImage = spy(originalImage);
|
FSImage spyImage = spy(originalImage);
|
||||||
fsn.dir.fsImage = spyImage;
|
Whitebox.setInternalState(fsn, "fsImage", spyImage);
|
||||||
|
|
||||||
boolean shouldFail = false; // should we expect the save operation to fail
|
boolean shouldFail = false; // should we expect the save operation to fail
|
||||||
// inject fault
|
// inject fault
|
||||||
|
@ -233,11 +234,11 @@ public class TestSaveNamespace {
|
||||||
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
||||||
|
|
||||||
// Replace the FSImage with a spy
|
// Replace the FSImage with a spy
|
||||||
FSImage originalImage = fsn.dir.fsImage;
|
FSImage originalImage = fsn.getFSImage();
|
||||||
NNStorage storage = originalImage.getStorage();
|
NNStorage storage = originalImage.getStorage();
|
||||||
|
|
||||||
FSImage spyImage = spy(originalImage);
|
FSImage spyImage = spy(originalImage);
|
||||||
fsn.dir.fsImage = spyImage;
|
Whitebox.setInternalState(fsn, "fsImage", spyImage);
|
||||||
|
|
||||||
FileSystem fs = FileSystem.getLocal(conf);
|
FileSystem fs = FileSystem.getLocal(conf);
|
||||||
File rootDir = storage.getStorageDir(0).getRoot();
|
File rootDir = storage.getStorageDir(0).getRoot();
|
||||||
|
@ -367,14 +368,15 @@ public class TestSaveNamespace {
|
||||||
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
||||||
|
|
||||||
// Replace the FSImage with a spy
|
// Replace the FSImage with a spy
|
||||||
final FSImage originalImage = fsn.dir.fsImage;
|
final FSImage originalImage = fsn.getFSImage();
|
||||||
NNStorage storage = originalImage.getStorage();
|
NNStorage storage = originalImage.getStorage();
|
||||||
storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
|
storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
|
||||||
|
|
||||||
NNStorage spyStorage = spy(storage);
|
NNStorage spyStorage = spy(storage);
|
||||||
originalImage.storage = spyStorage;
|
originalImage.storage = spyStorage;
|
||||||
FSImage spyImage = spy(originalImage);
|
FSImage spyImage = spy(originalImage);
|
||||||
fsn.dir.fsImage = spyImage;
|
Whitebox.setInternalState(fsn, "fsImage", spyImage);
|
||||||
|
|
||||||
spyImage.storage.setStorageDirectories(
|
spyImage.storage.setStorageDirectories(
|
||||||
FSNamesystem.getNamespaceDirs(conf),
|
FSNamesystem.getNamespaceDirs(conf),
|
||||||
FSNamesystem.getNamespaceEditsDirs(conf));
|
FSNamesystem.getNamespaceEditsDirs(conf));
|
||||||
|
@ -532,7 +534,7 @@ public class TestSaveNamespace {
|
||||||
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
||||||
|
|
||||||
// Replace the FSImage with a spy
|
// Replace the FSImage with a spy
|
||||||
final FSImage image = fsn.dir.fsImage;
|
final FSImage image = fsn.getFSImage();
|
||||||
NNStorage storage = image.getStorage();
|
NNStorage storage = image.getStorage();
|
||||||
storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
|
storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
|
||||||
storage.setStorageDirectories(
|
storage.setStorageDirectories(
|
||||||
|
|
Loading…
Reference in New Issue