YARN-11252. Yarn Federation Router Supports Update / Delete Reservation in MemoryStore. (#4741)

This commit is contained in:
slfan1989 2022-08-19 12:13:43 +08:00 committed by GitHub
parent 7f030250b4
commit f75c58a1ca
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 906 additions and 0 deletions

View File

@ -26,6 +26,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHom
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
/**
* FederationReservationHomeSubClusterStore maintains the state of all
@ -86,4 +90,30 @@ public interface FederationReservationHomeSubClusterStore {
GetReservationsHomeSubClusterResponse getReservationsHomeSubCluster(
GetReservationsHomeSubClusterRequest request) throws YarnException;
/**
* Update the home {@code SubClusterId} of a previously submitted
* {@code ReservationId}. Currently response is empty if the operation was
* successful, if not an exception reporting reason for a failure.
*
* @param request the request to update the home sub-cluster of a reservation.
* @return empty on successful update of the Reservation in the StateStore, if
* not an exception reporting reason for a failure
* @throws YarnException if the request is invalid/fails
*/
UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
UpdateReservationHomeSubClusterRequest request) throws YarnException;
/**
* Delete the mapping of home {@code SubClusterId} of a previously submitted
* {@code ReservationId}. Currently response is empty if the operation was
* successful, if not an exception reporting reason for a failure.
*
* @param request the request to delete the home sub-cluster of a reservation.
* @return empty on successful update of the Reservation in the StateStore, if
* not an exception reporting reason for a failure
* @throws YarnException if the request is invalid/fails
*/
DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
DeleteReservationHomeSubClusterRequest request) throws YarnException;
}

View File

@ -67,6 +67,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHome
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationReservationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
@ -365,4 +369,31 @@ public class MemoryFederationStateStore implements FederationStateStore {
return GetReservationsHomeSubClusterResponse.newInstance(result);
}
@Override
public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
UpdateReservationHomeSubClusterRequest request) throws YarnException {
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationHomeSubCluster().getReservationId();
if (!reservations.containsKey(reservationId)) {
throw new YarnException("Reservation " + reservationId + " does not exist.");
}
SubClusterId subClusterId = request.getReservationHomeSubCluster().getHomeSubCluster();
reservations.put(reservationId, subClusterId);
return UpdateReservationHomeSubClusterResponse.newInstance();
}
@Override
public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
DeleteReservationHomeSubClusterRequest request) throws YarnException {
FederationReservationHomeSubClusterStoreInputValidator.validate(request);
ReservationId reservationId = request.getReservationId();
if (!reservations.containsKey(reservationId)) {
throw new YarnException("Reservation " + reservationId + " does not exist");
}
reservations.remove(reservationId);
return DeleteReservationHomeSubClusterResponse.newInstance();
}
}

View File

@ -74,6 +74,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHome
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationApplicationHomeSubClusterStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationMembershipStateStoreInputValidator;
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
@ -1027,4 +1031,16 @@ public class SQLFederationStateStore implements FederationStateStore {
GetReservationsHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
DeleteReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
UpdateReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}
}

View File

@ -72,6 +72,10 @@ import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHome
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationsHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterIdPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterInfoPBImpl;
import org.apache.hadoop.yarn.server.federation.store.records.impl.pb.SubClusterPolicyConfigurationPBImpl;
@ -662,4 +666,16 @@ public class ZookeeperFederationStateStore implements FederationStateStore {
GetReservationsHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
DeleteReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}
@Override
public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
UpdateReservationHomeSubClusterRequest request) throws YarnException {
throw new NotImplementedException("Code is not implemented");
}
}

View File

