Merge trunk into HA branch.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1294255 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2012-02-27 18:13:19 +00:00
commit 5fd47e77ab
11 changed files with 660 additions and 462 deletions

File diff suppressed because it is too large Load Diff

View File

@ -326,12 +326,10 @@ public abstract class AbstractCounters<C extends Counter,
*/ */
public synchronized void incrAllCounters(AbstractCounters<C, G> other) { public synchronized void incrAllCounters(AbstractCounters<C, G> other) {
for(G right : other) { for(G right : other) {
G left = groups.get(right.getName()); String groupName = right.getName();
G left = (isFrameworkGroup(groupName) ? fgroups : groups).get(groupName);
if (left == null) { if (left == null) {
limits.checkGroups(groups.size() + 1); left = addGroup(groupName, right.getDisplayName());
left = groupFactory.newGroup(right.getName(), right.getDisplayName(),
limits);
groups.put(right.getName(), left);
} }
left.incrAllCounters(right); left.incrAllCounters(right);
} }

View File

@ -107,6 +107,8 @@ public abstract class CounterGroupFactory<C extends Counter,
if (gf != null) return gf.newGroup(name); if (gf != null) return gf.newGroup(name);
if (name.equals(FS_GROUP_NAME)) { if (name.equals(FS_GROUP_NAME)) {
return newFileSystemGroup(); return newFileSystemGroup();
} else if (s2i.get(name) != null) {
return newFrameworkGroup(s2i.get(name));
} }
return newGenericGroup(name, displayName, limits); return newGenericGroup(name, displayName, limits);
} }

View File

@ -70,6 +70,29 @@ public class TestCounters {
testMaxGroups(new Counters()); testMaxGroups(new Counters());
} }
} }
@Test
public void testCountersIncrement() {
Counters fCounters = new Counters();
Counter fCounter = fCounters.findCounter(FRAMEWORK_COUNTER);
fCounter.setValue(100);
Counter gCounter = fCounters.findCounter("test", "foo");
gCounter.setValue(200);
Counters counters = new Counters();
counters.incrAllCounters(fCounters);
Counter counter;
for (CounterGroup cg : fCounters) {
CounterGroup group = counters.getGroup(cg.getName());
if (group.getName().equals("test")) {
counter = counters.findCounter("test", "foo");
assertEquals(200, counter.getValue());
} else {
counter = counters.findCounter(FRAMEWORK_COUNTER);
assertEquals(100, counter.getValue());
}
}
}
static final Enum<?> FRAMEWORK_COUNTER = TaskCounter.CPU_MILLISECONDS; static final Enum<?> FRAMEWORK_COUNTER = TaskCounter.CPU_MILLISECONDS;
static final long FRAMEWORK_COUNTER_VALUE = 8; static final long FRAMEWORK_COUNTER_VALUE = 8;

View File

