svn merge -c 1489012 FIXES: MAPREDUCE-5268. Improve history server startup performance. Contributed by Karthik Kambatla
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1489015 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6bd7b27abf
commit
f09946bd47
|
@ -144,6 +144,9 @@ Release 2.1.0-beta - UNRELEASED
|
||||||
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
|
MAPREDUCE-4974. Optimising the LineRecordReader initialize() method
|
||||||
(Gelesh via bobby)
|
(Gelesh via bobby)
|
||||||
|
|
||||||
|
MAPREDUCE-5268. Improve history server startup performance (Karthik
|
||||||
|
Kambatla via jlowe)
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
|
||||||
MAPREDUCE-4671. AM does not tell the RM about container requests which are
|
MAPREDUCE-4671. AM does not tell the RM about container requests which are
|
||||||
|
@ -936,6 +939,21 @@ Release 2.0.0-alpha - 05-23-2012
|
||||||
MAPREDUCE-4444. nodemanager fails to start when one of the local-dirs is
|
MAPREDUCE-4444. nodemanager fails to start when one of the local-dirs is
|
||||||
bad (Jason Lowe via bobby)
|
bad (Jason Lowe via bobby)
|
||||||
|
|
||||||
|
Release 0.23.9 - UNRELEASED
|
||||||
|
|
||||||
|
INCOMPATIBLE CHANGES
|
||||||
|
|
||||||
|
NEW FEATURES
|
||||||
|
|
||||||
|
IMPROVEMENTS
|
||||||
|
|
||||||
|
OPTIMIZATIONS
|
||||||
|
|
||||||
|
MAPREDUCE-5268. Improve history server startup performance (Karthik
|
||||||
|
Kambatla via jlowe)
|
||||||
|
|
||||||
|
BUG FIXES
|
||||||
|
|
||||||
Release 0.23.8 - UNRELEASED
|
Release 0.23.8 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -26,6 +26,7 @@ import java.util.Collections;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.NavigableSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.SortedMap;
|
import java.util.SortedMap;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
|
@ -36,6 +37,7 @@ import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
@ -131,19 +133,73 @@ public class HistoryFileManager extends AbstractService {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class JobListCache {
|
/**
|
||||||
|
* Wrapper around {@link ConcurrentSkipListMap} that maintains size along
|
||||||
|
* side for O(1) size() implementation for use in JobListCache.
|
||||||
|
*
|
||||||
|
* Note: The size is not updated atomically with changes additions/removals.
|
||||||
|
* This race can lead to size() returning an incorrect size at times.
|
||||||
|
*/
|
||||||
|
static class JobIdHistoryFileInfoMap {
|
||||||
private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
|
private ConcurrentSkipListMap<JobId, HistoryFileInfo> cache;
|
||||||
|
private AtomicInteger mapSize;
|
||||||
|
|
||||||
|
JobIdHistoryFileInfoMap() {
|
||||||
|
cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
|
||||||
|
mapSize = new AtomicInteger();
|
||||||
|
}
|
||||||
|
|
||||||
|
public HistoryFileInfo putIfAbsent(JobId key, HistoryFileInfo value) {
|
||||||
|
HistoryFileInfo ret = cache.putIfAbsent(key, value);
|
||||||
|
if (ret == null) {
|
||||||
|
mapSize.incrementAndGet();
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
public HistoryFileInfo remove(JobId key) {
|
||||||
|
HistoryFileInfo ret = cache.remove(key);
|
||||||
|
if (ret != null) {
|
||||||
|
mapSize.decrementAndGet();
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the recorded size of the internal map. Note that this could be out
|
||||||
|
* of sync with the actual size of the map
|
||||||
|
* @return "recorded" size
|
||||||
|
*/
|
||||||
|
public int size() {
|
||||||
|
return mapSize.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
public HistoryFileInfo get(JobId key) {
|
||||||
|
return cache.get(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public NavigableSet<JobId> navigableKeySet() {
|
||||||
|
return cache.navigableKeySet();
|
||||||
|
}
|
||||||
|
|
||||||
|
public Collection<HistoryFileInfo> values() {
|
||||||
|
return cache.values();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class JobListCache {
|
||||||
|
private JobIdHistoryFileInfoMap cache;
|
||||||
private int maxSize;
|
private int maxSize;
|
||||||
private long maxAge;
|
private long maxAge;
|
||||||
|
|
||||||
public JobListCache(int maxSize, long maxAge) {
|
public JobListCache(int maxSize, long maxAge) {
|
||||||
this.maxSize = maxSize;
|
this.maxSize = maxSize;
|
||||||
this.maxAge = maxAge;
|
this.maxAge = maxAge;
|
||||||
this.cache = new ConcurrentSkipListMap<JobId, HistoryFileInfo>();
|
this.cache = new JobIdHistoryFileInfoMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
|
public HistoryFileInfo addIfAbsent(HistoryFileInfo fileInfo) {
|
||||||
JobId jobId = fileInfo.getJobIndexInfo().getJobId();
|
JobId jobId = fileInfo.getJobId();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Adding " + jobId + " to job list cache with "
|
LOG.debug("Adding " + jobId + " to job list cache with "
|
||||||
+ fileInfo.getJobIndexInfo());
|
+ fileInfo.getJobIndexInfo());
|
||||||
|
|
|
@ -0,0 +1,78 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.v2.hs;
|
||||||
|
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.NavigableSet;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobIdHistoryFileInfoMap;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
public class TestJobIdHistoryFileInfoMap {
|
||||||
|
|
||||||
|
private boolean checkSize(JobIdHistoryFileInfoMap map, int size)
|
||||||
|
throws InterruptedException {
|
||||||
|
for (int i = 0; i < 100; i++) {
|
||||||
|
if (map.size() != size)
|
||||||
|
Thread.sleep(20);
|
||||||
|
else
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Trivial test case that verifies basic functionality of {@link
|
||||||
|
* JobIdHistoryFileInfoMap}
|
||||||
|
*/
|
||||||
|
@Test(timeout = 2000)
|
||||||
|
public void testWithSingleElement() throws InterruptedException {
|
||||||
|
JobIdHistoryFileInfoMap mapWithSize = new JobIdHistoryFileInfoMap();
|
||||||
|
|
||||||
|
JobId jobId = MRBuilderUtils.newJobId(1, 1, 1);
|
||||||
|
HistoryFileInfo fileInfo1 = Mockito.mock(HistoryFileInfo.class);
|
||||||
|
Mockito.when(fileInfo1.getJobId()).thenReturn(jobId);
|
||||||
|
|
||||||
|
// add it twice
|
||||||
|
assertEquals("Incorrect return on putIfAbsent()",
|
||||||
|
null, mapWithSize.putIfAbsent(jobId, fileInfo1));
|
||||||
|
assertEquals("Incorrect return on putIfAbsent()",
|
||||||
|
fileInfo1, mapWithSize.putIfAbsent(jobId, fileInfo1));
|
||||||
|
|
||||||
|
// check get()
|
||||||
|
assertEquals("Incorrect get()", fileInfo1, mapWithSize.get(jobId));
|
||||||
|
assertTrue("Incorrect size()", checkSize(mapWithSize, 1));
|
||||||
|
|
||||||
|
// check navigableKeySet()
|
||||||
|
NavigableSet<JobId> set = mapWithSize.navigableKeySet();
|
||||||
|
assertEquals("Incorrect navigableKeySet()", 1, set.size());
|
||||||
|
assertTrue("Incorrect navigableKeySet()", set.contains(jobId));
|
||||||
|
|
||||||
|
// check values()
|
||||||
|
Collection<HistoryFileInfo> values = mapWithSize.values();
|
||||||
|
assertEquals("Incorrect values()", 1, values.size());
|
||||||
|
assertTrue("Incorrect values()", values.contains(fileInfo1));
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,82 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.mapreduce.v2.hs;
|
||||||
|
|
||||||
|
import java.lang.InterruptedException;
|
||||||
|
import java.util.Collection;
|
||||||
|
|
||||||
|
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.HistoryFileInfo;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.hs.HistoryFileManager.JobListCache;
|
||||||
|
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
public class TestJobListCache {
|
||||||
|
|
||||||
|
@Test (timeout = 1000)
|
||||||
|
public void testAddExisting() {
|
||||||
|
JobListCache cache = new JobListCache(2, 1000);
|
||||||
|
|
||||||
|
JobId jobId = MRBuilderUtils.newJobId(1, 1, 1);
|
||||||
|
HistoryFileInfo fileInfo = Mockito.mock(HistoryFileInfo.class);
|
||||||
|
Mockito.when(fileInfo.getJobId()).thenReturn(jobId);
|
||||||
|
|
||||||
|
cache.addIfAbsent(fileInfo);
|
||||||
|
cache.addIfAbsent(fileInfo);
|
||||||
|
assertEquals("Incorrect number of cache entries", 1,
|
||||||
|
cache.values().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 1000)
|
||||||
|
public void testEviction() throws InterruptedException {
|
||||||
|
int maxSize = 2;
|
||||||
|
JobListCache cache = new JobListCache(maxSize, 1000);
|
||||||
|
|
||||||
|
JobId jobId1 = MRBuilderUtils.newJobId(1, 1, 1);
|
||||||
|
HistoryFileInfo fileInfo1 = Mockito.mock(HistoryFileInfo.class);
|
||||||
|
Mockito.when(fileInfo1.getJobId()).thenReturn(jobId1);
|
||||||
|
|
||||||
|
JobId jobId2 = MRBuilderUtils.newJobId(2, 2, 2);
|
||||||
|
HistoryFileInfo fileInfo2 = Mockito.mock(HistoryFileInfo.class);
|
||||||
|
Mockito.when(fileInfo2.getJobId()).thenReturn(jobId2);
|
||||||
|
|
||||||
|
JobId jobId3 = MRBuilderUtils.newJobId(3, 3, 3);
|
||||||
|
HistoryFileInfo fileInfo3 = Mockito.mock(HistoryFileInfo.class);
|
||||||
|
Mockito.when(fileInfo3.getJobId()).thenReturn(jobId3);
|
||||||
|
|
||||||
|
cache.addIfAbsent(fileInfo1);
|
||||||
|
cache.addIfAbsent(fileInfo2);
|
||||||
|
cache.addIfAbsent(fileInfo3);
|
||||||
|
|
||||||
|
Collection <HistoryFileInfo> values;
|
||||||
|
for (int i = 0; i < 9; i++) {
|
||||||
|
values = cache.values();
|
||||||
|
if (values.size() > maxSize) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} else {
|
||||||
|
assertFalse("fileInfo1 should have been evicted",
|
||||||
|
values.contains(fileInfo1));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fail("JobListCache didn't delete the extra entry");
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue