MAPREDUCE-5268. Improve history server startup performance. Contributed by Karthik Kambatla

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1489012 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-06-03 14:46:30 +00:00
parent 6c4f5e47fc
commit ac44e0a0d0
4 changed files with 237 additions and 3 deletions

View File

@ -281,6 +281,9 @@ Release 2.1.0-beta - UNRELEASED
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
(Gelesh via bobby)
MAPREDUCE-5268. Improve history server startup performance (Karthik
Kambatla via jlowe)
BUG FIXES
MAPREDUCE-4671. AM does not tell the RM about container requests which are
@ -1064,6 +1067,21 @@ Release 2.0.0-alpha - 05-23-2012
MAPREDUCE-4444. nodemanager fails to start when one of the local-dirs is
bad (Jason Lowe via bobby)
Release 0.23.9 - UNRELEASED
INCOMPATIBLE CHANGES
NEW FEATURES
IMPROVEMENTS
OPTIMIZATIONS
MAPREDUCE-5268. Improve history server startup performance (Karthik
Kambatla via jlowe)
BUG FIXES
Release 0.23.8 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NavigableSet;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
@ -36,6 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -131,19 +133,73 @@ public class HistoryFileManager extends AbstractService {
}
}
static class JobListCache {
/**
* Wrapper around {@link ConcurrentSkipListMap} that maintains size along
* side for O(1) size() implementation for use in JobListCache.
*
* Note: The size is not updated atomically with changes additions/removals.
* This race can lead to size() returning an incorrect size at times.
*/
static class JobIdHistoryFileInfoMap {
private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
private AtomicInteger mapSize;
JobIdHistoryFileInfoMap() {
cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
mapSize = new AtomicInteger();
}
public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) {
HistoryFileInfo ret = cache.putIfAbsent(key, value);
if (ret == null) {
mapSize.incrementAndGet();
}
return ret;
}
public HistoryFileInfo remove(JobId key) {
HistoryFileInfo ret = cache.remove(key);
if (ret != null) {
mapSize.decrementAndGet();
}
return ret;
}
/**
* Returns the recorded size of the internal map. Note that this could be out
* of sync with the actual size of the map
* @return "recorded" size
*/
public int size() {
return mapSize.get();
}
public HistoryFileInfo get(JobId key) {
return cache.get(key);
}
public NavigableSet<JobId> navigableKeySet() {
return cache.navigableKeySet();
}
public Collection<HistoryFileInfo> values() {
return cache.values();
}
}
static class JobListCache {
private JobIdHistoryFileInfoMap cache;
private int maxSize;
private long maxAge;
public JobListCache(int maxSize, long maxAge) {
this.maxSize = maxSize;
this.maxAge = maxAge;
this.cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
this.cache = new JobIdHistoryFileInfoMap();
}
public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
JobId jobId = fileInfo.getJobIndexInfo().getJobId();
JobId jobId = fileInfo.getJobId();
if (LOG.isDebugEnabled()) {
LOG.debug("Adding " + jobId + " to job list cache with "
+ fileInfo.getJobIndexInfo());

View File

@ -0,0 +1,78 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.util.Collection;
import java.util.NavigableSet;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobIdHistoryFileInfoMap;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
public class TestJobIdHistoryFileInfoMap {
private boolean checkSize(JobIdHistoryFileInfoMap map, int size)
throws InterruptedException {
for (int i = 0; i < 100; i++) {
if (map.size() != size)
Thread.sleep(20);
else
return true;
}
return false;
}
/**
* Trivial test case that verifies basic functionality of {@link
* JobIdHistoryFileInfoMap}
*/
@Test(timeout = 2000)
public void testWithSingleElement() throws InterruptedException {
JobIdHistoryFileInfoMap mapWithSize = new JobIdHistoryFileInfoMap();
JobId jobId = MRBuilderUtils.newJobId(1, 1, 1);
HistoryFileInfo fileInfo1 = Mockito.mock(HistoryFileInfo.class);
Mockito.when(fileInfo1.getJobId()).thenReturn(jobId);
// add it twice
assertEquals("Incorrect return on putIfAbsent()",
null, mapWithSize.putIfAbsent(jobId, fileInfo1));
assertEquals("Incorrect return on putIfAbsent()",
fileInfo1, mapWithSize.putIfAbsent(jobId, fileInfo1));
// check get()
assertEquals("Incorrect get()", fileInfo1, mapWithSize.get(jobId));
assertTrue("Incorrect size()", checkSize(mapWithSize, 1));
// check navigableKeySet()
NavigableSet<JobId> set = mapWithSize.navigableKeySet();
assertEquals("Incorrect navigableKeySet()", 1, set.size());
assertTrue("Incorrect navigableKeySet()", set.contains(jobId));
// check values()
Collection<HistoryFileInfo> values = mapWithSize.values();
assertEquals("Incorrect values()", 1, values.size());
assertTrue("Incorrect values()", values.contains(fileInfo1));
}
}

View File

@ -0,0 +1,82 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.mapreduce.v2.hs;
import java.lang.InterruptedException;
import java.util.Collection;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.junit.Test;
import org.mockito.Mockito;
import static org.junit.Assert.*;
public class TestJobListCache {
@Test (timeout = 1000)
public void testAddExisting() {
JobListCache cache = new JobListCache(2, 1000);
JobId jobId = MRBuilderUtils.newJobId(1, 1, 1);
HistoryFileInfo fileInfo = Mockito.mock(HistoryFileInfo.class);
Mockito.when(fileInfo.getJobId()).thenReturn(jobId);
cache.addIfAbsent(fileInfo);
cache.addIfAbsent(fileInfo);
assertEquals("Incorrect number of cache entries", 1,
cache.values().size());
}
@Test (timeout = 1000)
public void testEviction() throws InterruptedException {
int maxSize = 2;
JobListCache cache = new JobListCache(maxSize, 1000);
JobId jobId1 = MRBuilderUtils.newJobId(1, 1, 1);
HistoryFileInfo fileInfo1 = Mockito.mock(HistoryFileInfo.class);
Mockito.when(fileInfo1.getJobId()).thenReturn(jobId1);
JobId jobId2 = MRBuilderUtils.newJobId(2, 2, 2);
HistoryFileInfo fileInfo2 = Mockito.mock(HistoryFileInfo.class);
Mockito.when(fileInfo2.getJobId()).thenReturn(jobId2);
JobId jobId3 = MRBuilderUtils.newJobId(3, 3, 3);
HistoryFileInfo fileInfo3 = Mockito.mock(HistoryFileInfo.class);
Mockito.when(fileInfo3.getJobId()).thenReturn(jobId3);
cache.addIfAbsent(fileInfo1);
cache.addIfAbsent(fileInfo2);
cache.addIfAbsent(fileInfo3);
Collection <HistoryFileInfo> values;
for (int i = 0; i < 9; i++) {
values = cache.values();
if (values.size() > maxSize) {
Thread.sleep(100);
} else {
assertFalse("fileInfo1 should have been evicted",
values.contains(fileInfo1));
return;
}
}
fail("JobListCache didn't delete the extra entry");
}
}