@ -28,14 +28,37 @@ import org.apache.hadoop.conf.Configuration;
public abstract class AbstractService implements Service { public abstract class AbstractService implements Service {
private static final Log LOG = LogFactory.getLog(AbstractService.class); private static final Log LOG = LogFactory.getLog(AbstractService.class);
/**
* Service state: initially {@link STATE#NOTINITED}.
*/
private STATE state = STATE.NOTINITED; private STATE state = STATE.NOTINITED;
/**
* Service name.
*/
private final String name; private final String name;
/**
* Service start time. Will be zero until the service is started.
*/
private long startTime; private long startTime;
/**
* The configuration. Will be null until the service is initialized.
*/
private Configuration config; private Configuration config;
/**
* List of state change listeners; it is final to ensure
* that it will never be null.
*/
private List<ServiceStateChangeListener> listeners = private List<ServiceStateChangeListener> listeners =
new ArrayList<ServiceStateChangeListener>(); new ArrayList<ServiceStateChangeListener>();
/**
* Construct the service.
* @param name service name
*/
public AbstractService(String name) { public AbstractService(String name) {
this.name = name; this.name = name;
} }
@ -45,6 +68,11 @@ public abstract class AbstractService implements Service {
return state; return state;
} }
/**
* {@inheritDoc}
* @throws IllegalStateException if the current service state does not permit
* this action
*/
@Override @Override
public synchronized void init(Configuration conf) { public synchronized void init(Configuration conf) {
ensureCurrentState(STATE.NOTINITED); ensureCurrentState(STATE.NOTINITED);
@ -53,6 +81,11 @@ public abstract class AbstractService implements Service {
LOG.info("Service:" + getName() + " is inited."); LOG.info("Service:" + getName() + " is inited.");
} }
/**
* {@inheritDoc}
* @throws IllegalStateException if the current service state does not permit
* this action
*/
@Override @Override
public synchronized void start() { public synchronized void start() {
startTime = System.currentTimeMillis(); startTime = System.currentTimeMillis();
@ -61,6 +94,11 @@ public abstract class AbstractService implements Service {
LOG.info("Service:" + getName() + " is started."); LOG.info("Service:" + getName() + " is started.");
} }
/**
* {@inheritDoc}
* @throws IllegalStateException if the current service state does not permit
* this action
*/
@Override @Override
public synchronized void stop() { public synchronized void stop() {
if (state == STATE.STOPPED || if (state == STATE.STOPPED ||
@ -100,6 +138,12 @@ public abstract class AbstractService implements Service {
return startTime; return startTime;
} }
/**
* Verify that that a service is in a given state.
* @param currentState the desired state
* @throws IllegalStateException if the service state is different from
* the desired state
*/
private void ensureCurrentState(STATE currentState) { private void ensureCurrentState(STATE currentState) {
if (state != currentState) { if (state != currentState) {
throw new IllegalStateException("For this operation, current State must " + throw new IllegalStateException("For this operation, current State must " +
@ -107,6 +151,14 @@ public abstract class AbstractService implements Service {
} }
} }
/**
* Change to a new state and notify all listeners.
* This is a private method that is only invoked from synchronized methods,
* which avoid having to clone the listener list. It does imply that
* the state change listener methods should be short lived, as they
* will delay the state transition.
* @param newState new service state
*/
private void changeState(STATE newState) { private void changeState(STATE newState) {
state = newState; state = newState;
//notify listeners //notify listeners

View File

@ -25,21 +25,87 @@ import org.apache.hadoop.conf.Configuration;
*/ */
public interface Service { public interface Service {
/**
* Service states
*/
public enum STATE { public enum STATE {
/** Constructed but not initialized */
NOTINITED, NOTINITED,
/** Initialized but not started or stopped */
INITED, INITED,
/** started and not stopped */
STARTED, STARTED,
STOPPED;
/** stopped. No further state transitions are permitted */
STOPPED
} }
/**
* Initialize the service.
*
* The transition must be from {@link STATE#NOTINITED} to {@link STATE#INITED}
* unless the operation failed and an exception was raised.
* @param config the configuration of the service
*/
void init(Configuration config); void init(Configuration config);
/**
* Start the service.
*
* The transition should be from {@link STATE#INITED} to {@link STATE#STARTED}
* unless the operation failed and an exception was raised.
*/
void start(); void start();
/**
* Stop the service.
*
* This operation must be designed to complete regardless of the initial state
* of the service, including the state of all its internal fields.
*/
void stop(); void stop();
/**
* Register an instance of the service state change events.
* @param listener a new listener
*/
void register(ServiceStateChangeListener listener); void register(ServiceStateChangeListener listener);
/**
* Unregister a previously instance of the service state change events.
* @param listener the listener to unregister.
*/
void unregister(ServiceStateChangeListener listener); void unregister(ServiceStateChangeListener listener);
/**
* Get the name of this service.
* @return the service name
*/
String getName(); String getName();
/**
* Get the configuration of this service.
* This is normally not a clone and may be manipulated, though there are no
* guarantees as to what the consequences of such actions may be
* @return the current configuration, unless a specific implentation chooses
* otherwise.
*/
Configuration getConfig(); Configuration getConfig();
/**
* Get the current service state
* @return the state of the service
*/
STATE getServiceState(); STATE getServiceState();
/**
* Get the service start time
* @return the start time of the service. This will be zero if the service
* has not yet been started.
*/
long getStartTime(); long getStartTime();
} }

View File

@ -23,6 +23,23 @@ package org.apache.hadoop.yarn.service;
*/ */
public interface ServiceStateChangeListener { public interface ServiceStateChangeListener {
/**
* Callback to notify of a state change. The service will already
* have changed state before this callback is invoked.
*
* This operation is invoked on the thread that initiated the state change,
* while the service itself in in a sychronized section.
* <ol>
* <li>Any long-lived operation here will prevent the service state
* change from completing in a timely manner.</li>
* <li>If another thread is somehow invoked from the listener, and
* that thread invokes the methods of the service (including
* subclass-specific methods), there is a risk of a deadlock.</li>
* </ol>
*
*
* @param service the service that has changed.
*/
void stateChanged(Service service); void stateChanged(Service service);
} }

View File

@ -572,12 +572,7 @@ public class LeafQueue implements CSQueue {
// Careful! Locking order is important! // Careful! Locking order is important!
// Check queue ACLs // Check queue ACLs
UserGroupInformation userUgi; UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(userName);
try {
userUgi = UserGroupInformation.getCurrentUser();
} catch (IOException ioe) {
throw new AccessControlException(ioe);
}
if (!hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)) { if (!hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)) {
throw new AccessControlException("User " + userName + " cannot submit" + throw new AccessControlException("User " + userName + " cannot submit" +
" applications to queue " + getQueuePath()); " applications to queue " + getQueuePath());

View File

@ -30,7 +30,6 @@ import static org.apache.hadoop.yarn.webapp.view.JQueryUI.*;
public class RmView extends TwoColumnLayout { public class RmView extends TwoColumnLayout {
static final int MAX_DISPLAY_ROWS = 100; // direct table rendering static final int MAX_DISPLAY_ROWS = 100; // direct table rendering
static final int MAX_FAST_ROWS = 1000; // inline js array static final int MAX_FAST_ROWS = 1000; // inline js array
static final int MAX_INLINE_ROWS = 2000; // ajax load
@Override @Override
protected void preHead(Page.HTML<_> html) { protected void preHead(Page.HTML<_> html) {
@ -81,11 +80,6 @@ public class RmView extends TwoColumnLayout {
if (list.apps.size() > MAX_FAST_ROWS) { if (list.apps.size() > MAX_FAST_ROWS) {
tableInitProgress(init, list.apps.size() * 6); tableInitProgress(init, list.apps.size() * 6);
} }
if (list.apps.size() > MAX_INLINE_ROWS) {
list.rendering = Render.JS_LOAD;
return init.append(", sAjaxSource:'").append(url("apps", "json")).
append("'}").toString();
}
list.rendering = Render.JS_ARRAY; list.rendering = Render.JS_ARRAY;
return init.append(", aaData:appsData}").toString(); return init.append(", aaData:appsData}").toString();
} }

View File

@ -119,10 +119,11 @@ public class TestLeafQueue {
private static final String B = "b"; private static final String B = "b";
private static final String C = "c"; private static final String C = "c";
private static final String C1 = "c1"; private static final String C1 = "c1";
private static final String D = "d";
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
// Define top-level queues // Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, C}); conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, C, D});
conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100); conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100);
conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100); conf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT, 100);
conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " "); conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
@ -133,7 +134,7 @@ public class TestLeafQueue {
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*"); conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B; final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
conf.setCapacity(Q_B, 90); conf.setCapacity(Q_B, 80);
conf.setMaximumCapacity(Q_B, 99); conf.setMaximumCapacity(Q_B, 99);
conf.setAcl(Q_B, QueueACL.SUBMIT_APPLICATIONS, "*"); conf.setAcl(Q_B, QueueACL.SUBMIT_APPLICATIONS, "*");
@ -146,6 +147,11 @@ public class TestLeafQueue {
final String Q_C1 = Q_C + "." + C1; final String Q_C1 = Q_C + "." + C1;
conf.setCapacity(Q_C1, 100); conf.setCapacity(Q_C1, 100);
final String Q_D = CapacitySchedulerConfiguration.ROOT + "." + D;
conf.setCapacity(Q_D, 10);
conf.setMaximumCapacity(Q_D, 11);
conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d");
} }
@ -202,8 +208,8 @@ public class TestLeafQueue {
assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon); assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B)); LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
assertEquals(0.9, b.getCapacity(), epsilon); assertEquals(0.80, b.getCapacity(), epsilon);
assertEquals(0.9, b.getAbsoluteCapacity(), epsilon); assertEquals(0.80, b.getAbsoluteCapacity(), epsilon);
assertEquals(0.99, b.getMaximumCapacity(), epsilon); assertEquals(0.99, b.getMaximumCapacity(), epsilon);
assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon); assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
@ -257,9 +263,34 @@ public class TestLeafQueue {
// Only 1 container // Only 1 container
a.assignContainers(clusterResource, node_0); a.assignContainers(clusterResource, node_0);
assertEquals(7*GB, a.getMetrics().getAvailableMB()); assertEquals(6*GB, a.getMetrics().getAvailableMB());
} }
@Test
public void testUserQueueAcl() throws Exception {
// Manipulate queue 'a'
LeafQueue d = stubLeafQueue((LeafQueue) queues.get(D));
// Users
final String user_d = "user_d";
// Submit applications
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 1);
SchedulerApp app_0 = new SchedulerApp(appAttemptId_0, user_d, d, null,
rmContext, null);
d.submitApplication(app_0, user_d, D);
// Attempt the same application again
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(0, 2);
SchedulerApp app_1 = new SchedulerApp(appAttemptId_1, user_d, d, null,
rmContext, null);
d.submitApplication(app_1, user_d, D); // same user
}
@Test @Test
public void testAppAttemptMetrics() throws Exception { public void testAppAttemptMetrics() throws Exception {

View File

@ -177,8 +177,10 @@ public class TestRMWebApp {
public static ResourceManager mockRm(RMContext rmContext) throws IOException { public static ResourceManager mockRm(RMContext rmContext) throws IOException {
ResourceManager rm = mock(ResourceManager.class); ResourceManager rm = mock(ResourceManager.class);
ResourceScheduler rs = mockCapacityScheduler(); ResourceScheduler rs = mockCapacityScheduler();
ApplicationACLsManager aclMgr = mockAppACLsManager();
when(rm.getResourceScheduler()).thenReturn(rs); when(rm.getResourceScheduler()).thenReturn(rs);
when(rm.getRMContext()).thenReturn(rmContext); when(rm.getRMContext()).thenReturn(rmContext);
when(rm.getApplicationACLsManager()).thenReturn(aclMgr);
return rm; return rm;
} }
@ -192,6 +194,11 @@ public class TestRMWebApp {
return cs; return cs;
} }
public static ApplicationACLsManager mockAppACLsManager() {
Configuration conf = new Configuration();
return new ApplicationACLsManager(conf);
}
static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { static void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
// Define top-level queues // Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"}); conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
@ -271,7 +278,7 @@ public class TestRMWebApp {
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
// For manual testing // For manual testing
WebApps.$for("yarn", new TestRMWebApp()).at(8888).inDevMode(). WebApps.$for("yarn", new TestRMWebApp()).at(8888).inDevMode().
start(new RMWebApp(mockRm(101, 8, 8, 8*GiB))).joinThread(); start(new RMWebApp(mockRm(2500, 8, 8, 8*GiB))).joinThread();
WebApps.$for("yarn", new TestRMWebApp()).at(8888).inDevMode(). WebApps.$for("yarn", new TestRMWebApp()).at(8888).inDevMode().
start(new RMWebApp(mockFifoRm(10, 1, 4, 8*GiB))).joinThread(); start(new RMWebApp(mockFifoRm(10, 1, 4, 8*GiB))).joinThread();
} }