HDFS-6557. Move the reference of fsimage to FSNamesystem. Contributed by Haohui Mai.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1604242 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
61bf9f7799
commit
9ca79e8d32
@ -456,6 +456,8 @@ Release 2.5.0 - UNRELEASED
|
|||||||
HDFS-6403. Add metrics for log warnings reported by JVM pauses. (Yongjun
|
HDFS-6403. Add metrics for log warnings reported by JVM pauses. (Yongjun
|
||||||
Zhang via atm)
|
Zhang via atm)
|
||||||
|
|
||||||
|
HDFS-6557. Move the reference of fsimage to FSNamesystem. (wheat9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
HDFS-6214. Webhdfs has poor throughput for files >2GB (daryn)
|
||||||
|
@ -117,7 +117,6 @@ private static INodeDirectorySnapshottable createRoot(FSNamesystem namesystem) {
|
|||||||
public final static byte[] DOT_INODES =
|
public final static byte[] DOT_INODES =
|
||||||
DFSUtil.string2Bytes(DOT_INODES_STRING);
|
DFSUtil.string2Bytes(DOT_INODES_STRING);
|
||||||
INodeDirectory rootDir;
|
INodeDirectory rootDir;
|
||||||
FSImage fsImage;
|
|
||||||
private final FSNamesystem namesystem;
|
private final FSNamesystem namesystem;
|
||||||
private volatile boolean skipQuotaCheck = false; //skip while consuming edits
|
private volatile boolean skipQuotaCheck = false; //skip while consuming edits
|
||||||
private final int maxComponentLength;
|
private final int maxComponentLength;
|
||||||
@ -170,11 +169,10 @@ public int getWriteHoldCount() {
|
|||||||
*/
|
*/
|
||||||
private final NameCache<ByteArray> nameCache;
|
private final NameCache<ByteArray> nameCache;
|
||||||
|
|
||||||
FSDirectory(FSImage fsImage, FSNamesystem ns, Configuration conf) {
|
FSDirectory(FSNamesystem ns, Configuration conf) {
|
||||||
this.dirLock = new ReentrantReadWriteLock(true); // fair
|
this.dirLock = new ReentrantReadWriteLock(true); // fair
|
||||||
rootDir = createRoot(ns);
|
rootDir = createRoot(ns);
|
||||||
inodeMap = INodeMap.newInstance(rootDir);
|
inodeMap = INodeMap.newInstance(rootDir);
|
||||||
this.fsImage = fsImage;
|
|
||||||
int configuredLimit = conf.getInt(
|
int configuredLimit = conf.getInt(
|
||||||
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
|
DFSConfigKeys.DFS_LIST_LIMIT, DFSConfigKeys.DFS_LIST_LIMIT_DEFAULT);
|
||||||
this.lsLimit = configuredLimit>0 ?
|
this.lsLimit = configuredLimit>0 ?
|
||||||
@ -231,9 +229,7 @@ public INodeDirectory getRoot() {
|
|||||||
* Shutdown the filestore
|
* Shutdown the filestore
|
||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
public void close() throws IOException {
|
public void close() throws IOException {}
|
||||||
fsImage.close();
|
|
||||||
}
|
|
||||||
|
|
||||||
void markNameCacheInitialized() {
|
void markNameCacheInitialized() {
|
||||||
writeLock();
|
writeLock();
|
||||||
|
@ -495,7 +495,6 @@ void doImportCheckpoint(FSNamesystem target) throws IOException {
|
|||||||
FSImage realImage = target.getFSImage();
|
FSImage realImage = target.getFSImage();
|
||||||
FSImage ckptImage = new FSImage(conf,
|
FSImage ckptImage = new FSImage(conf,
|
||||||
checkpointDirs, checkpointEditsDirs);
|
checkpointDirs, checkpointEditsDirs);
|
||||||
target.dir.fsImage = ckptImage;
|
|
||||||
// load from the checkpoint dirs
|
// load from the checkpoint dirs
|
||||||
try {
|
try {
|
||||||
ckptImage.recoverTransitionRead(StartupOption.REGULAR, target, null);
|
ckptImage.recoverTransitionRead(StartupOption.REGULAR, target, null);
|
||||||
@ -507,7 +506,6 @@ void doImportCheckpoint(FSNamesystem target) throws IOException {
|
|||||||
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
|
realImage.getEditLog().setNextTxId(ckptImage.getEditLog().getLastWrittenTxId()+1);
|
||||||
realImage.initEditLog(StartupOption.IMPORT);
|
realImage.initEditLog(StartupOption.IMPORT);
|
||||||
|
|
||||||
target.dir.fsImage = realImage;
|
|
||||||
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
|
realImage.getStorage().setBlockPoolID(ckptImage.getBlockPoolID());
|
||||||
|
|
||||||
// and save it but keep the same checkpointTime
|
// and save it but keep the same checkpointTime
|
||||||
|
@ -517,6 +517,9 @@ private void logAuditEvent(boolean succeeded,
|
|||||||
|
|
||||||
private volatile boolean imageLoaded = false;
|
private volatile boolean imageLoaded = false;
|
||||||
private final Condition cond;
|
private final Condition cond;
|
||||||
|
|
||||||
|
private final FSImage fsImage;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Notify that loading of this FSDirectory is complete, and
|
* Notify that loading of this FSDirectory is complete, and
|
||||||
* it is imageLoaded for use
|
* it is imageLoaded for use
|
||||||
@ -738,6 +741,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
|
|||||||
LOG.info("fsLock is fair:" + fair);
|
LOG.info("fsLock is fair:" + fair);
|
||||||
fsLock = new FSNamesystemLock(fair);
|
fsLock = new FSNamesystemLock(fair);
|
||||||
cond = fsLock.writeLock().newCondition();
|
cond = fsLock.writeLock().newCondition();
|
||||||
|
this.fsImage = fsImage;
|
||||||
try {
|
try {
|
||||||
resourceRecheckInterval = conf.getLong(
|
resourceRecheckInterval = conf.getLong(
|
||||||
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
|
DFS_NAMENODE_RESOURCE_CHECK_INTERVAL_KEY,
|
||||||
@ -827,7 +831,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
|
|||||||
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
|
DFS_NAMENODE_DELEGATION_TOKEN_ALWAYS_USE_DEFAULT);
|
||||||
|
|
||||||
this.dtSecretManager = createDelegationTokenSecretManager(conf);
|
this.dtSecretManager = createDelegationTokenSecretManager(conf);
|
||||||
this.dir = new FSDirectory(fsImage, this, conf);
|
this.dir = new FSDirectory(this, conf);
|
||||||
this.snapshotManager = new SnapshotManager(dir);
|
this.snapshotManager = new SnapshotManager(dir);
|
||||||
this.cacheManager = new CacheManager(this, conf, blockManager);
|
this.cacheManager = new CacheManager(this, conf, blockManager);
|
||||||
this.safeMode = new SafeModeInfo(conf);
|
this.safeMode = new SafeModeInfo(conf);
|
||||||
@ -1055,7 +1059,7 @@ void startActiveServices() throws IOException {
|
|||||||
LOG.info("Starting services required for active state");
|
LOG.info("Starting services required for active state");
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
FSEditLog editLog = dir.fsImage.getEditLog();
|
FSEditLog editLog = getFSImage().getEditLog();
|
||||||
|
|
||||||
if (!editLog.isOpenForWrite()) {
|
if (!editLog.isOpenForWrite()) {
|
||||||
// During startup, we're already open for write during initialization.
|
// During startup, we're already open for write during initialization.
|
||||||
@ -1084,12 +1088,12 @@ void startActiveServices() throws IOException {
|
|||||||
metaSaveAsString());
|
metaSaveAsString());
|
||||||
}
|
}
|
||||||
|
|
||||||
long nextTxId = dir.fsImage.getLastAppliedTxId() + 1;
|
long nextTxId = getFSImage().getLastAppliedTxId() + 1;
|
||||||
LOG.info("Will take over writing edit logs at txnid " +
|
LOG.info("Will take over writing edit logs at txnid " +
|
||||||
nextTxId);
|
nextTxId);
|
||||||
editLog.setNextTxId(nextTxId);
|
editLog.setNextTxId(nextTxId);
|
||||||
|
|
||||||
dir.fsImage.editLog.openForWrite();
|
getFSImage().editLog.openForWrite();
|
||||||
}
|
}
|
||||||
|
|
||||||
// Enable quota checks.
|
// Enable quota checks.
|
||||||
@ -1164,13 +1168,13 @@ void stopActiveServices() {
|
|||||||
((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
|
((NameNodeEditLogRoller)nnEditLogRoller.getRunnable()).stop();
|
||||||
nnEditLogRoller.interrupt();
|
nnEditLogRoller.interrupt();
|
||||||
}
|
}
|
||||||
if (dir != null && dir.fsImage != null) {
|
if (dir != null && getFSImage() != null) {
|
||||||
if (dir.fsImage.editLog != null) {
|
if (getFSImage().editLog != null) {
|
||||||
dir.fsImage.editLog.close();
|
getFSImage().editLog.close();
|
||||||
}
|
}
|
||||||
// Update the fsimage with the last txid that we wrote
|
// Update the fsimage with the last txid that we wrote
|
||||||
// so that the tailer starts from the right spot.
|
// so that the tailer starts from the right spot.
|
||||||
dir.fsImage.updateLastAppliedTxIdFromWritten();
|
getFSImage().updateLastAppliedTxIdFromWritten();
|
||||||
}
|
}
|
||||||
if (cacheManager != null) {
|
if (cacheManager != null) {
|
||||||
cacheManager.stopMonitorThread();
|
cacheManager.stopMonitorThread();
|
||||||
@ -1193,9 +1197,9 @@ void stopActiveServices() {
|
|||||||
*/
|
*/
|
||||||
void startStandbyServices(final Configuration conf) throws IOException {
|
void startStandbyServices(final Configuration conf) throws IOException {
|
||||||
LOG.info("Starting services required for standby state");
|
LOG.info("Starting services required for standby state");
|
||||||
if (!dir.fsImage.editLog.isOpenForRead()) {
|
if (!getFSImage().editLog.isOpenForRead()) {
|
||||||
// During startup, we're already open for read.
|
// During startup, we're already open for read.
|
||||||
dir.fsImage.editLog.initSharedJournalsForRead();
|
getFSImage().editLog.initSharedJournalsForRead();
|
||||||
}
|
}
|
||||||
|
|
||||||
blockManager.setPostponeBlocksFromFuture(true);
|
blockManager.setPostponeBlocksFromFuture(true);
|
||||||
@ -1242,8 +1246,8 @@ void stopStandbyServices() throws IOException {
|
|||||||
if (editLogTailer != null) {
|
if (editLogTailer != null) {
|
||||||
editLogTailer.stop();
|
editLogTailer.stop();
|
||||||
}
|
}
|
||||||
if (dir != null && dir.fsImage != null && dir.fsImage.editLog != null) {
|
if (dir != null && getFSImage() != null && getFSImage().editLog != null) {
|
||||||
dir.fsImage.editLog.close();
|
getFSImage().editLog.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1486,9 +1490,9 @@ NamespaceInfo getNamespaceInfo() {
|
|||||||
* Version of @see #getNamespaceInfo() that is not protected by a lock.
|
* Version of @see #getNamespaceInfo() that is not protected by a lock.
|
||||||
*/
|
*/
|
||||||
NamespaceInfo unprotectedGetNamespaceInfo() {
|
NamespaceInfo unprotectedGetNamespaceInfo() {
|
||||||
return new NamespaceInfo(dir.fsImage.getStorage().getNamespaceID(),
|
return new NamespaceInfo(getFSImage().getStorage().getNamespaceID(),
|
||||||
getClusterId(), getBlockPoolId(),
|
getClusterId(), getBlockPoolId(),
|
||||||
dir.fsImage.getStorage().getCTime());
|
getFSImage().getStorage().getCTime());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1506,12 +1510,10 @@ void close() {
|
|||||||
try {
|
try {
|
||||||
stopActiveServices();
|
stopActiveServices();
|
||||||
stopStandbyServices();
|
stopStandbyServices();
|
||||||
if (dir != null) {
|
|
||||||
dir.close();
|
|
||||||
}
|
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
LOG.error("Error closing FSDirectory", ie);
|
} finally {
|
||||||
IOUtils.cleanup(LOG, dir);
|
IOUtils.cleanup(LOG, dir);
|
||||||
|
IOUtils.cleanup(LOG, fsImage);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -4503,7 +4505,7 @@ void registerDatanode(DatanodeRegistration nodeReg) throws IOException {
|
|||||||
* @return registration ID
|
* @return registration ID
|
||||||
*/
|
*/
|
||||||
String getRegistrationID() {
|
String getRegistrationID() {
|
||||||
return Storage.getRegistrationID(dir.fsImage.getStorage());
|
return Storage.getRegistrationID(getFSImage().getStorage());
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -4719,7 +4721,7 @@ public void stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public FSImage getFSImage() {
|
public FSImage getFSImage() {
|
||||||
return dir.fsImage;
|
return fsImage;
|
||||||
}
|
}
|
||||||
|
|
||||||
public FSEditLog getEditLog() {
|
public FSEditLog getEditLog() {
|
||||||
@ -7044,7 +7046,7 @@ private long getDfsUsed(DatanodeDescriptor alivenode) {
|
|||||||
|
|
||||||
@Override // NameNodeMXBean
|
@Override // NameNodeMXBean
|
||||||
public String getClusterId() {
|
public String getClusterId() {
|
||||||
return dir.fsImage.getStorage().getClusterID();
|
return getFSImage().getStorage().getClusterID();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // NameNodeMXBean
|
@Override // NameNodeMXBean
|
||||||
|
@ -831,7 +831,7 @@ public boolean isInSafeMode() {
|
|||||||
/** get FSImage */
|
/** get FSImage */
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public FSImage getFSImage() {
|
public FSImage getFSImage() {
|
||||||
return namesystem.dir.fsImage;
|
return namesystem.getFSImage();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -1141,7 +1141,7 @@ public static boolean doRollback(Configuration conf,
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
nsys.dir.fsImage.doRollback(nsys);
|
nsys.getFSImage().doRollback(nsys);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -45,6 +45,7 @@
|
|||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This is a utility class to expose NameNode functionality for unit tests.
|
* This is a utility class to expose NameNode functionality for unit tests.
|
||||||
@ -177,8 +178,9 @@ public static ReentrantReadWriteLock spyOnFsLock(FSNamesystem fsn) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static FSImage spyOnFsImage(NameNode nn1) {
|
public static FSImage spyOnFsImage(NameNode nn1) {
|
||||||
FSImage spy = Mockito.spy(nn1.getNamesystem().dir.fsImage);
|
FSNamesystem fsn = nn1.getNamesystem();
|
||||||
nn1.getNamesystem().dir.fsImage = spy;
|
FSImage spy = Mockito.spy(fsn.getFSImage());
|
||||||
|
Whitebox.setInternalState(fsn, "fsImage", spy);
|
||||||
return spy;
|
return spy;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -75,8 +75,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
|
|||||||
return new PermissionStatus(SUPERUSER, SUPERGROUP, perm);
|
return new PermissionStatus(SUPERUSER, SUPERGROUP, perm);
|
||||||
}
|
}
|
||||||
}).when(fsn).createFsOwnerPermissions(any(FsPermission.class));
|
}).when(fsn).createFsOwnerPermissions(any(FsPermission.class));
|
||||||
FSImage image = mock(FSImage.class);
|
dir = new FSDirectory(fsn, conf);
|
||||||
dir = new FSDirectory(image, fsn, conf);
|
|
||||||
inodeRoot = dir.getRoot();
|
inodeRoot = dir.getRoot();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -61,6 +61,7 @@
|
|||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
import org.mockito.internal.util.reflection.Whitebox;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
@ -124,14 +125,14 @@ private void saveNamespaceWithInjectedFault(Fault fault) throws Exception {
|
|||||||
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
||||||
|
|
||||||
// Replace the FSImage with a spy
|
// Replace the FSImage with a spy
|
||||||
FSImage originalImage = fsn.dir.fsImage;
|
FSImage originalImage = fsn.getFSImage();
|
||||||
NNStorage storage = originalImage.getStorage();
|
NNStorage storage = originalImage.getStorage();
|
||||||
|
|
||||||
NNStorage spyStorage = spy(storage);
|
NNStorage spyStorage = spy(storage);
|
||||||
originalImage.storage = spyStorage;
|
originalImage.storage = spyStorage;
|
||||||
|
|
||||||
FSImage spyImage = spy(originalImage);
|
FSImage spyImage = spy(originalImage);
|
||||||
fsn.dir.fsImage = spyImage;
|
Whitebox.setInternalState(fsn, "fsImage", spyImage);
|
||||||
|
|
||||||
boolean shouldFail = false; // should we expect the save operation to fail
|
boolean shouldFail = false; // should we expect the save operation to fail
|
||||||
// inject fault
|
// inject fault
|
||||||
@ -233,11 +234,11 @@ public void testReinsertnamedirsInSavenamespace() throws Exception {
|
|||||||
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
||||||
|
|
||||||
// Replace the FSImage with a spy
|
// Replace the FSImage with a spy
|
||||||
FSImage originalImage = fsn.dir.fsImage;
|
FSImage originalImage = fsn.getFSImage();
|
||||||
NNStorage storage = originalImage.getStorage();
|
NNStorage storage = originalImage.getStorage();
|
||||||
|
|
||||||
FSImage spyImage = spy(originalImage);
|
FSImage spyImage = spy(originalImage);
|
||||||
fsn.dir.fsImage = spyImage;
|
Whitebox.setInternalState(fsn, "fsImage", spyImage);
|
||||||
|
|
||||||
FileSystem fs = FileSystem.getLocal(conf);
|
FileSystem fs = FileSystem.getLocal(conf);
|
||||||
File rootDir = storage.getStorageDir(0).getRoot();
|
File rootDir = storage.getStorageDir(0).getRoot();
|
||||||
@ -367,14 +368,15 @@ public void doTestFailedSaveNamespace(boolean restoreStorageAfterFailure)
|
|||||||
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
||||||
|
|
||||||
// Replace the FSImage with a spy
|
// Replace the FSImage with a spy
|
||||||
final FSImage originalImage = fsn.dir.fsImage;
|
final FSImage originalImage = fsn.getFSImage();
|
||||||
NNStorage storage = originalImage.getStorage();
|
NNStorage storage = originalImage.getStorage();
|
||||||
storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
|
storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
|
||||||
|
|
||||||
NNStorage spyStorage = spy(storage);
|
NNStorage spyStorage = spy(storage);
|
||||||
originalImage.storage = spyStorage;
|
originalImage.storage = spyStorage;
|
||||||
FSImage spyImage = spy(originalImage);
|
FSImage spyImage = spy(originalImage);
|
||||||
fsn.dir.fsImage = spyImage;
|
Whitebox.setInternalState(fsn, "fsImage", spyImage);
|
||||||
|
|
||||||
spyImage.storage.setStorageDirectories(
|
spyImage.storage.setStorageDirectories(
|
||||||
FSNamesystem.getNamespaceDirs(conf),
|
FSNamesystem.getNamespaceDirs(conf),
|
||||||
FSNamesystem.getNamespaceEditsDirs(conf));
|
FSNamesystem.getNamespaceEditsDirs(conf));
|
||||||
@ -504,7 +506,7 @@ public void testCancelSaveNamespace() throws Exception {
|
|||||||
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
FSNamesystem fsn = FSNamesystem.loadFromDisk(conf);
|
||||||
|
|
||||||
// Replace the FSImage with a spy
|
// Replace the FSImage with a spy
|
||||||
final FSImage image = fsn.dir.fsImage;
|
final FSImage image = fsn.getFSImage();
|
||||||
NNStorage storage = image.getStorage();
|
NNStorage storage = image.getStorage();
|
||||||
storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
|
storage.close(); // unlock any directories that FSNamesystem's initialization may have locked
|
||||||
storage.setStorageDirectories(
|
storage.setStorageDirectories(
|
||||||
|
Loading…
x
Reference in New Issue
Block a user