YARN-4675. Reorganize TimelineClient and TimelineClientImpl into separate classes for ATSv1.x and ATSv2. Contributed by Naganarasimha G R.
(cherry picked from commit 73235ab30361b41293846189f3c5fef321ae7cac)
This commit is contained in:
parent
a16a684fbb
commit
bb100d35fd
|
@ -73,13 +73,12 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
|||
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.util.TimelineServiceHelper;
|
||||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
import org.codehaus.jackson.node.JsonNodeFactory;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
|
@ -92,8 +91,6 @@ import com.sun.jersey.api.client.ClientHandlerException;
|
|||
*/
|
||||
public class JobHistoryEventHandler extends AbstractService
|
||||
implements EventHandler<JobHistoryEvent> {
|
||||
private static final JsonNodeFactory FACTORY =
|
||||
new ObjectMapper().getNodeFactory();
|
||||
|
||||
private final AppContext context;
|
||||
private final int startCount;
|
||||
|
@ -135,9 +132,10 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
// should job completion be force when the AM shuts down?
|
||||
protected volatile boolean forceJobCompletion = false;
|
||||
|
||||
@VisibleForTesting
|
||||
protected TimelineClient timelineClient;
|
||||
|
||||
private boolean timelineServiceV2Enabled = false;
|
||||
@VisibleForTesting
|
||||
protected TimelineV2Client timelineV2Client;
|
||||
|
||||
private static String MAPREDUCE_JOB_ENTITY_TYPE = "MAPREDUCE_JOB";
|
||||
private static String MAPREDUCE_TASK_ENTITY_TYPE = "MAPREDUCE_TASK";
|
||||
|
@ -270,12 +268,17 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
MRJobConfig.DEFAULT_MAPREDUCE_JOB_EMIT_TIMELINE_DATA)) {
|
||||
LOG.info("Emitting job history data to the timeline service is enabled");
|
||||
if (YarnConfiguration.timelineServiceEnabled(conf)) {
|
||||
|
||||
timelineClient =
|
||||
((MRAppMaster.RunningAppContext)context).getTimelineClient();
|
||||
timelineClient.init(conf);
|
||||
timelineServiceV2Enabled =
|
||||
YarnConfiguration.timelineServiceV2Enabled(conf);
|
||||
boolean timelineServiceV2Enabled =
|
||||
((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
|
||||
if(timelineServiceV2Enabled) {
|
||||
timelineV2Client =
|
||||
((MRAppMaster.RunningAppContext)context).getTimelineV2Client();
|
||||
timelineV2Client.init(conf);
|
||||
} else {
|
||||
timelineClient =
|
||||
((MRAppMaster.RunningAppContext) context).getTimelineClient();
|
||||
timelineClient.init(conf);
|
||||
}
|
||||
LOG.info("Timeline service is enabled; version: " +
|
||||
YarnConfiguration.getTimelineServiceVersion(conf));
|
||||
} else {
|
||||
|
@ -326,6 +329,8 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
protected void serviceStart() throws Exception {
|
||||
if (timelineClient != null) {
|
||||
timelineClient.start();
|
||||
} else if (timelineV2Client != null) {
|
||||
timelineV2Client.start();
|
||||
}
|
||||
eventHandlingThread = new Thread(new Runnable() {
|
||||
@Override
|
||||
|
@ -450,6 +455,8 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
}
|
||||
if (timelineClient != null) {
|
||||
timelineClient.stop();
|
||||
} else if (timelineV2Client != null) {
|
||||
timelineV2Client.stop();
|
||||
}
|
||||
LOG.info("Stopped JobHistoryEventHandler. super.stop()");
|
||||
super.serviceStop();
|
||||
|
@ -607,14 +614,12 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
}
|
||||
processEventForJobSummary(event.getHistoryEvent(), mi.getJobSummary(),
|
||||
event.getJobID());
|
||||
if (timelineClient != null) {
|
||||
if (timelineServiceV2Enabled) {
|
||||
processEventForNewTimelineService(historyEvent, event.getJobID(),
|
||||
event.getTimestamp());
|
||||
} else {
|
||||
processEventForTimelineServer(historyEvent, event.getJobID(),
|
||||
event.getTimestamp());
|
||||
}
|
||||
if (timelineV2Client != null) {
|
||||
processEventForNewTimelineService(historyEvent, event.getJobID(),
|
||||
event.getTimestamp());
|
||||
} else if (timelineClient != null) {
|
||||
processEventForTimelineServer(historyEvent, event.getJobID(),
|
||||
event.getTimestamp());
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("In HistoryEventHandler "
|
||||
|
@ -1167,8 +1172,8 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
configSize += size;
|
||||
if (configSize > JobHistoryEventUtils.ATS_CONFIG_PUBLISH_SIZE_BYTES) {
|
||||
if (jobEntityForConfigs.getConfigs().size() > 0) {
|
||||
timelineClient.putEntities(jobEntityForConfigs);
|
||||
timelineClient.putEntities(appEntityForConfigs);
|
||||
timelineV2Client.putEntities(jobEntityForConfigs);
|
||||
timelineV2Client.putEntities(appEntityForConfigs);
|
||||
jobEntityForConfigs = createJobEntity(jobId);
|
||||
appEntityForConfigs = new ApplicationEntity();
|
||||
appEntityForConfigs.setId(appId);
|
||||
|
@ -1179,8 +1184,8 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
appEntityForConfigs.addConfig(entry.getKey(), entry.getValue());
|
||||
}
|
||||
if (configSize > 0) {
|
||||
timelineClient.putEntities(jobEntityForConfigs);
|
||||
timelineClient.putEntities(appEntityForConfigs);
|
||||
timelineV2Client.putEntities(jobEntityForConfigs);
|
||||
timelineV2Client.putEntities(appEntityForConfigs);
|
||||
}
|
||||
} catch (IOException | YarnException e) {
|
||||
LOG.error("Exception while publishing configs on JOB_SUBMITTED Event " +
|
||||
|
@ -1316,9 +1321,9 @@ public class JobHistoryEventHandler extends AbstractService
|
|||
}
|
||||
try {
|
||||
if (appEntityWithJobMetrics == null) {
|
||||
timelineClient.putEntitiesAsync(tEntity);
|
||||
timelineV2Client.putEntitiesAsync(tEntity);
|
||||
} else {
|
||||
timelineClient.putEntities(tEntity, appEntityWithJobMetrics);
|
||||
timelineV2Client.putEntities(tEntity, appEntityWithJobMetrics);
|
||||
}
|
||||
} catch (IOException | YarnException e) {
|
||||
LOG.error("Failed to process Event " + event.getEventType()
|
||||
|
|
|
@ -38,6 +38,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import javax.crypto.KeyGenerator;
|
||||
|
||||
import org.apache.commons.io.IOUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
@ -138,6 +140,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
|
@ -151,8 +154,6 @@ import org.apache.hadoop.yarn.util.SystemClock;
|
|||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
import javax.crypto.KeyGenerator;
|
||||
|
||||
/**
|
||||
* The Map-Reduce Application Master.
|
||||
* The state machine is encapsulated in the implementation of Job interface.
|
||||
|
@ -1059,6 +1060,7 @@ public class MRAppMaster extends CompositeService {
|
|||
private final ClientToAMTokenSecretManager clientToAMTokenSecretManager;
|
||||
private String historyUrl = null;
|
||||
private TimelineClient timelineClient = null;
|
||||
private TimelineV2Client timelineV2Client = null;
|
||||
|
||||
private final TaskAttemptFinishingMonitor taskAttemptFinishingMonitor;
|
||||
|
||||
|
@ -1074,7 +1076,7 @@ public class MRAppMaster extends CompositeService {
|
|||
|
||||
if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
|
||||
// create new version TimelineClient
|
||||
timelineClient = TimelineClient.createTimelineClient(
|
||||
timelineV2Client = TimelineV2Client.createTimelineClient(
|
||||
appAttemptID.getApplicationId());
|
||||
} else {
|
||||
timelineClient = TimelineClient.createTimelineClient();
|
||||
|
@ -1180,10 +1182,14 @@ public class MRAppMaster extends CompositeService {
|
|||
this.historyUrl = historyUrl;
|
||||
}
|
||||
|
||||
// Get Timeline Collector's address (get sync from RM)
|
||||
public TimelineClient getTimelineClient() {
|
||||
return timelineClient;
|
||||
}
|
||||
|
||||
// Get Timeline Collector's address (get sync from RM)
|
||||
public TimelineV2Client getTimelineV2Client() {
|
||||
return timelineV2Client;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
|
|
@ -864,8 +864,8 @@ public class RMContainerAllocator extends RMContainerRequestor
|
|||
MRAppMaster.RunningAppContext appContext =
|
||||
(MRAppMaster.RunningAppContext)this.getContext();
|
||||
if (collectorAddr != null && !collectorAddr.isEmpty()
|
||||
&& appContext.getTimelineClient() != null) {
|
||||
appContext.getTimelineClient().setTimelineServiceAddress(
|
||||
&& appContext.getTimelineV2Client() != null) {
|
||||
appContext.getTimelineV2Client().setTimelineServiceAddress(
|
||||
response.getCollectorAddr());
|
||||
}
|
||||
|
||||
|
|
|
@ -32,8 +32,8 @@ import static org.mockito.Mockito.when;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.InputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.HashMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
|
@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.server.MiniYARNCluster;
|
||||
|
@ -832,6 +833,9 @@ public class TestJobHistoryEventHandler {
|
|||
if (mockContext instanceof RunningAppContext) {
|
||||
when(((RunningAppContext)mockContext).getTimelineClient()).
|
||||
thenReturn(TimelineClient.createTimelineClient());
|
||||
when(((RunningAppContext) mockContext).getTimelineV2Client())
|
||||
.thenReturn(TimelineV2Client
|
||||
.createTimelineClient(ApplicationId.newInstance(0, 1)));
|
||||
}
|
||||
return mockContext;
|
||||
}
|
||||
|
@ -1038,6 +1042,8 @@ class JHEvenHandlerForTest extends JobHistoryEventHandler {
|
|||
protected void serviceStart() {
|
||||
if (timelineClient != null) {
|
||||
timelineClient.start();
|
||||
} else if (timelineV2Client != null) {
|
||||
timelineV2Client.start();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -97,6 +97,7 @@ import org.apache.hadoop.yarn.api.records.timeline.TimelineEvent;
|
|||
import org.apache.hadoop.yarn.api.records.timeline.TimelinePutResponse;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
||||
import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
|
||||
import org.apache.hadoop.yarn.client.api.async.impl.NMClientAsyncImpl;
|
||||
|
@ -221,7 +222,9 @@ public class ApplicationMaster {
|
|||
// Tracking url to which app master publishes info for clients to monitor
|
||||
private String appMasterTrackingUrl = "";
|
||||
|
||||
private boolean timelineServiceV2 = false;
|
||||
private boolean timelineServiceV2Enabled = false;
|
||||
|
||||
private boolean timelineServiceV1Enabled = false;
|
||||
|
||||
// App Master configuration
|
||||
// No. of containers to run shell command on
|
||||
|
@ -295,6 +298,10 @@ public class ApplicationMaster {
|
|||
// Timeline Client
|
||||
@VisibleForTesting
|
||||
TimelineClient timelineClient;
|
||||
|
||||
// Timeline v2 Client
|
||||
private TimelineV2Client timelineV2Client;
|
||||
|
||||
static final String CONTAINER_ENTITY_GROUP_ID = "CONTAINERS";
|
||||
static final String APPID_TIMELINE_FILTER_NAME = "appId";
|
||||
static final String USER_TIMELINE_FILTER_NAME = "user";
|
||||
|
@ -569,9 +576,12 @@ public class ApplicationMaster {
|
|||
"container_retry_interval", "0"));
|
||||
|
||||
if (YarnConfiguration.timelineServiceEnabled(conf)) {
|
||||
timelineServiceV2 = YarnConfiguration.timelineServiceV2Enabled(conf);
|
||||
timelineServiceV2Enabled =
|
||||
((int) YarnConfiguration.getTimelineServiceVersion(conf) == 2);
|
||||
timelineServiceV1Enabled = !timelineServiceV2Enabled;
|
||||
} else {
|
||||
timelineClient = null;
|
||||
timelineV2Client = null;
|
||||
LOG.warn("Timeline service is not enabled");
|
||||
}
|
||||
|
||||
|
@ -634,18 +644,17 @@ public class ApplicationMaster {
|
|||
nmClientAsync.start();
|
||||
|
||||
startTimelineClient(conf);
|
||||
if (timelineServiceV2) {
|
||||
if (timelineServiceV2Enabled) {
|
||||
// need to bind timelineClient
|
||||
amRMClient.registerTimelineClient(timelineClient);
|
||||
amRMClient.registerTimelineV2Client(timelineV2Client);
|
||||
}
|
||||
if(timelineClient != null) {
|
||||
if (timelineServiceV2) {
|
||||
publishApplicationAttemptEventOnTimelineServiceV2(
|
||||
DSEvent.DS_APP_ATTEMPT_START);
|
||||
} else {
|
||||
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
||||
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
|
||||
}
|
||||
|
||||
if (timelineServiceV2Enabled) {
|
||||
publishApplicationAttemptEventOnTimelineServiceV2(
|
||||
DSEvent.DS_APP_ATTEMPT_START);
|
||||
} else if (timelineServiceV1Enabled) {
|
||||
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
||||
DSEvent.DS_APP_ATTEMPT_START, domainId, appSubmitterUgi);
|
||||
}
|
||||
|
||||
// Setup local RPC Server to accept status requests directly from clients
|
||||
|
@ -717,18 +726,21 @@ public class ApplicationMaster {
|
|||
public Void run() throws Exception {
|
||||
if (YarnConfiguration.timelineServiceEnabled(conf)) {
|
||||
// Creating the Timeline Client
|
||||
if (timelineServiceV2) {
|
||||
timelineClient = TimelineClient.createTimelineClient(
|
||||
if (timelineServiceV2Enabled) {
|
||||
timelineV2Client = TimelineV2Client.createTimelineClient(
|
||||
appAttemptID.getApplicationId());
|
||||
timelineV2Client.init(conf);
|
||||
timelineV2Client.start();
|
||||
LOG.info("Timeline service V2 client is enabled");
|
||||
} else {
|
||||
timelineClient = TimelineClient.createTimelineClient();
|
||||
timelineClient.init(conf);
|
||||
timelineClient.start();
|
||||
LOG.info("Timeline service V1 client is enabled");
|
||||
}
|
||||
timelineClient.init(conf);
|
||||
timelineClient.start();
|
||||
} else {
|
||||
timelineClient = null;
|
||||
timelineV2Client = null;
|
||||
LOG.warn("Timeline service is not enabled");
|
||||
}
|
||||
return null;
|
||||
|
@ -754,14 +766,12 @@ public class ApplicationMaster {
|
|||
} catch (InterruptedException ex) {}
|
||||
}
|
||||
|
||||
if (timelineClient != null) {
|
||||
if (timelineServiceV2) {
|
||||
publishApplicationAttemptEventOnTimelineServiceV2(
|
||||
DSEvent.DS_APP_ATTEMPT_END);
|
||||
} else {
|
||||
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
||||
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
|
||||
}
|
||||
if (timelineServiceV2Enabled) {
|
||||
publishApplicationAttemptEventOnTimelineServiceV2(
|
||||
DSEvent.DS_APP_ATTEMPT_END);
|
||||
} else if (timelineServiceV1Enabled) {
|
||||
publishApplicationAttemptEvent(timelineClient, appAttemptID.toString(),
|
||||
DSEvent.DS_APP_ATTEMPT_END, domainId, appSubmitterUgi);
|
||||
}
|
||||
|
||||
// Join all launched threads
|
||||
|
@ -810,8 +820,10 @@ public class ApplicationMaster {
|
|||
amRMClient.stop();
|
||||
|
||||
// Stop Timeline Client
|
||||
if(timelineClient != null) {
|
||||
if(timelineServiceV1Enabled) {
|
||||
timelineClient.stop();
|
||||
} else if (timelineServiceV2Enabled) {
|
||||
timelineV2Client.stop();
|
||||
}
|
||||
|
||||
return success;
|
||||
|
@ -866,21 +878,19 @@ public class ApplicationMaster {
|
|||
LOG.info("Container completed successfully." + ", containerId="
|
||||
+ containerStatus.getContainerId());
|
||||
}
|
||||
if(timelineClient != null) {
|
||||
if (timelineServiceV2) {
|
||||
Long containerStartTime =
|
||||
containerStartTimes.get(containerStatus.getContainerId());
|
||||
if (containerStartTime == null) {
|
||||
containerStartTime = SystemClock.getInstance().getTime();
|
||||
containerStartTimes.put(containerStatus.getContainerId(),
|
||||
containerStartTime);
|
||||
}
|
||||
publishContainerEndEventOnTimelineServiceV2(containerStatus,
|
||||
if (timelineServiceV2Enabled) {
|
||||
Long containerStartTime =
|
||||
containerStartTimes.get(containerStatus.getContainerId());
|
||||
if (containerStartTime == null) {
|
||||
containerStartTime = SystemClock.getInstance().getTime();
|
||||
containerStartTimes.put(containerStatus.getContainerId(),
|
||||
containerStartTime);
|
||||
} else {
|
||||
publishContainerEndEvent(
|
||||
timelineClient, containerStatus, domainId, appSubmitterUgi);
|
||||
}
|
||||
publishContainerEndEventOnTimelineServiceV2(containerStatus,
|
||||
containerStartTime);
|
||||
} else if (timelineServiceV1Enabled) {
|
||||
publishContainerEndEvent(timelineClient, containerStatus, domainId,
|
||||
appSubmitterUgi);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1004,18 +1014,15 @@ public class ApplicationMaster {
|
|||
applicationMaster.nmClientAsync.getContainerStatusAsync(
|
||||
containerId, container.getNodeId());
|
||||
}
|
||||
if(applicationMaster.timelineClient != null) {
|
||||
if (applicationMaster.timelineServiceV2) {
|
||||
long startTime = SystemClock.getInstance().getTime();
|
||||
applicationMaster.getContainerStartTimes().put(
|
||||
containerId, startTime);
|
||||
applicationMaster.publishContainerStartEventOnTimelineServiceV2(
|
||||
container, startTime);
|
||||
} else {
|
||||
applicationMaster.publishContainerStartEvent(
|
||||
applicationMaster.timelineClient, container,
|
||||
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
|
||||
}
|
||||
if (applicationMaster.timelineServiceV2Enabled) {
|
||||
long startTime = SystemClock.getInstance().getTime();
|
||||
applicationMaster.getContainerStartTimes().put(containerId, startTime);
|
||||
applicationMaster.publishContainerStartEventOnTimelineServiceV2(
|
||||
container, startTime);
|
||||
} else if (applicationMaster.timelineServiceV1Enabled) {
|
||||
applicationMaster.publishContainerStartEvent(
|
||||
applicationMaster.timelineClient, container,
|
||||
applicationMaster.domainId, applicationMaster.appSubmitterUgi);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1406,7 +1413,7 @@ public class ApplicationMaster {
|
|||
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public TimelinePutResponse run() throws Exception {
|
||||
timelineClient.putEntities(entity);
|
||||
timelineV2Client.putEntities(entity);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
@ -1440,7 +1447,7 @@ public class ApplicationMaster {
|
|||
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public TimelinePutResponse run() throws Exception {
|
||||
timelineClient.putEntities(entity);
|
||||
timelineV2Client.putEntities(entity);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
@ -1476,7 +1483,7 @@ public class ApplicationMaster {
|
|||
appSubmitterUgi.doAs(new PrivilegedExceptionAction<Object>() {
|
||||
@Override
|
||||
public TimelinePutResponse run() throws Exception {
|
||||
timelineClient.putEntitiesAsync(entity);
|
||||
timelineV2Client.putEntitiesAsync(entity);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.classification.InterfaceAudience;
|
|||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
|
|||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -54,7 +56,8 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
AbstractService {
|
||||
private static final Log LOG = LogFactory.getLog(AMRMClient.class);
|
||||
|
||||
private TimelineClient timelineClient;
|
||||
private TimelineV2Client timelineV2Client;
|
||||
private boolean timelineServiceV2Enabled;
|
||||
|
||||
/**
|
||||
* Create a new instance of AMRMClient.
|
||||
|
@ -79,6 +82,12 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
nmTokenCache = NMTokenCache.getSingleton();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
timelineServiceV2Enabled = YarnConfiguration.timelineServiceV2Enabled(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Object to represent a single container request for resources. Scheduler
|
||||
* documentation should be consulted for the specifics of how the parameters
|
||||
|
@ -683,19 +692,30 @@ public abstract class AMRMClient<T extends AMRMClient.ContainerRequest> extends
|
|||
}
|
||||
|
||||
/**
|
||||
* Register TimelineClient to AMRMClient.
|
||||
* @param client the timeline client to register
|
||||
* Register TimelineV2Client to AMRMClient. Writer's address for the timeline
|
||||
* V2 client will be updated dynamically if registered.
|
||||
*
|
||||
* @param client the timeline v2 client to register
|
||||
* @throws YarnException when this method is invoked even when ATS V2 is not
|
||||
* configured.
|
||||
*/
|
||||
public void registerTimelineClient(TimelineClient client) {
|
||||
this.timelineClient = client;
|
||||
public void registerTimelineV2Client(TimelineV2Client client)
|
||||
throws YarnException {
|
||||
if (timelineServiceV2Enabled) {
|
||||
timelineV2Client = client;
|
||||
} else {
|
||||
LOG.error("Trying to register timeline v2 client when not configured.");
|
||||
throw new YarnException(
|
||||
"register timeline v2 client when not configured.");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get registered timeline client.
|
||||
* @return the registered timeline client
|
||||
* Get registered timeline v2 client.
|
||||
* @return the registered timeline v2 client
|
||||
*/
|
||||
public TimelineClient getRegisteredTimeineClient() {
|
||||
return this.timelineClient;
|
||||
public TimelineV2Client getRegisteredTimelineV2Client() {
|
||||
return this.timelineV2Client;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,8 +18,6 @@
|
|||
|
||||
package org.apache.hadoop.yarn.client.api.async;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
|
@ -29,8 +27,8 @@ import org.apache.commons.logging.Log;
|
|||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||
import org.apache.hadoop.yarn.api.records.Container;
|
||||
|
@ -46,13 +44,15 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
|||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||
import org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl;
|
||||
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.base.Supplier;
|
||||
|
||||
/**
|
||||
* <code>AMRMClientAsync</code> handles communication with the ResourceManager
|
||||
|
@ -346,17 +346,20 @@ extends AbstractService {
|
|||
/**
|
||||
* Register TimelineClient to AMRMClient.
|
||||
* @param timelineClient
|
||||
* @throws YarnException when this method is invoked even when ATS V2 is not
|
||||
* configured.
|
||||
*/
|
||||
public void registerTimelineClient(TimelineClient timelineClient) {
|
||||
client.registerTimelineClient(timelineClient);
|
||||
public void registerTimelineV2Client(TimelineV2Client timelineClient)
|
||||
throws YarnException {
|
||||
client.registerTimelineV2Client(timelineClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get registered timeline client.
|
||||
* @return the registered timeline client
|
||||
*/
|
||||
public TimelineClient getRegisteredTimeineClient() {
|
||||
return client.getRegisteredTimeineClient();
|
||||
public TimelineV2Client getRegisteredTimelineV2Client() {
|
||||
return client.getRegisteredTimelineV2Client();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.UpdateContainerRequest;
|
|||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient;
|
||||
import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
|
||||
import org.apache.hadoop.yarn.client.api.impl.AMRMClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException;
|
||||
|
@ -325,7 +325,8 @@ extends AMRMClientAsync<T> {
|
|||
AllocateResponse response = (AllocateResponse) object;
|
||||
|
||||
String collectorAddress = response.getCollectorAddr();
|
||||
TimelineClient timelineClient = client.getRegisteredTimeineClient();
|
||||
TimelineV2Client timelineClient =
|
||||
client.getRegisteredTimelineV2Client();
|
||||
if (timelineClient != null && collectorAddress != null
|
||||
&& !collectorAddress.isEmpty()) {
|
||||
if (collectorAddr == null
|
||||
|
|
|
@ -141,7 +141,7 @@ public class YarnClientImpl extends YarnClient {
|
|||
Text timelineService;
|
||||
@VisibleForTesting
|
||||
String timelineDTRenewer;
|
||||
protected boolean timelineServiceEnabled;
|
||||
private boolean timelineV1ServiceEnabled;
|
||||
protected boolean timelineServiceBestEffort;
|
||||
|
||||
private static final String ROOT = "root";
|
||||
|
@ -167,9 +167,14 @@ public class YarnClientImpl extends YarnClient {
|
|||
YarnConfiguration.DEFAULT_YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_INTERVAL_MS);
|
||||
}
|
||||
|
||||
float timelineServiceVersion =
|
||||
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
|
||||
if (conf.getBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)) {
|
||||
timelineServiceEnabled = true;
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_ENABLED)
|
||||
&& ((Float.compare(timelineServiceVersion, 1.0f) == 0)
|
||||
|| (Float.compare(timelineServiceVersion, 1.5f) == 0))) {
|
||||
timelineV1ServiceEnabled = true;
|
||||
timelineDTRenewer = getTimelineDelegationTokenRenewer(conf);
|
||||
timelineService = TimelineUtils.buildTimelineTokenService(conf);
|
||||
}
|
||||
|
@ -178,7 +183,7 @@ public class YarnClientImpl extends YarnClient {
|
|||
// TimelineServer which means we are able to get history information
|
||||
// for applications/applicationAttempts/containers by using ahsClient
|
||||
// when the TimelineServer is running.
|
||||
if (timelineServiceEnabled || conf.getBoolean(
|
||||
if (timelineV1ServiceEnabled || conf.getBoolean(
|
||||
YarnConfiguration.APPLICATION_HISTORY_ENABLED,
|
||||
YarnConfiguration.DEFAULT_APPLICATION_HISTORY_ENABLED)) {
|
||||
historyServiceEnabled = true;
|
||||
|
@ -257,7 +262,7 @@ public class YarnClientImpl extends YarnClient {
|
|||
|
||||
// Automatically add the timeline DT into the CLC
|
||||
// Only when the security and the timeline service are both enabled
|
||||
if (isSecurityEnabled() && timelineServiceEnabled) {
|
||||
if (isSecurityEnabled() && timelineV1ServiceEnabled) {
|
||||
addTimelineDelegationToken(appContext.getAMContainerSpec());
|
||||
}
|
||||
|
||||
|
|
|
@ -21,14 +21,12 @@ package org.apache.hadoop.yarn.client.api;
|
|||
import java.io.Flushable;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntityGroupId;
|
||||
|
@ -39,24 +37,22 @@ import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
|||
|
||||
/**
|
||||
* A client library that can be used to post some information in terms of a
|
||||
* number of conceptual entities.
|
||||
* number of conceptual entities. This client library needs to be used along
|
||||
* with Timeline V.1.x server versions.
|
||||
* Refer {@link TimelineV2Client} for ATS V2 interface.
|
||||
*/
|
||||
@Public
|
||||
@Evolving
|
||||
public abstract class TimelineClient extends AbstractService implements
|
||||
public abstract class TimelineClient extends CompositeService implements
|
||||
Flushable {
|
||||
|
||||
/**
|
||||
* Create a timeline client. The current UGI when the user initialize the
|
||||
* client will be used to do the put and the delegation token operations. The
|
||||
* current user may use {@link UserGroupInformation#doAs} another user to
|
||||
* construct and initialize a timeline client if the following operations are
|
||||
* supposed to be conducted by that user.
|
||||
*/
|
||||
private ApplicationId contextAppId;
|
||||
|
||||
/**
|
||||
* Creates an instance of the timeline v.1.x client.
|
||||
* The current UGI when the user initialize the client will be used to do the
|
||||
* put and the delegation token operations. The current user may use
|
||||
* {@link UserGroupInformation#doAs} another user to construct and initialize
|
||||
* a timeline client if the following operations are supposed to be conducted
|
||||
* by that user.
|
||||
*
|
||||
* @return the created timeline client instance
|
||||
*/
|
||||
|
@ -66,23 +62,8 @@ public abstract class TimelineClient extends AbstractService implements
|
|||
return client;
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates an instance of the timeline v.2 client.
|
||||
*
|
||||
* @param appId the application id with which the timeline client is
|
||||
* associated
|
||||
* @return the created timeline client instance
|
||||
*/
|
||||
@Public
|
||||
public static TimelineClient createTimelineClient(ApplicationId appId) {
|
||||
TimelineClient client = new TimelineClientImpl(appId);
|
||||
return client;
|
||||
}
|
||||
|
||||
@Private
|
||||
protected TimelineClient(String name, ApplicationId appId) {
|
||||
protected TimelineClient(String name) {
|
||||
super(name);
|
||||
setContextAppId(appId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -207,57 +188,4 @@ public abstract class TimelineClient extends AbstractService implements
|
|||
public abstract void cancelDelegationToken(
|
||||
Token<TimelineDelegationTokenIdentifier> timelineDT)
|
||||
throws IOException, YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Send the information of a number of conceptual entities to the timeline
|
||||
* service v.2 collector. It is a blocking API. The method will not return
|
||||
* until all the put entities have been persisted. If this method is invoked
|
||||
* for a non-v.2 timeline client instance, a YarnException is thrown.
|
||||
* </p>
|
||||
*
|
||||
* @param entities the collection of {@link
|
||||
* org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
|
||||
* @throws IOException
|
||||
* @throws YarnException
|
||||
*/
|
||||
@Public
|
||||
public abstract void putEntities(
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
|
||||
entities) throws IOException, YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Send the information of a number of conceptual entities to the timeline
|
||||
* service v.2 collector. It is an asynchronous API. The method will return
|
||||
* once all the entities are received. If this method is invoked for a
|
||||
* non-v.2 timeline client instance, a YarnException is thrown.
|
||||
* </p>
|
||||
*
|
||||
* @param entities the collection of {@link
|
||||
* org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity}
|
||||
* @throws IOException
|
||||
* @throws YarnException
|
||||
*/
|
||||
@Public
|
||||
public abstract void putEntitiesAsync(
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
|
||||
entities) throws IOException, YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Update the timeline service address where the request will be sent to.
|
||||
* </p>
|
||||
* @param address
|
||||
* the timeline service address
|
||||
*/
|
||||
public abstract void setTimelineServiceAddress(String address);
|
||||
|
||||
protected ApplicationId getContextAppId() {
|
||||
return contextAppId;
|
||||
}
|
||||
|
||||
protected void setContextAppId(ApplicationId appId) {
|
||||
this.contextAppId = appId;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,92 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client.api;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||
import org.apache.hadoop.service.CompositeService;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
/**
|
||||
* A client library that can be used to post some information in terms of a
|
||||
* number of conceptual entities. This client library needs to be used along
|
||||
* with time line v.2 server version.
|
||||
* Refer {@link TimelineClient} for ATS V1 interface.
|
||||
*/
|
||||
public abstract class TimelineV2Client extends CompositeService {
|
||||
/**
|
||||
* Creates an instance of the timeline v.2 client.
|
||||
*
|
||||
* @param appId the application id with which the timeline client is
|
||||
* associated
|
||||
* @return the created timeline client instance
|
||||
*/
|
||||
@Public
|
||||
public static TimelineV2Client createTimelineClient(ApplicationId appId) {
|
||||
TimelineV2Client client = new TimelineV2ClientImpl(appId);
|
||||
return client;
|
||||
}
|
||||
|
||||
protected TimelineV2Client(String name) {
|
||||
super(name);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Send the information of a number of conceptual entities to the timeline
|
||||
* service v.2 collector. It is a blocking API. The method will not return
|
||||
* until all the put entities have been persisted.
|
||||
* </p>
|
||||
*
|
||||
* @param entities the collection of {@link TimelineEntity}
|
||||
* @throws IOException if there are I/O errors
|
||||
* @throws YarnException if entities are incomplete/invalid
|
||||
*/
|
||||
@Public
|
||||
public abstract void putEntities(TimelineEntity... entities)
|
||||
throws IOException, YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Send the information of a number of conceptual entities to the timeline
|
||||
* service v.2 collector. It is an asynchronous API. The method will return
|
||||
* once all the entities are received.
|
||||
* </p>
|
||||
*
|
||||
* @param entities the collection of {@link TimelineEntity}
|
||||
* @throws IOException if there are I/O errors
|
||||
* @throws YarnException if entities are incomplete/invalid
|
||||
*/
|
||||
@Public
|
||||
public abstract void putEntitiesAsync(TimelineEntity... entities)
|
||||
throws IOException, YarnException;
|
||||
|
||||
/**
|
||||
* <p>
|
||||
* Update the timeline service address where the request will be sent to.
|
||||
* </p>
|
||||
*
|
||||
* @param address the timeline service address
|
||||
*/
|
||||
public abstract void setTimelineServiceAddress(String address);
|
||||
}
|
|
@ -20,31 +20,9 @@ package org.apache.hadoop.yarn.client.api.impl;
|
|||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.HttpsURLConnection;
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.MultivaluedMap;
|
||||
|
||||
import org.apache.commons.cli.CommandLine;
|
||||
import org.apache.commons.cli.GnuParser;
|
||||
|
@ -57,16 +35,9 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
|||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomain;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineDomains;
|
||||
import org.apache.hadoop.yarn.api.records.timeline.TimelineEntities;
|
||||
|
@ -81,30 +52,15 @@ import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
|||
import org.codehaus.jackson.map.ObjectMapper;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.sun.jersey.api.client.Client;
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
import com.sun.jersey.api.client.ClientRequest;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.config.ClientConfig;
|
||||
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
||||
import com.sun.jersey.api.client.filter.ClientFilter;
|
||||
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
||||
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
||||
import com.sun.jersey.core.util.MultivaluedMapImpl;
|
||||
|
||||
@Private
|
||||
@Evolving
|
||||
public class TimelineClientImpl extends TimelineClient {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TimelineClientImpl.class);
|
||||
private static final String RESOURCE_URI_STR = "/ws/v1/timeline/";
|
||||
private static final ObjectMapper MAPPER = new ObjectMapper();
|
||||
private static final String RESOURCE_URI_STR_V1 = "/ws/v1/timeline/";
|
||||
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
|
||||
private static final Joiner JOINER = Joiner.on("");
|
||||
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
|
||||
|
||||
private static Options opts;
|
||||
private static final String ENTITY_DATA_TYPE = "entity";
|
||||
|
@ -119,180 +75,38 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
opts.addOption("help", false, "Print usage");
|
||||
}
|
||||
|
||||
private Client client;
|
||||
private ConnectionConfigurator connConfigurator;
|
||||
private DelegationTokenAuthenticator authenticator;
|
||||
private DelegationTokenAuthenticatedURL.Token token;
|
||||
private UserGroupInformation authUgi;
|
||||
private String doAsUser;
|
||||
private Configuration configuration;
|
||||
@VisibleForTesting
|
||||
protected DelegationTokenAuthenticatedURL.Token token;
|
||||
@VisibleForTesting
|
||||
protected UserGroupInformation authUgi;
|
||||
@VisibleForTesting
|
||||
protected String doAsUser;
|
||||
|
||||
private float timelineServiceVersion;
|
||||
private TimelineWriter timelineWriter;
|
||||
private SSLFactory sslFactory;
|
||||
|
||||
private volatile String timelineServiceAddress;
|
||||
|
||||
// Retry parameters for identifying new timeline service
|
||||
// TODO consider to merge with connection retry
|
||||
private int maxServiceRetries;
|
||||
private long serviceRetryInterval;
|
||||
private boolean timelineServiceV2 = false;
|
||||
private String timelineServiceAddress;
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
TimelineClientConnectionRetry connectionRetry;
|
||||
|
||||
private TimelineEntityDispatcher entityDispatcher;
|
||||
|
||||
// Abstract class for an operation that should be retried by timeline client
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static abstract class TimelineClientRetryOp {
|
||||
// The operation that should be retried
|
||||
public abstract Object run() throws IOException;
|
||||
// The method to indicate if we should retry given the incoming exception
|
||||
public abstract boolean shouldRetryOn(Exception e);
|
||||
}
|
||||
|
||||
// Class to handle retry
|
||||
// Outside this class, only visible to tests
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
static class TimelineClientConnectionRetry {
|
||||
|
||||
// maxRetries < 0 means keep trying
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public int maxRetries;
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public long retryInterval;
|
||||
|
||||
// Indicates if retries happened last time. Only tests should read it.
|
||||
// In unit tests, retryOn() calls should _not_ be concurrent.
|
||||
private boolean retried = false;
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
boolean getRetired() {
|
||||
return retried;
|
||||
}
|
||||
|
||||
// Constructor with default retry settings
|
||||
public TimelineClientConnectionRetry(Configuration conf) {
|
||||
Preconditions.checkArgument(conf.getInt(
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES) >= -1,
|
||||
"%s property value should be greater than or equal to -1",
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
|
||||
Preconditions
|
||||
.checkArgument(
|
||||
conf.getLong(
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
|
||||
"%s property value should be greater than zero",
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
|
||||
maxRetries = conf.getInt(
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
|
||||
retryInterval = conf.getLong(
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
|
||||
}
|
||||
|
||||
public Object retryOn(TimelineClientRetryOp op)
|
||||
throws RuntimeException, IOException {
|
||||
int leftRetries = maxRetries;
|
||||
retried = false;
|
||||
|
||||
// keep trying
|
||||
while (true) {
|
||||
try {
|
||||
// try perform the op, if fail, keep retrying
|
||||
return op.run();
|
||||
} catch (IOException | RuntimeException e) {
|
||||
// break if there's no retries left
|
||||
if (leftRetries == 0) {
|
||||
break;
|
||||
}
|
||||
if (op.shouldRetryOn(e)) {
|
||||
logException(e, leftRetries);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
if (leftRetries > 0) {
|
||||
leftRetries--;
|
||||
}
|
||||
retried = true;
|
||||
try {
|
||||
// sleep for the given time interval
|
||||
Thread.sleep(retryInterval);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Client retry sleep interrupted! ");
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("Failed to connect to timeline server. "
|
||||
+ "Connection retries limit exceeded. "
|
||||
+ "The posted timeline event may be missing");
|
||||
};
|
||||
|
||||
private void logException(Exception e, int leftRetries) {
|
||||
if (leftRetries > 0) {
|
||||
LOG.info("Exception caught by TimelineClientConnectionRetry,"
|
||||
+ " will try " + leftRetries + " more time(s).\nMessage: "
|
||||
+ e.getMessage());
|
||||
} else {
|
||||
// note that maxRetries may be -1 at the very beginning
|
||||
LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
|
||||
+ " will keep retrying.\nMessage: "
|
||||
+ e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class TimelineJerseyRetryFilter extends ClientFilter {
|
||||
@Override
|
||||
public ClientResponse handle(final ClientRequest cr)
|
||||
throws ClientHandlerException {
|
||||
// Set up the retry operation
|
||||
TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
|
||||
@Override
|
||||
public Object run() {
|
||||
// Try pass the request, if fail, keep retrying
|
||||
return getNext().handle(cr);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldRetryOn(Exception e) {
|
||||
// Only retry on connection exceptions
|
||||
return (e instanceof ClientHandlerException)
|
||||
&& (e.getCause() instanceof ConnectException ||
|
||||
e.getCause() instanceof SocketTimeoutException ||
|
||||
e.getCause() instanceof SocketException);
|
||||
}
|
||||
};
|
||||
try {
|
||||
return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
|
||||
} catch (IOException e) {
|
||||
throw new ClientHandlerException("Jersey retry failed!\nMessage: "
|
||||
+ e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
TimelineConnector connector;
|
||||
|
||||
public TimelineClientImpl() {
|
||||
super(TimelineClientImpl.class.getName(), null);
|
||||
}
|
||||
|
||||
public TimelineClientImpl(ApplicationId applicationId) {
|
||||
super(TimelineClientImpl.class.getName(), applicationId);
|
||||
this.timelineServiceV2 = true;
|
||||
super(TimelineClientImpl.class.getName());
|
||||
}
|
||||
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
this.configuration = conf;
|
||||
timelineServiceVersion =
|
||||
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
|
||||
LOG.info("Timeline service address: " + getTimelineServiceAddress());
|
||||
if (!YarnConfiguration.timelineServiceEnabled(conf)
|
||||
|| !((Float.compare(this.timelineServiceVersion, 1.0f) == 0)
|
||||
|| (Float.compare(this.timelineServiceVersion, 1.5f) == 0))) {
|
||||
throw new IOException("Timeline V1 client is not properly configured. "
|
||||
+ "Either timeline service is not enabled or version is not set to"
|
||||
+ " 1.x");
|
||||
}
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation realUgi = ugi.getRealUser();
|
||||
if (realUgi != null) {
|
||||
|
@ -302,62 +116,34 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
authUgi = ugi;
|
||||
doAsUser = null;
|
||||
}
|
||||
ClientConfig cc = new DefaultClientConfig();
|
||||
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
|
||||
connConfigurator = initConnConfigurator(conf);
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
authenticator = new KerberosDelegationTokenAuthenticator();
|
||||
} else {
|
||||
authenticator = new PseudoDelegationTokenAuthenticator();
|
||||
}
|
||||
authenticator.setConnectionConfigurator(connConfigurator);
|
||||
token = new DelegationTokenAuthenticatedURL.Token();
|
||||
connector = createTimelineConnector();
|
||||
|
||||
connectionRetry = new TimelineClientConnectionRetry(conf);
|
||||
client = new Client(new URLConnectionClientHandler(
|
||||
new TimelineURLConnectionFactory()), cc);
|
||||
TimelineJerseyRetryFilter retryFilter = new TimelineJerseyRetryFilter();
|
||||
// TODO need to cleanup filter retry later.
|
||||
if (!timelineServiceV2) {
|
||||
client.addFilter(retryFilter);
|
||||
}
|
||||
|
||||
// old version timeline service need to get address from configuration
|
||||
// while new version need to auto discovery (with retry).
|
||||
if (timelineServiceV2) {
|
||||
maxServiceRetries = conf.getInt(
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
|
||||
serviceRetryInterval = conf.getLong(
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
|
||||
entityDispatcher = new TimelineEntityDispatcher(conf);
|
||||
if (YarnConfiguration.useHttps(conf)) {
|
||||
timelineServiceAddress =
|
||||
conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS);
|
||||
} else {
|
||||
if (YarnConfiguration.useHttps(conf)) {
|
||||
setTimelineServiceAddress(conf.get(
|
||||
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS));
|
||||
} else {
|
||||
setTimelineServiceAddress(conf.get(
|
||||
YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS));
|
||||
}
|
||||
timelineServiceVersion =
|
||||
conf.getFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_VERSION);
|
||||
LOG.info("Timeline service address: " + getTimelineServiceAddress());
|
||||
timelineServiceAddress =
|
||||
conf.get(YarnConfiguration.TIMELINE_SERVICE_WEBAPP_ADDRESS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_WEBAPP_ADDRESS);
|
||||
}
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected TimelineConnector createTimelineConnector() {
|
||||
TimelineConnector newConnector =
|
||||
new TimelineConnector(true, authUgi, doAsUser, token);
|
||||
addIfService(newConnector);
|
||||
return newConnector;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
if (timelineServiceV2) {
|
||||
entityDispatcher.start();
|
||||
} else {
|
||||
timelineWriter = createTimelineWriter(configuration, authUgi, client,
|
||||
constructResURI(getConfig(), timelineServiceAddress, false));
|
||||
}
|
||||
timelineWriter = createTimelineWriter(getConfig(), authUgi,
|
||||
connector.getClient(), TimelineConnector.constructResURI(getConfig(),
|
||||
timelineServiceAddress, RESOURCE_URI_STR_V1));
|
||||
}
|
||||
|
||||
protected TimelineWriter createTimelineWriter(Configuration conf,
|
||||
|
@ -376,12 +162,6 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
if (this.timelineWriter != null) {
|
||||
this.timelineWriter.close();
|
||||
}
|
||||
if (this.sslFactory != null) {
|
||||
this.sslFactory.destroy();
|
||||
}
|
||||
if (timelineServiceV2) {
|
||||
entityDispatcher.stop();
|
||||
}
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
|
@ -393,131 +173,17 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
}
|
||||
|
||||
@Override
|
||||
public TimelinePutResponse putEntities(
|
||||
TimelineEntity... entities) throws IOException, YarnException {
|
||||
public TimelinePutResponse putEntities(TimelineEntity... entities)
|
||||
throws IOException, YarnException {
|
||||
return timelineWriter.putEntities(entities);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putEntities(
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
|
||||
entities) throws IOException, YarnException {
|
||||
if (!timelineServiceV2) {
|
||||
throw new YarnException("v.2 method is invoked on a v.1.x client");
|
||||
}
|
||||
entityDispatcher.dispatchEntities(true, entities);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putEntitiesAsync(
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity...
|
||||
entities) throws IOException, YarnException {
|
||||
if (!timelineServiceV2) {
|
||||
throw new YarnException("v.2 method is invoked on a v.1.x client");
|
||||
}
|
||||
entityDispatcher.dispatchEntities(false, entities);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putDomain(TimelineDomain domain) throws IOException,
|
||||
YarnException {
|
||||
timelineWriter.putDomain(domain);
|
||||
}
|
||||
|
||||
// Used for new timeline service only
|
||||
@Private
|
||||
protected void putObjects(String path, MultivaluedMap<String, String> params,
|
||||
Object obj) throws IOException, YarnException {
|
||||
|
||||
int retries = verifyRestEndPointAvailable();
|
||||
|
||||
// timelineServiceAddress could be stale, add retry logic here.
|
||||
boolean needRetry = true;
|
||||
while (needRetry) {
|
||||
try {
|
||||
URI uri = constructResURI(getConfig(), timelineServiceAddress, true);
|
||||
putObjects(uri, path, params, obj);
|
||||
needRetry = false;
|
||||
} catch (IOException e) {
|
||||
// handle exception for timelineServiceAddress being updated.
|
||||
checkRetryWithSleep(retries, e);
|
||||
retries--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private int verifyRestEndPointAvailable() throws YarnException {
|
||||
// timelineServiceAddress could haven't be initialized yet
|
||||
// or stale (only for new timeline service)
|
||||
int retries = pollTimelineServiceAddress(this.maxServiceRetries);
|
||||
if (timelineServiceAddress == null) {
|
||||
String errMessage = "TimelineClient has reached to max retry times : "
|
||||
+ this.maxServiceRetries
|
||||
+ ", but failed to fetch timeline service address. Please verify"
|
||||
+ " Timeline Auxillary Service is configured in all the NMs";
|
||||
LOG.error(errMessage);
|
||||
throw new YarnException(errMessage);
|
||||
}
|
||||
return retries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if reaching to maximum of retries.
|
||||
* @param retries
|
||||
* @param e
|
||||
*/
|
||||
private void checkRetryWithSleep(int retries, IOException e)
|
||||
throws YarnException, IOException {
|
||||
if (retries > 0) {
|
||||
try {
|
||||
Thread.sleep(this.serviceRetryInterval);
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new YarnException("Interrupted while retrying to connect to ATS");
|
||||
}
|
||||
} else {
|
||||
StringBuilder msg =
|
||||
new StringBuilder("TimelineClient has reached to max retry times : ");
|
||||
msg.append(this.maxServiceRetries);
|
||||
msg.append(" for service address: ");
|
||||
msg.append(timelineServiceAddress);
|
||||
LOG.error(msg.toString());
|
||||
throw new IOException(msg.toString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void putObjects(
|
||||
URI base, String path, MultivaluedMap<String, String> params, Object obj)
|
||||
throws IOException, YarnException {
|
||||
ClientResponse resp;
|
||||
try {
|
||||
resp = client.resource(base).path(path).queryParams(params)
|
||||
.accept(MediaType.APPLICATION_JSON)
|
||||
.type(MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class, obj);
|
||||
} catch (RuntimeException re) {
|
||||
// runtime exception is expected if the client cannot connect the server
|
||||
String msg =
|
||||
"Failed to get the response from the timeline server.";
|
||||
LOG.error(msg, re);
|
||||
throw new IOException(re);
|
||||
}
|
||||
if (resp == null ||
|
||||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
|
||||
String msg = "Response from the timeline server is " +
|
||||
((resp == null) ? "null":
|
||||
"not successful," + " HTTP error code: " + resp.getStatus()
|
||||
+ ", Server response:\n" + resp.getEntity(String.class));
|
||||
LOG.error(msg);
|
||||
throw new YarnException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimelineServiceAddress(String address) {
|
||||
this.timelineServiceAddress = address;
|
||||
}
|
||||
|
||||
private String getTimelineServiceAddress() {
|
||||
return this.timelineServiceAddress;
|
||||
}
|
||||
|
@ -534,17 +200,17 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
public Token<TimelineDelegationTokenIdentifier> run()
|
||||
throws Exception {
|
||||
DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(authenticator,
|
||||
connConfigurator);
|
||||
connector.getDelegationTokenAuthenticatedURL();
|
||||
// TODO we should add retry logic here if timelineServiceAddress is
|
||||
// not available immediately.
|
||||
return (Token) authUrl.getDelegationToken(
|
||||
constructResURI(getConfig(),
|
||||
getTimelineServiceAddress(), false).toURL(),
|
||||
TimelineConnector.constructResURI(getConfig(),
|
||||
getTimelineServiceAddress(), RESOURCE_URI_STR_V1).toURL(),
|
||||
token, renewer, doAsUser);
|
||||
}
|
||||
};
|
||||
return (Token<TimelineDelegationTokenIdentifier>) operateDelegationToken(getDTAction);
|
||||
return (Token<TimelineDelegationTokenIdentifier>) connector
|
||||
.operateDelegationToken(getDTAction);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
|
@ -570,26 +236,26 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
token.setDelegationToken((Token) timelineDT);
|
||||
}
|
||||
DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(authenticator,
|
||||
connConfigurator);
|
||||
connector.getDelegationTokenAuthenticatedURL();
|
||||
// If the token service address is not available, fall back to use
|
||||
// the configured service address.
|
||||
final URI serviceURI = isTokenServiceAddrEmpty ?
|
||||
constructResURI(getConfig(), getTimelineServiceAddress(), false)
|
||||
final URI serviceURI = isTokenServiceAddrEmpty
|
||||
? TimelineConnector.constructResURI(getConfig(),
|
||||
getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
|
||||
: new URI(scheme, null, address.getHostName(),
|
||||
address.getPort(), RESOURCE_URI_STR_V1, null, null);
|
||||
address.getPort(), RESOURCE_URI_STR_V1, null, null);
|
||||
return authUrl
|
||||
.renewDelegationToken(serviceURI.toURL(), token, doAsUser);
|
||||
}
|
||||
};
|
||||
return (Long) operateDelegationToken(renewDTAction);
|
||||
return (Long) connector.operateDelegationToken(renewDTAction);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public void cancelDelegationToken(
|
||||
final Token<TimelineDelegationTokenIdentifier> timelineDT)
|
||||
throws IOException, YarnException {
|
||||
throws IOException, YarnException {
|
||||
final boolean isTokenServiceAddrEmpty =
|
||||
timelineDT.getService().toString().isEmpty();
|
||||
final String scheme = isTokenServiceAddrEmpty ? null
|
||||
|
@ -609,134 +275,29 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
token.setDelegationToken((Token) timelineDT);
|
||||
}
|
||||
DelegationTokenAuthenticatedURL authUrl =
|
||||
new DelegationTokenAuthenticatedURL(authenticator,
|
||||
connConfigurator);
|
||||
connector.getDelegationTokenAuthenticatedURL();
|
||||
// If the token service address is not available, fall back to use
|
||||
// the configured service address.
|
||||
final URI serviceURI = isTokenServiceAddrEmpty ?
|
||||
constructResURI(getConfig(), getTimelineServiceAddress(), false)
|
||||
final URI serviceURI = isTokenServiceAddrEmpty
|
||||
? TimelineConnector.constructResURI(getConfig(),
|
||||
getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
|
||||
: new URI(scheme, null, address.getHostName(),
|
||||
address.getPort(), RESOURCE_URI_STR_V1, null, null);
|
||||
address.getPort(), RESOURCE_URI_STR_V1, null, null);
|
||||
authUrl.cancelDelegationToken(serviceURI.toURL(), token, doAsUser);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
operateDelegationToken(cancelDTAction);
|
||||
connector.operateDelegationToken(cancelDTAction);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return super.toString() + " with timeline server "
|
||||
+ constructResURI(getConfig(), getTimelineServiceAddress(), false)
|
||||
+ TimelineConnector.constructResURI(getConfig(),
|
||||
getTimelineServiceAddress(), RESOURCE_URI_STR_V1)
|
||||
+ " and writer " + timelineWriter;
|
||||
}
|
||||
|
||||
private Object operateDelegationToken(
|
||||
final PrivilegedExceptionAction<?> action)
|
||||
throws IOException, YarnException {
|
||||
// Set up the retry operation
|
||||
TimelineClientRetryOp tokenRetryOp =
|
||||
createTimelineClientRetryOpForOperateDelegationToken(action);
|
||||
|
||||
return connectionRetry.retryOn(tokenRetryOp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll TimelineServiceAddress for maximum of retries times if it is null.
|
||||
*
|
||||
* @param retries
|
||||
* @return the left retry times
|
||||
* @throws IOException
|
||||
*/
|
||||
private int pollTimelineServiceAddress(int retries) throws YarnException {
|
||||
while (timelineServiceAddress == null && retries > 0) {
|
||||
try {
|
||||
Thread.sleep(this.serviceRetryInterval);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new YarnException("Interrupted while trying to connect ATS");
|
||||
}
|
||||
retries--;
|
||||
}
|
||||
return retries;
|
||||
}
|
||||
|
||||
private class TimelineURLConnectionFactory
|
||||
implements HttpURLConnectionFactory {
|
||||
|
||||
@Override
|
||||
public HttpURLConnection getHttpURLConnection(final URL url) throws IOException {
|
||||
authUgi.checkTGTAndReloginFromKeytab();
|
||||
try {
|
||||
return new DelegationTokenAuthenticatedURL(
|
||||
authenticator, connConfigurator).openConnection(url, token,
|
||||
doAsUser);
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
throw new IOException(e.getCause());
|
||||
} catch (AuthenticationException ae) {
|
||||
throw new IOException(ae);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private ConnectionConfigurator initConnConfigurator(Configuration conf) {
|
||||
try {
|
||||
return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, conf);
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Cannot load customized ssl related configuration. " +
|
||||
"Fallback to system-generic settings.", e);
|
||||
return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
|
||||
}
|
||||
}
|
||||
|
||||
private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR =
|
||||
new ConnectionConfigurator() {
|
||||
@Override
|
||||
public HttpURLConnection configure(HttpURLConnection conn)
|
||||
throws IOException {
|
||||
setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
|
||||
return conn;
|
||||
}
|
||||
};
|
||||
|
||||
private ConnectionConfigurator initSslConnConfigurator(final int timeout,
|
||||
Configuration conf) throws IOException, GeneralSecurityException {
|
||||
final SSLSocketFactory sf;
|
||||
final HostnameVerifier hv;
|
||||
|
||||
sslFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
|
||||
sslFactory.init();
|
||||
sf = sslFactory.createSSLSocketFactory();
|
||||
hv = sslFactory.getHostnameVerifier();
|
||||
|
||||
return new ConnectionConfigurator() {
|
||||
@Override
|
||||
public HttpURLConnection configure(HttpURLConnection conn)
|
||||
throws IOException {
|
||||
if (conn instanceof HttpsURLConnection) {
|
||||
HttpsURLConnection c = (HttpsURLConnection) conn;
|
||||
c.setSSLSocketFactory(sf);
|
||||
c.setHostnameVerifier(hv);
|
||||
}
|
||||
setTimeouts(conn, timeout);
|
||||
return conn;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
private static void setTimeouts(URLConnection connection, int socketTimeout) {
|
||||
connection.setConnectTimeout(socketTimeout);
|
||||
connection.setReadTimeout(socketTimeout);
|
||||
}
|
||||
|
||||
private static URI constructResURI(
|
||||
Configuration conf, String address, boolean v2) {
|
||||
return URI.create(
|
||||
JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
|
||||
address, v2 ? RESOURCE_URI_STR_V2 : RESOURCE_URI_STR_V1));
|
||||
}
|
||||
|
||||
public static void main(String[] argv) throws Exception {
|
||||
CommandLine cliParser = new GnuParser().parse(opts, argv);
|
||||
if (cliParser.hasOption("put")) {
|
||||
|
@ -872,266 +433,4 @@ public class TimelineClientImpl extends TimelineClient {
|
|||
public void setTimelineWriter(TimelineWriter writer) {
|
||||
this.timelineWriter = writer;
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public TimelineClientRetryOp
|
||||
createTimelineClientRetryOpForOperateDelegationToken(
|
||||
final PrivilegedExceptionAction<?> action) throws IOException {
|
||||
return new TimelineClientRetryOpForOperateDelegationToken(
|
||||
this.authUgi, action);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public class TimelineClientRetryOpForOperateDelegationToken
|
||||
extends TimelineClientRetryOp {
|
||||
|
||||
private final UserGroupInformation authUgi;
|
||||
private final PrivilegedExceptionAction<?> action;
|
||||
|
||||
public TimelineClientRetryOpForOperateDelegationToken(
|
||||
UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
|
||||
this.authUgi = authUgi;
|
||||
this.action = action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object run() throws IOException {
|
||||
// Try pass the request, if fail, keep retrying
|
||||
authUgi.checkTGTAndReloginFromKeytab();
|
||||
try {
|
||||
return authUgi.doAs(action);
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
throw new IOException(e.getCause());
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldRetryOn(Exception e) {
|
||||
// retry on connection exceptions
|
||||
// and SocketTimeoutException
|
||||
return (e instanceof ConnectException
|
||||
|| e instanceof SocketTimeoutException);
|
||||
}
|
||||
}
|
||||
|
||||
private final class EntitiesHolder extends FutureTask<Void> {
|
||||
private final
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
|
||||
entities;
|
||||
private final boolean isSync;
|
||||
|
||||
EntitiesHolder(
|
||||
final
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
|
||||
entities,
|
||||
final boolean isSync) {
|
||||
super(new Callable<Void>() {
|
||||
// publishEntities()
|
||||
public Void call() throws Exception {
|
||||
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
|
||||
params.add("appid", getContextAppId().toString());
|
||||
params.add("async", Boolean.toString(!isSync));
|
||||
putObjects("entities", params, entities);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
this.entities = entities;
|
||||
this.isSync = isSync;
|
||||
}
|
||||
|
||||
public boolean isSync() {
|
||||
return isSync;
|
||||
}
|
||||
|
||||
public org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
|
||||
getEntities() {
|
||||
return entities;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is responsible for collecting the timeline entities and
|
||||
* publishing them in async.
|
||||
*/
|
||||
private class TimelineEntityDispatcher {
|
||||
/**
|
||||
* Time period for which the timelineclient will wait for draining after
|
||||
* stop.
|
||||
*/
|
||||
private static final long DRAIN_TIME_PERIOD = 2000L;
|
||||
|
||||
private int numberOfAsyncsToMerge;
|
||||
private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
|
||||
private ExecutorService executor;
|
||||
|
||||
TimelineEntityDispatcher(Configuration conf) {
|
||||
timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
|
||||
numberOfAsyncsToMerge =
|
||||
conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
|
||||
YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
|
||||
}
|
||||
|
||||
Runnable createRunnable() {
|
||||
return new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
EntitiesHolder entitiesHolder;
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
// Merge all the async calls and make one push, but if its sync
|
||||
// call push immediately
|
||||
try {
|
||||
entitiesHolder = timelineEntityQueue.take();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("Timeline dispatcher thread was interrupted ");
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
if (entitiesHolder != null) {
|
||||
publishWithoutBlockingOnQueue(entitiesHolder);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (!timelineEntityQueue.isEmpty()) {
|
||||
LOG.info("Yet to publish " + timelineEntityQueue.size()
|
||||
+ " timelineEntities, draining them now. ");
|
||||
}
|
||||
// Try to drain the remaining entities to be published @ the max for
|
||||
// 2 seconds
|
||||
long timeTillweDrain =
|
||||
System.currentTimeMillis() + DRAIN_TIME_PERIOD;
|
||||
while (!timelineEntityQueue.isEmpty()) {
|
||||
publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
|
||||
if (System.currentTimeMillis() > timeTillweDrain) {
|
||||
// time elapsed stop publishing further....
|
||||
if (!timelineEntityQueue.isEmpty()) {
|
||||
LOG.warn("Time to drain elapsed! Remaining "
|
||||
+ timelineEntityQueue.size() + "timelineEntities will not"
|
||||
+ " be published");
|
||||
// if some entities were not drained then we need interrupt
|
||||
// the threads which had put sync EntityHolders to the queue.
|
||||
EntitiesHolder nextEntityInTheQueue = null;
|
||||
while ((nextEntityInTheQueue =
|
||||
timelineEntityQueue.poll()) != null) {
|
||||
nextEntityInTheQueue.cancel(true);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Publishes the given EntitiesHolder and return immediately if sync
|
||||
* call, else tries to fetch the EntitiesHolder from the queue in non
|
||||
* blocking fashion and collate the Entities if possible before
|
||||
* publishing through REST.
|
||||
*
|
||||
* @param entitiesHolder
|
||||
*/
|
||||
private void publishWithoutBlockingOnQueue(
|
||||
EntitiesHolder entitiesHolder) {
|
||||
if (entitiesHolder.isSync()) {
|
||||
entitiesHolder.run();
|
||||
return;
|
||||
}
|
||||
int count = 1;
|
||||
while (true) {
|
||||
// loop till we find a sync put Entities or there is nothing
|
||||
// to take
|
||||
EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
|
||||
if (nextEntityInTheQueue == null) {
|
||||
// Nothing in the queue just publish and get back to the
|
||||
// blocked wait state
|
||||
entitiesHolder.run();
|
||||
break;
|
||||
} else if (nextEntityInTheQueue.isSync()) {
|
||||
// flush all the prev async entities first
|
||||
entitiesHolder.run();
|
||||
// and then flush the sync entity
|
||||
nextEntityInTheQueue.run();
|
||||
break;
|
||||
} else {
|
||||
// append all async entities together and then flush
|
||||
entitiesHolder.getEntities().addEntities(
|
||||
nextEntityInTheQueue.getEntities().getEntities());
|
||||
count++;
|
||||
if (count == numberOfAsyncsToMerge) {
|
||||
// Flush the entities if the number of the async
|
||||
// putEntites merged reaches the desired limit. To avoid
|
||||
// collecting multiple entities and delaying for a long
|
||||
// time.
|
||||
entitiesHolder.run();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void dispatchEntities(boolean sync,
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity[]
|
||||
entitiesTobePublished) throws YarnException {
|
||||
if (executor.isShutdown()) {
|
||||
throw new YarnException("Timeline client is in the process of stopping,"
|
||||
+ " not accepting any more TimelineEntities");
|
||||
}
|
||||
|
||||
// wrap all TimelineEntity into TimelineEntities object
|
||||
org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities
|
||||
entities =
|
||||
new org.apache.hadoop.yarn.api.records.timelineservice.
|
||||
TimelineEntities();
|
||||
for (org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity
|
||||
entity : entitiesTobePublished) {
|
||||
entities.addEntity(entity);
|
||||
}
|
||||
|
||||
// created a holder and place it in queue
|
||||
EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
|
||||
try {
|
||||
timelineEntityQueue.put(entitiesHolder);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new YarnException(
|
||||
"Failed while adding entity to the queue for publishing", e);
|
||||
}
|
||||
|
||||
if (sync) {
|
||||
// In sync call we need to wait till its published and if any error then
|
||||
// throw it back
|
||||
try {
|
||||
entitiesHolder.get();
|
||||
} catch (ExecutionException e) {
|
||||
throw new YarnException("Failed while publishing entity",
|
||||
e.getCause());
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new YarnException("Interrupted while publishing entity", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void start() {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
executor.execute(createRunnable());
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
LOG.info("Stopping TimelineClient.");
|
||||
executor.shutdownNow();
|
||||
try {
|
||||
executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,442 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.UndeclaredThrowableException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.net.URL;
|
||||
import java.net.URLConnection;
|
||||
import java.security.GeneralSecurityException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
|
||||
import javax.net.ssl.HostnameVerifier;
|
||||
import javax.net.ssl.HttpsURLConnection;
|
||||
import javax.net.ssl.SSLSocketFactory;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.authentication.client.AuthenticationException;
|
||||
import org.apache.hadoop.security.authentication.client.ConnectionConfigurator;
|
||||
import org.apache.hadoop.security.ssl.SSLFactory;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL.Token;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.security.token.delegation.web.KerberosDelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.security.token.delegation.web.PseudoDelegationTokenAuthenticator;
|
||||
import org.apache.hadoop.service.AbstractService;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.webapp.YarnJacksonJaxbJsonProvider;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.sun.jersey.api.client.Client;
|
||||
import com.sun.jersey.api.client.ClientHandlerException;
|
||||
import com.sun.jersey.api.client.ClientRequest;
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.api.client.config.ClientConfig;
|
||||
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
||||
import com.sun.jersey.api.client.filter.ClientFilter;
|
||||
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
||||
import com.sun.jersey.client.urlconnection.URLConnectionClientHandler;
|
||||
|
||||
/**
|
||||
* Utility Connector class which is used by timeline clients to securely get
|
||||
* connected to the timeline server.
|
||||
*
|
||||
*/
|
||||
public class TimelineConnector extends AbstractService {
|
||||
|
||||
private static final Joiner JOINER = Joiner.on("");
|
||||
private static final Log LOG = LogFactory.getLog(TimelineConnector.class);
|
||||
public final static int DEFAULT_SOCKET_TIMEOUT = 1 * 60 * 1000; // 1 minute
|
||||
|
||||
private SSLFactory sslFactory;
|
||||
private Client client;
|
||||
private ConnectionConfigurator connConfigurator;
|
||||
private DelegationTokenAuthenticator authenticator;
|
||||
private DelegationTokenAuthenticatedURL.Token token;
|
||||
private UserGroupInformation authUgi;
|
||||
private String doAsUser;
|
||||
@VisibleForTesting
|
||||
TimelineClientConnectionRetry connectionRetry;
|
||||
private boolean requireConnectionRetry;
|
||||
|
||||
public TimelineConnector(boolean requireConnectionRetry,
|
||||
UserGroupInformation authUgi, String doAsUser,
|
||||
DelegationTokenAuthenticatedURL.Token token) {
|
||||
super("TimelineConnector");
|
||||
this.requireConnectionRetry = requireConnectionRetry;
|
||||
this.authUgi = authUgi;
|
||||
this.doAsUser = doAsUser;
|
||||
this.token = token;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
super.serviceInit(conf);
|
||||
ClientConfig cc = new DefaultClientConfig();
|
||||
cc.getClasses().add(YarnJacksonJaxbJsonProvider.class);
|
||||
|
||||
sslFactory = getSSLFactory(conf);
|
||||
connConfigurator = getConnConfigurator(sslFactory);
|
||||
|
||||
if (UserGroupInformation.isSecurityEnabled()) {
|
||||
authenticator = new KerberosDelegationTokenAuthenticator();
|
||||
} else {
|
||||
authenticator = new PseudoDelegationTokenAuthenticator();
|
||||
}
|
||||
authenticator.setConnectionConfigurator(connConfigurator);
|
||||
|
||||
connectionRetry = new TimelineClientConnectionRetry(conf);
|
||||
client =
|
||||
new Client(
|
||||
new URLConnectionClientHandler(new TimelineURLConnectionFactory(
|
||||
authUgi, authenticator, connConfigurator, token, doAsUser)),
|
||||
cc);
|
||||
if (requireConnectionRetry) {
|
||||
TimelineJerseyRetryFilter retryFilter =
|
||||
new TimelineJerseyRetryFilter(connectionRetry);
|
||||
client.addFilter(retryFilter);
|
||||
}
|
||||
}
|
||||
|
||||
private static final ConnectionConfigurator DEFAULT_TIMEOUT_CONN_CONFIGURATOR
|
||||
= new ConnectionConfigurator() {
|
||||
@Override
|
||||
public HttpURLConnection configure(HttpURLConnection conn)
|
||||
throws IOException {
|
||||
setTimeouts(conn, DEFAULT_SOCKET_TIMEOUT);
|
||||
return conn;
|
||||
}
|
||||
};
|
||||
|
||||
private ConnectionConfigurator getConnConfigurator(SSLFactory sslFactoryObj) {
|
||||
try {
|
||||
return initSslConnConfigurator(DEFAULT_SOCKET_TIMEOUT, sslFactoryObj);
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Cannot load customized ssl related configuration. "
|
||||
+ "Fallback to system-generic settings.", e);
|
||||
return DEFAULT_TIMEOUT_CONN_CONFIGURATOR;
|
||||
}
|
||||
}
|
||||
|
||||
private static ConnectionConfigurator initSslConnConfigurator(
|
||||
final int timeout, SSLFactory sslFactory)
|
||||
throws IOException, GeneralSecurityException {
|
||||
final SSLSocketFactory sf;
|
||||
final HostnameVerifier hv;
|
||||
|
||||
sf = sslFactory.createSSLSocketFactory();
|
||||
hv = sslFactory.getHostnameVerifier();
|
||||
|
||||
return new ConnectionConfigurator() {
|
||||
@Override
|
||||
public HttpURLConnection configure(HttpURLConnection conn)
|
||||
throws IOException {
|
||||
if (conn instanceof HttpsURLConnection) {
|
||||
HttpsURLConnection c = (HttpsURLConnection) conn;
|
||||
c.setSSLSocketFactory(sf);
|
||||
c.setHostnameVerifier(hv);
|
||||
}
|
||||
setTimeouts(conn, timeout);
|
||||
return conn;
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
protected SSLFactory getSSLFactory(Configuration conf)
|
||||
throws GeneralSecurityException, IOException {
|
||||
SSLFactory newSSLFactory = new SSLFactory(SSLFactory.Mode.CLIENT, conf);
|
||||
newSSLFactory.init();
|
||||
return newSSLFactory;
|
||||
}
|
||||
|
||||
private static void setTimeouts(URLConnection connection, int socketTimeout) {
|
||||
connection.setConnectTimeout(socketTimeout);
|
||||
connection.setReadTimeout(socketTimeout);
|
||||
}
|
||||
|
||||
public static URI constructResURI(Configuration conf, String address,
|
||||
String uri) {
|
||||
return URI.create(
|
||||
JOINER.join(YarnConfiguration.useHttps(conf) ? "https://" : "http://",
|
||||
address, uri));
|
||||
}
|
||||
|
||||
DelegationTokenAuthenticatedURL getDelegationTokenAuthenticatedURL() {
|
||||
return new DelegationTokenAuthenticatedURL(authenticator, connConfigurator);
|
||||
}
|
||||
|
||||
protected void serviceStop() {
|
||||
if (this.sslFactory != null) {
|
||||
this.sslFactory.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
public Client getClient() {
|
||||
return client;
|
||||
}
|
||||
|
||||
public Object operateDelegationToken(
|
||||
final PrivilegedExceptionAction<?> action)
|
||||
throws IOException, YarnException {
|
||||
// Set up the retry operation
|
||||
TimelineClientRetryOp tokenRetryOp =
|
||||
createRetryOpForOperateDelegationToken(action);
|
||||
|
||||
return connectionRetry.retryOn(tokenRetryOp);
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
TimelineClientRetryOp createRetryOpForOperateDelegationToken(
|
||||
final PrivilegedExceptionAction<?> action) throws IOException {
|
||||
return new TimelineClientRetryOpForOperateDelegationToken(this.authUgi,
|
||||
action);
|
||||
}
|
||||
|
||||
/**
|
||||
* Abstract class for an operation that should be retried by timeline client.
|
||||
*/
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static abstract class TimelineClientRetryOp {
|
||||
// The operation that should be retried
|
||||
public abstract Object run() throws IOException;
|
||||
|
||||
// The method to indicate if we should retry given the incoming exception
|
||||
public abstract boolean shouldRetryOn(Exception e);
|
||||
}
|
||||
|
||||
private static class TimelineURLConnectionFactory
|
||||
implements HttpURLConnectionFactory {
|
||||
private DelegationTokenAuthenticator authenticator;
|
||||
private UserGroupInformation authUgi;
|
||||
private ConnectionConfigurator connConfigurator;
|
||||
private Token token;
|
||||
private String doAsUser;
|
||||
|
||||
public TimelineURLConnectionFactory(UserGroupInformation authUgi,
|
||||
DelegationTokenAuthenticator authenticator,
|
||||
ConnectionConfigurator connConfigurator,
|
||||
DelegationTokenAuthenticatedURL.Token token, String doAsUser) {
|
||||
this.authUgi = authUgi;
|
||||
this.authenticator = authenticator;
|
||||
this.connConfigurator = connConfigurator;
|
||||
this.token = token;
|
||||
this.doAsUser = doAsUser;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpURLConnection getHttpURLConnection(final URL url)
|
||||
throws IOException {
|
||||
authUgi.checkTGTAndReloginFromKeytab();
|
||||
try {
|
||||
return new DelegationTokenAuthenticatedURL(authenticator,
|
||||
connConfigurator).openConnection(url, token, doAsUser);
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
throw new IOException(e.getCause());
|
||||
} catch (AuthenticationException ae) {
|
||||
throw new IOException(ae);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Class to handle retry
|
||||
// Outside this class, only visible to tests
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
static class TimelineClientConnectionRetry {
|
||||
|
||||
// maxRetries < 0 means keep trying
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public int maxRetries;
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public long retryInterval;
|
||||
|
||||
// Indicates if retries happened last time. Only tests should read it.
|
||||
// In unit tests, retryOn() calls should _not_ be concurrent.
|
||||
private boolean retried = false;
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
boolean getRetired() {
|
||||
return retried;
|
||||
}
|
||||
|
||||
// Constructor with default retry settings
|
||||
public TimelineClientConnectionRetry(Configuration conf) {
|
||||
Preconditions.checkArgument(
|
||||
conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES)
|
||||
>= -1,
|
||||
"%s property value should be greater than or equal to -1",
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
|
||||
Preconditions.checkArgument(
|
||||
conf.getLong(
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
|
||||
YarnConfiguration.
|
||||
DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS) > 0,
|
||||
"%s property value should be greater than zero",
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
|
||||
maxRetries =
|
||||
conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
|
||||
retryInterval = conf.getLong(
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
|
||||
}
|
||||
|
||||
public Object retryOn(TimelineClientRetryOp op)
|
||||
throws RuntimeException, IOException {
|
||||
int leftRetries = maxRetries;
|
||||
retried = false;
|
||||
|
||||
// keep trying
|
||||
while (true) {
|
||||
try {
|
||||
// try perform the op, if fail, keep retrying
|
||||
return op.run();
|
||||
} catch (IOException | RuntimeException e) {
|
||||
// break if there's no retries left
|
||||
if (leftRetries == 0) {
|
||||
break;
|
||||
}
|
||||
if (op.shouldRetryOn(e)) {
|
||||
logException(e, leftRetries);
|
||||
} else {
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
if (leftRetries > 0) {
|
||||
leftRetries--;
|
||||
}
|
||||
retried = true;
|
||||
try {
|
||||
// sleep for the given time interval
|
||||
Thread.sleep(retryInterval);
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.warn("Client retry sleep interrupted! ");
|
||||
}
|
||||
}
|
||||
throw new RuntimeException("Failed to connect to timeline server. "
|
||||
+ "Connection retries limit exceeded. "
|
||||
+ "The posted timeline event may be missing");
|
||||
};
|
||||
|
||||
private void logException(Exception e, int leftRetries) {
|
||||
if (leftRetries > 0) {
|
||||
LOG.info(
|
||||
"Exception caught by TimelineClientConnectionRetry," + " will try "
|
||||
+ leftRetries + " more time(s).\nMessage: " + e.getMessage());
|
||||
} else {
|
||||
// note that maxRetries may be -1 at the very beginning
|
||||
LOG.info("ConnectionException caught by TimelineClientConnectionRetry,"
|
||||
+ " will keep retrying.\nMessage: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class TimelineJerseyRetryFilter extends ClientFilter {
|
||||
private TimelineClientConnectionRetry connectionRetry;
|
||||
|
||||
public TimelineJerseyRetryFilter(
|
||||
TimelineClientConnectionRetry connectionRetry) {
|
||||
this.connectionRetry = connectionRetry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ClientResponse handle(final ClientRequest cr)
|
||||
throws ClientHandlerException {
|
||||
// Set up the retry operation
|
||||
TimelineClientRetryOp jerseyRetryOp = new TimelineClientRetryOp() {
|
||||
@Override
|
||||
public Object run() {
|
||||
// Try pass the request, if fail, keep retrying
|
||||
return getNext().handle(cr);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldRetryOn(Exception e) {
|
||||
// Only retry on connection exceptions
|
||||
return (e instanceof ClientHandlerException)
|
||||
&& (e.getCause() instanceof ConnectException
|
||||
|| e.getCause() instanceof SocketTimeoutException
|
||||
|| e.getCause() instanceof SocketException);
|
||||
}
|
||||
};
|
||||
try {
|
||||
return (ClientResponse) connectionRetry.retryOn(jerseyRetryOp);
|
||||
} catch (IOException e) {
|
||||
throw new ClientHandlerException(
|
||||
"Jersey retry failed!\nMessage: " + e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public static class TimelineClientRetryOpForOperateDelegationToken
|
||||
extends TimelineClientRetryOp {
|
||||
|
||||
private final UserGroupInformation authUgi;
|
||||
private final PrivilegedExceptionAction<?> action;
|
||||
|
||||
public TimelineClientRetryOpForOperateDelegationToken(
|
||||
UserGroupInformation authUgi, PrivilegedExceptionAction<?> action) {
|
||||
this.authUgi = authUgi;
|
||||
this.action = action;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Object run() throws IOException {
|
||||
// Try pass the request, if fail, keep retrying
|
||||
authUgi.checkTGTAndReloginFromKeytab();
|
||||
try {
|
||||
return authUgi.doAs(action);
|
||||
} catch (UndeclaredThrowableException e) {
|
||||
throw new IOException(e.getCause());
|
||||
} catch (InterruptedException e) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shouldRetryOn(Exception e) {
|
||||
// retry on connection exceptions
|
||||
// and SocketTimeoutException
|
||||
return (e instanceof ConnectException
|
||||
|| e instanceof SocketTimeoutException);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,459 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.client.api.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.FutureTask;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import javax.ws.rs.core.MediaType;
|
||||
import javax.ws.rs.core.MultivaluedMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.delegation.web.DelegationTokenAuthenticatedURL;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
|
||||
import com.sun.jersey.api.client.ClientResponse;
|
||||
import com.sun.jersey.core.util.MultivaluedMapImpl;
|
||||
|
||||
/**
|
||||
* Implementation of timeline v2 client interface.
|
||||
*
|
||||
*/
|
||||
public class TimelineV2ClientImpl extends TimelineV2Client {
|
||||
private static final Log LOG = LogFactory.getLog(TimelineV2ClientImpl.class);
|
||||
|
||||
private static final String RESOURCE_URI_STR_V2 = "/ws/v2/timeline/";
|
||||
|
||||
private TimelineEntityDispatcher entityDispatcher;
|
||||
private volatile String timelineServiceAddress;
|
||||
|
||||
// Retry parameters for identifying new timeline service
|
||||
// TODO consider to merge with connection retry
|
||||
private int maxServiceRetries;
|
||||
private long serviceRetryInterval;
|
||||
|
||||
private TimelineConnector connector;
|
||||
|
||||
private ApplicationId contextAppId;
|
||||
|
||||
public TimelineV2ClientImpl(ApplicationId appId) {
|
||||
super(TimelineV2ClientImpl.class.getName());
|
||||
this.contextAppId = appId;
|
||||
}
|
||||
|
||||
public ApplicationId getContextAppId() {
|
||||
return contextAppId;
|
||||
}
|
||||
|
||||
protected void serviceInit(Configuration conf) throws Exception {
|
||||
if (!YarnConfiguration.timelineServiceEnabled(conf)
|
||||
|| (int) YarnConfiguration.getTimelineServiceVersion(conf) != 2) {
|
||||
throw new IOException("Timeline V2 client is not properly configured. "
|
||||
+ "Either timeline service is not enabled or version is not set to"
|
||||
+ " 2");
|
||||
}
|
||||
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
|
||||
UserGroupInformation realUgi = ugi.getRealUser();
|
||||
String doAsUser = null;
|
||||
UserGroupInformation authUgi = null;
|
||||
if (realUgi != null) {
|
||||
authUgi = realUgi;
|
||||
doAsUser = ugi.getShortUserName();
|
||||
} else {
|
||||
authUgi = ugi;
|
||||
doAsUser = null;
|
||||
}
|
||||
|
||||
// TODO need to add/cleanup filter retry later for ATSV2. similar to V1
|
||||
DelegationTokenAuthenticatedURL.Token token =
|
||||
new DelegationTokenAuthenticatedURL.Token();
|
||||
connector = new TimelineConnector(false, authUgi, doAsUser, token);
|
||||
addIfService(connector);
|
||||
|
||||
// new version need to auto discovery (with retry till ATS v2 address is
|
||||
// got).
|
||||
maxServiceRetries =
|
||||
conf.getInt(YarnConfiguration.TIMELINE_SERVICE_CLIENT_MAX_RETRIES,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_MAX_RETRIES);
|
||||
serviceRetryInterval = conf.getLong(
|
||||
YarnConfiguration.TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS,
|
||||
YarnConfiguration.DEFAULT_TIMELINE_SERVICE_CLIENT_RETRY_INTERVAL_MS);
|
||||
entityDispatcher = new TimelineEntityDispatcher(conf);
|
||||
super.serviceInit(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStart() throws Exception {
|
||||
super.serviceStart();
|
||||
entityDispatcher.start();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void serviceStop() throws Exception {
|
||||
entityDispatcher.stop();
|
||||
super.serviceStop();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putEntities(TimelineEntity... entities)
|
||||
throws IOException, YarnException {
|
||||
entityDispatcher.dispatchEntities(true, entities);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putEntitiesAsync(TimelineEntity... entities)
|
||||
throws IOException, YarnException {
|
||||
entityDispatcher.dispatchEntities(false, entities);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setTimelineServiceAddress(String address) {
|
||||
this.timelineServiceAddress = address;
|
||||
}
|
||||
|
||||
@Private
|
||||
protected void putObjects(String path, MultivaluedMap<String, String> params,
|
||||
Object obj) throws IOException, YarnException {
|
||||
|
||||
int retries = verifyRestEndPointAvailable();
|
||||
|
||||
// timelineServiceAddress could be stale, add retry logic here.
|
||||
boolean needRetry = true;
|
||||
while (needRetry) {
|
||||
try {
|
||||
URI uri = TimelineConnector.constructResURI(getConfig(),
|
||||
timelineServiceAddress, RESOURCE_URI_STR_V2);
|
||||
putObjects(uri, path, params, obj);
|
||||
needRetry = false;
|
||||
} catch (IOException e) {
|
||||
// handle exception for timelineServiceAddress being updated.
|
||||
checkRetryWithSleep(retries, e);
|
||||
retries--;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if reaching to maximum of retries.
|
||||
*
|
||||
* @param retries
|
||||
* @param e
|
||||
*/
|
||||
private void checkRetryWithSleep(int retries, IOException e)
|
||||
throws YarnException, IOException {
|
||||
if (retries > 0) {
|
||||
try {
|
||||
Thread.sleep(this.serviceRetryInterval);
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new YarnException("Interrupted while retrying to connect to ATS");
|
||||
}
|
||||
} else {
|
||||
StringBuilder msg =
|
||||
new StringBuilder("TimelineClient has reached to max retry times : ");
|
||||
msg.append(this.maxServiceRetries);
|
||||
msg.append(" for service address: ");
|
||||
msg.append(timelineServiceAddress);
|
||||
LOG.error(msg.toString());
|
||||
throw new IOException(msg.toString(), e);
|
||||
}
|
||||
}
|
||||
|
||||
protected void putObjects(URI base, String path,
|
||||
MultivaluedMap<String, String> params, Object obj)
|
||||
throws IOException, YarnException {
|
||||
ClientResponse resp;
|
||||
try {
|
||||
resp = connector.getClient().resource(base).path(path).queryParams(params)
|
||||
.accept(MediaType.APPLICATION_JSON).type(MediaType.APPLICATION_JSON)
|
||||
.put(ClientResponse.class, obj);
|
||||
} catch (RuntimeException re) {
|
||||
// runtime exception is expected if the client cannot connect the server
|
||||
String msg = "Failed to get the response from the timeline server.";
|
||||
LOG.error(msg, re);
|
||||
throw new IOException(re);
|
||||
}
|
||||
if (resp == null ||
|
||||
resp.getClientResponseStatus() != ClientResponse.Status.OK) {
|
||||
String msg =
|
||||
"Response from the timeline server is " + ((resp == null) ? "null"
|
||||
: "not successful," + " HTTP error code: " + resp.getStatus()
|
||||
+ ", Server response:\n" + resp.getEntity(String.class));
|
||||
LOG.error(msg);
|
||||
throw new YarnException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
private int verifyRestEndPointAvailable() throws YarnException {
|
||||
// timelineServiceAddress could haven't be initialized yet
|
||||
// or stale (only for new timeline service)
|
||||
int retries = pollTimelineServiceAddress(this.maxServiceRetries);
|
||||
if (timelineServiceAddress == null) {
|
||||
String errMessage = "TimelineClient has reached to max retry times : "
|
||||
+ this.maxServiceRetries
|
||||
+ ", but failed to fetch timeline service address. Please verify"
|
||||
+ " Timeline Auxiliary Service is configured in all the NMs";
|
||||
LOG.error(errMessage);
|
||||
throw new YarnException(errMessage);
|
||||
}
|
||||
return retries;
|
||||
}
|
||||
|
||||
/**
|
||||
* Poll TimelineServiceAddress for maximum of retries times if it is null.
|
||||
*
|
||||
* @param retries
|
||||
* @return the left retry times
|
||||
* @throws IOException
|
||||
*/
|
||||
private int pollTimelineServiceAddress(int retries) throws YarnException {
|
||||
while (timelineServiceAddress == null && retries > 0) {
|
||||
try {
|
||||
Thread.sleep(this.serviceRetryInterval);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new YarnException("Interrupted while trying to connect ATS");
|
||||
}
|
||||
retries--;
|
||||
}
|
||||
return retries;
|
||||
}
|
||||
|
||||
private final class EntitiesHolder extends FutureTask<Void> {
|
||||
private final TimelineEntities entities;
|
||||
private final boolean isSync;
|
||||
|
||||
EntitiesHolder(final TimelineEntities entities, final boolean isSync) {
|
||||
super(new Callable<Void>() {
|
||||
// publishEntities()
|
||||
public Void call() throws Exception {
|
||||
MultivaluedMap<String, String> params = new MultivaluedMapImpl();
|
||||
params.add("appid", getContextAppId().toString());
|
||||
params.add("async", Boolean.toString(!isSync));
|
||||
putObjects("entities", params, entities);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
this.entities = entities;
|
||||
this.isSync = isSync;
|
||||
}
|
||||
|
||||
public boolean isSync() {
|
||||
return isSync;
|
||||
}
|
||||
|
||||
public TimelineEntities getEntities() {
|
||||
return entities;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This class is responsible for collecting the timeline entities and
|
||||
* publishing them in async.
|
||||
*/
|
||||
private class TimelineEntityDispatcher {
|
||||
/**
|
||||
* Time period for which the timelineclient will wait for draining after
|
||||
* stop.
|
||||
*/
|
||||
private static final long DRAIN_TIME_PERIOD = 2000L;
|
||||
|
||||
private int numberOfAsyncsToMerge;
|
||||
private final BlockingQueue<EntitiesHolder> timelineEntityQueue;
|
||||
private ExecutorService executor;
|
||||
|
||||
TimelineEntityDispatcher(Configuration conf) {
|
||||
timelineEntityQueue = new LinkedBlockingQueue<EntitiesHolder>();
|
||||
numberOfAsyncsToMerge =
|
||||
conf.getInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE,
|
||||
YarnConfiguration.DEFAULT_NUMBER_OF_ASYNC_ENTITIES_TO_MERGE);
|
||||
}
|
||||
|
||||
Runnable createRunnable() {
|
||||
return new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
EntitiesHolder entitiesHolder;
|
||||
while (!Thread.currentThread().isInterrupted()) {
|
||||
// Merge all the async calls and make one push, but if its sync
|
||||
// call push immediately
|
||||
try {
|
||||
entitiesHolder = timelineEntityQueue.take();
|
||||
} catch (InterruptedException ie) {
|
||||
LOG.info("Timeline dispatcher thread was interrupted ");
|
||||
Thread.currentThread().interrupt();
|
||||
return;
|
||||
}
|
||||
if (entitiesHolder != null) {
|
||||
publishWithoutBlockingOnQueue(entitiesHolder);
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (!timelineEntityQueue.isEmpty()) {
|
||||
LOG.info("Yet to publish " + timelineEntityQueue.size()
|
||||
+ " timelineEntities, draining them now. ");
|
||||
}
|
||||
// Try to drain the remaining entities to be published @ the max for
|
||||
// 2 seconds
|
||||
long timeTillweDrain =
|
||||
System.currentTimeMillis() + DRAIN_TIME_PERIOD;
|
||||
while (!timelineEntityQueue.isEmpty()) {
|
||||
publishWithoutBlockingOnQueue(timelineEntityQueue.poll());
|
||||
if (System.currentTimeMillis() > timeTillweDrain) {
|
||||
// time elapsed stop publishing further....
|
||||
if (!timelineEntityQueue.isEmpty()) {
|
||||
LOG.warn("Time to drain elapsed! Remaining "
|
||||
+ timelineEntityQueue.size() + "timelineEntities will not"
|
||||
+ " be published");
|
||||
// if some entities were not drained then we need interrupt
|
||||
// the threads which had put sync EntityHolders to the queue.
|
||||
EntitiesHolder nextEntityInTheQueue = null;
|
||||
while ((nextEntityInTheQueue =
|
||||
timelineEntityQueue.poll()) != null) {
|
||||
nextEntityInTheQueue.cancel(true);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Publishes the given EntitiesHolder and return immediately if sync
|
||||
* call, else tries to fetch the EntitiesHolder from the queue in non
|
||||
* blocking fashion and collate the Entities if possible before
|
||||
* publishing through REST.
|
||||
*
|
||||
* @param entitiesHolder
|
||||
*/
|
||||
private void publishWithoutBlockingOnQueue(
|
||||
EntitiesHolder entitiesHolder) {
|
||||
if (entitiesHolder.isSync()) {
|
||||
entitiesHolder.run();
|
||||
return;
|
||||
}
|
||||
int count = 1;
|
||||
while (true) {
|
||||
// loop till we find a sync put Entities or there is nothing
|
||||
// to take
|
||||
EntitiesHolder nextEntityInTheQueue = timelineEntityQueue.poll();
|
||||
if (nextEntityInTheQueue == null) {
|
||||
// Nothing in the queue just publish and get back to the
|
||||
// blocked wait state
|
||||
entitiesHolder.run();
|
||||
break;
|
||||
} else if (nextEntityInTheQueue.isSync()) {
|
||||
// flush all the prev async entities first
|
||||
entitiesHolder.run();
|
||||
// and then flush the sync entity
|
||||
nextEntityInTheQueue.run();
|
||||
break;
|
||||
} else {
|
||||
// append all async entities together and then flush
|
||||
entitiesHolder.getEntities().addEntities(
|
||||
nextEntityInTheQueue.getEntities().getEntities());
|
||||
count++;
|
||||
if (count == numberOfAsyncsToMerge) {
|
||||
// Flush the entities if the number of the async
|
||||
// putEntites merged reaches the desired limit. To avoid
|
||||
// collecting multiple entities and delaying for a long
|
||||
// time.
|
||||
entitiesHolder.run();
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
public void dispatchEntities(boolean sync,
|
||||
TimelineEntity[] entitiesTobePublished) throws YarnException {
|
||||
if (executor.isShutdown()) {
|
||||
throw new YarnException("Timeline client is in the process of stopping,"
|
||||
+ " not accepting any more TimelineEntities");
|
||||
}
|
||||
|
||||
// wrap all TimelineEntity into TimelineEntities object
|
||||
TimelineEntities entities = new TimelineEntities();
|
||||
for (TimelineEntity entity : entitiesTobePublished) {
|
||||
entities.addEntity(entity);
|
||||
}
|
||||
|
||||
// created a holder and place it in queue
|
||||
EntitiesHolder entitiesHolder = new EntitiesHolder(entities, sync);
|
||||
try {
|
||||
timelineEntityQueue.put(entitiesHolder);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new YarnException(
|
||||
"Failed while adding entity to the queue for publishing", e);
|
||||
}
|
||||
|
||||
if (sync) {
|
||||
// In sync call we need to wait till its published and if any error then
|
||||
// throw it back
|
||||
try {
|
||||
entitiesHolder.get();
|
||||
} catch (ExecutionException e) {
|
||||
throw new YarnException("Failed while publishing entity",
|
||||
e.getCause());
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new YarnException("Interrupted while publishing entity", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void start() {
|
||||
executor = Executors.newSingleThreadExecutor();
|
||||
executor.execute(createRunnable());
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
LOG.info("Stopping TimelineClient.");
|
||||
executor.shutdownNow();
|
||||
try {
|
||||
executor.awaitTermination(DRAIN_TIME_PERIOD, TimeUnit.MILLISECONDS);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -215,11 +215,11 @@ public class TestTimelineClient {
|
|||
+ "Timeline server should be off to run this test. ");
|
||||
} catch (RuntimeException ce) {
|
||||
Assert.assertTrue(
|
||||
"Handler exception for reason other than retry: " + ce.getMessage(),
|
||||
ce.getMessage().contains("Connection retries limit exceeded"));
|
||||
"Handler exception for reason other than retry: " + ce.getMessage(),
|
||||
ce.getMessage().contains("Connection retries limit exceeded"));
|
||||
// we would expect this exception here, check if the client has retried
|
||||
Assert.assertTrue("Retry filter didn't perform any retries! ", client
|
||||
.connectionRetry.getRetired());
|
||||
Assert.assertTrue("Retry filter didn't perform any retries! ",
|
||||
client.connector.connectionRetry.getRetired());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -318,7 +318,7 @@ public class TestTimelineClient {
|
|||
.getMessage().contains("Connection retries limit exceeded"));
|
||||
// we would expect this exception here, check if the client has retried
|
||||
Assert.assertTrue("Retry filter didn't perform any retries! ",
|
||||
client.connectionRetry.getRetired());
|
||||
client.connector.connectionRetry.getRetired());
|
||||
}
|
||||
|
||||
public static ClientResponse mockEntityClientResponse(
|
||||
|
@ -419,17 +419,26 @@ public class TestTimelineClient {
|
|||
private TimelineClientImpl createTimelineClientFakeTimelineClientRetryOp(
|
||||
YarnConfiguration conf) {
|
||||
TimelineClientImpl client = new TimelineClientImpl() {
|
||||
|
||||
@Override
|
||||
public TimelineClientRetryOp
|
||||
createTimelineClientRetryOpForOperateDelegationToken(
|
||||
final PrivilegedExceptionAction<?> action) throws IOException {
|
||||
TimelineClientRetryOpForOperateDelegationToken op =
|
||||
spy(new TimelineClientRetryOpForOperateDelegationToken(
|
||||
UserGroupInformation.getCurrentUser(), action));
|
||||
doThrow(new SocketTimeoutException("Test socketTimeoutException"))
|
||||
.when(op).run();
|
||||
return op;
|
||||
protected TimelineConnector createTimelineConnector() {
|
||||
TimelineConnector connector =
|
||||
new TimelineConnector(true, authUgi, doAsUser, token) {
|
||||
@Override
|
||||
public TimelineClientRetryOp
|
||||
createRetryOpForOperateDelegationToken(
|
||||
final PrivilegedExceptionAction<?> action)
|
||||
throws IOException {
|
||||
TimelineClientRetryOpForOperateDelegationToken op =
|
||||
spy(new TimelineClientRetryOpForOperateDelegationToken(
|
||||
UserGroupInformation.getCurrentUser(), action));
|
||||
doThrow(
|
||||
new SocketTimeoutException("Test socketTimeoutException"))
|
||||
.when(op).run();
|
||||
return op;
|
||||
}
|
||||
};
|
||||
addIfService(connector);
|
||||
return connector;
|
||||
}
|
||||
};
|
||||
client.init(conf);
|
||||
|
|
|
@ -50,7 +50,7 @@ public class TestTimelineClientV2Impl {
|
|||
public void setup() {
|
||||
conf = new YarnConfiguration();
|
||||
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.0f);
|
||||
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||
conf.setInt(YarnConfiguration.NUMBER_OF_ASYNC_ENTITIES_TO_MERGE, 3);
|
||||
if (!currTestName.getMethodName()
|
||||
.contains("testRetryOnConnectionFailure")) {
|
||||
|
@ -71,7 +71,7 @@ public class TestTimelineClientV2Impl {
|
|||
}
|
||||
|
||||
private class TestV2TimelineClientForExceptionHandling
|
||||
extends TimelineClientImpl {
|
||||
extends TimelineV2ClientImpl {
|
||||
public TestV2TimelineClientForExceptionHandling(ApplicationId id) {
|
||||
super(id);
|
||||
}
|
||||
|
|
|
@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
|||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEvent;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetricOperation;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||
import org.apache.hadoop.yarn.event.AsyncDispatcher;
|
||||
import org.apache.hadoop.yarn.event.Dispatcher;
|
||||
import org.apache.hadoop.yarn.event.EventHandler;
|
||||
|
@ -77,7 +77,7 @@ public class NMTimelinePublisher extends CompositeService {
|
|||
|
||||
private String httpAddress;
|
||||
|
||||
private final Map<ApplicationId, TimelineClient> appToClientMap;
|
||||
private final Map<ApplicationId, TimelineV2Client> appToClientMap;
|
||||
|
||||
public NMTimelinePublisher(Context context) {
|
||||
super(NMTimelinePublisher.class.getName());
|
||||
|
@ -103,7 +103,7 @@ public class NMTimelinePublisher extends CompositeService {
|
|||
}
|
||||
|
||||
@VisibleForTesting
|
||||
Map<ApplicationId, TimelineClient> getAppToClientMap() {
|
||||
Map<ApplicationId, TimelineV2Client> getAppToClientMap() {
|
||||
return appToClientMap;
|
||||
}
|
||||
|
||||
|
@ -148,7 +148,7 @@ public class NMTimelinePublisher extends CompositeService {
|
|||
try {
|
||||
// no need to put it as part of publisher as timeline client already has
|
||||
// Queuing concept
|
||||
TimelineClient timelineClient = getTimelineClient(appId);
|
||||
TimelineV2Client timelineClient = getTimelineClient(appId);
|
||||
if (timelineClient != null) {
|
||||
timelineClient.putEntitiesAsync(entity);
|
||||
} else {
|
||||
|
@ -242,7 +242,7 @@ public class NMTimelinePublisher extends CompositeService {
|
|||
try {
|
||||
// no need to put it as part of publisher as timeline client already has
|
||||
// Queuing concept
|
||||
TimelineClient timelineClient = getTimelineClient(appId);
|
||||
TimelineV2Client timelineClient = getTimelineClient(appId);
|
||||
if (timelineClient != null) {
|
||||
timelineClient.putEntitiesAsync(entity);
|
||||
} else {
|
||||
|
@ -273,7 +273,7 @@ public class NMTimelinePublisher extends CompositeService {
|
|||
LOG.debug("Publishing the entity " + entity + ", JSON-style content: "
|
||||
+ TimelineUtils.dumpTimelineRecordtoJSON(entity));
|
||||
}
|
||||
TimelineClient timelineClient = getTimelineClient(appId);
|
||||
TimelineV2Client timelineClient = getTimelineClient(appId);
|
||||
if (timelineClient != null) {
|
||||
timelineClient.putEntities(entity);
|
||||
} else {
|
||||
|
@ -390,8 +390,8 @@ public class NMTimelinePublisher extends CompositeService {
|
|||
|
||||
public void createTimelineClient(ApplicationId appId) {
|
||||
if (!appToClientMap.containsKey(appId)) {
|
||||
TimelineClient timelineClient =
|
||||
TimelineClient.createTimelineClient(appId);
|
||||
TimelineV2Client timelineClient =
|
||||
TimelineV2Client.createTimelineClient(appId);
|
||||
timelineClient.init(getConfig());
|
||||
timelineClient.start();
|
||||
appToClientMap.put(appId, timelineClient);
|
||||
|
@ -399,7 +399,7 @@ public class NMTimelinePublisher extends CompositeService {
|
|||
}
|
||||
|
||||
public void stopTimelineClient(ApplicationId appId) {
|
||||
TimelineClient client = appToClientMap.remove(appId);
|
||||
TimelineV2Client client = appToClientMap.remove(appId);
|
||||
if (client != null) {
|
||||
client.stop();
|
||||
}
|
||||
|
@ -407,13 +407,13 @@ public class NMTimelinePublisher extends CompositeService {
|
|||
|
||||
public void setTimelineServiceAddress(ApplicationId appId,
|
||||
String collectorAddr) {
|
||||
TimelineClient client = appToClientMap.get(appId);
|
||||
TimelineV2Client client = appToClientMap.get(appId);
|
||||
if (client != null) {
|
||||
client.setTimelineServiceAddress(collectorAddr);
|
||||
}
|
||||
}
|
||||
|
||||
private TimelineClient getTimelineClient(ApplicationId appId) {
|
||||
private TimelineV2Client getTimelineClient(ApplicationId appId) {
|
||||
return appToClientMap.get(appId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
|
|||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.client.api.impl.TimelineClientImpl;
|
||||
import org.apache.hadoop.yarn.client.api.impl.TimelineV2ClientImpl;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
|
||||
|
@ -50,7 +50,7 @@ public class TestNMTimelinePublisher {
|
|||
public void testContainerResourceUsage() {
|
||||
Context context = mock(Context.class);
|
||||
@SuppressWarnings("unchecked")
|
||||
final DummyTimelineClient timelineClient = new DummyTimelineClient();
|
||||
final DummyTimelineClient timelineClient = new DummyTimelineClient(null);
|
||||
when(context.getNodeId()).thenReturn(NodeId.newInstance("localhost", 0));
|
||||
when(context.getHttpPort()).thenReturn(0);
|
||||
NMTimelinePublisher publisher = new NMTimelinePublisher(context) {
|
||||
|
@ -137,7 +137,11 @@ public class TestNMTimelinePublisher {
|
|||
}
|
||||
}
|
||||
|
||||
protected static class DummyTimelineClient extends TimelineClientImpl {
|
||||
protected static class DummyTimelineClient extends TimelineV2ClientImpl {
|
||||
public DummyTimelineClient(ApplicationId appId) {
|
||||
super(appId);
|
||||
}
|
||||
|
||||
private TimelineEntity[] lastPublishedEntities;
|
||||
|
||||
@Override
|
||||
|
|
|
@ -43,7 +43,7 @@ import org.apache.hadoop.yarn.api.records.timelineservice.QueueEntity;
|
|||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||
import org.apache.hadoop.yarn.api.records.timelineservice.UserEntity;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineClient;
|
||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
|
||||
|
@ -96,8 +96,8 @@ public class TestTimelineServiceClientIntegration {
|
|||
|
||||
@Test
|
||||
public void testPutEntities() throws Exception {
|
||||
TimelineClient client =
|
||||
TimelineClient.createTimelineClient(ApplicationId.newInstance(0, 1));
|
||||
TimelineV2Client client =
|
||||
TimelineV2Client.createTimelineClient(ApplicationId.newInstance(0, 1));
|
||||
try {
|
||||
// set the timeline service address manually
|
||||
client.setTimelineServiceAddress(
|
||||
|
@ -123,8 +123,8 @@ public class TestTimelineServiceClientIntegration {
|
|||
@Test
|
||||
public void testPutExtendedEntities() throws Exception {
|
||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||
TimelineClient client =
|
||||
TimelineClient.createTimelineClient(appId);
|
||||
TimelineV2Client client =
|
||||
TimelineV2Client.createTimelineClient(appId);
|
||||
try {
|
||||
// set the timeline service address manually
|
||||
client.setTimelineServiceAddress(
|
||||
|
|
Loading…
Reference in New Issue