YARN-5686. DefaultContainerExecutor random working dir algorigthm skews results (Vrushali C via Varun Saxena)

This commit is contained in:
Varun Saxena 2016-10-27 14:03:02 +05:30
parent a8cfa5054c
commit 65c009ca00
2 changed files with 47 additions and 3 deletions

View File

@ -730,6 +730,18 @@ protected Path getWorkingDir(List<String> localDirs, String user,
// make probability to pick a directory proportional to // make probability to pick a directory proportional to
// the available space on the directory. // the available space on the directory.
long randomPosition = RandomUtils.nextLong() % totalAvailable; long randomPosition = RandomUtils.nextLong() % totalAvailable;
int dir = pickDirectory(randomPosition, availableOnDisk);
return getApplicationDir(new Path(localDirs.get(dir)), user, appId);
}
/**
* Picks a directory based on the input random number and
* available size at each dir.
*/
@Private
@VisibleForTesting
int pickDirectory(long randomPosition, final long[] availableOnDisk) {
int dir = 0; int dir = 0;
// skip zero available space directory, // skip zero available space directory,
// because totalAvailable is greater than 0 and randomPosition // because totalAvailable is greater than 0 and randomPosition
@ -738,11 +750,10 @@ protected Path getWorkingDir(List<String> localDirs, String user,
while (availableOnDisk[dir] == 0L) { while (availableOnDisk[dir] == 0L) {
dir++; dir++;
} }
while (randomPosition > availableOnDisk[dir]) { while (randomPosition >= availableOnDisk[dir]) {
randomPosition -= availableOnDisk[dir++]; randomPosition -= availableOnDisk[dir++];
} }
return dir;
return getApplicationDir(new Path(localDirs.get(dir)), user, appId);
} }
/** /**

View File

@ -20,6 +20,7 @@
import static org.apache.hadoop.fs.CreateFlag.CREATE; import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE; import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any; import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyBoolean;
@ -485,6 +486,38 @@ public ContainerLocalizer createContainerLocalizer(String user,
verify(mockLfs, times(2)).getFsStatus(any(Path.class)); verify(mockLfs, times(2)).getFsStatus(any(Path.class));
} }
@Test
public void testPickDirectory() throws Exception {
Configuration conf = new Configuration();
FileContext lfs = FileContext.getLocalFSFileContext(conf);
DefaultContainerExecutor executor = new DefaultContainerExecutor(lfs);
long[] availableOnDisk = new long[2];
availableOnDisk[0] = 100;
availableOnDisk[1] = 100;
assertEquals(0, executor.pickDirectory(0L, availableOnDisk));
assertEquals(0, executor.pickDirectory(99L, availableOnDisk));
assertEquals(1, executor.pickDirectory(100L, availableOnDisk));
assertEquals(1, executor.pickDirectory(101L, availableOnDisk));
assertEquals(1, executor.pickDirectory(199L, availableOnDisk));
long[] availableOnDisk2 = new long[5];
availableOnDisk2[0] = 100;
availableOnDisk2[1] = 10;
availableOnDisk2[2] = 400;
availableOnDisk2[3] = 200;
availableOnDisk2[4] = 350;
assertEquals(0, executor.pickDirectory(0L, availableOnDisk2));
assertEquals(0, executor.pickDirectory(99L, availableOnDisk2));
assertEquals(1, executor.pickDirectory(100L, availableOnDisk2));
assertEquals(1, executor.pickDirectory(105L, availableOnDisk2));
assertEquals(2, executor.pickDirectory(110L, availableOnDisk2));
assertEquals(2, executor.pickDirectory(259L, availableOnDisk2));
assertEquals(3, executor.pickDirectory(700L, availableOnDisk2));
assertEquals(4, executor.pickDirectory(710L, availableOnDisk2));
assertEquals(4, executor.pickDirectory(910L, availableOnDisk2));
}
// @Test // @Test
// public void testInit() throws IOException, InterruptedException { // public void testInit() throws IOException, InterruptedException {
// Configuration conf = new Configuration(); // Configuration conf = new Configuration();