@ -0,0 +1,65 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.util.Records;
/**
* The request to <code>Federation state store</code> to delete the mapping of
* home subcluster of a submitted reservation.
*/
@Private
@Unstable
public abstract class DeleteReservationHomeSubClusterRequest {
@Private
@Unstable
public static DeleteReservationHomeSubClusterRequest newInstance(
ReservationId reservationId) {
DeleteReservationHomeSubClusterRequest deleteReservationRequest =
Records.newRecord(DeleteReservationHomeSubClusterRequest.class);
deleteReservationRequest.setReservationId(reservationId);
return deleteReservationRequest;
}
/**
* Get the identifier of the {@link ReservationId} to be removed from
* <code>Federation state store</code> .
*
* @return the identifier of the Reservation to be removed from Federation
* State Store.
*/
@Public
@Unstable
public abstract ReservationId getReservationId();
/**
* Set the identifier of the {@link ReservationId} to be removed from
* <code>Federation state store</code> .
*
* @param reservationId the identifier of the Reservation to be removed from
* Federation State Store.
*/
@Private
@Unstable
public abstract void setReservationId(ReservationId reservationId);
}

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* DeleteReservationHomeSubClusterResponse contains the answer from the {@code
* FederationReservationHomeSubClusterStore} to a request to delete the mapping
* of home subcluster of a submitted reservation. Currently, response is empty if
* the operation was successful, if not an exception reporting reason for a
* failure.
*/
@Private
@Unstable
public abstract class DeleteReservationHomeSubClusterResponse {
@Private
@Unstable
public static DeleteReservationHomeSubClusterResponse newInstance() {
DeleteReservationHomeSubClusterResponse response =
Records.newRecord(DeleteReservationHomeSubClusterResponse.class);
return response;
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* <p>
* The request sent by the <code>Router</code> to
* <code>Federation state store</code> to update the home subcluster of a newly
* submitted reservation.
*
* <p>
* The request includes the mapping details, i.e.:
* <ul>
* <li>{@code ReservationId}</li>
* <li>{@code SubClusterId}</li>
* </ul>
*/
@Private
@Unstable
public abstract class UpdateReservationHomeSubClusterRequest {
@Private
@Unstable
public static UpdateReservationHomeSubClusterRequest newInstance(
ReservationHomeSubCluster reservationHomeSubCluster) {
UpdateReservationHomeSubClusterRequest updateReservationRequest =
Records.newRecord(UpdateReservationHomeSubClusterRequest.class);
updateReservationRequest
.setReservationHomeSubCluster(reservationHomeSubCluster);
return updateReservationRequest;
}
/**
* Get the {@link ReservationHomeSubCluster} representing the mapping of the
* reservation to it's home sub-cluster.
*
* @return the mapping of the reservation to it's home sub-cluster.
*/
@Public
@Unstable
public abstract ReservationHomeSubCluster getReservationHomeSubCluster();
/**
* Set the {@link ReservationHomeSubCluster} representing the mapping of the
* reservation to it's home sub-cluster.
*
* @param reservationHomeSubCluster the mapping of the reservation to it's
* home sub-cluster.
*/
@Private
@Unstable
public abstract void setReservationHomeSubCluster(
ReservationHomeSubCluster reservationHomeSubCluster);
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.util.Records;
/**
* UpdateReservationHomeSubClusterResponse contains the answer from the
* {@code FederationReservationHomeSubClusterStore} to a request to register the
* home subcluster of a submitted reservation. Currently response is empty if
* the operation was successful, if not an exception reporting reason for a
* failure.
*/
@Private
@Unstable
public abstract class UpdateReservationHomeSubClusterResponse {
@Private
@Unstable
public static UpdateReservationHomeSubClusterResponse newInstance() {
UpdateReservationHomeSubClusterResponse response =
Records.newRecord(UpdateReservationHomeSubClusterResponse.class);
return response;
}
}

View File

@ -0,0 +1,129 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.impl.pb.ReservationIdPBImpl;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.DeleteReservationHomeSubClusterRequestProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.DeleteReservationHomeSubClusterRequestProtoOrBuilder;
import org.apache.hadoop.yarn.proto.YarnProtos.ReservationIdProto;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
/**
* Protocol buffer based implementation of
* {@link DeleteReservationHomeSubClusterRequest}.
*/
@Private
@Unstable
public class DeleteReservationHomeSubClusterRequestPBImpl
extends DeleteReservationHomeSubClusterRequest {
private DeleteReservationHomeSubClusterRequestProto proto =
DeleteReservationHomeSubClusterRequestProto.getDefaultInstance();
private DeleteReservationHomeSubClusterRequestProto.Builder builder = null;
private boolean viaProto = false;
public DeleteReservationHomeSubClusterRequestPBImpl() {
builder = DeleteReservationHomeSubClusterRequestProto.newBuilder();
}
public DeleteReservationHomeSubClusterRequestPBImpl(
DeleteReservationHomeSubClusterRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public DeleteReservationHomeSubClusterRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = DeleteReservationHomeSubClusterRequestProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public ReservationId getReservationId() {
DeleteReservationHomeSubClusterRequestProtoOrBuilder p =
viaProto ? proto : builder;
if (!p.hasReservationId()) {
return null;
}
return convertFromProtoFormat(p.getReservationId());
}
@Override
public void setReservationId(ReservationId reservationId) {
maybeInitBuilder();
if (reservationId == null) {
builder.clearReservationId();
return;
}
builder.setReservationId(convertToProtoFormat(reservationId));
}
private ReservationId convertFromProtoFormat(ReservationIdProto appId) {
return new ReservationIdPBImpl(appId);
}
private ReservationIdProto convertToProtoFormat(ReservationId appId) {
return ((ReservationIdPBImpl) appId).getProto();
}
}

View File

@ -0,0 +1,76 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.DeleteReservationHomeSubClusterResponseProto;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
/**
* Protocol buffer based implementation of
* {@link DeleteReservationHomeSubClusterResponse}.
*/
@Private
@Unstable
public class DeleteReservationHomeSubClusterResponsePBImpl
extends DeleteReservationHomeSubClusterResponse {
private DeleteReservationHomeSubClusterResponseProto proto =
DeleteReservationHomeSubClusterResponseProto.getDefaultInstance();
private DeleteReservationHomeSubClusterResponseProto.Builder builder = null;
private boolean viaProto = false;
public DeleteReservationHomeSubClusterResponsePBImpl() {
builder = DeleteReservationHomeSubClusterResponseProto.newBuilder();
}
public DeleteReservationHomeSubClusterResponsePBImpl(
DeleteReservationHomeSubClusterResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public DeleteReservationHomeSubClusterResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -0,0 +1,131 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.ReservationHomeSubClusterProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.UpdateReservationHomeSubClusterRequestProto;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.UpdateReservationHomeSubClusterRequestProtoOrBuilder;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
/**
* Protocol buffer based implementation of
* {@link UpdateReservationHomeSubClusterRequest} .
*/
@Private
@Unstable
public class UpdateReservationHomeSubClusterRequestPBImpl
extends UpdateReservationHomeSubClusterRequest {
private UpdateReservationHomeSubClusterRequestProto proto =
UpdateReservationHomeSubClusterRequestProto.getDefaultInstance();
private UpdateReservationHomeSubClusterRequestProto.Builder builder = null;
private boolean viaProto = false;
public UpdateReservationHomeSubClusterRequestPBImpl() {
builder = UpdateReservationHomeSubClusterRequestProto.newBuilder();
}
public UpdateReservationHomeSubClusterRequestPBImpl(
UpdateReservationHomeSubClusterRequestProto proto) {
this.proto = proto;
viaProto = true;
}
public UpdateReservationHomeSubClusterRequestProto getProto() {
mergeLocalToProto();
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
private void mergeLocalToProto() {
if (viaProto) {
maybeInitBuilder();
}
mergeLocalToBuilder();
proto = builder.build();
viaProto = true;
}
private void maybeInitBuilder() {
if (viaProto || builder == null) {
builder = UpdateReservationHomeSubClusterRequestProto.newBuilder(proto);
}
viaProto = false;
}
private void mergeLocalToBuilder() {
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
@Override
public ReservationHomeSubCluster getReservationHomeSubCluster() {
UpdateReservationHomeSubClusterRequestProtoOrBuilder p =
viaProto ? proto : builder;
if (!p.hasAppSubclusterMap()) {
return null;
}
return convertFromProtoFormat(p.getAppSubclusterMap());
}
@Override
public void setReservationHomeSubCluster(
ReservationHomeSubCluster reservationInfo) {
maybeInitBuilder();
if (reservationInfo == null) {
builder.clearAppSubclusterMap();
return;
}
builder.setAppSubclusterMap(convertToProtoFormat(reservationInfo));
}
private ReservationHomeSubCluster convertFromProtoFormat(
ReservationHomeSubClusterProto sc) {
return new ReservationHomeSubClusterPBImpl(sc);
}
private ReservationHomeSubClusterProto convertToProtoFormat(
ReservationHomeSubCluster sc) {
return ((ReservationHomeSubClusterPBImpl) sc).getProto();
}
}

View File

@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package org.apache.hadoop.yarn.server.federation.store.records.impl.pb;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.federation.proto.YarnServerFederationProtos.UpdateReservationHomeSubClusterResponseProto;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.thirdparty.protobuf.TextFormat;
/**
* Protocol buffer based implementation of
* {@link UpdateReservationHomeSubClusterResponse}.
*/
@Private
@Unstable
public class UpdateReservationHomeSubClusterResponsePBImpl
extends UpdateReservationHomeSubClusterResponse {
private UpdateReservationHomeSubClusterResponseProto proto =
UpdateReservationHomeSubClusterResponseProto.getDefaultInstance();
private UpdateReservationHomeSubClusterResponseProto.Builder builder = null;
private boolean viaProto = false;
public UpdateReservationHomeSubClusterResponsePBImpl() {
builder = UpdateReservationHomeSubClusterResponseProto.newBuilder();
}
public UpdateReservationHomeSubClusterResponsePBImpl(
UpdateReservationHomeSubClusterResponseProto proto) {
this.proto = proto;
viaProto = true;
}
public UpdateReservationHomeSubClusterResponseProto getProto() {
proto = viaProto ? proto : builder.build();
viaProto = true;
return proto;
}
@Override
public int hashCode() {
return getProto().hashCode();
}
@Override
public boolean equals(Object other) {
if (other == null) {
return false;
}
if (other.getClass().isAssignableFrom(this.getClass())) {
return this.getProto().equals(this.getClass().cast(other).getProto());
}
return false;
}
@Override
public String toString() {
return TextFormat.shortDebugString(getProto());
}
}

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.yarn.server.federation.store.exception.FederationStateS
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.ReservationHomeSubCluster;
import org.apache.hadoop.yarn.server.federation.store.records.GetReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -122,4 +124,48 @@ public final class FederationReservationHomeSubClusterStoreInputValidator {
throw new FederationStateStoreInvalidInputException(message);
}
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast). Check if the provided {@link UpdateReservationHomeSubClusterRequest}
* for updating an reservation is valid or not.
*
* @param request the {@link UpdateReservationHomeSubClusterRequest} to
* validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
public static void validate(UpdateReservationHomeSubClusterRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing UpdateReservationHomeSubCluster Request." +
" Please try again by specifying an ReservationHomeSubCluster information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
// validate ReservationHomeSubCluster info
checkReservationHomeSubCluster(request.getReservationHomeSubCluster());
}
/**
* Quick validation on the input to check some obvious fail conditions (fail
* fast). Check if the provided {@link DeleteReservationHomeSubClusterRequest}
* for deleting an Reservation is valid or not.
*
* @param request the {@link DeleteReservationHomeSubClusterRequest} to
* validate against
* @throws FederationStateStoreInvalidInputException if the request is invalid
*/
public static void validate(DeleteReservationHomeSubClusterRequest request)
throws FederationStateStoreInvalidInputException {
if (request == null) {
String message = "Missing DeleteReservationHomeSubCluster Request." +
" Please try again by specifying an ReservationHomeSubCluster information.";
LOG.warn(message);
throw new FederationStateStoreInvalidInputException(message);
}
// validate Reservation Id
checkReservationId(request.getReservationId());
}
}

View File

@ -72,6 +72,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterId;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterInfo;
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterPolicyConfiguration;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -446,6 +448,34 @@ public final class FederationStateStoreFacade {
return response.getReservationHomeSubCluster().getHomeSubCluster();
}
/**
* Updates the home {@link SubClusterId} for the specified
* {@link ReservationId}.
*
* @param appHomeSubCluster the mapping of the reservation to it's home
* sub-cluster
* @throws YarnException if the call to the state store is unsuccessful
*/
public void updateReservationHomeSubCluster(ReservationHomeSubCluster appHomeSubCluster)
throws YarnException {
UpdateReservationHomeSubClusterRequest request =
UpdateReservationHomeSubClusterRequest.newInstance(appHomeSubCluster);
stateStore.updateReservationHomeSubCluster(request);
}
/**
* Delete the home {@link SubClusterId} for the specified
* {@link ReservationId}.
*
* @param reservationId the identifier of the reservation
* @throws YarnException if the call to the state store is unsuccessful
*/
public void deleteReservationHomeSubCluster(ReservationId reservationId) throws YarnException {
DeleteReservationHomeSubClusterRequest request =
DeleteReservationHomeSubClusterRequest.newInstance(reservationId);
stateStore.deleteReservationHomeSubCluster(request);
}
/**
* Helper method to create instances of Object using the class name defined in
* the configuration object. The instances creates {@link RetryProxy} using

View File

@ -194,4 +194,76 @@ public abstract class BaseRouterPoliciesTest
Assert.assertEquals(chosen, chosen2);
}
@Test
public void testUpdateReservation() throws YarnException {
long now = Time.now();
ReservationSubmissionRequest resReq = getReservationSubmissionRequest();
when(resReq.getQueue()).thenReturn("queue1");
when(resReq.getReservationId()).thenReturn(ReservationId.newInstance(now, 1));
// first we invoke a reservation placement
FederationRouterPolicy routerPolicy = (FederationRouterPolicy) getPolicy();
SubClusterId chosen = routerPolicy.getReservationHomeSubcluster(resReq);
// add this to the store
FederationStateStoreFacade facade =
getFederationPolicyContext().getFederationStateStoreFacade();
ReservationHomeSubCluster subCluster =
ReservationHomeSubCluster.newInstance(resReq.getReservationId(), chosen);
facade.addReservationHomeSubCluster(subCluster);
// get all activeSubClusters
Map<SubClusterId, SubClusterInfo> activeSubClusters = getActiveSubclusters();
// Update ReservationHomeSubCluster
List<SubClusterId> subClusterIds = new ArrayList<>(activeSubClusters.keySet());
SubClusterId chosen2 = subClusterIds.get(this.getRand().nextInt(subClusterIds.size()));
ReservationHomeSubCluster subCluster2 =
ReservationHomeSubCluster.newInstance(resReq.getReservationId(), chosen2);
facade.updateReservationHomeSubCluster(subCluster2);
// route an application that uses this app
ApplicationSubmissionContext applicationSubmissionContext =
ApplicationSubmissionContext.newInstance(
ApplicationId.newInstance(now, 1), "app1", "queue1", Priority.newInstance(1),
null, false, false, 1, null, null, false);
applicationSubmissionContext.setReservationID(resReq.getReservationId());
SubClusterId chosen3 = routerPolicy.getHomeSubcluster(
applicationSubmissionContext, new ArrayList<>());
Assert.assertEquals(chosen2, chosen3);
}
@Test
public void testDeleteReservation() throws Exception {
long now = Time.now();
ReservationSubmissionRequest resReq = getReservationSubmissionRequest();
when(resReq.getQueue()).thenReturn("queue1");
when(resReq.getReservationId()).thenReturn(ReservationId.newInstance(now, 1));
// first we invoke a reservation placement
FederationRouterPolicy routerPolicy = (FederationRouterPolicy) getPolicy();
SubClusterId chosen = routerPolicy.getReservationHomeSubcluster(resReq);
// add this to the store
FederationStateStoreFacade facade =
getFederationPolicyContext().getFederationStateStoreFacade();
ReservationHomeSubCluster subCluster =
ReservationHomeSubCluster.newInstance(resReq.getReservationId(), chosen);
facade.addReservationHomeSubCluster(subCluster);
// delete this to the store
facade.deleteReservationHomeSubCluster(resReq.getReservationId());
ApplicationSubmissionContext applicationSubmissionContext =
ApplicationSubmissionContext.newInstance(
ApplicationId.newInstance(now, 1), "app1", "queue1", Priority.newInstance(1),
null, false, false, 1, null, null, false);
applicationSubmissionContext.setReservationID(resReq.getReservationId());
LambdaTestUtils.intercept(YarnException.class,
"Reservation " + resReq.getReservationId() + " does not exist",
() -> routerPolicy.getHomeSubcluster(applicationSubmissionContext, new ArrayList<>()));
}
}

View File

@ -63,4 +63,16 @@ public class TestRejectRouterPolicy extends BaseRouterPoliciesTest {
public void testFollowReservation() throws YarnException {
super.testFollowReservation();
}
@Override
@Test(expected = FederationPolicyException.class)
public void testUpdateReservation() throws YarnException {
super.testUpdateReservation();
}
@Override
@Test(expected = FederationPolicyException.class)
public void testDeleteReservation() throws Exception {
super.testDeleteReservation();
}
}

View File

@ -37,6 +37,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHome
import org.apache.hadoop.yarn.server.federation.store.records.AddReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.DeleteReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.GetApplicationsHomeSubClusterRequest;
@ -66,6 +68,8 @@ import org.apache.hadoop.yarn.server.federation.store.records.SubClusterRegister
import org.apache.hadoop.yarn.server.federation.store.records.SubClusterState;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateApplicationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterRequest;
import org.apache.hadoop.yarn.server.federation.store.records.UpdateReservationHomeSubClusterResponse;
import org.apache.hadoop.yarn.server.federation.utils.FederationStateStoreFacade;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -325,4 +329,16 @@ public class FederationStateStoreService extends AbstractService
GetReservationsHomeSubClusterRequest request) throws YarnException {
return stateStoreClient.getReservationsHomeSubCluster(request);
}
@Override
public UpdateReservationHomeSubClusterResponse updateReservationHomeSubCluster(
UpdateReservationHomeSubClusterRequest request) throws YarnException {
return stateStoreClient.updateReservationHomeSubCluster(request);
}
@Override
public DeleteReservationHomeSubClusterResponse deleteReservationHomeSubCluster(
DeleteReservationHomeSubClusterRequest request) throws YarnException {
return stateStoreClient.deleteReservationHomeSubCluster(request);
}
}