YARN-11102. Fix spotbugs error in hadoop-sls module. Contributed by Szilard Nemeth, Andras Gyori.
This commit is contained in:
parent
15a5ea2c95
commit
4b1a6bfb10
|
@ -44,8 +44,11 @@ import java.io.IOException;
|
||||||
import java.io.InputStreamReader;
|
import java.io.InputStreamReader;
|
||||||
import java.io.Reader;
|
import java.io.Reader;
|
||||||
import java.nio.charset.StandardCharsets;
|
import java.nio.charset.StandardCharsets;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -53,7 +56,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
public class AMRunner {
|
public class AMRunner {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(AMRunner.class);
|
private static final Logger LOG = LoggerFactory.getLogger(AMRunner.class);
|
||||||
static int remainingApps = 0;
|
int remainingApps = 0;
|
||||||
|
|
||||||
private final Configuration conf;
|
private final Configuration conf;
|
||||||
private int AM_ID;
|
private int AM_ID;
|
||||||
|
@ -263,7 +266,7 @@ public class AMRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setInputTraces(String[] inputTraces) {
|
public void setInputTraces(String[] inputTraces) {
|
||||||
this.inputTraces = inputTraces;
|
this.inputTraces = inputTraces.clone();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setResourceManager(ResourceManager rm) {
|
public void setResourceManager(ResourceManager rm) {
|
||||||
|
|
|
@ -205,7 +205,7 @@ public class NMRunner {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setInputTraces(String[] inputTraces) {
|
public void setInputTraces(String[] inputTraces) {
|
||||||
this.inputTraces = inputTraces;
|
this.inputTraces = inputTraces.clone();
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getNumNMs() {
|
public int getNumNMs() {
|
||||||
|
|
|
@ -98,8 +98,8 @@ public class SLSRunner extends Configured implements Tool {
|
||||||
public static final String NETWORK_NEGATIVE_CACHE_TTL =
|
public static final String NETWORK_NEGATIVE_CACHE_TTL =
|
||||||
"networkaddress.cache.negative.ttl";
|
"networkaddress.cache.negative.ttl";
|
||||||
|
|
||||||
public static int getRemainingApps() {
|
public int getRemainingApps() {
|
||||||
return AMRunner.remainingApps;
|
return amRunner.remainingApps;
|
||||||
}
|
}
|
||||||
|
|
||||||
public SLSRunner() throws ClassNotFoundException, YarnException {
|
public SLSRunner() throws ClassNotFoundException, YarnException {
|
||||||
|
@ -204,6 +204,7 @@ public class SLSRunner extends Configured implements Tool {
|
||||||
// set queue & tracked apps information
|
// set queue & tracked apps information
|
||||||
SchedulerWrapper resourceScheduler =
|
SchedulerWrapper resourceScheduler =
|
||||||
(SchedulerWrapper) rmRunner.getRm().getResourceScheduler();
|
(SchedulerWrapper) rmRunner.getRm().getResourceScheduler();
|
||||||
|
resourceScheduler.setSLSRunner(this);
|
||||||
Tracker tracker = resourceScheduler.getTracker();
|
Tracker tracker = resourceScheduler.getTracker();
|
||||||
tracker.setQueueSet(rmRunner.getQueueAppNumMap().keySet());
|
tracker.setQueueSet(rmRunner.getQueueAppNumMap().keySet());
|
||||||
tracker.setTrackedAppSet(amRunner.getTrackedApps());
|
tracker.setTrackedAppSet(amRunner.getTrackedApps());
|
||||||
|
@ -301,9 +302,9 @@ public class SLSRunner extends Configured implements Tool {
|
||||||
return nmRunner.getNmMap();
|
return nmRunner.getNmMap();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void decreaseRemainingApps() {
|
public void decreaseRemainingApps() {
|
||||||
AMRunner.remainingApps--;
|
amRunner.remainingApps--;
|
||||||
if (AMRunner.remainingApps == 0) {
|
if (amRunner.remainingApps == 0) {
|
||||||
exitSLSRunner();
|
exitSLSRunner();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.sls.SLSRunner;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
|
@ -43,6 +44,7 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
|
||||||
|
|
||||||
private final SLSSchedulerCommons schedulerCommons;
|
private final SLSSchedulerCommons schedulerCommons;
|
||||||
private Configuration conf;
|
private Configuration conf;
|
||||||
|
private SLSRunner runner;
|
||||||
|
|
||||||
public SLSCapacityScheduler() {
|
public SLSCapacityScheduler() {
|
||||||
schedulerCommons = new SLSSchedulerCommons(this);
|
schedulerCommons = new SLSSchedulerCommons(this);
|
||||||
|
@ -138,5 +140,15 @@ public class SLSCapacityScheduler extends CapacityScheduler implements
|
||||||
public Tracker getTracker() {
|
public Tracker getTracker() {
|
||||||
return schedulerCommons.getTracker();
|
return schedulerCommons.getTracker();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setSLSRunner(SLSRunner runner) {
|
||||||
|
this.runner = runner;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SLSRunner getSLSRunner() {
|
||||||
|
return this.runner;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -30,6 +30,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
|
import org.apache.hadoop.yarn.sls.SLSRunner;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
@ -37,6 +39,7 @@ import java.util.List;
|
||||||
public class SLSFairScheduler extends FairScheduler
|
public class SLSFairScheduler extends FairScheduler
|
||||||
implements SchedulerWrapper, Configurable {
|
implements SchedulerWrapper, Configurable {
|
||||||
private final SLSSchedulerCommons schedulerCommons;
|
private final SLSSchedulerCommons schedulerCommons;
|
||||||
|
private SLSRunner runner;
|
||||||
|
|
||||||
public SLSFairScheduler() {
|
public SLSFairScheduler() {
|
||||||
schedulerCommons = new SLSSchedulerCommons(this);
|
schedulerCommons = new SLSSchedulerCommons(this);
|
||||||
|
@ -99,4 +102,14 @@ public class SLSFairScheduler extends FairScheduler
|
||||||
public Tracker getTracker() {
|
public Tracker getTracker() {
|
||||||
return schedulerCommons.getTracker();
|
return schedulerCommons.getTracker();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setSLSRunner(SLSRunner runner) {
|
||||||
|
this.runner = runner;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public SLSRunner getSLSRunner() {
|
||||||
|
return this.runner;
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -205,6 +205,7 @@ public class SLSSchedulerCommons {
|
||||||
}
|
}
|
||||||
|
|
||||||
public void handle(SchedulerEvent schedulerEvent) {
|
public void handle(SchedulerEvent schedulerEvent) {
|
||||||
|
SchedulerWrapper wrapper = (SchedulerWrapper) scheduler;
|
||||||
if (!metricsON) {
|
if (!metricsON) {
|
||||||
((SchedulerWrapper)scheduler).propagatedHandle(schedulerEvent);
|
((SchedulerWrapper)scheduler).propagatedHandle(schedulerEvent);
|
||||||
return;
|
return;
|
||||||
|
@ -259,11 +260,11 @@ public class SLSSchedulerCommons {
|
||||||
|
|
||||||
if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
|
if (schedulerEvent.getType() == SchedulerEventType.APP_ATTEMPT_REMOVED
|
||||||
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
|
&& schedulerEvent instanceof AppAttemptRemovedSchedulerEvent) {
|
||||||
SLSRunner.decreaseRemainingApps();
|
wrapper.getSLSRunner().decreaseRemainingApps();
|
||||||
AppAttemptRemovedSchedulerEvent appRemoveEvent =
|
AppAttemptRemovedSchedulerEvent appRemoveEvent =
|
||||||
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
|
(AppAttemptRemovedSchedulerEvent) schedulerEvent;
|
||||||
appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
|
appQueueMap.remove(appRemoveEvent.getApplicationAttemptID());
|
||||||
if (SLSRunner.getRemainingApps() == 0) {
|
if (wrapper.getSLSRunner().getRemainingApps() == 0) {
|
||||||
try {
|
try {
|
||||||
schedulerMetrics.tearDown();
|
schedulerMetrics.tearDown();
|
||||||
SLSRunner.exitSLSRunner();
|
SLSRunner.exitSLSRunner();
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
|
||||||
|
import org.apache.hadoop.yarn.sls.SLSRunner;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -48,4 +49,8 @@ public interface SchedulerWrapper {
|
||||||
List<String> blacklistAdditions,
|
List<String> blacklistAdditions,
|
||||||
List<String> blacklistRemovals,
|
List<String> blacklistRemovals,
|
||||||
ContainerUpdates updateRequests);
|
ContainerUpdates updateRequests);
|
||||||
|
|
||||||
|
void setSLSRunner(SLSRunner runner);
|
||||||
|
|
||||||
|
SLSRunner getSLSRunner();
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue