HDFS-8856. Make LeaseManager#countPath O(1). (Contributed by Arpit Agarwal)
This commit is contained in:
parent
8e23a7997f
commit
42a05d29ee
|
@ -424,6 +424,8 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-8815. DFS getStoragePolicy implementation using single RPC call
|
HDFS-8815. DFS getStoragePolicy implementation using single RPC call
|
||||||
(Surendra Singh Lilhore via vinayakumarb)
|
(Surendra Singh Lilhore via vinayakumarb)
|
||||||
|
|
||||||
|
HDFS-8856. Make LeaseManager#countPath O(1). (Arpit Agarwal)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
HDFS-8026. Trace FSOutputSummer#writeChecksumChunks rather than
|
||||||
|
|
|
@ -254,7 +254,9 @@ class Checkpointer extends Daemon {
|
||||||
try {
|
try {
|
||||||
backupNode.namesystem.setImageLoaded();
|
backupNode.namesystem.setImageLoaded();
|
||||||
if(backupNode.namesystem.getBlocksTotal() > 0) {
|
if(backupNode.namesystem.getBlocksTotal() > 0) {
|
||||||
backupNode.namesystem.setBlockTotal();
|
long completeBlocksTotal =
|
||||||
|
backupNode.namesystem.getCompleteBlocksTotal();
|
||||||
|
backupNode.namesystem.setBlockTotal(completeBlocksTotal);
|
||||||
}
|
}
|
||||||
bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
|
bnImage.saveFSImageInAllDirs(backupNode.getNamesystem(), txid);
|
||||||
if (!backupNode.namesystem.isRollingUpgrade()) {
|
if (!backupNode.namesystem.isRollingUpgrade()) {
|
||||||
|
|
|
@ -1034,9 +1034,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
assert safeMode != null && !isPopulatingReplQueues();
|
assert safeMode != null && !isPopulatingReplQueues();
|
||||||
StartupProgress prog = NameNode.getStartupProgress();
|
StartupProgress prog = NameNode.getStartupProgress();
|
||||||
prog.beginPhase(Phase.SAFEMODE);
|
prog.beginPhase(Phase.SAFEMODE);
|
||||||
|
long completeBlocksTotal = getCompleteBlocksTotal();
|
||||||
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
|
prog.setTotal(Phase.SAFEMODE, STEP_AWAITING_REPORTED_BLOCKS,
|
||||||
getCompleteBlocksTotal());
|
completeBlocksTotal);
|
||||||
setBlockTotal();
|
setBlockTotal(completeBlocksTotal);
|
||||||
blockManager.activate(conf);
|
blockManager.activate(conf);
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
|
@ -4681,12 +4682,12 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
/**
|
/**
|
||||||
* Set the total number of blocks in the system.
|
* Set the total number of blocks in the system.
|
||||||
*/
|
*/
|
||||||
public void setBlockTotal() {
|
public void setBlockTotal(long completeBlocksTotal) {
|
||||||
// safeMode is volatile, and may be set to null at any time
|
// safeMode is volatile, and may be set to null at any time
|
||||||
SafeModeInfo safeMode = this.safeMode;
|
SafeModeInfo safeMode = this.safeMode;
|
||||||
if (safeMode == null)
|
if (safeMode == null)
|
||||||
return;
|
return;
|
||||||
safeMode.setBlockTotal((int) getCompleteBlocksTotal());
|
safeMode.setBlockTotal((int) completeBlocksTotal);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -4718,13 +4719,14 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
/**
|
/**
|
||||||
* Get the total number of COMPLETE blocks in the system.
|
* Get the total number of COMPLETE blocks in the system.
|
||||||
* For safe mode only complete blocks are counted.
|
* For safe mode only complete blocks are counted.
|
||||||
|
* This is invoked only during NN startup and checkpointing.
|
||||||
*/
|
*/
|
||||||
private long getCompleteBlocksTotal() {
|
public long getCompleteBlocksTotal() {
|
||||||
// Calculate number of blocks under construction
|
// Calculate number of blocks under construction
|
||||||
long numUCBlocks = 0;
|
long numUCBlocks = 0;
|
||||||
readLock();
|
readLock();
|
||||||
numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
|
|
||||||
try {
|
try {
|
||||||
|
numUCBlocks = leaseManager.getNumUnderConstructionBlocks();
|
||||||
return getBlocksTotal() - numUCBlocks;
|
return getBlocksTotal() - numUCBlocks;
|
||||||
} finally {
|
} finally {
|
||||||
readUnlock();
|
readUnlock();
|
||||||
|
|
|
@ -22,6 +22,7 @@ import static org.apache.hadoop.util.Time.monotonicNow;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
|
@ -128,15 +129,13 @@ public class LeaseManager {
|
||||||
|
|
||||||
/** @return the number of leases currently in the system */
|
/** @return the number of leases currently in the system */
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public synchronized int countLease() {return sortedLeases.size();}
|
public synchronized int countLease() {
|
||||||
|
return sortedLeases.size();
|
||||||
|
}
|
||||||
|
|
||||||
/** @return the number of paths contained in all leases */
|
/** @return the number of paths contained in all leases */
|
||||||
synchronized int countPath() {
|
synchronized long countPath() {
|
||||||
int count = 0;
|
return leasesById.size();
|
||||||
for (Lease lease : sortedLeases) {
|
|
||||||
count += lease.getFiles().size();
|
|
||||||
}
|
|
||||||
return count;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -280,7 +279,9 @@ public class LeaseManager {
|
||||||
return holder.hashCode();
|
return holder.hashCode();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Collection<Long> getFiles() { return files; }
|
private Collection<Long> getFiles() {
|
||||||
|
return Collections.unmodifiableCollection(files);
|
||||||
|
}
|
||||||
|
|
||||||
String getHolder() {
|
String getHolder() {
|
||||||
return holder;
|
return holder;
|
||||||
|
|
|
@ -17,19 +17,28 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.is;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNotNull;
|
import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
|
import org.apache.hadoop.fs.permission.PermissionStatus;
|
||||||
|
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.junit.rules.Timeout;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
public class TestLeaseManager {
|
public class TestLeaseManager {
|
||||||
|
@Rule
|
||||||
|
public Timeout timeout = new Timeout(300000);
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRemoveLeases() throws Exception {
|
public void testRemoveLeases() throws Exception {
|
||||||
FSNamesystem fsn = mock(FSNamesystem.class);
|
FSNamesystem fsn = mock(FSNamesystem.class);
|
||||||
|
@ -52,14 +61,9 @@ public class TestLeaseManager {
|
||||||
* leases, the Namenode does't enter an infinite loop while holding the FSN
|
* leases, the Namenode does't enter an infinite loop while holding the FSN
|
||||||
* write lock and thus become unresponsive
|
* write lock and thus become unresponsive
|
||||||
*/
|
*/
|
||||||
@Test (timeout=1000)
|
@Test
|
||||||
public void testCheckLeaseNotInfiniteLoop() {
|
public void testCheckLeaseNotInfiniteLoop() {
|
||||||
FSDirectory dir = Mockito.mock(FSDirectory.class);
|
LeaseManager lm = new LeaseManager(makeMockFsNameSystem());
|
||||||
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
|
|
||||||
Mockito.when(fsn.isRunning()).thenReturn(true);
|
|
||||||
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
|
|
||||||
Mockito.when(fsn.getFSDirectory()).thenReturn(dir);
|
|
||||||
LeaseManager lm = new LeaseManager(fsn);
|
|
||||||
|
|
||||||
//Make sure the leases we are going to add exceed the hard limit
|
//Make sure the leases we are going to add exceed the hard limit
|
||||||
lm.setLeasePeriod(0, 0);
|
lm.setLeasePeriod(0, 0);
|
||||||
|
@ -73,4 +77,49 @@ public class TestLeaseManager {
|
||||||
//Initiate a call to checkLease. This should exit within the test timeout
|
//Initiate a call to checkLease. This should exit within the test timeout
|
||||||
lm.checkLeases();
|
lm.checkLeases();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testCountPath() {
|
||||||
|
LeaseManager lm = new LeaseManager(makeMockFsNameSystem());
|
||||||
|
|
||||||
|
lm.addLease("holder1", 1);
|
||||||
|
assertThat(lm.countPath(), is(1L));
|
||||||
|
|
||||||
|
lm.addLease("holder2", 2);
|
||||||
|
assertThat(lm.countPath(), is(2L));
|
||||||
|
lm.addLease("holder2", 2); // Duplicate addition
|
||||||
|
assertThat(lm.countPath(), is(2L));
|
||||||
|
|
||||||
|
assertThat(lm.countPath(), is(2L));
|
||||||
|
|
||||||
|
// Remove a couple of non-existing leases. countPath should not change.
|
||||||
|
lm.removeLease("holder2", stubInodeFile(3));
|
||||||
|
lm.removeLease("InvalidLeaseHolder", stubInodeFile(1));
|
||||||
|
assertThat(lm.countPath(), is(2L));
|
||||||
|
|
||||||
|
INodeFile file = stubInodeFile(1);
|
||||||
|
lm.reassignLease(lm.getLease(file), file, "holder2");
|
||||||
|
assertThat(lm.countPath(), is(2L)); // Count unchanged on reassign
|
||||||
|
|
||||||
|
lm.removeLease("holder2", stubInodeFile(2)); // Remove existing
|
||||||
|
assertThat(lm.countPath(), is(1L));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static FSNamesystem makeMockFsNameSystem() {
|
||||||
|
FSDirectory dir = mock(FSDirectory.class);
|
||||||
|
FSNamesystem fsn = mock(FSNamesystem.class);
|
||||||
|
when(fsn.isRunning()).thenReturn(true);
|
||||||
|
when(fsn.hasWriteLock()).thenReturn(true);
|
||||||
|
when(fsn.getFSDirectory()).thenReturn(dir);
|
||||||
|
return fsn;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static INodeFile stubInodeFile(long inodeId) {
|
||||||
|
PermissionStatus p = new PermissionStatus(
|
||||||
|
"dummy", "dummy", new FsPermission((short) 0777));
|
||||||
|
return new INodeFile(
|
||||||
|
inodeId, "/foo".getBytes(), p, 0L, 0L,
|
||||||
|
BlockInfo.EMPTY_ARRAY, (short) 1, 1L);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue