YARN-955. Implemented ApplicationHistoryProtocol handler. Contributed by Mayank Bansal.
svn merge --ignore-ancestry -c 1556743 ../YARN-321 git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1562198 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7440b97256
commit
3d227e9544
|
@ -499,6 +499,9 @@ Branch YARN-321: Generic ApplicationHistoryService
|
||||||
YARN-1266. Implemented PB service and client wrappers for
|
YARN-1266. Implemented PB service and client wrappers for
|
||||||
ApplicationHistoryProtocol. (Mayank Bansal via vinodkv)
|
ApplicationHistoryProtocol. (Mayank Bansal via vinodkv)
|
||||||
|
|
||||||
|
YARN-955. Implemented ApplicationHistoryProtocol handler. (Mayank Bansal via
|
||||||
|
vinodkv)
|
||||||
|
|
||||||
Release 2.2.0 - 2013-10-13
|
Release 2.2.0 - 2013-10-13
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -962,6 +962,17 @@ public class YarnConfiguration extends Configuration {
|
||||||
|
|
||||||
/** AHS store class */
|
/** AHS store class */
|
||||||
public static final String AHS_STORE = AHS_PREFIX + "store.class";
|
public static final String AHS_STORE = AHS_PREFIX + "store.class";
|
||||||
|
|
||||||
|
/** host:port address for Application History Server API. */
|
||||||
|
public static final String AHS_ADDRESS = AHS_PREFIX + "address";
|
||||||
|
public static final int DEFAULT_AHS_PORT = 10200;
|
||||||
|
public static final String DEFAULT_AHS_ADDRESS = "0.0.0.0:"
|
||||||
|
+ DEFAULT_AHS_PORT;
|
||||||
|
|
||||||
|
/** The number of threads to handle client API requests. */
|
||||||
|
public static final String AHS_CLIENT_THREAD_COUNT = AHS_PREFIX
|
||||||
|
+ "client.thread-count";
|
||||||
|
public static final int DEFAULT_AHS_CLIENT_THREAD_COUNT = 10;
|
||||||
|
|
||||||
////////////////////////////////
|
////////////////////////////////
|
||||||
// Other Configs
|
// Other Configs
|
||||||
|
|
|
@ -1073,6 +1073,19 @@
|
||||||
<value>${hadoop.log.dir}/yarn/system/ahstore</value>
|
<value>${hadoop.log.dir}/yarn/system/ahstore</value>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>This is default address for the Application History server
|
||||||
|
to start the RPC server.</description>
|
||||||
|
<name>yarn.ahs.address</name>
|
||||||
|
<value>0.0.0.0:10200</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<description>CLient thread count to serve the client requests.</description>
|
||||||
|
<name>yarn.ahs.client.thread-count</name>
|
||||||
|
<value>10</value>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<description>T-file compression types used to compress history data.</description>
|
<description>T-file compression types used to compress history data.</description>
|
||||||
<name>yarn.ahs.fs-history-store.compression-type</name>
|
<name>yarn.ahs.fs-history-store.compression-type</name>
|
||||||
|
|
|
@ -1,115 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.applicationhistoryservice;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
import org.apache.hadoop.service.AbstractService;
|
|
||||||
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
|
||||||
|
|
||||||
public class AHSClientService extends AbstractService implements
|
|
||||||
ApplicationHistoryProtocol {
|
|
||||||
|
|
||||||
public AHSClientService() {
|
|
||||||
super(AHSClientService.class.getName());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetApplicationReportResponse getApplicationReport(
|
|
||||||
GetApplicationReportRequest request) throws YarnException {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetApplicationsResponse
|
|
||||||
getApplications(GetApplicationsRequest request) throws YarnException {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetDelegationTokenResponse getDelegationToken(
|
|
||||||
GetDelegationTokenRequest request) throws YarnException {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public RenewDelegationTokenResponse renewDelegationToken(
|
|
||||||
RenewDelegationTokenRequest request) throws YarnException {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public CancelDelegationTokenResponse cancelDelegationToken(
|
|
||||||
CancelDelegationTokenRequest request) throws YarnException {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
|
||||||
GetApplicationAttemptReportRequest request) throws YarnException,
|
|
||||||
IOException {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetApplicationAttemptsResponse getApplicationAttempts(
|
|
||||||
GetApplicationAttemptsRequest request) throws YarnException, IOException {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetContainerReportResponse getContainerReport(
|
|
||||||
GetContainerReportRequest request) throws YarnException, IOException {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public GetContainersResponse getContainers(GetContainersRequest request)
|
|
||||||
throws YarnException, IOException {
|
|
||||||
// TODO Auto-generated method stub
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,190 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.applicationhistoryservice;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.ipc.Server;
|
||||||
|
import org.apache.hadoop.service.AbstractService;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationHistoryProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.CancelDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.RenewDelegationTokenResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
|
||||||
|
public class ApplicationHistoryClientService extends AbstractService {
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(ApplicationHistoryClientService.class);
|
||||||
|
private ApplicationHistoryManager history;
|
||||||
|
private ApplicationHistoryProtocol protocolHandler;
|
||||||
|
private Server server;
|
||||||
|
private InetSocketAddress bindAddress;
|
||||||
|
|
||||||
|
public ApplicationHistoryClientService(ApplicationHistoryManager history) {
|
||||||
|
super("ApplicationHistoryClientService");
|
||||||
|
this.history = history;
|
||||||
|
this.protocolHandler = new ApplicationHSClientProtocolHandler();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
Configuration conf = getConfig();
|
||||||
|
YarnRPC rpc = YarnRPC.create(conf);
|
||||||
|
InetSocketAddress address = conf.getSocketAddr(
|
||||||
|
YarnConfiguration.AHS_ADDRESS, YarnConfiguration.DEFAULT_AHS_ADDRESS,
|
||||||
|
YarnConfiguration.DEFAULT_AHS_PORT);
|
||||||
|
|
||||||
|
server = rpc.getServer(ApplicationHistoryProtocol.class, protocolHandler,
|
||||||
|
address, conf, null, conf.getInt(
|
||||||
|
YarnConfiguration.AHS_CLIENT_THREAD_COUNT,
|
||||||
|
YarnConfiguration.DEFAULT_AHS_CLIENT_THREAD_COUNT));
|
||||||
|
|
||||||
|
server.start();
|
||||||
|
this.bindAddress = conf.updateConnectAddr(YarnConfiguration.AHS_ADDRESS,
|
||||||
|
server.getListenerAddress());
|
||||||
|
LOG.info("Instantiated ApplicationHistoryClientService at "
|
||||||
|
+ this.bindAddress);
|
||||||
|
|
||||||
|
super.serviceStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
if (server != null) {
|
||||||
|
server.stop();
|
||||||
|
}
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public ApplicationHistoryProtocol getClientHandler() {
|
||||||
|
return this.protocolHandler;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
public InetSocketAddress getBindAddress() {
|
||||||
|
return this.bindAddress;
|
||||||
|
}
|
||||||
|
|
||||||
|
private class ApplicationHSClientProtocolHandler implements
|
||||||
|
ApplicationHistoryProtocol {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CancelDelegationTokenResponse cancelDelegationToken(
|
||||||
|
CancelDelegationTokenRequest request) throws YarnException, IOException {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetApplicationAttemptReportResponse getApplicationAttemptReport(
|
||||||
|
GetApplicationAttemptReportRequest request) throws YarnException,
|
||||||
|
IOException {
|
||||||
|
GetApplicationAttemptReportResponse response = GetApplicationAttemptReportResponse
|
||||||
|
.newInstance(history.getApplicationAttempt(request
|
||||||
|
.getApplicationAttemptId()));
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetApplicationAttemptsResponse getApplicationAttempts(
|
||||||
|
GetApplicationAttemptsRequest request) throws YarnException,
|
||||||
|
IOException {
|
||||||
|
GetApplicationAttemptsResponse response = GetApplicationAttemptsResponse
|
||||||
|
.newInstance(new ArrayList<ApplicationAttemptReport>(history
|
||||||
|
.getApplicationAttempts(request.getApplicationId()).values()));
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetApplicationReportResponse getApplicationReport(
|
||||||
|
GetApplicationReportRequest request) throws YarnException, IOException {
|
||||||
|
ApplicationId applicationId = request.getApplicationId();
|
||||||
|
GetApplicationReportResponse response = GetApplicationReportResponse
|
||||||
|
.newInstance(history.getApplication(applicationId));
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetApplicationsResponse getApplications(
|
||||||
|
GetApplicationsRequest request) throws YarnException, IOException {
|
||||||
|
GetApplicationsResponse response = GetApplicationsResponse
|
||||||
|
.newInstance(new ArrayList<ApplicationReport>(history
|
||||||
|
.getAllApplications().values()));
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetContainerReportResponse getContainerReport(
|
||||||
|
GetContainerReportRequest request) throws YarnException, IOException {
|
||||||
|
GetContainerReportResponse response = GetContainerReportResponse
|
||||||
|
.newInstance(history.getContainer(request.getContainerId()));
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetContainersResponse getContainers(GetContainersRequest request)
|
||||||
|
throws YarnException, IOException {
|
||||||
|
GetContainersResponse response = GetContainersResponse
|
||||||
|
.newInstance(new ArrayList<ContainerReport>(history.getContainers(
|
||||||
|
request.getApplicationAttemptId()).values()));
|
||||||
|
return response;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public GetDelegationTokenResponse getDelegationToken(
|
||||||
|
GetDelegationTokenRequest request) throws YarnException, IOException {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RenewDelegationTokenResponse renewDelegationToken(
|
||||||
|
RenewDelegationTokenRequest request) throws YarnException, IOException {
|
||||||
|
// TODO Auto-generated method stub
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,25 +1,38 @@
|
||||||
/**
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
* to you under the Apache License, Version 2.0 (the
|
* to you under the Apache License, Version 2.0 (the
|
||||||
* "License"); you may not use this file except in compliance
|
* "License"); you may not use this file except in compliance
|
||||||
* with the License. You may obtain a copy of the License at
|
* with the License. You may obtain a copy of the License at
|
||||||
*
|
*
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
*
|
*
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
* See the License for the specific language governing permissions and
|
* See the License for the specific language governing permissions and
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.applicationhistoryservice;
|
package org.apache.hadoop.yarn.server.applicationhistoryservice;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
|
import org.apache.hadoop.metrics2.source.JvmMetrics;
|
||||||
import org.apache.hadoop.service.CompositeService;
|
import org.apache.hadoop.service.CompositeService;
|
||||||
|
import org.apache.hadoop.service.Service;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
|
import org.apache.hadoop.util.StringUtils;
|
||||||
|
import org.apache.hadoop.yarn.YarnUncaughtExceptionHandler;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* History server that keeps track of all types of history in the cluster.
|
* History server that keeps track of all types of history in the cluster.
|
||||||
|
@ -27,16 +40,82 @@ import org.apache.hadoop.service.CompositeService;
|
||||||
*/
|
*/
|
||||||
public class ApplicationHistoryServer extends CompositeService {
|
public class ApplicationHistoryServer extends CompositeService {
|
||||||
|
|
||||||
|
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
|
||||||
|
private static final Log LOG = LogFactory
|
||||||
|
.getLog(ApplicationHistoryServer.class);
|
||||||
|
|
||||||
|
ApplicationHistoryClientService ahsClientService;
|
||||||
|
ApplicationHistoryManager historyManager;
|
||||||
|
|
||||||
public ApplicationHistoryServer() {
|
public ApplicationHistoryServer() {
|
||||||
super(ApplicationHistoryServer.class.getName());
|
super(ApplicationHistoryServer.class.getName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
super.serviceInit(conf);
|
historyManager = createApplicationHistory();
|
||||||
AHSClientService ahsClientService = new AHSClientService();
|
ahsClientService = createApplicationHistoryClientService(historyManager);
|
||||||
addService(ahsClientService);
|
addService(ahsClientService);
|
||||||
|
addService((Service) historyManager);
|
||||||
AHSWebServer webServer = new AHSWebServer();
|
AHSWebServer webServer = new AHSWebServer();
|
||||||
addService(webServer);
|
addService(webServer);
|
||||||
|
super.serviceInit(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStart() throws Exception {
|
||||||
|
DefaultMetricsSystem.initialize("ApplicationHistoryServer");
|
||||||
|
JvmMetrics.initSingleton("ApplicationHistoryServer", null);
|
||||||
|
super.serviceStart();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void serviceStop() throws Exception {
|
||||||
|
DefaultMetricsSystem.shutdown();
|
||||||
|
super.serviceStop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Private
|
||||||
|
@VisibleForTesting
|
||||||
|
public ApplicationHistoryClientService getClientService() {
|
||||||
|
return this.ahsClientService;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ApplicationHistoryClientService createApplicationHistoryClientService(
|
||||||
|
ApplicationHistoryManager historyManager) {
|
||||||
|
return new ApplicationHistoryClientService(historyManager);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ApplicationHistoryManager createApplicationHistory() {
|
||||||
|
return new ApplicationHistoryManagerImpl();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ApplicationHistoryManager getApplicationHistory() {
|
||||||
|
return this.historyManager;
|
||||||
|
}
|
||||||
|
|
||||||
|
static ApplicationHistoryServer launchAppHistoryServer(String[] args) {
|
||||||
|
Thread
|
||||||
|
.setDefaultUncaughtExceptionHandler(new YarnUncaughtExceptionHandler());
|
||||||
|
StringUtils.startupShutdownMessage(ApplicationHistoryServer.class, args,
|
||||||
|
LOG);
|
||||||
|
ApplicationHistoryServer appHistoryServer = null;
|
||||||
|
try {
|
||||||
|
appHistoryServer = new ApplicationHistoryServer();
|
||||||
|
ShutdownHookManager.get().addShutdownHook(
|
||||||
|
new CompositeServiceShutdownHook(appHistoryServer),
|
||||||
|
SHUTDOWN_HOOK_PRIORITY);
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
appHistoryServer.init(conf);
|
||||||
|
appHistoryServer.start();
|
||||||
|
} catch (Throwable t) {
|
||||||
|
LOG.fatal("Error starting ApplicationHistoryServer", t);
|
||||||
|
ExitUtil.terminate(-1, "Error starting ApplicationHistoryServer");
|
||||||
|
}
|
||||||
|
return appHistoryServer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
launchAppHistoryServer(args);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,189 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.applicationhistoryservice;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import junit.framework.Assert;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptReportResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationAttemptsResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainerReportResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.GetContainersResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerReport;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestApplicationHistoryClientService extends
|
||||||
|
ApplicationHistoryStoreTestUtils {
|
||||||
|
|
||||||
|
ApplicationHistoryServer historyServer = null;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() {
|
||||||
|
historyServer = new ApplicationHistoryServer();
|
||||||
|
Configuration config = new YarnConfiguration();
|
||||||
|
config.setClass(YarnConfiguration.AHS_STORE,
|
||||||
|
MemoryApplicationHistoryStore.class, ApplicationHistoryStore.class);
|
||||||
|
historyServer.init(config);
|
||||||
|
historyServer.start();
|
||||||
|
store = ((ApplicationHistoryManagerImpl) historyServer
|
||||||
|
.getApplicationHistory()).getHistoryStore();
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
historyServer.stop();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplicationReport() throws IOException, YarnException {
|
||||||
|
ApplicationId appId = null;
|
||||||
|
appId = ApplicationId.newInstance(0, 1);
|
||||||
|
writeApplicationStartData(appId);
|
||||||
|
writeApplicationFinishData(appId);
|
||||||
|
GetApplicationReportRequest request = GetApplicationReportRequest
|
||||||
|
.newInstance(appId);
|
||||||
|
GetApplicationReportResponse response = historyServer.getClientService()
|
||||||
|
.getClientHandler().getApplicationReport(request);
|
||||||
|
ApplicationReport appReport = response.getApplicationReport();
|
||||||
|
Assert.assertNotNull(appReport);
|
||||||
|
Assert.assertEquals("application_0_0001", appReport.getApplicationId()
|
||||||
|
.toString());
|
||||||
|
Assert.assertEquals("test type", appReport.getApplicationType().toString());
|
||||||
|
Assert.assertEquals("test queue", appReport.getQueue().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplications() throws IOException, YarnException {
|
||||||
|
ApplicationId appId = null;
|
||||||
|
appId = ApplicationId.newInstance(0, 1);
|
||||||
|
writeApplicationStartData(appId);
|
||||||
|
writeApplicationFinishData(appId);
|
||||||
|
ApplicationId appId1 = ApplicationId.newInstance(0, 2);
|
||||||
|
writeApplicationStartData(appId1);
|
||||||
|
writeApplicationFinishData(appId1);
|
||||||
|
GetApplicationsRequest request = GetApplicationsRequest.newInstance();
|
||||||
|
GetApplicationsResponse response = historyServer.getClientService()
|
||||||
|
.getClientHandler().getApplications(request);
|
||||||
|
List<ApplicationReport> appReport = response.getApplicationList();
|
||||||
|
Assert.assertNotNull(appReport);
|
||||||
|
Assert.assertEquals(appId, appReport.get(0).getApplicationId());
|
||||||
|
Assert.assertEquals(appId1, appReport.get(1).getApplicationId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplicationAttemptReport() throws IOException, YarnException {
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
|
||||||
|
1);
|
||||||
|
writeApplicationAttemptStartData(appAttemptId);
|
||||||
|
writeApplicationAttemptFinishData(appAttemptId);
|
||||||
|
GetApplicationAttemptReportRequest request = GetApplicationAttemptReportRequest
|
||||||
|
.newInstance(appAttemptId);
|
||||||
|
GetApplicationAttemptReportResponse response = historyServer
|
||||||
|
.getClientService().getClientHandler().getApplicationAttemptReport(
|
||||||
|
request);
|
||||||
|
ApplicationAttemptReport attemptReport = response
|
||||||
|
.getApplicationAttemptReport();
|
||||||
|
Assert.assertNotNull(attemptReport);
|
||||||
|
Assert.assertEquals("appattempt_0_0001_000001", attemptReport
|
||||||
|
.getApplicationAttemptId().toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testApplicationAttempts() throws IOException, YarnException {
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
|
||||||
|
1);
|
||||||
|
ApplicationAttemptId appAttemptId1 = ApplicationAttemptId.newInstance(
|
||||||
|
appId, 2);
|
||||||
|
writeApplicationAttemptStartData(appAttemptId);
|
||||||
|
writeApplicationAttemptFinishData(appAttemptId);
|
||||||
|
writeApplicationAttemptStartData(appAttemptId1);
|
||||||
|
writeApplicationAttemptFinishData(appAttemptId1);
|
||||||
|
GetApplicationAttemptsRequest request = GetApplicationAttemptsRequest
|
||||||
|
.newInstance(appId);
|
||||||
|
GetApplicationAttemptsResponse response = historyServer.getClientService()
|
||||||
|
.getClientHandler().getApplicationAttempts(request);
|
||||||
|
List<ApplicationAttemptReport> attemptReports = response
|
||||||
|
.getApplicationAttemptList();
|
||||||
|
Assert.assertNotNull(attemptReports);
|
||||||
|
Assert.assertEquals(appAttemptId, attemptReports.get(0)
|
||||||
|
.getApplicationAttemptId());
|
||||||
|
Assert.assertEquals(appAttemptId1, attemptReports.get(1)
|
||||||
|
.getApplicationAttemptId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainerReport() throws IOException, YarnException {
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
|
||||||
|
1);
|
||||||
|
ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
|
||||||
|
writeContainerStartData(containerId);
|
||||||
|
writeContainerFinishData(containerId);
|
||||||
|
GetContainerReportRequest request = GetContainerReportRequest
|
||||||
|
.newInstance(containerId);
|
||||||
|
GetContainerReportResponse response = historyServer.getClientService()
|
||||||
|
.getClientHandler().getContainerReport(request);
|
||||||
|
ContainerReport container = response.getContainerReport();
|
||||||
|
Assert.assertNotNull(container);
|
||||||
|
Assert.assertEquals(containerId, container.getContainerId());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testContainers() throws IOException, YarnException {
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId,
|
||||||
|
1);
|
||||||
|
ContainerId containerId = ContainerId.newInstance(appAttemptId, 1);
|
||||||
|
ContainerId containerId1 = ContainerId.newInstance(appAttemptId, 2);
|
||||||
|
writeContainerStartData(containerId);
|
||||||
|
writeContainerFinishData(containerId);
|
||||||
|
writeContainerStartData(containerId1);
|
||||||
|
writeContainerFinishData(containerId1);
|
||||||
|
GetContainersRequest request = GetContainersRequest
|
||||||
|
.newInstance(appAttemptId);
|
||||||
|
GetContainersResponse response = historyServer.getClientService()
|
||||||
|
.getClientHandler().getContainers(request);
|
||||||
|
List<ContainerReport> containers = response.getContainerList();
|
||||||
|
Assert.assertNotNull(containers);
|
||||||
|
Assert.assertEquals(containerId, containers.get(1).getContainerId());
|
||||||
|
Assert.assertEquals(containerId1, containers.get(0).getContainerId());
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,77 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.applicationhistoryservice;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.service.Service.STATE;
|
||||||
|
import org.apache.hadoop.util.ExitUtil;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestApplicationHistoryServer {
|
||||||
|
|
||||||
|
ApplicationHistoryServer historyServer = null;
|
||||||
|
|
||||||
|
// simple test init/start/stop ApplicationHistoryServer. Status should change.
|
||||||
|
@Test(timeout = 50000)
|
||||||
|
public void testStartStopServer() throws Exception {
|
||||||
|
historyServer = new ApplicationHistoryServer();
|
||||||
|
Configuration config = new YarnConfiguration();
|
||||||
|
historyServer.init(config);
|
||||||
|
assertEquals(STATE.INITED, historyServer.getServiceState());
|
||||||
|
assertEquals(3, historyServer.getServices().size());
|
||||||
|
ApplicationHistoryClientService historyService = historyServer
|
||||||
|
.getClientService();
|
||||||
|
assertNotNull(historyServer.getClientService());
|
||||||
|
assertEquals(STATE.INITED, historyService.getServiceState());
|
||||||
|
|
||||||
|
historyServer.start();
|
||||||
|
assertEquals(STATE.STARTED, historyServer.getServiceState());
|
||||||
|
assertEquals(STATE.STARTED, historyService.getServiceState());
|
||||||
|
historyServer.stop();
|
||||||
|
assertEquals(STATE.STOPPED, historyServer.getServiceState());
|
||||||
|
}
|
||||||
|
|
||||||
|
// test launch method
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testLaunch() throws Exception {
|
||||||
|
|
||||||
|
ExitUtil.disableSystemExit();
|
||||||
|
try {
|
||||||
|
historyServer = ApplicationHistoryServer
|
||||||
|
.launchAppHistoryServer(new String[0]);
|
||||||
|
} catch (ExitUtil.ExitException e) {
|
||||||
|
assertEquals(0, e.status);
|
||||||
|
ExitUtil.resetFirstExitException();
|
||||||
|
fail();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void stop() {
|
||||||
|
if (historyServer != null) {
|
||||||
|
historyServer.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue