YARN-7390. All reservation related test cases failed when TestYarnClient runs against Fair Scheduler. (Yufei Gu via Haibo Chen)

This commit is contained in:
Haibo Chen 2017-11-16 10:48:24 -08:00
parent 61ace174cd
commit 28d0fcbef4
2 changed files with 551 additions and 425 deletions

View File

@ -33,7 +33,6 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@ -42,7 +41,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import com.google.common.base.Supplier;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.DataInputByteBuffer;
@ -75,14 +73,6 @@ import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsRequest;
import org.apache.hadoop.yarn.api.protocolrecords.GetNodesToLabelsResponse;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest;
import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.protocolrecords.SignalContainerRequest;
import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -96,13 +86,7 @@ import org.apache.hadoop.yarn.api.records.ContainerReport;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.SignalContainerCommand;
import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
@ -119,23 +103,28 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.security.client.TimelineDelegationTokenIdentifier;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.ParameterizedSchedulerTestBase;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.Records;
import org.apache.hadoop.yarn.util.UTCClock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
import org.slf4j.event.Level;
public class TestYarnClient {
/**
* This class is to test class {@link YarnClient) and {@link YarnClientImpl}.
*/
public class TestYarnClient extends ParameterizedSchedulerTestBase {
public TestYarnClient(SchedulerType type) throws IOException {
super(type);
}
protected void configureFairScheduler(YarnConfiguration conf) {}
@Before
public void setup() {
@ -145,7 +134,7 @@ public class TestYarnClient {
@Test
public void testClientStop() {
Configuration conf = new Configuration();
Configuration conf = getConf();
ResourceManager rm = new ResourceManager();
rm.init(conf);
rm.start();
@ -159,7 +148,7 @@ public class TestYarnClient {
@Test
public void testStartWithTimelineV15() throws Exception {
Configuration conf = new Configuration();
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 1.5f);
YarnClientImpl client = (YarnClientImpl) YarnClient.createYarnClient();
@ -254,7 +243,7 @@ public class TestYarnClient {
@SuppressWarnings("deprecation")
@Test (timeout = 30000)
public void testSubmitApplication() throws Exception {
Configuration conf = new Configuration();
Configuration conf = getConf();
conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
100); // speed up tests
final YarnClient client = new MockYarnClient();
@ -301,7 +290,7 @@ public class TestYarnClient {
@SuppressWarnings("deprecation")
@Test (timeout = 20000)
public void testSubmitApplicationInterrupted() throws IOException {
Configuration conf = new Configuration();
Configuration conf = getConf();
int pollIntervalMs = 1000;
conf.setLong(YarnConfiguration.YARN_CLIENT_APP_SUBMISSION_POLL_INTERVAL_MS,
pollIntervalMs);
@ -406,10 +395,9 @@ public class TestYarnClient {
rm.start();
RMApp app = rm.submitApp(2000);
Configuration conf = new Configuration();
@SuppressWarnings("resource")
final YarnClient client = new MockYarnClient();
client.init(conf);
client.init(getConf());
client.start();
client.killApplication(app.getApplicationId());
@ -447,9 +435,8 @@ public class TestYarnClient {
@Test (timeout = 10000)
public void testGetApplications() throws YarnException, IOException {
Configuration conf = new Configuration();
final YarnClient client = new MockYarnClient();
client.init(conf);
client.init(getConf());
client.start();
List<ApplicationReport> expectedReports = ((MockYarnClient)client).getReports();
@ -500,9 +487,8 @@ public class TestYarnClient {
@Test(timeout = 10000)
public void testGetApplicationAttempts() throws YarnException, IOException {
Configuration conf = new Configuration();
final YarnClient client = new MockYarnClient();
client.init(conf);
client.init(getConf());
client.start();
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
@ -539,7 +525,7 @@ public class TestYarnClient {
@Test(timeout = 10000)
public void testGetContainers() throws YarnException, IOException {
Configuration conf = new Configuration();
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
true);
@ -572,7 +558,7 @@ public class TestYarnClient {
@Test(timeout = 10000)
public void testGetContainerReport() throws YarnException, IOException {
Configuration conf = new Configuration();
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.APPLICATION_HISTORY_ENABLED,
true);
final YarnClient client = new MockYarnClient();
@ -603,9 +589,8 @@ public class TestYarnClient {
@Test (timeout = 10000)
public void testGetLabelsToNodes() throws YarnException, IOException {
Configuration conf = new Configuration();
final YarnClient client = new MockYarnClient();
client.init(conf);
client.init(getConf());
client.start();
// Get labels to nodes mapping
@ -629,9 +614,8 @@ public class TestYarnClient {
@Test (timeout = 10000)
public void testGetNodesToLabels() throws YarnException, IOException {
Configuration conf = new Configuration();
final YarnClient client = new MockYarnClient();
client.init(conf);
client.init(getConf());
client.start();
// Get labels to nodes mapping
@ -1025,7 +1009,7 @@ public class TestYarnClient {
MiniYARNCluster cluster = new MiniYARNCluster("testMRAMTokens", 1, 1, 1);
YarnClient rmClient = null;
try {
cluster.init(new YarnConfiguration());
cluster.init(getConf());
cluster.start();
final Configuration yarnConf = cluster.getConfig();
rmClient = YarnClient.createYarnClient();
@ -1146,7 +1130,7 @@ public class TestYarnClient {
boolean expectedTimeoutEnforcement) {
YarnClientImpl client = new YarnClientImpl();
try {
Configuration conf = new Configuration();
Configuration conf = getConf();
if (valueForTimeout != null) {
conf.setLong(
YarnConfiguration.YARN_CLIENT_APPLICATION_CLIENT_PROTOCOL_POLL_TIMEOUT_MS,
@ -1165,7 +1149,7 @@ public class TestYarnClient {
@Test
public void testBestEffortTimelineDelegationToken()
throws Exception {
Configuration conf = new YarnConfiguration();
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
@ -1199,7 +1183,7 @@ public class TestYarnClient {
@Test
public void testAutomaticTimelineDelegationTokenLoading()
throws Exception {
Configuration conf = new YarnConfiguration();
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
SecurityUtil.setAuthenticationMethod(AuthenticationMethod.KERBEROS, conf);
TimelineDelegationTokenIdentifier timelineDT =
@ -1289,7 +1273,7 @@ public class TestYarnClient {
public void testParseTimelineDelegationTokenRenewer() throws Exception {
// Client side
YarnClientImpl client = (YarnClientImpl) YarnClient.createYarnClient();
Configuration conf = new YarnConfiguration();
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true);
conf.set(YarnConfiguration.RM_PRINCIPAL, "rm/_HOST@EXAMPLE.COM");
conf.set(
@ -1303,387 +1287,9 @@ public class TestYarnClient {
}
}
private MiniYARNCluster setupMiniYARNCluster() throws Exception {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
MiniYARNCluster cluster =
new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
cluster.init(conf);
cluster.start();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return cluster.getResourceManager().getRMContext()
.getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ)
.getTotalCapacity().getMemorySize() > 6000;
}
}, 10, 10000);
return cluster;
}
private YarnClient setupYarnClient(MiniYARNCluster cluster) {
final Configuration yarnConf = cluster.getConfig();
YarnClient client = YarnClient.createYarnClient();
client.init(yarnConf);
client.start();
return client;
}
private ReservationSubmissionRequest submitReservationTestHelper(
YarnClient client, long arrival, long deadline, long duration)
throws IOException, YarnException {
ReservationId reservationID = client.createReservation().getReservationId();
ReservationSubmissionRequest sRequest = createSimpleReservationRequest(
reservationID, 4, arrival, deadline, duration);
ReservationSubmissionResponse sResponse =
client.submitReservation(sRequest);
Assert.assertNotNull(sResponse);
Assert.assertNotNull(reservationID);
System.out.println("Submit reservation response: " + reservationID);
return sRequest;
}
@Test
public void testCreateReservation() throws Exception {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
// Submit the reservation again with the same request and make sure it
// passes.
client.submitReservation(sRequest);
// Submit the reservation with the same reservation id but different
// reservation definition, and ensure YarnException is thrown.
arrival = clock.getTime();
ReservationDefinition rDef = sRequest.getReservationDefinition();
rDef.setArrival(arrival + duration);
sRequest.setReservationDefinition(rDef);
try {
client.submitReservation(sRequest);
Assert.fail("Reservation submission should fail if a duplicate "
+ "reservation id is used, but the reservation definition has been "
+ "updated.");
} catch (Exception e) {
Assert.assertTrue(e instanceof YarnException);
}
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testUpdateReservation() throws Exception {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
ReservationDefinition rDef = sRequest.getReservationDefinition();
ReservationRequest rr =
rDef.getReservationRequests().getReservationResources().get(0);
ReservationId reservationID = sRequest.getReservationId();
rr.setNumContainers(5);
arrival = clock.getTime();
duration = 30000;
deadline = (long) (arrival + 1.05 * duration);
rr.setDuration(duration);
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
ReservationUpdateRequest uRequest =
ReservationUpdateRequest.newInstance(rDef, reservationID);
ReservationUpdateResponse uResponse = client.updateReservation(uRequest);
Assert.assertNotNull(uResponse);
System.out.println("Update reservation response: " + uResponse);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testListReservationsByReservationId() throws Exception{
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
ReservationId reservationID = sRequest.getReservationId();
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1,
-1, false);
ReservationListResponse response = client.listReservations(request);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size(), 0);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testListReservationsByTimeInterval() throws Exception {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
// List reservations, search by a point in time within the reservation
// range.
arrival = clock.getTime();
ReservationId reservationID = sRequest.getReservationId();
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", arrival + duration / 2,
arrival + duration / 2, true);
ReservationListResponse response = client.listReservations(request);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by time within reservation interval.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE, true);
response = client.listReservations(request);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// Verify that the full resource allocations exist.
Assert.assertTrue(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size() > 0);
// Verify that the full RDL is returned.
ReservationRequests reservationRequests =
response.getReservationAllocationState().get(0)
.getReservationDefinition().getReservationRequests();
Assert.assertEquals("R_ALL",
reservationRequests.getInterpreter().toString());
Assert.assertTrue(reservationRequests.getReservationResources().get(0)
.getDuration() == duration);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testListReservationsByInvalidTimeInterval() throws Exception {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
// List reservations, search by invalid end time == -1.
ReservationListRequest request = ReservationListRequest
.newInstance(ReservationSystemTestUtil.reservationQ, "", 1, -1, true);
ReservationListResponse response = client.listReservations(request);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), sRequest.getReservationId().getId());
// List reservations, search by invalid end time < -1.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, -10, true);
response = client.listReservations(request);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), sRequest.getReservationId().getId());
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testListReservationsByTimeIntervalContainingNoReservations()
throws Exception {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
// List reservations, search by very large start time.
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE, -1,
false);
ReservationListResponse response = client.listReservations(request);
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
duration = 30000;
deadline = sRequest.getReservationDefinition().getDeadline();
// List reservations, search by start time after the reservation
// end time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", deadline + duration,
deadline + 2 * duration, false);
response = client.listReservations(request);
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
arrival = clock.getTime();
// List reservations, search by end time before the reservation start
// time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 0, arrival - duration,
false);
response = client.listReservations(request);
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
// List reservations, search by very small end time.
request = ReservationListRequest
.newInstance(ReservationSystemTestUtil.reservationQ, "", 0, 1, false);
response = client.listReservations(request);
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testReservationDelete() throws Exception {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
ReservationId reservationID = sRequest.getReservationId();
// Delete the reservation
ReservationDeleteRequest dRequest =
ReservationDeleteRequest.newInstance(reservationID);
ReservationDeleteResponse dResponse = client.deleteReservation(dRequest);
Assert.assertNotNull(dResponse);
System.out.println("Delete reservation response: " + dResponse);
// List reservations, search by non-existent reservationID
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1,
-1, false);
ReservationListResponse response = client.listReservations(request);
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getReservationAllocationState().size());
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
private ReservationSubmissionRequest createSimpleReservationRequest(
ReservationId reservationId, int numContainers, long arrival,
long deadline, long duration) {
// create a request with a single atomic ask
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
numContainers, 1, duration);
ReservationRequests reqs =
ReservationRequests.newInstance(Collections.singletonList(r),
ReservationRequestInterpreter.R_ALL);
ReservationDefinition rDef =
ReservationDefinition.newInstance(arrival, deadline, reqs,
"testYarnClient#reservation");
ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(rDef,
ReservationSystemTestUtil.reservationQ, reservationId);
return request;
}
@Test(timeout = 30000, expected = ApplicationNotFoundException.class)
public void testShouldNotRetryForeverForNonNetworkExceptions() throws Exception {
YarnConfiguration conf = new YarnConfiguration();
YarnConfiguration conf = getConf();
conf.setInt(YarnConfiguration.RESOURCEMANAGER_CONNECT_MAX_WAIT_MS, -1);
ResourceManager rm = null;
@ -1715,10 +1321,9 @@ public class TestYarnClient {
@Test
public void testSignalContainer() throws Exception {
Configuration conf = new Configuration();
@SuppressWarnings("resource")
final YarnClient client = new MockYarnClient();
client.init(conf);
client.init(getConf());
client.start();
ApplicationId applicationId = ApplicationId.newInstance(1234, 5);
ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
@ -1741,7 +1346,7 @@ public class TestYarnClient {
boolean timelineClientBestEffort,
Throwable mockErr,
CreateTimelineClientErrorVerifier errVerifier) throws Exception {
Configuration conf = new Configuration();
Configuration conf = getConf();
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED,
timelineServiceEnabled);
conf.setBoolean(YarnConfiguration.TIMELINE_SERVICE_CLIENT_BEST_EFFORT,

View File

@ -0,0 +1,521 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.client.api.impl;
import com.google.common.base.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationDeleteResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationListResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationSubmissionResponse;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.ReservationUpdateResponse;
import org.apache.hadoop.yarn.api.records.ReservationDefinition;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.ReservationRequest;
import org.apache.hadoop.yarn.api.records.ReservationRequestInterpreter;
import org.apache.hadoop.yarn.api.records.ReservationRequests;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.YarnClient;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.MiniYARNCluster;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystemTestUtil;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.UTCClock;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.stream.Collectors;
/**
* This class is to test class {@link YarnClient) and {@link YarnClientImpl}
* with Reservation.
*/
@RunWith(Parameterized.class)
public class TestYarnClientWithReservation {
protected final static String TEST_DIR =
new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
protected final static String FS_ALLOC_FILE =
new File(TEST_DIR, "test-fs-queues.xml").getAbsolutePath();
public enum SchedulerType {
CAPACITY, FAIR
}
private SchedulerType schedulerType;
@Parameterized.Parameters(name = "{0}")
public static Collection<Object[]> getParameters() {
return Arrays.stream(SchedulerType.values()).map(
type -> new Object[]{type}).collect(Collectors.toList());
}
public TestYarnClientWithReservation(SchedulerType scheduler) {
this.schedulerType = scheduler;
}
private MiniYARNCluster setupMiniYARNCluster() throws Exception {
MiniYARNCluster cluster =
new MiniYARNCluster("testReservationAPIs", 2, 1, 1);
cluster.init(getConfigurationForReservation());
cluster.start();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
return cluster.getResourceManager().getRMContext()
.getReservationSystem()
.getPlan(ReservationSystemTestUtil.reservationQ)
.getTotalCapacity().getMemorySize() > 6000;
}
}, 10, 10000);
return cluster;
}
private Configuration getConfigurationForReservation() {
Configuration conf = new Configuration();
if (schedulerType == SchedulerType.FAIR) {
conf = configureReservationForFairScheduler();
conf.set(YarnConfiguration.RM_SCHEDULER, FairScheduler.class.getName());
} else if (schedulerType == SchedulerType.CAPACITY) {
conf = configureReservationForCapacityScheduler();
conf.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getName());
}
conf.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
return conf;
}
private Configuration configureReservationForCapacityScheduler() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
ReservationSystemTestUtil.setupQueueConfiguration(conf);
return conf;
}
private Configuration configureReservationForFairScheduler() {
Configuration conf = new Configuration();
try {
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queue name=\"root\">");
out.println(" <queue name=\"default\"></queue>");
out.println(" <queue name=\"dedicated\">");
out.println(" <reservation></reservation>");
// set weight to 10 to make sure this queue get enough steady fair share
out.println(" <weight>10</weight>");
out.println(" </queue>");
out.println("</queue>");
out.println("<defaultQueueSchedulingPolicy>drf" +
"</defaultQueueSchedulingPolicy>");
out.println("</allocations>");
out.close();
} catch (IOException e) {
Assert.fail(e.getMessage());
}
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, FS_ALLOC_FILE);
return conf;
}
private YarnClient setupYarnClient(MiniYARNCluster cluster) {
final Configuration yarnConf = cluster.getConfig();
YarnClient client = YarnClient.createYarnClient();
client.init(yarnConf);
client.start();
return client;
}
private ReservationSubmissionRequest submitReservationTestHelper(
YarnClient client, long arrival, long deadline, long duration)
throws IOException, YarnException {
ReservationId reservationID = client.createReservation().getReservationId();
ReservationSubmissionRequest sRequest = createSimpleReservationRequest(
reservationID, 4, arrival, deadline, duration);
ReservationSubmissionResponse sResponse =
client.submitReservation(sRequest);
Assert.assertNotNull(sResponse);
Assert.assertNotNull(reservationID);
System.out.println("Submit reservation response: " + reservationID);
return sRequest;
}
@Before
public void setup() {
QueueMetrics.clearQueueMetrics();
DefaultMetricsSystem.setMiniClusterMode(true);
}
@Test
public void testCreateReservation() throws Exception {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
// Submit the reservation again with the same request and make sure it
// passes.
client.submitReservation(sRequest);
// Submit the reservation with the same reservation id but different
// reservation definition, and ensure YarnException is thrown.
arrival = clock.getTime();
ReservationDefinition rDef = sRequest.getReservationDefinition();
rDef.setArrival(arrival + duration);
sRequest.setReservationDefinition(rDef);
try {
client.submitReservation(sRequest);
Assert.fail("Reservation submission should fail if a duplicate "
+ "reservation id is used, but the reservation definition has been "
+ "updated.");
} catch (Exception e) {
Assert.assertTrue(e instanceof YarnException);
}
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testUpdateReservation() throws Exception {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
ReservationDefinition rDef = sRequest.getReservationDefinition();
ReservationRequest rr =
rDef.getReservationRequests().getReservationResources().get(0);
ReservationId reservationID = sRequest.getReservationId();
rr.setNumContainers(5);
arrival = clock.getTime();
duration = 30000;
deadline = (long) (arrival + 1.05 * duration);
rr.setDuration(duration);
rDef.setArrival(arrival);
rDef.setDeadline(deadline);
ReservationUpdateRequest uRequest =
ReservationUpdateRequest.newInstance(rDef, reservationID);
ReservationUpdateResponse uResponse = client.updateReservation(uRequest);
Assert.assertNotNull(uResponse);
System.out.println("Update reservation response: " + uResponse);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
private ReservationSubmissionRequest createSimpleReservationRequest(
ReservationId reservationId, int numContainers, long arrival,
long deadline, long duration) {
// create a request with a single atomic ask
ReservationRequest r =
ReservationRequest.newInstance(Resource.newInstance(1024, 1),
numContainers, 1, duration);
ReservationRequests reqs =
ReservationRequests.newInstance(Collections.singletonList(r),
ReservationRequestInterpreter.R_ALL);
ReservationDefinition rDef =
ReservationDefinition.newInstance(arrival, deadline, reqs,
"testYarnClient#reservation");
ReservationSubmissionRequest request =
ReservationSubmissionRequest.newInstance(rDef,
ReservationSystemTestUtil.reservationQ, reservationId);
return request;
}
@Test
public void testListReservationsByReservationId() throws Exception{
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
ReservationId reservationID = sRequest.getReservationId();
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1,
-1, false);
ReservationListResponse response = client.listReservations(request);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size(), 0);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testListReservationsByTimeInterval() throws Exception {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
// List reservations, search by a point in time within the reservation
// range.
arrival = clock.getTime();
ReservationId reservationID = sRequest.getReservationId();
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", arrival + duration / 2,
arrival + duration / 2, true);
ReservationListResponse response = client.listReservations(request);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// List reservations, search by time within reservation interval.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, Long.MAX_VALUE, true);
response = client.listReservations(request);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), reservationID.getId());
// Verify that the full resource allocations exist.
Assert.assertTrue(response.getReservationAllocationState().get(0)
.getResourceAllocationRequests().size() > 0);
// Verify that the full RDL is returned.
ReservationRequests reservationRequests =
response.getReservationAllocationState().get(0)
.getReservationDefinition().getReservationRequests();
Assert.assertEquals("R_ALL",
reservationRequests.getInterpreter().toString());
Assert.assertTrue(reservationRequests.getReservationResources().get(0)
.getDuration() == duration);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testListReservationsByInvalidTimeInterval() throws Exception {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
// List reservations, search by invalid end time == -1.
ReservationListRequest request = ReservationListRequest
.newInstance(ReservationSystemTestUtil.reservationQ, "", 1, -1, true);
ReservationListResponse response = client.listReservations(request);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), sRequest.getReservationId().getId());
// List reservations, search by invalid end time < -1.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 1, -10, true);
response = client.listReservations(request);
Assert.assertNotNull(response);
Assert.assertEquals(1, response.getReservationAllocationState().size());
Assert.assertEquals(response.getReservationAllocationState().get(0)
.getReservationId().getId(), sRequest.getReservationId().getId());
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testListReservationsByTimeIntervalContainingNoReservations()
throws Exception {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
// List reservations, search by very large start time.
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", Long.MAX_VALUE, -1,
false);
ReservationListResponse response = client.listReservations(request);
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
duration = 30000;
deadline = sRequest.getReservationDefinition().getDeadline();
// List reservations, search by start time after the reservation
// end time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", deadline + duration,
deadline + 2 * duration, false);
response = client.listReservations(request);
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
arrival = clock.getTime();
// List reservations, search by end time before the reservation start
// time.
request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, "", 0, arrival - duration,
false);
response = client.listReservations(request);
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
// List reservations, search by very small end time.
request = ReservationListRequest
.newInstance(ReservationSystemTestUtil.reservationQ, "", 0, 1, false);
response = client.listReservations(request);
// Ensure all reservations are filtered out.
Assert.assertNotNull(response);
Assert.assertEquals(response.getReservationAllocationState().size(), 0);
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
@Test
public void testReservationDelete() throws Exception {
MiniYARNCluster cluster = setupMiniYARNCluster();
YarnClient client = setupYarnClient(cluster);
try {
Clock clock = new UTCClock();
long arrival = clock.getTime();
long duration = 60000;
long deadline = (long) (arrival + 1.05 * duration);
ReservationSubmissionRequest sRequest =
submitReservationTestHelper(client, arrival, deadline, duration);
ReservationId reservationID = sRequest.getReservationId();
// Delete the reservation
ReservationDeleteRequest dRequest =
ReservationDeleteRequest.newInstance(reservationID);
ReservationDeleteResponse dResponse = client.deleteReservation(dRequest);
Assert.assertNotNull(dResponse);
System.out.println("Delete reservation response: " + dResponse);
// List reservations, search by non-existent reservationID
ReservationListRequest request = ReservationListRequest.newInstance(
ReservationSystemTestUtil.reservationQ, reservationID.toString(), -1,
-1, false);
ReservationListResponse response = client.listReservations(request);
Assert.assertNotNull(response);
Assert.assertEquals(0, response.getReservationAllocationState().size());
} finally {
// clean-up
if (client != null) {
client.stop();
}
cluster.stop();
}
}
}