YARN-6130. [ATSv2 Security] Generate a delegation token for AM when app collector is created and pass it to AM via NM and RM. Contributed by Varun Saxena.
This commit is contained in:
parent
99781dabff
commit
6e698f4228
|
@ -878,13 +878,16 @@ public class RMContainerAllocator extends RMContainerRequestor
|
||||||
handleUpdatedNodes(response);
|
handleUpdatedNodes(response);
|
||||||
handleJobPriorityChange(response);
|
handleJobPriorityChange(response);
|
||||||
// handle receiving the timeline collector address for this app
|
// handle receiving the timeline collector address for this app
|
||||||
String collectorAddr = response.getCollectorAddr();
|
String collectorAddr = null;
|
||||||
|
if (response.getCollectorInfo() != null) {
|
||||||
|
collectorAddr = response.getCollectorInfo().getCollectorAddr();
|
||||||
|
}
|
||||||
|
|
||||||
MRAppMaster.RunningAppContext appContext =
|
MRAppMaster.RunningAppContext appContext =
|
||||||
(MRAppMaster.RunningAppContext)this.getContext();
|
(MRAppMaster.RunningAppContext)this.getContext();
|
||||||
if (collectorAddr != null && !collectorAddr.isEmpty()
|
if (collectorAddr != null && !collectorAddr.isEmpty()
|
||||||
&& appContext.getTimelineV2Client() != null) {
|
&& appContext.getTimelineV2Client() != null) {
|
||||||
appContext.getTimelineV2Client().setTimelineServiceAddress(
|
appContext.getTimelineV2Client().setTimelineServiceAddress(collectorAddr);
|
||||||
response.getCollectorAddr());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
for (ContainerStatus cont : finishedContainers) {
|
for (ContainerStatus cont : finishedContainers) {
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceStability.Evolving;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
import org.apache.hadoop.classification.InterfaceStability.Stable;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.records.CollectorInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
@ -92,21 +93,21 @@ public abstract class AllocateResponse {
|
||||||
.preemptionMessage(preempt).nmTokens(nmTokens).build();
|
.preemptionMessage(preempt).nmTokens(nmTokens).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Public
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public static AllocateResponse newInstance(int responseId,
|
public static AllocateResponse newInstance(int responseId,
|
||||||
List<ContainerStatus> completedContainers,
|
List<ContainerStatus> completedContainers,
|
||||||
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
|
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
|
||||||
Resource availResources, AMCommand command, int numClusterNodes,
|
Resource availResources, AMCommand command, int numClusterNodes,
|
||||||
PreemptionMessage preempt, List<NMToken> nmTokens,
|
PreemptionMessage preempt, List<NMToken> nmTokens,
|
||||||
List<UpdatedContainer> updatedContainers) {
|
CollectorInfo collectorInfo) {
|
||||||
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
|
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
|
||||||
.responseId(responseId)
|
.responseId(responseId)
|
||||||
.completedContainersStatuses(completedContainers)
|
.completedContainersStatuses(completedContainers)
|
||||||
.allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
|
.allocatedContainers(allocatedContainers).updatedNodes(updatedNodes)
|
||||||
.availableResources(availResources).amCommand(command)
|
.availableResources(availResources).amCommand(command)
|
||||||
.preemptionMessage(preempt).nmTokens(nmTokens)
|
.preemptionMessage(preempt).nmTokens(nmTokens)
|
||||||
.updatedContainers(updatedContainers).build();
|
.collectorInfo(collectorInfo).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
|
@ -133,7 +134,7 @@ public abstract class AllocateResponse {
|
||||||
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
|
List<Container> allocatedContainers, List<NodeReport> updatedNodes,
|
||||||
Resource availResources, AMCommand command, int numClusterNodes,
|
Resource availResources, AMCommand command, int numClusterNodes,
|
||||||
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
|
PreemptionMessage preempt, List<NMToken> nmTokens, Token amRMToken,
|
||||||
List<UpdatedContainer> updatedContainers, String collectorAddr) {
|
List<UpdatedContainer> updatedContainers, CollectorInfo collectorInfo) {
|
||||||
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
|
return AllocateResponse.newBuilder().numClusterNodes(numClusterNodes)
|
||||||
.responseId(responseId)
|
.responseId(responseId)
|
||||||
.completedContainersStatuses(completedContainers)
|
.completedContainersStatuses(completedContainers)
|
||||||
|
@ -141,7 +142,7 @@ public abstract class AllocateResponse {
|
||||||
.availableResources(availResources).amCommand(command)
|
.availableResources(availResources).amCommand(command)
|
||||||
.preemptionMessage(preempt).nmTokens(nmTokens)
|
.preemptionMessage(preempt).nmTokens(nmTokens)
|
||||||
.updatedContainers(updatedContainers).amRmToken(amRMToken)
|
.updatedContainers(updatedContainers).amRmToken(amRMToken)
|
||||||
.collectorAddr(collectorAddr).build();
|
.collectorInfo(collectorInfo).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -333,17 +334,18 @@ public abstract class AllocateResponse {
|
||||||
public abstract void setApplicationPriority(Priority priority);
|
public abstract void setApplicationPriority(Priority priority);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The address of collector that belong to this app
|
* The data associated with the collector that belongs to this app. Contains
|
||||||
|
* address and token alongwith identification information.
|
||||||
*
|
*
|
||||||
* @return The address of collector that belong to this attempt
|
* @return The data of collector that belong to this attempt
|
||||||
*/
|
*/
|
||||||
@Public
|
@Public
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract String getCollectorAddr();
|
public abstract CollectorInfo getCollectorInfo();
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public abstract void setCollectorAddr(String collectorAddr);
|
public abstract void setCollectorInfo(CollectorInfo info);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get the list of container update errors to inform the
|
* Get the list of container update errors to inform the
|
||||||
|
@ -559,15 +561,17 @@ public abstract class AllocateResponse {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the <code>collectorAddr</code> of the response.
|
* Set the <code>collectorInfo</code> of the response.
|
||||||
* @see AllocateResponse#setCollectorAddr(String)
|
* @see AllocateResponse#setCollectorInfo(CollectorInfo)
|
||||||
* @param collectorAddr <code>collectorAddr</code> of the response
|
* @param collectorInfo <code>collectorInfo</code> of the response which
|
||||||
|
* contains collector address, RM id, version and collector token.
|
||||||
* @return {@link AllocateResponseBuilder}
|
* @return {@link AllocateResponseBuilder}
|
||||||
*/
|
*/
|
||||||
@Private
|
@Private
|
||||||
@Unstable
|
@Unstable
|
||||||
public AllocateResponseBuilder collectorAddr(String collectorAddr) {
|
public AllocateResponseBuilder collectorInfo(
|
||||||
allocateResponse.setCollectorAddr(collectorAddr);
|
CollectorInfo collectorInfo) {
|
||||||
|
allocateResponse.setCollectorInfo(collectorInfo);
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,55 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.records;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Collector info containing collector address and collector token passed from
|
||||||
|
* RM to AM in Allocate Response.
|
||||||
|
*/
|
||||||
|
@Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
public abstract class CollectorInfo {
|
||||||
|
|
||||||
|
protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
|
||||||
|
|
||||||
|
public static CollectorInfo newInstance(String collectorAddr, Token token) {
|
||||||
|
CollectorInfo amCollectorInfo =
|
||||||
|
Records.newRecord(CollectorInfo.class);
|
||||||
|
amCollectorInfo.setCollectorAddr(collectorAddr);
|
||||||
|
amCollectorInfo.setCollectorToken(token);
|
||||||
|
return amCollectorInfo;
|
||||||
|
}
|
||||||
|
|
||||||
|
public abstract String getCollectorAddr();
|
||||||
|
|
||||||
|
public abstract void setCollectorAddr(String addr);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get delegation token for app collector which AM will use to publish
|
||||||
|
* entities.
|
||||||
|
* @return the delegation token for app collector.
|
||||||
|
*/
|
||||||
|
public abstract Token getCollectorToken();
|
||||||
|
|
||||||
|
public abstract void setCollectorToken(Token token);
|
||||||
|
}
|
|
@ -613,3 +613,8 @@ message StringBytesMapProto {
|
||||||
optional string key = 1;
|
optional string key = 1;
|
||||||
optional bytes value = 2;
|
optional bytes value = 2;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message CollectorInfoProto {
|
||||||
|
optional string collector_addr = 1;
|
||||||
|
optional hadoop.common.TokenProto collector_token = 2;
|
||||||
|
}
|
||||||
|
|
|
@ -112,7 +112,7 @@ message AllocateResponseProto {
|
||||||
repeated NMTokenProto nm_tokens = 9;
|
repeated NMTokenProto nm_tokens = 9;
|
||||||
optional hadoop.common.TokenProto am_rm_token = 12;
|
optional hadoop.common.TokenProto am_rm_token = 12;
|
||||||
optional PriorityProto application_priority = 13;
|
optional PriorityProto application_priority = 13;
|
||||||
optional string collector_addr = 14;
|
optional CollectorInfoProto collector_info = 14;
|
||||||
repeated UpdateContainerErrorProto update_errors = 15;
|
repeated UpdateContainerErrorProto update_errors = 15;
|
||||||
repeated UpdatedContainerProto updated_containers = 16;
|
repeated UpdatedContainerProto updated_containers = 16;
|
||||||
}
|
}
|
||||||
|
|
|
@ -325,7 +325,11 @@ extends AMRMClientAsync<T> {
|
||||||
}
|
}
|
||||||
|
|
||||||
AllocateResponse response = (AllocateResponse) object;
|
AllocateResponse response = (AllocateResponse) object;
|
||||||
String collectorAddress = response.getCollectorAddr();
|
String collectorAddress = null;
|
||||||
|
if (response.getCollectorInfo() != null) {
|
||||||
|
collectorAddress = response.getCollectorInfo().getCollectorAddr();
|
||||||
|
}
|
||||||
|
|
||||||
TimelineV2Client timelineClient =
|
TimelineV2Client timelineClient =
|
||||||
client.getRegisteredTimelineV2Client();
|
client.getRegisteredTimelineV2Client();
|
||||||
if (timelineClient != null && collectorAddress != null
|
if (timelineClient != null && collectorAddress != null
|
||||||
|
|
|
@ -0,0 +1,72 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
|
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
|
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||||
|
import org.junit.After;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test Base for Application Master Service Protocol.
|
||||||
|
*/
|
||||||
|
public abstract class ApplicationMasterServiceProtoTestBase
|
||||||
|
extends ProtocolHATestBase {
|
||||||
|
|
||||||
|
private ApplicationMasterProtocol amClient;
|
||||||
|
private ApplicationAttemptId attemptId;
|
||||||
|
|
||||||
|
protected void startupHAAndSetupClient() throws Exception {
|
||||||
|
attemptId = this.cluster.createFakeApplicationAttemptId();
|
||||||
|
|
||||||
|
Token<AMRMTokenIdentifier> appToken =
|
||||||
|
this.cluster.getResourceManager().getRMContext()
|
||||||
|
.getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
|
||||||
|
appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf));
|
||||||
|
UserGroupInformation.setLoginUser(UserGroupInformation
|
||||||
|
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
|
||||||
|
UserGroupInformation.getCurrentUser().addToken(appToken);
|
||||||
|
syncToken(appToken);
|
||||||
|
amClient = ClientRMProxy
|
||||||
|
.createRMProxy(this.conf, ApplicationMasterProtocol.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void shutDown() {
|
||||||
|
if(this.amClient != null) {
|
||||||
|
RPC.stopProxy(this.amClient);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
protected ApplicationMasterProtocol getAMClient() {
|
||||||
|
return amClient;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
|
||||||
|
for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
|
||||||
|
this.cluster.getResourceManager(i).getRMContext()
|
||||||
|
.getAMRMTokenSecretManager().addPersistedPassword(token);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.CollectorInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
@ -804,11 +805,20 @@ public abstract class ProtocolHATestBase extends ClientBaseWithFixes {
|
||||||
}
|
}
|
||||||
|
|
||||||
public AllocateResponse createFakeAllocateResponse() {
|
public AllocateResponse createFakeAllocateResponse() {
|
||||||
return AllocateResponse.newInstance(-1,
|
if (YarnConfiguration.timelineServiceV2Enabled(getConfig())) {
|
||||||
new ArrayList<ContainerStatus>(),
|
return AllocateResponse.newInstance(-1,
|
||||||
new ArrayList<Container>(), new ArrayList<NodeReport>(),
|
new ArrayList<ContainerStatus>(), new ArrayList<Container>(),
|
||||||
Resource.newInstance(1024, 2), null, 1,
|
new ArrayList<NodeReport>(), Resource.newInstance(1024, 2), null, 1,
|
||||||
null, new ArrayList<NMToken>());
|
null, new ArrayList<NMToken>(), CollectorInfo.newInstance(
|
||||||
|
"host:port", Token.newInstance(new byte[] {0}, "TIMELINE",
|
||||||
|
new byte[] {0}, "rm")));
|
||||||
|
} else {
|
||||||
|
return AllocateResponse.newInstance(-1,
|
||||||
|
new ArrayList<ContainerStatus>(),
|
||||||
|
new ArrayList<Container>(), new ArrayList<NodeReport>(),
|
||||||
|
Resource.newInstance(1024, 2), null, 1,
|
||||||
|
null, new ArrayList<NMToken>());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,71 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.client;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.HATestUtil;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineWriterImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineWriter;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests Application Master Protocol with timeline service v2 enabled.
|
||||||
|
*/
|
||||||
|
public class TestApplicationMasterServiceProtocolForTimelineV2
|
||||||
|
extends ApplicationMasterServiceProtoTestBase {
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void initialize() throws Exception {
|
||||||
|
HATestUtil.setRpcAddressForRM(RM1_NODE_ID, RM1_PORT_BASE + 200, conf);
|
||||||
|
HATestUtil.setRpcAddressForRM(RM2_NODE_ID, RM2_PORT_BASE + 200, conf);
|
||||||
|
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
|
||||||
|
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f);
|
||||||
|
conf.setClass(YarnConfiguration.TIMELINE_SERVICE_WRITER_CLASS,
|
||||||
|
FileSystemTimelineWriterImpl.class, TimelineWriter.class);
|
||||||
|
startHACluster(0, false, false, true);
|
||||||
|
super.startupHAAndSetupClient();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 15000)
|
||||||
|
public void testAllocateForTimelineV2OnHA()
|
||||||
|
throws YarnException, IOException {
|
||||||
|
AllocateRequest request = AllocateRequest.newInstance(0, 50f,
|
||||||
|
new ArrayList<ResourceRequest>(),
|
||||||
|
new ArrayList<ContainerId>(),
|
||||||
|
ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
|
||||||
|
new ArrayList<String>()));
|
||||||
|
AllocateResponse response = getAMClient().allocate(request);
|
||||||
|
Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
|
||||||
|
Assert.assertNotNull(response.getCollectorInfo());
|
||||||
|
Assert.assertEquals("host:port",
|
||||||
|
response.getCollectorInfo().getCollectorAddr());
|
||||||
|
Assert.assertNotNull(response.getCollectorInfo().getCollectorToken());
|
||||||
|
}
|
||||||
|
}
|
|
@ -23,10 +23,6 @@ import java.util.ArrayList;
|
||||||
|
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
|
|
||||||
import org.apache.hadoop.ipc.RPC;
|
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.security.token.Token;
|
|
||||||
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
|
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
|
||||||
|
@ -34,45 +30,20 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRespons
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
|
||||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
|
||||||
import org.junit.After;
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
|
||||||
public class TestApplicationMasterServiceProtocolOnHA
|
public class TestApplicationMasterServiceProtocolOnHA
|
||||||
extends ProtocolHATestBase {
|
extends ApplicationMasterServiceProtoTestBase {
|
||||||
private ApplicationMasterProtocol amClient;
|
|
||||||
private ApplicationAttemptId attemptId ;
|
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void initialize() throws Exception {
|
public void initialize() throws Exception {
|
||||||
startHACluster(0, false, false, true);
|
startHACluster(0, false, false, true);
|
||||||
attemptId = this.cluster.createFakeApplicationAttemptId();
|
super.startupHAAndSetupClient();
|
||||||
|
|
||||||
Token<AMRMTokenIdentifier> appToken =
|
|
||||||
this.cluster.getResourceManager().getRMContext()
|
|
||||||
.getAMRMTokenSecretManager().createAndGetAMRMToken(attemptId);
|
|
||||||
appToken.setService(ClientRMProxy.getAMRMTokenService(this.conf));
|
|
||||||
UserGroupInformation.setLoginUser(UserGroupInformation
|
|
||||||
.createRemoteUser(UserGroupInformation.getCurrentUser().getUserName()));
|
|
||||||
UserGroupInformation.getCurrentUser().addToken(appToken);
|
|
||||||
syncToken(appToken);
|
|
||||||
|
|
||||||
amClient = ClientRMProxy
|
|
||||||
.createRMProxy(this.conf, ApplicationMasterProtocol.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
@After
|
|
||||||
public void shutDown() {
|
|
||||||
if(this.amClient != null) {
|
|
||||||
RPC.stopProxy(this.amClient);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test(timeout = 15000)
|
@Test(timeout = 15000)
|
||||||
|
@ -81,7 +52,7 @@ public class TestApplicationMasterServiceProtocolOnHA
|
||||||
RegisterApplicationMasterRequest request =
|
RegisterApplicationMasterRequest request =
|
||||||
RegisterApplicationMasterRequest.newInstance("localhost", 0, "");
|
RegisterApplicationMasterRequest.newInstance("localhost", 0, "");
|
||||||
RegisterApplicationMasterResponse response =
|
RegisterApplicationMasterResponse response =
|
||||||
amClient.registerApplicationMaster(request);
|
getAMClient().registerApplicationMaster(request);
|
||||||
Assert.assertEquals(response,
|
Assert.assertEquals(response,
|
||||||
this.cluster.createFakeRegisterApplicationMasterResponse());
|
this.cluster.createFakeRegisterApplicationMasterResponse());
|
||||||
}
|
}
|
||||||
|
@ -93,7 +64,7 @@ public class TestApplicationMasterServiceProtocolOnHA
|
||||||
FinishApplicationMasterRequest.newInstance(
|
FinishApplicationMasterRequest.newInstance(
|
||||||
FinalApplicationStatus.SUCCEEDED, "", "");
|
FinalApplicationStatus.SUCCEEDED, "", "");
|
||||||
FinishApplicationMasterResponse response =
|
FinishApplicationMasterResponse response =
|
||||||
amClient.finishApplicationMaster(request);
|
getAMClient().finishApplicationMaster(request);
|
||||||
Assert.assertEquals(response,
|
Assert.assertEquals(response,
|
||||||
this.cluster.createFakeFinishApplicationMasterResponse());
|
this.cluster.createFakeFinishApplicationMasterResponse());
|
||||||
}
|
}
|
||||||
|
@ -105,14 +76,7 @@ public class TestApplicationMasterServiceProtocolOnHA
|
||||||
new ArrayList<ContainerId>(),
|
new ArrayList<ContainerId>(),
|
||||||
ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
|
ResourceBlacklistRequest.newInstance(new ArrayList<String>(),
|
||||||
new ArrayList<String>()));
|
new ArrayList<String>()));
|
||||||
AllocateResponse response = amClient.allocate(request);
|
AllocateResponse response = getAMClient().allocate(request);
|
||||||
Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
|
Assert.assertEquals(response, this.cluster.createFakeAllocateResponse());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void syncToken(Token<AMRMTokenIdentifier> token) throws IOException {
|
|
||||||
for (int i = 0; i < this.cluster.getNumOfResourceManager(); i++) {
|
|
||||||
this.cluster.getResourceManager(i).getRMContext()
|
|
||||||
.getAMRMTokenSecretManager().addPersistedPassword(token);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -426,7 +426,7 @@ public class TestAMRMClientAsync {
|
||||||
}
|
}
|
||||||
AllocateResponse response =
|
AllocateResponse response =
|
||||||
AllocateResponse.newInstance(0, completed, allocated,
|
AllocateResponse.newInstance(0, completed, allocated,
|
||||||
new ArrayList<NodeReport>(), null, null, 1, null, nmTokens,
|
new ArrayList<NodeReport>(), null, null, 1, null, nmTokens, null,
|
||||||
updatedContainers);
|
updatedContainers);
|
||||||
return response;
|
return response;
|
||||||
}
|
}
|
||||||
|
|
|
@ -27,6 +27,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||||
|
import org.apache.hadoop.yarn.api.records.CollectorInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.AMCommand;
|
import org.apache.hadoop.yarn.api.records.AMCommand;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.api.records.Token;
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
|
import org.apache.hadoop.yarn.api.records.UpdateContainerError;
|
||||||
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
import org.apache.hadoop.yarn.api.records.UpdatedContainer;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.CollectorInfoPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.NMTokenPBImpl;
|
||||||
|
@ -48,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.UpdatedContainerPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.UpdatedContainerPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.NodeReportProto;
|
||||||
|
@ -80,6 +83,7 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
||||||
private PreemptionMessage preempt;
|
private PreemptionMessage preempt;
|
||||||
private Token amrmToken = null;
|
private Token amrmToken = null;
|
||||||
private Priority appPriority = null;
|
private Priority appPriority = null;
|
||||||
|
private CollectorInfo collectorInfo = null;
|
||||||
|
|
||||||
public AllocateResponsePBImpl() {
|
public AllocateResponsePBImpl() {
|
||||||
builder = AllocateResponseProto.newBuilder();
|
builder = AllocateResponseProto.newBuilder();
|
||||||
|
@ -162,6 +166,9 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
||||||
if (this.amrmToken != null) {
|
if (this.amrmToken != null) {
|
||||||
builder.setAmRmToken(convertToProtoFormat(this.amrmToken));
|
builder.setAmRmToken(convertToProtoFormat(this.amrmToken));
|
||||||
}
|
}
|
||||||
|
if (this.collectorInfo != null) {
|
||||||
|
builder.setCollectorInfo(convertToProtoFormat(this.collectorInfo));
|
||||||
|
}
|
||||||
if (this.appPriority != null) {
|
if (this.appPriority != null) {
|
||||||
builder.setApplicationPriority(convertToProtoFormat(this.appPriority));
|
builder.setApplicationPriority(convertToProtoFormat(this.appPriority));
|
||||||
}
|
}
|
||||||
|
@ -398,19 +405,25 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized String getCollectorAddr() {
|
public synchronized CollectorInfo getCollectorInfo() {
|
||||||
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
|
AllocateResponseProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
return p.getCollectorAddr();
|
if (this.collectorInfo != null) {
|
||||||
|
return this.collectorInfo;
|
||||||
|
}
|
||||||
|
if (!p.hasCollectorInfo()) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
this.collectorInfo = convertFromProtoFormat(p.getCollectorInfo());
|
||||||
|
return this.collectorInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void setCollectorAddr(String collectorAddr) {
|
public synchronized void setCollectorInfo(CollectorInfo info) {
|
||||||
maybeInitBuilder();
|
maybeInitBuilder();
|
||||||
if (collectorAddr == null) {
|
if (info == null) {
|
||||||
builder.clearCollectorAddr();
|
builder.clearCollectorInfo();
|
||||||
return;
|
|
||||||
}
|
}
|
||||||
builder.setCollectorAddr(collectorAddr);
|
this.collectorInfo = info;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -718,6 +731,16 @@ public class AllocateResponsePBImpl extends AllocateResponse {
|
||||||
return ((NodeReportPBImpl)t).getProto();
|
return ((NodeReportPBImpl)t).getProto();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private synchronized CollectorInfoPBImpl convertFromProtoFormat(
|
||||||
|
CollectorInfoProto p) {
|
||||||
|
return new CollectorInfoPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized CollectorInfoProto convertToProtoFormat(
|
||||||
|
CollectorInfo t) {
|
||||||
|
return ((CollectorInfoPBImpl)t).getProto();
|
||||||
|
}
|
||||||
|
|
||||||
private synchronized ContainerPBImpl convertFromProtoFormat(
|
private synchronized ContainerPBImpl convertFromProtoFormat(
|
||||||
ContainerProto p) {
|
ContainerProto p) {
|
||||||
return new ContainerPBImpl(p);
|
return new ContainerPBImpl(p);
|
||||||
|
|
|
@ -0,0 +1,148 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.api.records.impl.pb;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||||
|
import org.apache.hadoop.yarn.api.records.CollectorInfo;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnProtos.CollectorInfoProtoOrBuilder;
|
||||||
|
|
||||||
|
import com.google.protobuf.TextFormat;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Protocol record implementation of {@link CollectorInfo}.
|
||||||
|
*/
|
||||||
|
public class CollectorInfoPBImpl extends CollectorInfo {
|
||||||
|
|
||||||
|
private CollectorInfoProto proto = CollectorInfoProto.getDefaultInstance();
|
||||||
|
|
||||||
|
private CollectorInfoProto.Builder builder = null;
|
||||||
|
private boolean viaProto = false;
|
||||||
|
|
||||||
|
private String collectorAddr = null;
|
||||||
|
private Token collectorToken = null;
|
||||||
|
|
||||||
|
|
||||||
|
public CollectorInfoPBImpl() {
|
||||||
|
builder = CollectorInfoProto.newBuilder();
|
||||||
|
}
|
||||||
|
|
||||||
|
public CollectorInfoPBImpl(CollectorInfoProto proto) {
|
||||||
|
this.proto = proto;
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
public CollectorInfoProto getProto() {
|
||||||
|
mergeLocalToProto();
|
||||||
|
proto = viaProto ? proto : builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
return proto;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int hashCode() {
|
||||||
|
return getProto().hashCode();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void maybeInitBuilder() {
|
||||||
|
if (viaProto || builder == null) {
|
||||||
|
builder = CollectorInfoProto.newBuilder(proto);
|
||||||
|
}
|
||||||
|
viaProto = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToProto() {
|
||||||
|
if (viaProto) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
}
|
||||||
|
mergeLocalToBuilder();
|
||||||
|
proto = builder.build();
|
||||||
|
viaProto = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean equals(Object other) {
|
||||||
|
if (other == null) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
if (other.getClass().isAssignableFrom(this.getClass())) {
|
||||||
|
return this.getProto().equals(this.getClass().cast(other).getProto());
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return TextFormat.shortDebugString(getProto());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String getCollectorAddr() {
|
||||||
|
CollectorInfoProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (this.collectorAddr == null && p.hasCollectorAddr()) {
|
||||||
|
this.collectorAddr = p.getCollectorAddr();
|
||||||
|
}
|
||||||
|
return this.collectorAddr;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setCollectorAddr(String addr) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (collectorAddr == null) {
|
||||||
|
builder.clearCollectorAddr();
|
||||||
|
}
|
||||||
|
this.collectorAddr = addr;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Token getCollectorToken() {
|
||||||
|
CollectorInfoProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (this.collectorToken == null && p.hasCollectorToken()) {
|
||||||
|
this.collectorToken = convertFromProtoFormat(p.getCollectorToken());
|
||||||
|
}
|
||||||
|
return this.collectorToken;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setCollectorToken(Token token) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (token == null) {
|
||||||
|
builder.clearCollectorToken();
|
||||||
|
}
|
||||||
|
this.collectorToken = token;
|
||||||
|
}
|
||||||
|
|
||||||
|
private TokenPBImpl convertFromProtoFormat(TokenProto p) {
|
||||||
|
return new TokenPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TokenProto convertToProtoFormat(Token t) {
|
||||||
|
return ((TokenPBImpl) t).getProto();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void mergeLocalToBuilder() {
|
||||||
|
if (this.collectorAddr != null) {
|
||||||
|
builder.setCollectorAddr(this.collectorAddr);
|
||||||
|
}
|
||||||
|
if (this.collectorToken != null) {
|
||||||
|
builder.setCollectorToken(convertToProtoFormat(this.collectorToken));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -101,6 +101,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersRequestP
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.StopContainersResponsePBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationRequestPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SubmitApplicationResponsePBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.CollectorInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
@ -406,6 +407,7 @@ public class TestPBImplRecords extends BasePBImplRecordsTest {
|
||||||
generateByNewInstance(CommitResponse.class);
|
generateByNewInstance(CommitResponse.class);
|
||||||
generateByNewInstance(ApplicationTimeout.class);
|
generateByNewInstance(ApplicationTimeout.class);
|
||||||
generateByNewInstance(QueueConfigurations.class);
|
generateByNewInstance(QueueConfigurations.class);
|
||||||
|
generateByNewInstance(CollectorInfo.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.util.Arrays;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
@ -37,11 +38,11 @@ public abstract class ReportNewCollectorInfoRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
public static ReportNewCollectorInfoRequest newInstance(
|
public static ReportNewCollectorInfoRequest newInstance(
|
||||||
ApplicationId id, String collectorAddr) {
|
ApplicationId id, String collectorAddr, Token token) {
|
||||||
ReportNewCollectorInfoRequest request =
|
ReportNewCollectorInfoRequest request =
|
||||||
Records.newRecord(ReportNewCollectorInfoRequest.class);
|
Records.newRecord(ReportNewCollectorInfoRequest.class);
|
||||||
request.setAppCollectorsList(
|
request.setAppCollectorsList(
|
||||||
Arrays.asList(AppCollectorData.newInstance(id, collectorAddr)));
|
Arrays.asList(AppCollectorData.newInstance(id, collectorAddr, token)));
|
||||||
return request;
|
return request;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,23 +26,26 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.NodeLabelPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatRequestProtoOrBuilder;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
import org.apache.hadoop.yarn.server.api.records.NodeStatus;
|
||||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.MasterKeyPBImpl;
|
||||||
|
@ -164,9 +167,13 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
|
||||||
builder.clearRegisteringCollectors();
|
builder.clearRegisteringCollectors();
|
||||||
for (Map.Entry<ApplicationId, AppCollectorData> entry :
|
for (Map.Entry<ApplicationId, AppCollectorData> entry :
|
||||||
registeringCollectors.entrySet()) {
|
registeringCollectors.entrySet()) {
|
||||||
|
AppCollectorData data = entry.getValue();
|
||||||
builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder()
|
builder.addRegisteringCollectors(AppCollectorDataProto.newBuilder()
|
||||||
.setAppId(convertToProtoFormat(entry.getKey()))
|
.setAppId(convertToProtoFormat(entry.getKey()))
|
||||||
.setAppCollectorAddr(entry.getValue().getCollectorAddr()));
|
.setAppCollectorAddr(data.getCollectorAddr())
|
||||||
|
.setAppCollectorToken(convertToProtoFormat(data.getCollectorToken()))
|
||||||
|
.setRmIdentifier(data.getRMIdentifier())
|
||||||
|
.setVersion(data.getVersion()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -267,8 +274,10 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
|
||||||
this.registeringCollectors = new HashMap<>();
|
this.registeringCollectors = new HashMap<>();
|
||||||
for (AppCollectorDataProto c : list) {
|
for (AppCollectorDataProto c : list) {
|
||||||
ApplicationId appId = convertFromProtoFormat(c.getAppId());
|
ApplicationId appId = convertFromProtoFormat(c.getAppId());
|
||||||
|
Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken());
|
||||||
AppCollectorData data = AppCollectorData.newInstance(appId,
|
AppCollectorData data = AppCollectorData.newInstance(appId,
|
||||||
c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
|
c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(),
|
||||||
|
collectorToken);
|
||||||
this.registeringCollectors.put(appId, data);
|
this.registeringCollectors.put(appId, data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -309,6 +318,14 @@ public class NodeHeartbeatRequestPBImpl extends NodeHeartbeatRequest {
|
||||||
return ((MasterKeyPBImpl)t).getProto();
|
return ((MasterKeyPBImpl)t).getProto();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TokenPBImpl convertFromProtoFormat(TokenProto p) {
|
||||||
|
return new TokenPBImpl(p);
|
||||||
|
}
|
||||||
|
|
||||||
|
private TokenProto convertToProtoFormat(Token t) {
|
||||||
|
return ((TokenPBImpl) t).getProto();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public Set<NodeLabel> getNodeLabels() {
|
public Set<NodeLabel> getNodeLabels() {
|
||||||
initNodeLabels();
|
initNodeLabels();
|
||||||
|
|
|
@ -26,31 +26,34 @@ import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
|
import org.apache.hadoop.security.proto.SecurityProtos.TokenProto;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
|
import org.apache.hadoop.yarn.api.protocolrecords.impl.pb.SignalContainerRequestPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ResourcePBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
|
||||||
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ContainerQueuingLimitProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SignalContainerRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
|
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProtoOrBuilder;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.SystemCredentialsForAppsProto;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
|
||||||
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||||
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
import org.apache.hadoop.yarn.server.api.records.MasterKey;
|
||||||
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
import org.apache.hadoop.yarn.server.api.records.NodeAction;
|
||||||
|
@ -154,6 +157,8 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
|
||||||
builder.addAppCollectors(AppCollectorDataProto.newBuilder()
|
builder.addAppCollectors(AppCollectorDataProto.newBuilder()
|
||||||
.setAppId(convertToProtoFormat(entry.getKey()))
|
.setAppId(convertToProtoFormat(entry.getKey()))
|
||||||
.setAppCollectorAddr(data.getCollectorAddr())
|
.setAppCollectorAddr(data.getCollectorAddr())
|
||||||
|
.setAppCollectorToken(
|
||||||
|
convertToProtoFormat(entry.getValue().getCollectorToken()))
|
||||||
.setRmIdentifier(data.getRMIdentifier())
|
.setRmIdentifier(data.getRMIdentifier())
|
||||||
.setVersion(data.getVersion()));
|
.setVersion(data.getVersion()));
|
||||||
}
|
}
|
||||||
|
@ -599,8 +604,10 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
|
||||||
this.appCollectorsMap = new HashMap<>();
|
this.appCollectorsMap = new HashMap<>();
|
||||||
for (AppCollectorDataProto c : list) {
|
for (AppCollectorDataProto c : list) {
|
||||||
ApplicationId appId = convertFromProtoFormat(c.getAppId());
|
ApplicationId appId = convertFromProtoFormat(c.getAppId());
|
||||||
|
Token collectorToken = convertFromProtoFormat(c.getAppCollectorToken());
|
||||||
AppCollectorData data = AppCollectorData.newInstance(appId,
|
AppCollectorData data = AppCollectorData.newInstance(appId,
|
||||||
c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion());
|
c.getAppCollectorAddr(), c.getRmIdentifier(), c.getVersion(),
|
||||||
|
collectorToken);
|
||||||
this.appCollectorsMap.put(appId, data);
|
this.appCollectorsMap.put(appId, data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -780,5 +787,13 @@ public class NodeHeartbeatResponsePBImpl extends NodeHeartbeatResponse {
|
||||||
SignalContainerRequest t) {
|
SignalContainerRequest t) {
|
||||||
return ((SignalContainerRequestPBImpl)t).getProto();
|
return ((SignalContainerRequestPBImpl)t).getProto();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private TokenProto convertToProtoFormat(Token t) {
|
||||||
|
return ((TokenPBImpl) t).getProto();
|
||||||
|
}
|
||||||
|
|
||||||
|
private TokenPBImpl convertFromProtoFormat(TokenProto p) {
|
||||||
|
return new TokenPBImpl(p);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,12 +20,12 @@ package org.apache.hadoop.yarn.server.api.protocolrecords.impl.pb;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
||||||
|
import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.ReportNewCollectorInfoRequestProtoOrBuilder;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.ReportNewCollectorInfoRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
|
||||||
import org.apache.hadoop.yarn.server.api.records.impl.pb.AppCollectorDataPBImpl;
|
|
||||||
|
|
||||||
public class ReportNewCollectorInfoRequestPBImpl extends
|
public class ReportNewCollectorInfoRequestPBImpl extends
|
||||||
ReportNewCollectorInfoRequest {
|
ReportNewCollectorInfoRequest {
|
||||||
|
|
|
@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.api.records;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
|
||||||
|
|
||||||
|
@ -31,20 +32,32 @@ public abstract class AppCollectorData {
|
||||||
protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
|
protected static final long DEFAULT_TIMESTAMP_VALUE = -1;
|
||||||
|
|
||||||
public static AppCollectorData newInstance(
|
public static AppCollectorData newInstance(
|
||||||
ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
|
ApplicationId id, String collectorAddr, long rmIdentifier, long version,
|
||||||
|
Token token) {
|
||||||
AppCollectorData appCollectorData =
|
AppCollectorData appCollectorData =
|
||||||
Records.newRecord(AppCollectorData.class);
|
Records.newRecord(AppCollectorData.class);
|
||||||
appCollectorData.setApplicationId(id);
|
appCollectorData.setApplicationId(id);
|
||||||
appCollectorData.setCollectorAddr(collectorAddr);
|
appCollectorData.setCollectorAddr(collectorAddr);
|
||||||
appCollectorData.setRMIdentifier(rmIdentifier);
|
appCollectorData.setRMIdentifier(rmIdentifier);
|
||||||
appCollectorData.setVersion(version);
|
appCollectorData.setVersion(version);
|
||||||
|
appCollectorData.setCollectorToken(token);
|
||||||
return appCollectorData;
|
return appCollectorData;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static AppCollectorData newInstance(
|
||||||
|
ApplicationId id, String collectorAddr, long rmIdentifier, long version) {
|
||||||
|
return newInstance(id, collectorAddr, rmIdentifier, version, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public static AppCollectorData newInstance(ApplicationId id,
|
||||||
|
String collectorAddr, Token token) {
|
||||||
|
return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE,
|
||||||
|
DEFAULT_TIMESTAMP_VALUE, token);
|
||||||
|
}
|
||||||
|
|
||||||
public static AppCollectorData newInstance(ApplicationId id,
|
public static AppCollectorData newInstance(ApplicationId id,
|
||||||
String collectorAddr) {
|
String collectorAddr) {
|
||||||
return newInstance(id, collectorAddr, DEFAULT_TIMESTAMP_VALUE,
|
return newInstance(id, collectorAddr, null);
|
||||||
DEFAULT_TIMESTAMP_VALUE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -101,4 +114,12 @@ public abstract class AppCollectorData {
|
||||||
|
|
||||||
public abstract void setVersion(long version);
|
public abstract void setVersion(long version);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get delegation token for app collector which AM will use to publish
|
||||||
|
* entities.
|
||||||
|
* @return the delegation token for app collector.
|
||||||
|
*/
|
||||||
|
public abstract Token getCollectorToken();
|
||||||
|
|
||||||
|
public abstract void setCollectorToken(Token token);
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,10 +19,11 @@ package org.apache.hadoop.yarn.server.api.records.impl.pb;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
|
||||||
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||||
|
import org.apache.hadoop.yarn.api.records.impl.pb.TokenPBImpl;
|
||||||
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder;
|
import org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.AppCollectorDataProtoOrBuilder;
|
||||||
|
@ -43,6 +44,7 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
|
||||||
private String collectorAddr = null;
|
private String collectorAddr = null;
|
||||||
private Long rmIdentifier = null;
|
private Long rmIdentifier = null;
|
||||||
private Long version = null;
|
private Long version = null;
|
||||||
|
private Token collectorToken = null;
|
||||||
|
|
||||||
public AppCollectorDataPBImpl() {
|
public AppCollectorDataPBImpl() {
|
||||||
builder = AppCollectorDataProto.newBuilder();
|
builder = AppCollectorDataProto.newBuilder();
|
||||||
|
@ -158,6 +160,24 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
|
||||||
builder.setVersion(version);
|
builder.setVersion(version);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Token getCollectorToken() {
|
||||||
|
AppCollectorDataProtoOrBuilder p = viaProto ? proto : builder;
|
||||||
|
if (this.collectorToken == null && p.hasAppCollectorToken()) {
|
||||||
|
this.collectorToken = new TokenPBImpl(p.getAppCollectorToken());
|
||||||
|
}
|
||||||
|
return this.collectorToken;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setCollectorToken(Token token) {
|
||||||
|
maybeInitBuilder();
|
||||||
|
if (token == null) {
|
||||||
|
builder.clearAppCollectorToken();
|
||||||
|
}
|
||||||
|
this.collectorToken = token;
|
||||||
|
}
|
||||||
|
|
||||||
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
private ApplicationIdPBImpl convertFromProtoFormat(ApplicationIdProto p) {
|
||||||
return new ApplicationIdPBImpl(p);
|
return new ApplicationIdPBImpl(p);
|
||||||
}
|
}
|
||||||
|
@ -195,6 +215,9 @@ public class AppCollectorDataPBImpl extends AppCollectorData {
|
||||||
if (this.version != null) {
|
if (this.version != null) {
|
||||||
builder.setVersion(this.version);
|
builder.setVersion(this.version);
|
||||||
}
|
}
|
||||||
|
if (this.collectorToken != null) {
|
||||||
|
builder.setAppCollectorToken(
|
||||||
|
((TokenPBImpl)this.collectorToken).getProto());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,6 +22,7 @@ option java_generic_services = true;
|
||||||
option java_generate_equals_and_hash = true;
|
option java_generate_equals_and_hash = true;
|
||||||
package hadoop.yarn;
|
package hadoop.yarn;
|
||||||
|
|
||||||
|
import "Security.proto";
|
||||||
import "yarn_protos.proto";
|
import "yarn_protos.proto";
|
||||||
import "yarn_server_common_protos.proto";
|
import "yarn_server_common_protos.proto";
|
||||||
import "yarn_service_protos.proto";
|
import "yarn_service_protos.proto";
|
||||||
|
@ -139,6 +140,7 @@ message AppCollectorDataProto {
|
||||||
optional string app_collector_addr = 2;
|
optional string app_collector_addr = 2;
|
||||||
optional int64 rm_identifier = 3 [default = -1];
|
optional int64 rm_identifier = 3 [default = -1];
|
||||||
optional int64 version = 4 [default = -1];
|
optional int64 version = 4 [default = -1];
|
||||||
|
optional hadoop.common.TokenProto app_collector_token = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.ipc.RPC;
|
||||||
import org.apache.hadoop.ipc.Server;
|
import org.apache.hadoop.ipc.Server;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
|
||||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocol;
|
||||||
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
|
import org.apache.hadoop.yarn.api.ContainerManagementProtocolPB;
|
||||||
|
@ -72,6 +73,7 @@ import org.apache.hadoop.yarn.ipc.HadoopYarnProtoRPC;
|
||||||
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
import org.apache.hadoop.yarn.ipc.RPCUtil;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
|
||||||
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
|
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
|
||||||
|
@ -93,6 +95,21 @@ public class TestRPC {
|
||||||
"collectors' number in ReportNewCollectorInfoRequest is not ONE.";
|
"collectors' number in ReportNewCollectorInfoRequest is not ONE.";
|
||||||
|
|
||||||
public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0";
|
public static final String DEFAULT_COLLECTOR_ADDR = "localhost:0";
|
||||||
|
private static final Token DEFAULT_COLLECTOR_TOKEN;
|
||||||
|
static {
|
||||||
|
TimelineDelegationTokenIdentifier identifier =
|
||||||
|
new TimelineDelegationTokenIdentifier();
|
||||||
|
identifier.setOwner(new Text("user"));
|
||||||
|
identifier.setRenewer(new Text("user"));
|
||||||
|
identifier.setRealUser(new Text("user"));
|
||||||
|
long now = Time.now();
|
||||||
|
identifier.setIssueDate(now);
|
||||||
|
identifier.setMaxDate(now + 1000L);
|
||||||
|
identifier.setMasterKeyId(500);
|
||||||
|
identifier.setSequenceNumber(5);
|
||||||
|
DEFAULT_COLLECTOR_TOKEN = Token.newInstance(identifier.getBytes(),
|
||||||
|
identifier.getKind().toString(), identifier.getBytes(), "localhost:0");
|
||||||
|
}
|
||||||
|
|
||||||
public static final ApplicationId DEFAULT_APP_ID =
|
public static final ApplicationId DEFAULT_APP_ID =
|
||||||
ApplicationId.newInstance(0, 0);
|
ApplicationId.newInstance(0, 0);
|
||||||
|
@ -173,7 +190,16 @@ public class TestRPC {
|
||||||
try {
|
try {
|
||||||
ReportNewCollectorInfoRequest request =
|
ReportNewCollectorInfoRequest request =
|
||||||
ReportNewCollectorInfoRequest.newInstance(
|
ReportNewCollectorInfoRequest.newInstance(
|
||||||
DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR);
|
DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, null);
|
||||||
|
proxy.reportNewCollectorInfo(request);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
Assert.fail("RPC call failured is not expected here.");
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
ReportNewCollectorInfoRequest request =
|
||||||
|
ReportNewCollectorInfoRequest.newInstance(
|
||||||
|
DEFAULT_APP_ID, DEFAULT_COLLECTOR_ADDR, DEFAULT_COLLECTOR_TOKEN);
|
||||||
proxy.reportNewCollectorInfo(request);
|
proxy.reportNewCollectorInfo(request);
|
||||||
} catch (YarnException e) {
|
} catch (YarnException e) {
|
||||||
Assert.fail("RPC call failured is not expected here.");
|
Assert.fail("RPC call failured is not expected here.");
|
||||||
|
@ -437,6 +463,8 @@ public class TestRPC {
|
||||||
DEFAULT_APP_ID);
|
DEFAULT_APP_ID);
|
||||||
Assert.assertEquals(appCollector.getCollectorAddr(),
|
Assert.assertEquals(appCollector.getCollectorAddr(),
|
||||||
DEFAULT_COLLECTOR_ADDR);
|
DEFAULT_COLLECTOR_ADDR);
|
||||||
|
Assert.assertTrue(appCollector.getCollectorToken() == null ||
|
||||||
|
appCollector.getCollectorToken().equals(DEFAULT_COLLECTOR_TOKEN));
|
||||||
} else {
|
} else {
|
||||||
throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
|
throw new YarnException(ILLEGAL_NUMBER_MESSAGE);
|
||||||
}
|
}
|
||||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
import org.apache.hadoop.yarn.api.records.NodeLabel;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.api.records.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationAttemptIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
|
||||||
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
|
||||||
|
@ -351,7 +352,8 @@ public class TestYarnServerApiClasses {
|
||||||
private Map<ApplicationId, AppCollectorData> getCollectors() {
|
private Map<ApplicationId, AppCollectorData> getCollectors() {
|
||||||
ApplicationId appID = ApplicationId.newInstance(1L, 1);
|
ApplicationId appID = ApplicationId.newInstance(1L, 1);
|
||||||
String collectorAddr = "localhost:0";
|
String collectorAddr = "localhost:0";
|
||||||
AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr);
|
AppCollectorData data = AppCollectorData.newInstance(appID, collectorAddr,
|
||||||
|
Token.newInstance(new byte[0], "kind", new byte[0], "s"));
|
||||||
Map<ApplicationId, AppCollectorData> collectorMap =
|
Map<ApplicationId, AppCollectorData> collectorMap =
|
||||||
new HashMap<>();
|
new HashMap<>();
|
||||||
collectorMap.put(appID, data);
|
collectorMap.put(appID, data);
|
||||||
|
|
|
@ -71,7 +71,6 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.UnRegisterNodeManagerRequest;
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
||||||
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
import org.apache.hadoop.yarn.server.api.records.ContainerQueuingLimit;
|
||||||
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
import org.apache.hadoop.yarn.server.api.records.OpportunisticContainersStatus;
|
||||||
|
|
|
@ -46,7 +46,6 @@ import org.apache.hadoop.yarn.proto.YarnProtos;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.ContainerManagerApplicationProto;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
|
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
|
||||||
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
||||||
import org.apache.hadoop.yarn.proto.YarnServerNodemanagerRecoveryProtos.FlowContextProto;
|
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
import org.apache.hadoop.yarn.server.nodemanager.Context;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEvent;
|
||||||
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
|
import org.apache.hadoop.yarn.server.nodemanager.containermanager.AuxServicesEventType;
|
||||||
|
|
|
@ -45,8 +45,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterReque
|
||||||
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.api.records.CollectorInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
import org.apache.hadoop.yarn.api.records.ContainerUpdateType;
|
||||||
|
@ -54,7 +55,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||||
import org.apache.hadoop.yarn.server.api.records.AppCollectorData;
|
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
|
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt
|
||||||
|
@ -294,9 +294,9 @@ final class DefaultAMSProcessor implements ApplicationMasterServiceProcessor {
|
||||||
// add collector address for this application
|
// add collector address for this application
|
||||||
if (YarnConfiguration.timelineServiceV2Enabled(
|
if (YarnConfiguration.timelineServiceV2Enabled(
|
||||||
getRmContext().getYarnConfiguration())) {
|
getRmContext().getYarnConfiguration())) {
|
||||||
AppCollectorData data = app.getCollectorData();
|
CollectorInfo collectorInfo = app.getCollectorInfo();
|
||||||
if (data != null) {
|
if (collectorInfo != null) {
|
||||||
response.setCollectorAddr(data.getCollectorAddr());
|
response.setCollectorInfo(collectorInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.CollectorInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -187,13 +188,23 @@ public interface RMApp extends EventHandler<RMAppEvent> {
|
||||||
* only if the timeline service v.2 is enabled.
|
* only if the timeline service v.2 is enabled.
|
||||||
*
|
*
|
||||||
* @return the data for the application's collector, including collector
|
* @return the data for the application's collector, including collector
|
||||||
* address, collector ID. Return null if the timeline service v.2 is not
|
* address, RM ID, version and collector token. Return null if the timeline
|
||||||
* enabled.
|
* service v.2 is not enabled.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
AppCollectorData getCollectorData();
|
AppCollectorData getCollectorData();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The timeline collector information to be sent to AM. It should be used
|
||||||
|
* only if the timeline service v.2 is enabled.
|
||||||
|
*
|
||||||
|
* @return collector info, including collector address and collector token.
|
||||||
|
* Return null if the timeline service v.2 is not enabled.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@InterfaceStability.Unstable
|
||||||
|
CollectorInfo getCollectorInfo();
|
||||||
/**
|
/**
|
||||||
* The original tracking url for the application master.
|
* The original tracking url for the application master.
|
||||||
* @return the original tracking url for the application master.
|
* @return the original tracking url for the application master.
|
||||||
|
|
|
@ -55,6 +55,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeout;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.CollectorInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -167,6 +168,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
private int firstAttemptIdInStateStore = 1;
|
private int firstAttemptIdInStateStore = 1;
|
||||||
private int nextAttemptId = 1;
|
private int nextAttemptId = 1;
|
||||||
private AppCollectorData collectorData;
|
private AppCollectorData collectorData;
|
||||||
|
private CollectorInfo collectorInfo;
|
||||||
// This field isn't protected by readlock now.
|
// This field isn't protected by readlock now.
|
||||||
private volatile RMAppAttempt currentAttempt;
|
private volatile RMAppAttempt currentAttempt;
|
||||||
private String queue;
|
private String queue;
|
||||||
|
@ -530,7 +532,7 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
*/
|
*/
|
||||||
public void startTimelineCollector() {
|
public void startTimelineCollector() {
|
||||||
AppLevelTimelineCollector collector =
|
AppLevelTimelineCollector collector =
|
||||||
new AppLevelTimelineCollector(applicationId);
|
new AppLevelTimelineCollector(applicationId, user);
|
||||||
rmContext.getRMTimelineCollectorManager().putIfAbsent(
|
rmContext.getRMTimelineCollectorManager().putIfAbsent(
|
||||||
applicationId, collector);
|
applicationId, collector);
|
||||||
}
|
}
|
||||||
|
@ -618,6 +620,12 @@ public class RMAppImpl implements RMApp, Recoverable {
|
||||||
|
|
||||||
public void setCollectorData(AppCollectorData incomingData) {
|
public void setCollectorData(AppCollectorData incomingData) {
|
||||||
this.collectorData = incomingData;
|
this.collectorData = incomingData;
|
||||||
|
this.collectorInfo = CollectorInfo.newInstance(
|
||||||
|
incomingData.getCollectorAddr(), incomingData.getCollectorToken());
|
||||||
|
}
|
||||||
|
|
||||||
|
public CollectorInfo getCollectorInfo() {
|
||||||
|
return this.collectorInfo;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void removeCollectorData() {
|
public void removeCollectorData() {
|
||||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.CollectorInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Container;
|
import org.apache.hadoop.yarn.api.records.Container;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
|
@ -239,6 +240,11 @@ public abstract class MockAsm extends MockApps {
|
||||||
public boolean isAppInCompletedStates() {
|
public boolean isAppInCompletedStates() {
|
||||||
throw new UnsupportedOperationException("Not supported yet.");
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CollectorInfo getCollectorInfo() {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static RMApp newApplication(int i) {
|
public static RMApp newApplication(int i) {
|
||||||
|
|
|
@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
import org.apache.hadoop.yarn.api.records.ApplicationReport;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
import org.apache.hadoop.yarn.api.records.ApplicationTimeoutType;
|
||||||
|
import org.apache.hadoop.yarn.api.records.CollectorInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||||
|
@ -325,4 +326,9 @@ public class MockRMApp implements RMApp {
|
||||||
public boolean isAppInCompletedStates() {
|
public boolean isAppInCompletedStates() {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public CollectorInfo getCollectorInfo() {
|
||||||
|
throw new UnsupportedOperationException("Not supported yet.");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -80,7 +80,7 @@ public class TestTimelineServiceClientIntegration {
|
||||||
auxService =
|
auxService =
|
||||||
PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
|
PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
|
||||||
collectorManager, conf);
|
collectorManager, conf);
|
||||||
auxService.addApplication(ApplicationId.newInstance(0, 1));
|
auxService.addApplication(ApplicationId.newInstance(0, 1), "user");
|
||||||
} catch (ExitUtil.ExitException e) {
|
} catch (ExitUtil.ExitException e) {
|
||||||
fail();
|
fail();
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,7 +23,10 @@ import static org.junit.Assert.assertNotNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.spy;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.BufferedReader;
|
import java.io.BufferedReader;
|
||||||
|
@ -40,6 +43,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
import org.apache.hadoop.fs.FileUtil;
|
import org.apache.hadoop.fs.FileUtil;
|
||||||
import org.apache.hadoop.http.HttpConfig;
|
import org.apache.hadoop.http.HttpConfig;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.minikdc.MiniKdc;
|
import org.apache.hadoop.minikdc.MiniKdc;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
@ -51,10 +55,12 @@ import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
import org.apache.hadoop.yarn.client.api.TimelineV2Client;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
|
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineAuthenticationFilterInitializer;
|
||||||
|
import org.apache.hadoop.yarn.server.timelineservice.collector.AppLevelTimelineCollector;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager;
|
import org.apache.hadoop.yarn.server.timelineservice.collector.NodeTimelineCollectorManager;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
|
import org.apache.hadoop.yarn.server.timelineservice.collector.PerNodeTimelineCollectorsAuxService;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.FileSystemTimelineReaderImpl;
|
||||||
|
@ -76,7 +82,6 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
|
|
||||||
private static final String FOO_USER = "foo";
|
private static final String FOO_USER = "foo";
|
||||||
private static final String HTTP_USER = "HTTP";
|
private static final String HTTP_USER = "HTTP";
|
||||||
|
|
||||||
private static final File TEST_ROOT_DIR = new File(
|
private static final File TEST_ROOT_DIR = new File(
|
||||||
System.getProperty("test.build.dir", "target" + File.separator +
|
System.getProperty("test.build.dir", "target" + File.separator +
|
||||||
"test-dir"), UUID.randomUUID().toString());
|
"test-dir"), UUID.randomUUID().toString());
|
||||||
|
@ -88,21 +93,35 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
private static String httpSpnegoPrincipal = KerberosTestUtils.
|
private static String httpSpnegoPrincipal = KerberosTestUtils.
|
||||||
getServerPrincipal();
|
getServerPrincipal();
|
||||||
|
|
||||||
|
// First param indicates whether HTTPS access or HTTP access and second param
|
||||||
|
// indicates whether it is kerberos access or token based access.
|
||||||
@Parameterized.Parameters
|
@Parameterized.Parameters
|
||||||
public static Collection<Object[]> withSsl() {
|
public static Collection<Object[]> params() {
|
||||||
return Arrays.asList(new Object[][] {{false}, {true}});
|
return Arrays.asList(new Object[][] {{false, true}, {false, false},
|
||||||
|
{true, false}, {true, true}});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static MiniKdc testMiniKDC;
|
private static MiniKdc testMiniKDC;
|
||||||
private static String keystoresDir;
|
private static String keystoresDir;
|
||||||
private static String sslConfDir;
|
private static String sslConfDir;
|
||||||
private static Configuration conf;
|
private static Configuration conf;
|
||||||
|
private static UserGroupInformation nonKerberosUser;
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
nonKerberosUser = UserGroupInformation.getCurrentUser();
|
||||||
|
} catch (IOException e) {}
|
||||||
|
}
|
||||||
|
// Indicates whether HTTPS or HTTP access.
|
||||||
private boolean withSsl;
|
private boolean withSsl;
|
||||||
|
// Indicates whether Kerberos based login is used or token based access is
|
||||||
|
// done.
|
||||||
|
private boolean withKerberosLogin;
|
||||||
private NodeTimelineCollectorManager collectorManager;
|
private NodeTimelineCollectorManager collectorManager;
|
||||||
private PerNodeTimelineCollectorsAuxService auxService;
|
private PerNodeTimelineCollectorsAuxService auxService;
|
||||||
|
public TestTimelineAuthFilterForV2(boolean withSsl,
|
||||||
public TestTimelineAuthFilterForV2(boolean withSsl) {
|
boolean withKerberosLogin) {
|
||||||
this.withSsl = withSsl;
|
this.withSsl = withSsl;
|
||||||
|
this.withKerberosLogin = withKerberosLogin;
|
||||||
}
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
|
@ -143,8 +162,6 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
conf.set("hadoop.proxyuser.HTTP.hosts", "*");
|
conf.set("hadoop.proxyuser.HTTP.hosts", "*");
|
||||||
conf.set("hadoop.proxyuser.HTTP.users", FOO_USER);
|
conf.set("hadoop.proxyuser.HTTP.users", FOO_USER);
|
||||||
UserGroupInformation.setConfiguration(conf);
|
UserGroupInformation.setConfiguration(conf);
|
||||||
SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
|
|
||||||
YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost");
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
fail("Couldn't setup TimelineServer V2.");
|
fail("Couldn't setup TimelineServer V2.");
|
||||||
}
|
}
|
||||||
|
@ -166,9 +183,27 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
|
conf.set(YarnConfiguration.YARN_HTTP_POLICY_KEY,
|
||||||
HttpConfig.Policy.HTTP_ONLY.name());
|
HttpConfig.Policy.HTTP_ONLY.name());
|
||||||
}
|
}
|
||||||
|
UserGroupInformation.setConfiguration(conf);
|
||||||
collectorManager = new DummyNodeTimelineCollectorManager();
|
collectorManager = new DummyNodeTimelineCollectorManager();
|
||||||
auxService = PerNodeTimelineCollectorsAuxService.launchServer(new String[0],
|
auxService = PerNodeTimelineCollectorsAuxService.launchServer(
|
||||||
collectorManager, conf);
|
new String[0], collectorManager, conf);
|
||||||
|
if (withKerberosLogin) {
|
||||||
|
SecurityUtil.login(conf, YarnConfiguration.TIMELINE_SERVICE_KEYTAB,
|
||||||
|
YarnConfiguration.TIMELINE_SERVICE_PRINCIPAL, "localhost");
|
||||||
|
}
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
|
auxService.addApplication(
|
||||||
|
appId, UserGroupInformation.getCurrentUser().getUserName());
|
||||||
|
if (!withKerberosLogin) {
|
||||||
|
AppLevelTimelineCollector collector =
|
||||||
|
(AppLevelTimelineCollector)collectorManager.get(appId);
|
||||||
|
org.apache.hadoop.security.token.Token
|
||||||
|
<TimelineDelegationTokenIdentifier> token =
|
||||||
|
collector.getDelegationTokenForApp();
|
||||||
|
token.setService(new Text("localhost" + token.getService().toString().
|
||||||
|
substring(token.getService().toString().indexOf(":"))));
|
||||||
|
UserGroupInformation.getCurrentUser().addToken(token);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private TimelineV2Client createTimelineClientForUGI(ApplicationId appId) {
|
private TimelineV2Client createTimelineClientForUGI(ApplicationId appId) {
|
||||||
|
@ -199,9 +234,14 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
}
|
}
|
||||||
if (withSsl) {
|
if (withSsl) {
|
||||||
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
|
KeyStoreTestUtil.cleanupSSLConfig(keystoresDir, sslConfDir);
|
||||||
File base = new File(BASEDIR);
|
FileUtil.fullyDelete(new File(BASEDIR));
|
||||||
FileUtil.fullyDelete(base);
|
|
||||||
}
|
}
|
||||||
|
if (withKerberosLogin) {
|
||||||
|
UserGroupInformation.getCurrentUser().logoutUserFromKeytab();
|
||||||
|
}
|
||||||
|
// Reset the user for next run.
|
||||||
|
UserGroupInformation.setLoginUser(
|
||||||
|
UserGroupInformation.createRemoteUser(nonKerberosUser.getUserName()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static TimelineEntity createEntity(String id, String type) {
|
private static TimelineEntity createEntity(String id, String type) {
|
||||||
|
@ -241,35 +281,44 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void publishAndVerifyEntity(ApplicationId appId, File entityTypeDir,
|
||||||
|
String entityType) throws Exception {
|
||||||
|
TimelineV2Client client = createTimelineClientForUGI(appId);
|
||||||
|
try {
|
||||||
|
// Sync call. Results available immediately.
|
||||||
|
client.putEntities(createEntity("entity1", entityType));
|
||||||
|
assertEquals(1, entityTypeDir.listFiles().length);
|
||||||
|
verifyEntity(entityTypeDir, "entity1", entityType);
|
||||||
|
// Async call.
|
||||||
|
client.putEntitiesAsync(createEntity("entity2", entityType));
|
||||||
|
} finally {
|
||||||
|
client.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPutTimelineEntities() throws Exception {
|
public void testPutTimelineEntities() throws Exception {
|
||||||
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
|
||||||
auxService.addApplication(appId);
|
|
||||||
final String entityType = "dummy_type";
|
final String entityType = "dummy_type";
|
||||||
|
ApplicationId appId = ApplicationId.newInstance(0, 1);
|
||||||
File entityTypeDir = new File(TEST_ROOT_DIR.getAbsolutePath() +
|
File entityTypeDir = new File(TEST_ROOT_DIR.getAbsolutePath() +
|
||||||
File.separator + "entities" + File.separator +
|
File.separator + "entities" + File.separator +
|
||||||
YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator + "test_user" +
|
YarnConfiguration.DEFAULT_RM_CLUSTER_ID + File.separator +
|
||||||
|
UserGroupInformation.getCurrentUser().getUserName() +
|
||||||
File.separator + "test_flow_name" + File.separator +
|
File.separator + "test_flow_name" + File.separator +
|
||||||
"test_flow_version" + File.separator + "1" + File.separator +
|
"test_flow_version" + File.separator + "1" + File.separator +
|
||||||
appId.toString() + File.separator + entityType);
|
appId.toString() + File.separator + entityType);
|
||||||
try {
|
try {
|
||||||
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
|
if (withKerberosLogin) {
|
||||||
@Override
|
KerberosTestUtils.doAs(HTTP_USER + "/localhost", new Callable<Void>() {
|
||||||
public Void call() throws Exception {
|
@Override
|
||||||
TimelineV2Client client = createTimelineClientForUGI(appId);
|
public Void call() throws Exception {
|
||||||
try {
|
publishAndVerifyEntity(appId, entityTypeDir, entityType);
|
||||||
// Sync call. Results available immediately.
|
|
||||||
client.putEntities(createEntity("entity1", entityType));
|
|
||||||
assertEquals(1, entityTypeDir.listFiles().length);
|
|
||||||
verifyEntity(entityTypeDir, "entity1", entityType);
|
|
||||||
// Async call.
|
|
||||||
client.putEntitiesAsync(createEntity("entity2", entityType));
|
|
||||||
return null;
|
return null;
|
||||||
} finally {
|
|
||||||
client.stop();
|
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
});
|
} else {
|
||||||
|
publishAndVerifyEntity(appId, entityTypeDir, entityType);
|
||||||
|
}
|
||||||
// Wait for async entity to be published.
|
// Wait for async entity to be published.
|
||||||
for (int i = 0; i < 50; i++) {
|
for (int i = 0; i < 50; i++) {
|
||||||
if (entityTypeDir.listFiles().length == 2) {
|
if (entityTypeDir.listFiles().length == 2) {
|
||||||
|
@ -279,6 +328,11 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
}
|
}
|
||||||
assertEquals(2, entityTypeDir.listFiles().length);
|
assertEquals(2, entityTypeDir.listFiles().length);
|
||||||
verifyEntity(entityTypeDir, "entity2", entityType);
|
verifyEntity(entityTypeDir, "entity2", entityType);
|
||||||
|
AppLevelTimelineCollector collector =
|
||||||
|
(AppLevelTimelineCollector)collectorManager.get(appId);
|
||||||
|
auxService.removeApplication(appId);
|
||||||
|
verify(collectorManager.getTokenManagerService()).cancelToken(
|
||||||
|
eq(collector.getDelegationTokenForApp()), any(String.class));
|
||||||
} finally {
|
} finally {
|
||||||
FileUtils.deleteQuietly(entityTypeDir);
|
FileUtils.deleteQuietly(entityTypeDir);
|
||||||
}
|
}
|
||||||
|
@ -290,13 +344,20 @@ public class TestTimelineAuthFilterForV2 {
|
||||||
super();
|
super();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected TimelineV2DelegationTokenSecretManagerService
|
||||||
|
createTokenManagerService() {
|
||||||
|
return spy(new TimelineV2DelegationTokenSecretManagerService());
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected CollectorNodemanagerProtocol getNMCollectorService() {
|
protected CollectorNodemanagerProtocol getNMCollectorService() {
|
||||||
CollectorNodemanagerProtocol protocol =
|
CollectorNodemanagerProtocol protocol =
|
||||||
mock(CollectorNodemanagerProtocol.class);
|
mock(CollectorNodemanagerProtocol.class);
|
||||||
try {
|
try {
|
||||||
GetTimelineCollectorContextResponse response =
|
GetTimelineCollectorContextResponse response =
|
||||||
GetTimelineCollectorContextResponse.newInstance("test_user",
|
GetTimelineCollectorContextResponse.newInstance(
|
||||||
|
UserGroupInformation.getCurrentUser().getUserName(),
|
||||||
"test_flow_name", "test_flow_version", 1L);
|
"test_flow_name", "test_flow_version", 1L);
|
||||||
when(protocol.getTimelineCollectorContext(any(
|
when(protocol.getTimelineCollectorContext(any(
|
||||||
GetTimelineCollectorContextRequest.class))).thenReturn(response);
|
GetTimelineCollectorContextRequest.class))).thenReturn(response);
|
||||||
|
|
|
@ -22,9 +22,12 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -42,13 +45,20 @@ public class AppLevelTimelineCollector extends TimelineCollector {
|
||||||
LoggerFactory.getLogger(TimelineCollector.class);
|
LoggerFactory.getLogger(TimelineCollector.class);
|
||||||
|
|
||||||
private final ApplicationId appId;
|
private final ApplicationId appId;
|
||||||
|
private final String appUser;
|
||||||
private final TimelineCollectorContext context;
|
private final TimelineCollectorContext context;
|
||||||
private UserGroupInformation currentUser;
|
private UserGroupInformation currentUser;
|
||||||
|
private Token<TimelineDelegationTokenIdentifier> delegationTokenForApp;
|
||||||
|
|
||||||
public AppLevelTimelineCollector(ApplicationId appId) {
|
public AppLevelTimelineCollector(ApplicationId appId) {
|
||||||
|
this(appId, null);
|
||||||
|
}
|
||||||
|
|
||||||
|
public AppLevelTimelineCollector(ApplicationId appId, String user) {
|
||||||
super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
|
super(AppLevelTimelineCollector.class.getName() + " - " + appId.toString());
|
||||||
Preconditions.checkNotNull(appId, "AppId shouldn't be null");
|
Preconditions.checkNotNull(appId, "AppId shouldn't be null");
|
||||||
this.appId = appId;
|
this.appId = appId;
|
||||||
|
this.appUser = user;
|
||||||
context = new TimelineCollectorContext();
|
context = new TimelineCollectorContext();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -56,6 +66,20 @@ public class AppLevelTimelineCollector extends TimelineCollector {
|
||||||
return currentUser;
|
return currentUser;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public String getAppUser() {
|
||||||
|
return appUser;
|
||||||
|
}
|
||||||
|
|
||||||
|
void setDelegationTokenForApp(
|
||||||
|
Token<TimelineDelegationTokenIdentifier> token) {
|
||||||
|
this.delegationTokenForApp = token;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public Token<TimelineDelegationTokenIdentifier> getDelegationTokenForApp() {
|
||||||
|
return this.delegationTokenForApp;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID,
|
context.setClusterId(conf.get(YarnConfiguration.RM_CLUSTER_ID,
|
||||||
|
|
|
@ -56,8 +56,8 @@ public class AppLevelTimelineCollectorWithAgg
|
||||||
private ScheduledThreadPoolExecutor appAggregationExecutor;
|
private ScheduledThreadPoolExecutor appAggregationExecutor;
|
||||||
private AppLevelAggregator appAggregator;
|
private AppLevelAggregator appAggregator;
|
||||||
|
|
||||||
public AppLevelTimelineCollectorWithAgg(ApplicationId appId) {
|
public AppLevelTimelineCollectorWithAgg(ApplicationId appId, String user) {
|
||||||
super(appId);
|
super(appId, user);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static Set<String> initializeSkipSet() {
|
private static Set<String> initializeSkipSet() {
|
||||||
|
|
|
@ -28,14 +28,17 @@ import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.http.HttpServer2;
|
import org.apache.hadoop.http.HttpServer2;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.security.SecurityUtil;
|
import org.apache.hadoop.security.SecurityUtil;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
import org.apache.hadoop.yarn.ipc.YarnRPC;
|
||||||
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
|
import org.apache.hadoop.yarn.server.api.CollectorNodemanagerProtocol;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextRequest;
|
||||||
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
|
import org.apache.hadoop.yarn.server.api.protocolrecords.GetTimelineCollectorContextResponse;
|
||||||
|
@ -71,6 +74,8 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
||||||
|
|
||||||
private final boolean runningAsAuxService;
|
private final boolean runningAsAuxService;
|
||||||
|
|
||||||
|
private UserGroupInformation loginUGI;
|
||||||
|
|
||||||
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
|
static final String COLLECTOR_MANAGER_ATTR_KEY = "collector.manager";
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
|
@ -85,25 +90,40 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceInit(Configuration conf) throws Exception {
|
protected void serviceInit(Configuration conf) throws Exception {
|
||||||
tokenMgrService = new TimelineV2DelegationTokenSecretManagerService();
|
tokenMgrService = createTokenManagerService();
|
||||||
addService(tokenMgrService);
|
addService(tokenMgrService);
|
||||||
|
this.loginUGI = UserGroupInformation.getCurrentUser();
|
||||||
super.serviceInit(conf);
|
super.serviceInit(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void serviceStart() throws Exception {
|
protected void serviceStart() throws Exception {
|
||||||
if (UserGroupInformation.isSecurityEnabled() && !runningAsAuxService) {
|
if (UserGroupInformation.isSecurityEnabled()) {
|
||||||
// Do security login for cases where collector is running outside NM.
|
// Do security login for cases where collector is running outside NM.
|
||||||
try {
|
if (!runningAsAuxService) {
|
||||||
doSecureLogin();
|
try {
|
||||||
} catch(IOException ie) {
|
doSecureLogin();
|
||||||
throw new YarnRuntimeException("Failed to login", ie);
|
} catch(IOException ie) {
|
||||||
|
throw new YarnRuntimeException("Failed to login", ie);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
this.loginUGI = UserGroupInformation.getLoginUser();
|
||||||
}
|
}
|
||||||
super.serviceStart();
|
super.serviceStart();
|
||||||
startWebApp();
|
startWebApp();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected TimelineV2DelegationTokenSecretManagerService
|
||||||
|
createTokenManagerService() {
|
||||||
|
return new TimelineV2DelegationTokenSecretManagerService();
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public TimelineV2DelegationTokenSecretManagerService
|
||||||
|
getTokenManagerService() {
|
||||||
|
return tokenMgrService;
|
||||||
|
}
|
||||||
|
|
||||||
private void doSecureLogin() throws IOException {
|
private void doSecureLogin() throws IOException {
|
||||||
Configuration conf = getConfig();
|
Configuration conf = getConfig();
|
||||||
InetSocketAddress addr = NetUtils.createSocketAddr(conf.getTrimmed(
|
InetSocketAddress addr = NetUtils.createSocketAddr(conf.getTrimmed(
|
||||||
|
@ -122,13 +142,45 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
||||||
super.serviceStop();
|
super.serviceStop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public Token<TimelineDelegationTokenIdentifier> generateTokenForAppCollector(
|
||||||
|
String user) {
|
||||||
|
Token<TimelineDelegationTokenIdentifier> token = tokenMgrService.
|
||||||
|
generateToken(UserGroupInformation.createRemoteUser(user),
|
||||||
|
loginUGI.getShortUserName());
|
||||||
|
token.setService(new Text(timelineRestServerBindAddress));
|
||||||
|
return token;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public void cancelTokenForAppCollector(
|
||||||
|
AppLevelTimelineCollector appCollector) throws IOException {
|
||||||
|
if (appCollector.getDelegationTokenForApp() != null) {
|
||||||
|
tokenMgrService.cancelToken(appCollector.getDelegationTokenForApp(),
|
||||||
|
appCollector.getAppUser());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
|
protected void doPostPut(ApplicationId appId, TimelineCollector collector) {
|
||||||
try {
|
try {
|
||||||
// Get context info from NM
|
// Get context info from NM
|
||||||
updateTimelineCollectorContext(appId, collector);
|
updateTimelineCollectorContext(appId, collector);
|
||||||
|
// Generate token for app collector.
|
||||||
|
org.apache.hadoop.yarn.api.records.Token token = null;
|
||||||
|
if (UserGroupInformation.isSecurityEnabled() &&
|
||||||
|
collector instanceof AppLevelTimelineCollector) {
|
||||||
|
AppLevelTimelineCollector appCollector =
|
||||||
|
(AppLevelTimelineCollector)collector;
|
||||||
|
Token<TimelineDelegationTokenIdentifier> timelineToken =
|
||||||
|
generateTokenForAppCollector(appCollector.getAppUser());
|
||||||
|
appCollector.setDelegationTokenForApp(timelineToken);
|
||||||
|
token = org.apache.hadoop.yarn.api.records.Token.newInstance(
|
||||||
|
timelineToken.getIdentifier(), timelineToken.getKind().toString(),
|
||||||
|
timelineToken.getPassword(), timelineToken.getService().toString());
|
||||||
|
}
|
||||||
// Report to NM if a new collector is added.
|
// Report to NM if a new collector is added.
|
||||||
reportNewCollectorToNM(appId);
|
reportNewCollectorToNM(appId, token);
|
||||||
} catch (YarnException | IOException e) {
|
} catch (YarnException | IOException e) {
|
||||||
// throw exception here as it cannot be used if failed communicate with NM
|
// throw exception here as it cannot be used if failed communicate with NM
|
||||||
LOG.error("Failed to communicate with NM Collector Service for " + appId);
|
LOG.error("Failed to communicate with NM Collector Service for " + appId);
|
||||||
|
@ -136,6 +188,18 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void postRemove(ApplicationId appId, TimelineCollector collector) {
|
||||||
|
if (collector instanceof AppLevelTimelineCollector) {
|
||||||
|
try {
|
||||||
|
cancelTokenForAppCollector((AppLevelTimelineCollector)collector);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to cancel token for app collector with appId " +
|
||||||
|
appId, e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Launch the REST web server for this collector manager.
|
* Launch the REST web server for this collector manager.
|
||||||
*/
|
*/
|
||||||
|
@ -180,11 +244,12 @@ public class NodeTimelineCollectorManager extends TimelineCollectorManager {
|
||||||
timelineRestServerBindAddress);
|
timelineRestServerBindAddress);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void reportNewCollectorToNM(ApplicationId appId)
|
private void reportNewCollectorToNM(ApplicationId appId,
|
||||||
|
org.apache.hadoop.yarn.api.records.Token token)
|
||||||
throws YarnException, IOException {
|
throws YarnException, IOException {
|
||||||
ReportNewCollectorInfoRequest request =
|
ReportNewCollectorInfoRequest request =
|
||||||
ReportNewCollectorInfoRequest.newInstance(appId,
|
ReportNewCollectorInfoRequest.newInstance(appId,
|
||||||
this.timelineRestServerBindAddress);
|
this.timelineRestServerBindAddress, token);
|
||||||
LOG.info("Report a new collector for application: " + appId +
|
LOG.info("Report a new collector for application: " + appId +
|
||||||
" to the NM Collector Service.");
|
" to the NM Collector Service.");
|
||||||
getNMCollectorService().reportNewCollectorInfo(request);
|
getNMCollectorService().reportNewCollectorInfo(request);
|
||||||
|
|
|
@ -114,11 +114,12 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
||||||
* exists, no new service is created.
|
* exists, no new service is created.
|
||||||
*
|
*
|
||||||
* @param appId Application Id to be added.
|
* @param appId Application Id to be added.
|
||||||
|
* @param user Application Master container user.
|
||||||
* @return whether it was added successfully
|
* @return whether it was added successfully
|
||||||
*/
|
*/
|
||||||
public boolean addApplication(ApplicationId appId) {
|
public boolean addApplication(ApplicationId appId, String user) {
|
||||||
AppLevelTimelineCollector collector =
|
AppLevelTimelineCollector collector =
|
||||||
new AppLevelTimelineCollectorWithAgg(appId);
|
new AppLevelTimelineCollectorWithAgg(appId, user);
|
||||||
return (collectorManager.putIfAbsent(appId, collector)
|
return (collectorManager.putIfAbsent(appId, collector)
|
||||||
== collector);
|
== collector);
|
||||||
}
|
}
|
||||||
|
@ -147,7 +148,7 @@ public class PerNodeTimelineCollectorsAuxService extends AuxiliaryService {
|
||||||
if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
|
if (context.getContainerType() == ContainerType.APPLICATION_MASTER) {
|
||||||
ApplicationId appId = context.getContainerId().
|
ApplicationId appId = context.getContainerId().
|
||||||
getApplicationAttemptId().getApplicationId();
|
getApplicationAttemptId().getApplicationId();
|
||||||
addApplication(appId);
|
addApplication(appId, context.getUser());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -18,8 +18,13 @@
|
||||||
|
|
||||||
package org.apache.hadoop.yarn.server.timelineservice.security;
|
package org.apache.hadoop.yarn.server.timelineservice.security;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
import org.apache.hadoop.io.Text;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager;
|
||||||
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
|
||||||
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelgationTokenSecretManagerService;
|
import org.apache.hadoop.yarn.server.timeline.security.TimelineDelgationTokenSecretManagerService;
|
||||||
|
@ -43,6 +48,17 @@ public class TimelineV2DelegationTokenSecretManagerService extends
|
||||||
tokenMaxLifetime, tokenRenewInterval, tokenRemovalScanInterval);
|
tokenMaxLifetime, tokenRenewInterval, tokenRemovalScanInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Token<TimelineDelegationTokenIdentifier> generateToken(
|
||||||
|
UserGroupInformation ugi, String renewer) {
|
||||||
|
return ((TimelineV2DelegationTokenSecretManager)
|
||||||
|
getTimelineDelegationTokenSecretManager()).generateToken(ugi, renewer);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void cancelToken(Token<TimelineDelegationTokenIdentifier> token,
|
||||||
|
String canceller) throws IOException {
|
||||||
|
getTimelineDelegationTokenSecretManager().cancelToken(token, canceller);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Delegation token secret manager for ATSv2.
|
* Delegation token secret manager for ATSv2.
|
||||||
*/
|
*/
|
||||||
|
@ -70,6 +86,21 @@ public class TimelineV2DelegationTokenSecretManagerService extends
|
||||||
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
|
delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Token<TimelineDelegationTokenIdentifier> generateToken(
|
||||||
|
UserGroupInformation ugi, String renewer) {
|
||||||
|
Text realUser = null;
|
||||||
|
if (ugi.getRealUser() != null) {
|
||||||
|
realUser = new Text(ugi.getRealUser().getUserName());
|
||||||
|
}
|
||||||
|
TimelineDelegationTokenIdentifier identifier = createIdentifier();
|
||||||
|
identifier.setOwner(new Text(ugi.getUserName()));
|
||||||
|
identifier.setRenewer(new Text(renewer));
|
||||||
|
identifier.setRealUser(realUser);
|
||||||
|
byte[] password = createPassword(identifier);
|
||||||
|
return new Token<TimelineDelegationTokenIdentifier>(identifier.getBytes(),
|
||||||
|
password, identifier.getKind(), null);
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public TimelineDelegationTokenIdentifier createIdentifier() {
|
public TimelineDelegationTokenIdentifier createIdentifier() {
|
||||||
return new TimelineDelegationTokenIdentifier();
|
return new TimelineDelegationTokenIdentifier();
|
||||||
|
|
|
@ -95,7 +95,7 @@ public class TestNMTimelineCollectorManager {
|
||||||
Callable<Boolean> task = new Callable<Boolean>() {
|
Callable<Boolean> task = new Callable<Boolean>() {
|
||||||
public Boolean call() {
|
public Boolean call() {
|
||||||
AppLevelTimelineCollector collector =
|
AppLevelTimelineCollector collector =
|
||||||
new AppLevelTimelineCollectorWithAgg(appId);
|
new AppLevelTimelineCollectorWithAgg(appId, "user");
|
||||||
return (collectorManager.putIfAbsent(appId, collector) == collector);
|
return (collectorManager.putIfAbsent(appId, collector) == collector);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -126,7 +126,7 @@ public class TestNMTimelineCollectorManager {
|
||||||
Callable<Boolean> task = new Callable<Boolean>() {
|
Callable<Boolean> task = new Callable<Boolean>() {
|
||||||
public Boolean call() {
|
public Boolean call() {
|
||||||
AppLevelTimelineCollector collector =
|
AppLevelTimelineCollector collector =
|
||||||
new AppLevelTimelineCollectorWithAgg(appId);
|
new AppLevelTimelineCollectorWithAgg(appId, "user");
|
||||||
boolean successPut =
|
boolean successPut =
|
||||||
(collectorManager.putIfAbsent(appId, collector) == collector);
|
(collectorManager.putIfAbsent(appId, collector) == collector);
|
||||||
return successPut && collectorManager.remove(appId);
|
return successPut && collectorManager.remove(appId);
|
||||||
|
|
Loading…
Reference in New Issue