From def9d94a40e1ff71a0dc5a4db1f042e2704cb84d Mon Sep 17 00:00:00 2001 From: Xiao Chen Date: Mon, 9 Jul 2018 12:00:32 -0700 Subject: [PATCH 1/8] HADOOP-15591. KMSClientProvider should log KMS DT acquisition at INFO level. Contributed by Kitti Nanasi. --- .../org/apache/hadoop/crypto/key/kms/KMSClientProvider.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java index 7b4607507b9..11815da9d21 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/crypto/key/kms/KMSClientProvider.java @@ -1036,13 +1036,13 @@ public class KMSClientProvider extends KeyProvider implements CryptoExtension, public Token run() throws Exception { // Not using the cached token here.. Creating a new token here // everytime. - LOG.debug("Getting new token from {}, renewer:{}", url, renewer); + LOG.info("Getting new token from {}, renewer:{}", url, renewer); return authUrl.getDelegationToken(url, new DelegationTokenAuthenticatedURL.Token(), renewer, doAsUser); } }); if (token != null) { - LOG.debug("New token received: ({})", token); + LOG.info("New token received: ({})", token); credentials.addToken(token.getService(), token); tokens = new Token[] { token }; } else { From 895845e9b0d7ac49da36b5cf773c6330afe4f3e0 Mon Sep 17 00:00:00 2001 From: Xiao Chen Date: Mon, 9 Jul 2018 12:06:25 -0700 Subject: [PATCH 2/8] HADOOP-15581. Set default jetty log level to INFO in KMS. Contributed by Kitti Nanasi. --- .../hadoop-kms/src/main/conf/kms-log4j.properties | 4 +++- .../hadoop-kms/src/test/resources/log4j.properties | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-kms/src/main/conf/kms-log4j.properties b/hadoop-common-project/hadoop-kms/src/main/conf/kms-log4j.properties index 04a3cf3566d..e2afd41be08 100644 --- a/hadoop-common-project/hadoop-kms/src/main/conf/kms-log4j.properties +++ b/hadoop-common-project/hadoop-kms/src/main/conf/kms-log4j.properties @@ -37,4 +37,6 @@ log4j.logger.org.apache.hadoop=INFO log4j.logger.com.sun.jersey.server.wadl.generators.WadlGeneratorJAXBGrammarGenerator=OFF # make zookeeper log level an explicit config, and not changing with rootLogger. log4j.logger.org.apache.zookeeper=INFO -log4j.logger.org.apache.curator=INFO \ No newline at end of file +log4j.logger.org.apache.curator=INFO +# make jetty log level an explicit config, and not changing with rootLogger. +log4j.logger.org.eclipse.jetty=INFO \ No newline at end of file diff --git a/hadoop-common-project/hadoop-kms/src/test/resources/log4j.properties b/hadoop-common-project/hadoop-kms/src/test/resources/log4j.properties index e319af66569..b8e6353b393 100644 --- a/hadoop-common-project/hadoop-kms/src/test/resources/log4j.properties +++ b/hadoop-common-project/hadoop-kms/src/test/resources/log4j.properties @@ -31,4 +31,6 @@ log4j.logger.org.apache.directory.server.core=OFF log4j.logger.org.apache.hadoop.util.NativeCodeLoader=OFF # make zookeeper log level an explicit config, and not changing with rootLogger. log4j.logger.org.apache.zookeeper=INFO -log4j.logger.org.apache.curator=INFO \ No newline at end of file +log4j.logger.org.apache.curator=INFO +# make jetty log level an explicit config, and not changing with rootLogger. +log4j.logger.org.eclipse.jetty=INFO \ No newline at end of file From e12d93bfc1a0efd007bc84758e60b5149c3aa663 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Mon, 9 Jul 2018 12:02:20 -0700 Subject: [PATCH 3/8] HDDS-224. Create metrics for Event Watcher. Contributed by Elek, Marton. --- hadoop-hdds/framework/pom.xml | 5 + .../hdds/server/events/EventWatcher.java | 43 ++++++- .../server/events/EventWatcherMetrics.java | 79 +++++++++++++ .../hdds/server/events/TestEventWatcher.java | 111 +++++++++++++++--- 4 files changed, 222 insertions(+), 16 deletions(-) create mode 100644 hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcherMetrics.java diff --git a/hadoop-hdds/framework/pom.xml b/hadoop-hdds/framework/pom.xml index a497133f9dd..6e1927d2d93 100644 --- a/hadoop-hdds/framework/pom.xml +++ b/hadoop-hdds/framework/pom.xml @@ -39,6 +39,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd"> hadoop-hdds-common provided + + org.mockito + mockito-all + test + diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java index 19fddde9b4d..8c5605a29be 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventWatcher.java @@ -26,12 +26,17 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.function.Predicate; import java.util.stream.Collectors; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.ozone.lease.Lease; import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException; import org.apache.hadoop.ozone.lease.LeaseExpiredException; import org.apache.hadoop.ozone.lease.LeaseManager; import org.apache.hadoop.ozone.lease.LeaseNotFoundException; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; +import org.apache.commons.collections.map.HashedMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -58,18 +63,39 @@ public abstract class EventWatcher leaseManager; + private final EventWatcherMetrics metrics; + + private final String name; + protected final Map trackedEventsByUUID = new ConcurrentHashMap<>(); protected final Set trackedEvents = new HashSet<>(); - public EventWatcher(Event startEvent, + private final Map startTrackingTimes = new HashedMap(); + + public EventWatcher(String name, Event startEvent, Event completionEvent, LeaseManager leaseManager) { this.startEvent = startEvent; this.completionEvent = completionEvent; this.leaseManager = leaseManager; + this.metrics = new EventWatcherMetrics(); + Preconditions.checkNotNull(name); + if (name.equals("")) { + name = getClass().getSimpleName(); + } + if (name.equals("")) { + //for anonymous inner classes + name = getClass().getName(); + } + this.name = name; + } + public EventWatcher(Event startEvent, + Event completionEvent, + LeaseManager leaseManager) { + this("", startEvent, completionEvent, leaseManager); } public void start(EventQueue queue) { @@ -87,11 +113,16 @@ public abstract class EventWatcher + * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.server.events; + +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRate; + +import com.google.common.annotations.VisibleForTesting; + +/** + * Metrics for any event watcher. + */ +public class EventWatcherMetrics { + + @Metric() + private MutableCounterLong trackedEvents; + + @Metric() + private MutableCounterLong timedOutEvents; + + @Metric() + private MutableCounterLong completedEvents; + + @Metric() + private MutableRate completionTime; + + public void incrementTrackedEvents() { + trackedEvents.incr(); + } + + public void incrementTimedOutEvents() { + timedOutEvents.incr(); + } + + public void incrementCompletedEvents() { + completedEvents.incr(); + } + + @VisibleForTesting + public void updateFinishingTime(long duration) { + completionTime.add(duration); + } + + @VisibleForTesting + public MutableCounterLong getTrackedEvents() { + return trackedEvents; + } + + @VisibleForTesting + public MutableCounterLong getTimedOutEvents() { + return timedOutEvents; + } + + @VisibleForTesting + public MutableCounterLong getCompletedEvents() { + return completedEvents; + } + + @VisibleForTesting + public MutableRate getCompletionTime() { + return completionTime; + } +} diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java index 1731350cfe5..38e15547a84 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventWatcher.java @@ -21,8 +21,13 @@ import java.util.List; import java.util.Objects; import java.util.UUID; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.ozone.lease.LeaseManager; +import org.apache.hadoop.test.MetricsAsserts; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -46,6 +51,7 @@ public class TestEventWatcher { @Before public void startLeaseManager() { + DefaultMetricsSystem.instance(); leaseManager = new LeaseManager<>(2000l); leaseManager.start(); } @@ -53,12 +59,12 @@ public class TestEventWatcher { @After public void stopLeaseManager() { leaseManager.shutdown(); + DefaultMetricsSystem.shutdown(); } @Test public void testEventHandling() throws InterruptedException { - EventQueue queue = new EventQueue(); EventWatcher @@ -139,26 +145,101 @@ public class TestEventWatcher { Assert.assertEquals(0, c1todo.size()); Assert.assertFalse(replicationWatcher.contains(event1)); + } + @Test + public void testMetrics() throws InterruptedException { + + DefaultMetricsSystem.initialize("test"); + + EventQueue queue = new EventQueue(); + + EventWatcher + replicationWatcher = createEventWatcher(); + + EventHandlerStub underReplicatedEvents = + new EventHandlerStub<>(); + + queue.addHandler(UNDER_REPLICATED, underReplicatedEvents); + + replicationWatcher.start(queue); + + //send 3 event to track 3 in-progress activity + UnderreplicatedEvent event1 = + new UnderreplicatedEvent(UUID.randomUUID(), "C1"); + + UnderreplicatedEvent event2 = + new UnderreplicatedEvent(UUID.randomUUID(), "C2"); + + UnderreplicatedEvent event3 = + new UnderreplicatedEvent(UUID.randomUUID(), "C1"); + + queue.fireEvent(WATCH_UNDER_REPLICATED, event1); + + queue.fireEvent(WATCH_UNDER_REPLICATED, event2); + + queue.fireEvent(WATCH_UNDER_REPLICATED, event3); + + //1st event is completed, don't need to track any more + ReplicationCompletedEvent event1Completed = + new ReplicationCompletedEvent(event1.UUID, "C1", "D1"); + + queue.fireEvent(REPLICATION_COMPLETED, event1Completed); + + + Thread.sleep(2200l); + + //until now: 3 in-progress activities are tracked with three + // UnderreplicatedEvents. The first one is completed, the remaining two + // are timed out (as the timeout -- defined in the leasmanager -- is 2000ms. + + EventWatcherMetrics metrics = replicationWatcher.getMetrics(); + + //3 events are received + Assert.assertEquals(3, metrics.getTrackedEvents().value()); + + //one is finished. doesn't need to be resent + Assert.assertEquals(1, metrics.getCompletedEvents().value()); + + //Other two are timed out and resent + Assert.assertEquals(2, metrics.getTimedOutEvents().value()); + + DefaultMetricsSystem.shutdown(); } private EventWatcher createEventWatcher() { - return new EventWatcher( - WATCH_UNDER_REPLICATED, REPLICATION_COMPLETED, leaseManager) { - - @Override - void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) { - publisher.fireEvent(UNDER_REPLICATED, payload); - } - - @Override - void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) { - //Good job. We did it. - } - }; + return new CommandWatcherExample(WATCH_UNDER_REPLICATED, + REPLICATION_COMPLETED, leaseManager); } + private class CommandWatcherExample + extends EventWatcher { + + public CommandWatcherExample(Event startEvent, + Event completionEvent, + LeaseManager leaseManager) { + super("TestCommandWatcher", startEvent, completionEvent, leaseManager); + } + + @Override + void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) { + publisher.fireEvent(UNDER_REPLICATED, payload); + } + + @Override + void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) { + //Good job. We did it. + } + + @Override + public EventWatcherMetrics getMetrics() { + return super.getMetrics(); + } + } + + ; + private static class ReplicationCompletedEvent implements IdentifiableEventPayload { @@ -217,4 +298,4 @@ public class TestEventWatcher { } } -} \ No newline at end of file +} From ea9b608237e7f2cf9b1e36b0f78c9674ec84096f Mon Sep 17 00:00:00 2001 From: Giovanni Matteo Fumarola Date: Mon, 9 Jul 2018 12:27:36 -0700 Subject: [PATCH 4/8] YARN-7899. [AMRMProxy] Stateful FederationInterceptor for pending requests. Contributed by Botong Huang. --- .../hadoop/yarn/client/AMRMClientUtils.java | 91 ----------- .../hadoop/yarn/server/AMRMClientRelayer.java | 9 +- .../server/uam/UnmanagedAMPoolManager.java | 16 ++ .../uam/UnmanagedApplicationManager.java | 40 +++-- .../server/MockResourceManagerFacade.java | 13 +- .../amrmproxy/FederationInterceptor.java | 146 +++++++++++++++--- .../amrmproxy/BaseAMRMProxyTest.java | 2 + .../amrmproxy/TestFederationInterceptor.java | 17 ++ 8 files changed, 192 insertions(+), 142 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java index 387e399bc1f..5d4ab4a6814 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/client/AMRMClientUtils.java @@ -36,19 +36,9 @@ import org.apache.hadoop.security.SaslRpcServer; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; -import org.apache.hadoop.yarn.api.ApplicationMasterProtocol; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest; -import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; -import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse; -import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest; -import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; -import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; -import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; -import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,87 +57,6 @@ public final class AMRMClientUtils { private AMRMClientUtils() { } - /** - * Handle ApplicationNotRegistered exception and re-register. - * - * @param appId application Id - * @param rmProxy RM proxy instance - * @param registerRequest the AM re-register request - * @throws YarnException if re-register fails - */ - public static void handleNotRegisteredExceptionAndReRegister( - ApplicationId appId, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest) throws YarnException { - LOG.info("App attempt {} not registered, most likely due to RM failover. " - + " Trying to re-register.", appId); - try { - rmProxy.registerApplicationMaster(registerRequest); - } catch (Exception e) { - if (e instanceof InvalidApplicationMasterRequestException - && e.getMessage().contains(APP_ALREADY_REGISTERED_MESSAGE)) { - LOG.info("Concurrent thread successfully registered, moving on."); - } else { - LOG.error("Error trying to re-register AM", e); - throw new YarnException(e); - } - } - } - - /** - * Helper method for client calling ApplicationMasterProtocol.allocate that - * handles re-register if RM fails over. - * - * @param request allocate request - * @param rmProxy RM proxy - * @param registerRequest the register request for re-register - * @param appId application id - * @return allocate response - * @throws YarnException if RM call fails - * @throws IOException if RM call fails - */ - public static AllocateResponse allocateWithReRegister(AllocateRequest request, - ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, ApplicationId appId) - throws YarnException, IOException { - try { - return rmProxy.allocate(request); - } catch (ApplicationMasterNotRegisteredException e) { - handleNotRegisteredExceptionAndReRegister(appId, rmProxy, - registerRequest); - // reset responseId after re-register - request.setResponseId(0); - // retry allocate - return allocateWithReRegister(request, rmProxy, registerRequest, appId); - } - } - - /** - * Helper method for client calling - * ApplicationMasterProtocol.finishApplicationMaster that handles re-register - * if RM fails over. - * - * @param request finishApplicationMaster request - * @param rmProxy RM proxy - * @param registerRequest the register request for re-register - * @param appId application id - * @return finishApplicationMaster response - * @throws YarnException if RM call fails - * @throws IOException if RM call fails - */ - public static FinishApplicationMasterResponse finishAMWithReRegister( - FinishApplicationMasterRequest request, ApplicationMasterProtocol rmProxy, - RegisterApplicationMasterRequest registerRequest, ApplicationId appId) - throws YarnException, IOException { - try { - return rmProxy.finishApplicationMaster(request); - } catch (ApplicationMasterNotRegisteredException ex) { - handleNotRegisteredExceptionAndReRegister(appId, rmProxy, - registerRequest); - // retry finishAM after re-register - return finishAMWithReRegister(request, rmProxy, registerRequest, appId); - } - } - /** * Create a proxy for the specified protocol. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java index e8a7f64a144..0d1a27e289b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/AMRMClientRelayer.java @@ -147,6 +147,11 @@ public class AMRMClientRelayer extends AbstractService super.serviceStop(); } + public void setAMRegistrationRequest( + RegisterApplicationMasterRequest registerRequest) { + this.amRegistrationRequest = registerRequest; + } + @Override public RegisterApplicationMasterResponse registerApplicationMaster( RegisterApplicationMasterRequest request) @@ -259,8 +264,10 @@ public class AMRMClientRelayer extends AbstractService } } - // re register with RM, then retry allocate recursively + // re-register with RM, then retry allocate recursively registerApplicationMaster(this.amRegistrationRequest); + // Reset responseId after re-register + allocateRequest.setResponseId(0); return allocate(allocateRequest); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java index 02eef29dff7..5f9d81b881a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedAMPoolManager.java @@ -50,6 +50,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.util.AsyncCallback; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -385,4 +386,19 @@ public class UnmanagedAMPoolManager extends AbstractService { return this.unmanagedAppMasterMap.containsKey(uamId); } + /** + * Return the rmProxy relayer of an UAM. + * + * @param uamId uam Id + * @return the rmProxy relayer + * @throws YarnException if fails + */ + public AMRMClientRelayer getAMRMClientRelayer(String uamId) + throws YarnException { + if (!this.unmanagedAppMasterMap.containsKey(uamId)) { + throw new YarnException("UAM " + uamId + " does not exist"); + } + return this.unmanagedAppMasterMap.get(uamId).getAMRMClientRelayer(); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java index 73795dcbc77..856a8182dc9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/uam/UnmanagedApplicationManager.java @@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.YarnServerSecurityUtils; import org.apache.hadoop.yarn.util.AsyncCallback; @@ -90,7 +91,7 @@ public class UnmanagedApplicationManager { private BlockingQueue requestQueue; private AMRequestHandlerThread handlerThread; - private ApplicationMasterProtocol rmProxy; + private AMRMClientRelayer rmProxyRelayer; private ApplicationId applicationId; private String submitter; private String appNameSuffix; @@ -138,7 +139,7 @@ public class UnmanagedApplicationManager { this.appNameSuffix = appNameSuffix; this.handlerThread = new AMRequestHandlerThread(); this.requestQueue = new LinkedBlockingQueue<>(); - this.rmProxy = null; + this.rmProxyRelayer = null; this.connectionInitiated = false; this.registerRequest = null; this.recordFactory = RecordFactoryProvider.getRecordFactory(conf); @@ -190,8 +191,9 @@ public class UnmanagedApplicationManager { throws IOException { this.userUgi = UserGroupInformation.createProxyUser( this.applicationId.toString(), UserGroupInformation.getCurrentUser()); - this.rmProxy = createRMProxy(ApplicationMasterProtocol.class, this.conf, - this.userUgi, amrmToken); + this.rmProxyRelayer = + new AMRMClientRelayer(createRMProxy(ApplicationMasterProtocol.class, + this.conf, this.userUgi, amrmToken)); } /** @@ -209,19 +211,18 @@ public class UnmanagedApplicationManager { // Save the register request for re-register later this.registerRequest = request; - // Since we have setKeepContainersAcrossApplicationAttempts = true for UAM. - // We do not expect application already registered exception here LOG.info("Registering the Unmanaged application master {}", this.applicationId); RegisterApplicationMasterResponse response = - this.rmProxy.registerApplicationMaster(this.registerRequest); + this.rmProxyRelayer.registerApplicationMaster(this.registerRequest); + this.lastResponseId = 0; for (Container container : response.getContainersFromPreviousAttempts()) { - LOG.info("RegisterUAM returned existing running container " + LOG.debug("RegisterUAM returned existing running container " + container.getId()); } for (NMToken nmToken : response.getNMTokensFromPreviousAttempts()) { - LOG.info("RegisterUAM returned existing NM token for node " + LOG.debug("RegisterUAM returned existing NM token for node " + nmToken.getNodeId()); } @@ -249,7 +250,7 @@ public class UnmanagedApplicationManager { this.handlerThread.shutdown(); - if (this.rmProxy == null) { + if (this.rmProxyRelayer == null) { if (this.connectionInitiated) { // This is possible if the async launchUAM is still // blocked and retrying. Return a dummy response in this case. @@ -261,8 +262,7 @@ public class UnmanagedApplicationManager { + "be called before createAndRegister"); } } - return AMRMClientUtils.finishAMWithReRegister(request, this.rmProxy, - this.registerRequest, this.applicationId); + return this.rmProxyRelayer.finishApplicationMaster(request); } /** @@ -308,7 +308,7 @@ public class UnmanagedApplicationManager { // // In case 2, we have already save the allocate request above, so if the // registration succeed later, no request is lost. - if (this.rmProxy == null) { + if (this.rmProxyRelayer == null) { if (this.connectionInitiated) { LOG.info("Unmanaged AM still not successfully launched/registered yet." + " Saving the allocate request and send later."); @@ -328,6 +328,15 @@ public class UnmanagedApplicationManager { return this.applicationId; } + /** + * Returns the rmProxy relayer of this UAM. + * + * @return rmProxy relayer of the UAM + */ + public AMRMClientRelayer getAMRMClientRelayer() { + return this.rmProxyRelayer; + } + /** * Returns RM proxy for the specified protocol type. Unit test cases can * override this method and return mock proxy instances. @@ -592,10 +601,7 @@ public class UnmanagedApplicationManager { } request.setResponseId(lastResponseId); - - AllocateResponse response = AMRMClientUtils.allocateWithReRegister( - request, rmProxy, registerRequest, applicationId); - + AllocateResponse response = rmProxyRelayer.allocate(request); if (response == null) { throw new YarnException("Null allocateResponse from allocate"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java index 23cd3e21734..9b4d91d3dc5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/server/MockResourceManagerFacade.java @@ -251,8 +251,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, ApplicationAttemptId attemptId = getAppIdentifier(); LOG.info("Registering application attempt: " + attemptId); - shouldReRegisterNext = false; - List containersFromPreviousAttempt = null; synchronized (applicationContainerIdMap) { @@ -266,7 +264,7 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, containersFromPreviousAttempt.add(Container.newInstance(containerId, null, null, null, null, null)); } - } else { + } else if (!shouldReRegisterNext) { throw new InvalidApplicationMasterRequestException( AMRMClientUtils.APP_ALREADY_REGISTERED_MESSAGE); } @@ -276,6 +274,8 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, } } + shouldReRegisterNext = false; + // Make sure we wait for certain test cases last in the method synchronized (syncObj) { syncObj.notifyAll(); @@ -339,13 +339,6 @@ public class MockResourceManagerFacade implements ApplicationClientProtocol, validateRunning(); - if (request.getAskList() != null && request.getAskList().size() > 0 - && request.getReleaseList() != null - && request.getReleaseList().size() > 0) { - Assert.fail("The mock RM implementation does not support receiving " - + "askList and releaseList in the same heartbeat"); - } - ApplicationAttemptId attemptId = getAppIdentifier(); LOG.info("Allocate from application attempt: " + attemptId); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java index 57407490c4c..645e47e5af0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/FederationInterceptor.java @@ -62,14 +62,15 @@ import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.StrictPreemptionContract; import org.apache.hadoop.yarn.api.records.UpdateContainerRequest; -import org.apache.hadoop.yarn.client.AMRMClientUtils; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterRequestProto; import org.apache.hadoop.yarn.proto.YarnServiceProtos.RegisterApplicationMasterResponseProto; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; +import org.apache.hadoop.yarn.server.AMRMClientRelayer; import org.apache.hadoop.yarn.server.federation.failover.FederationProxyProviderUtil; import org.apache.hadoop.yarn.server.federation.policies.FederationPolicyUtils; import org.apache.hadoop.yarn.server.federation.policies.amrmproxy.FederationAMRMProxyPolicy; @@ -106,9 +107,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { public static final String NMSS_REG_RESPONSE_KEY = NMSS_CLASS_PREFIX + "registerResponse"; - /* + /** * When AMRMProxy HA is enabled, secondary AMRMTokens will be stored in Yarn - * Registry. Otherwise if NM recovery is enabled, the UAM token are store in + * Registry. Otherwise if NM recovery is enabled, the UAM token are stored in * local NMSS instead under this directory name. */ public static final String NMSS_SECONDARY_SC_PREFIX = @@ -119,8 +120,23 @@ public class FederationInterceptor extends AbstractRequestInterceptor { * The home sub-cluster is the sub-cluster where the AM container is running * in. */ - private ApplicationMasterProtocol homeRM; + private AMRMClientRelayer homeRMRelayer; private SubClusterId homeSubClusterId; + private volatile int lastHomeResponseId; + + /** + * A flag for work preserving NM restart. If we just recovered, we need to + * generate an {@link ApplicationMasterNotRegisteredException} exception back + * to AM (similar to what RM will do after its restart/fail-over) in its next + * allocate to trigger AM re-register (which we will shield from RM and just + * return our saved register response) and a full pending requests re-send, so + * that all the {@link AMRMClientRelayer} will be re-populated with all + * pending requests. + * + * TODO: When split-merge is not idempotent, this can lead to some + * over-allocation without a full cancel to RM. + */ + private volatile boolean justRecovered; /** * UAM pool for secondary sub-clusters (ones other than home sub-cluster), @@ -134,6 +150,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor { */ private UnmanagedAMPoolManager uamPool; + /** + * The rmProxy relayers for secondary sub-clusters that keep track of all + * pending requests. + */ + private Map secondaryRelayers; + /** Thread pool used for asynchronous operations. */ private ExecutorService threadpool; @@ -186,8 +208,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.asyncResponseSink = new ConcurrentHashMap<>(); this.threadpool = Executors.newCachedThreadPool(); this.uamPool = createUnmanagedAMPoolManager(this.threadpool); + this.secondaryRelayers = new ConcurrentHashMap<>(); this.amRegistrationRequest = null; this.amRegistrationResponse = null; + this.lastHomeResponseId = Integer.MAX_VALUE; + this.justRecovered = false; } /** @@ -224,8 +249,8 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.homeSubClusterId = SubClusterId.newInstance(YarnConfiguration.getClusterId(conf)); - this.homeRM = createHomeRMProxy(appContext, ApplicationMasterProtocol.class, - this.appOwner); + this.homeRMRelayer = new AMRMClientRelayer(createHomeRMProxy(appContext, + ApplicationMasterProtocol.class, this.appOwner)); this.federationFacade = FederationStateStoreFacade.getInstance(); this.subClusterResolver = this.federationFacade.getSubClusterResolver(); @@ -240,13 +265,12 @@ public class FederationInterceptor extends AbstractRequestInterceptor { @Override public void recover(Map recoveredDataMap) { super.recover(recoveredDataMap); - LOG.info("Recovering data for FederationInterceptor"); + ApplicationAttemptId attemptId = + getApplicationContext().getApplicationAttemptId(); + LOG.info("Recovering data for FederationInterceptor for {}", attemptId); if (recoveredDataMap == null) { return; } - - ApplicationAttemptId attemptId = - getApplicationContext().getApplicationAttemptId(); try { if (recoveredDataMap.containsKey(NMSS_REG_REQUEST_KEY)) { RegisterApplicationMasterRequestProto pb = @@ -255,6 +279,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.amRegistrationRequest = new RegisterApplicationMasterRequestPBImpl(pb); LOG.info("amRegistrationRequest recovered for {}", attemptId); + + // Give the register request to homeRMRelayer for future re-registration + this.homeRMRelayer.setAMRegistrationRequest(this.amRegistrationRequest); } if (recoveredDataMap.containsKey(NMSS_REG_RESPONSE_KEY)) { RegisterApplicationMasterResponseProto pb = @@ -263,6 +290,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { this.amRegistrationResponse = new RegisterApplicationMasterResponsePBImpl(pb); LOG.info("amRegistrationResponse recovered for {}", attemptId); + // Trigger re-register and full pending re-send only if we have a + // saved register response. This should always be true though. + this.justRecovered = true; } // Recover UAM amrmTokens from registry or NMSS @@ -309,6 +339,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { getApplicationContext().getUser(), this.homeSubClusterId.getId(), entry.getValue()); + this.secondaryRelayers.put(subClusterId.getId(), + this.uamPool.getAMRMClientRelayer(subClusterId.getId())); + RegisterApplicationMasterResponse response = this.uamPool.registerApplicationMaster(subClusterId.getId(), this.amRegistrationRequest); @@ -436,7 +469,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { * the other sub-cluster RM will be done lazily as needed later. */ this.amRegistrationResponse = - this.homeRM.registerApplicationMaster(request); + this.homeRMRelayer.registerApplicationMaster(request); if (this.amRegistrationResponse .getContainersFromPreviousAttempts() != null) { cacheAllocatedContainers( @@ -495,6 +528,34 @@ public class FederationInterceptor extends AbstractRequestInterceptor { Preconditions.checkArgument(this.policyInterpreter != null, "Allocate should be called after registerApplicationMaster"); + if (this.justRecovered && this.lastHomeResponseId == Integer.MAX_VALUE) { + // Save the responseId home RM is expecting + this.lastHomeResponseId = request.getResponseId(); + + throw new ApplicationMasterNotRegisteredException( + "AMRMProxy just restarted and recovered for " + + getApplicationContext().getApplicationAttemptId() + + ". AM should re-register and full re-send pending requests."); + } + + // Override responseId in the request in two cases: + // + // 1. After we just recovered after an NM restart and AM's responseId is + // reset due to the exception we generate. We need to override the + // responseId to the one homeRM expects. + // + // 2. After homeRM fail-over, the allocate response with reseted responseId + // might not be returned successfully back to AM because of RPC connection + // timeout between AM and AMRMProxy. In this case, we remember and reset the + // responseId for AM. + if (this.justRecovered + || request.getResponseId() > this.lastHomeResponseId) { + LOG.warn("Setting allocate responseId for {} from {} to {}", + getApplicationContext().getApplicationAttemptId(), + request.getResponseId(), this.lastHomeResponseId); + request.setResponseId(this.lastHomeResponseId); + } + try { // Split the heart beat request into multiple requests, one for each // sub-cluster RM that is used by this application. @@ -509,10 +570,18 @@ public class FederationInterceptor extends AbstractRequestInterceptor { sendRequestsToSecondaryResourceManagers(requests); // Send the request to the home RM and get the response - AllocateResponse homeResponse = AMRMClientUtils.allocateWithReRegister( - requests.get(this.homeSubClusterId), this.homeRM, - this.amRegistrationRequest, - getApplicationContext().getApplicationAttemptId().getApplicationId()); + AllocateRequest homeRequest = requests.get(this.homeSubClusterId); + LOG.info("{} heartbeating to home RM with responseId {}", + getApplicationContext().getApplicationAttemptId(), + homeRequest.getResponseId()); + + AllocateResponse homeResponse = this.homeRMRelayer.allocate(homeRequest); + + // Reset the flag after the first successful homeRM allocate response, + // otherwise keep overriding the responseId of new allocate request + if (this.justRecovered) { + this.justRecovered = false; + } // Notify policy of home response try { @@ -540,6 +609,22 @@ public class FederationInterceptor extends AbstractRequestInterceptor { newRegistrations.getSuccessfulRegistrations()); } + LOG.info("{} heartbeat response from home RM with responseId {}", + getApplicationContext().getApplicationAttemptId(), + homeResponse.getResponseId()); + + // Update lastHomeResponseId in three cases: + // 1. The normal responseId increments + // 2. homeResponse.getResponseId() == 1. This happens when homeRM fails + // over, AMRMClientRelayer auto re-register and full re-send for homeRM. + // 3. lastHomeResponseId == MAX_INT. This is the initial case or + // responseId about to overflow and wrap around + if (homeResponse.getResponseId() == this.lastHomeResponseId + 1 + || homeResponse.getResponseId() == 1 + || this.lastHomeResponseId == Integer.MAX_VALUE) { + this.lastHomeResponseId = homeResponse.getResponseId(); + } + // return the final response to the application master. return homeResponse; } catch (IOException ex) { @@ -584,6 +669,16 @@ public class FederationInterceptor extends AbstractRequestInterceptor { try { uamResponse = uamPool.finishApplicationMaster(subClusterId, finishRequest); + + if (uamResponse.getIsUnregistered()) { + secondaryRelayers.remove(subClusterId); + + if (getNMStateStore() != null) { + getNMStateStore().removeAMRMProxyAppContextEntry( + getApplicationContext().getApplicationAttemptId(), + NMSS_SECONDARY_SC_PREFIX + subClusterId); + } + } } catch (Throwable e) { LOG.warn("Failed to finish unmanaged application master: " + "RM address: " + subClusterId + " ApplicationId: " @@ -600,9 +695,7 @@ public class FederationInterceptor extends AbstractRequestInterceptor { // asynchronously by other sub-cluster resource managers, send the same // request to the home resource manager on this thread. FinishApplicationMasterResponse homeResponse = - AMRMClientUtils.finishAMWithReRegister(request, this.homeRM, - this.amRegistrationRequest, getApplicationContext() - .getApplicationAttemptId().getApplicationId()); + this.homeRMRelayer.finishApplicationMaster(request); if (subClusterIds.size() > 0) { // Wait for other sub-cluster resource managers to return the @@ -621,10 +714,6 @@ public class FederationInterceptor extends AbstractRequestInterceptor { if (uamResponse.getResponse() == null || !uamResponse.getResponse().getIsUnregistered()) { failedToUnRegister = true; - } else if (getNMStateStore() != null) { - getNMStateStore().removeAMRMProxyAppContextEntry( - getApplicationContext().getApplicationAttemptId(), - NMSS_SECONDARY_SC_PREFIX + uamResponse.getSubClusterId()); } } catch (Throwable e) { failedToUnRegister = true; @@ -689,6 +778,11 @@ public class FederationInterceptor extends AbstractRequestInterceptor { return this.registryClient; } + @VisibleForTesting + protected int getLastHomeResponseId() { + return this.lastHomeResponseId; + } + /** * Create the UAM pool manager for secondary sub-clsuters. For unit test to * override. @@ -800,6 +894,9 @@ public class FederationInterceptor extends AbstractRequestInterceptor { getApplicationContext().getUser(), homeSubClusterId.getId(), amrmToken); + secondaryRelayers.put(subClusterId.getId(), + uamPool.getAMRMClientRelayer(subClusterId.getId())); + response = uamPool.registerApplicationMaster( subClusterId.getId(), amRegistrationRequest); @@ -1098,7 +1195,10 @@ public class FederationInterceptor extends AbstractRequestInterceptor { token = uamPool.launchUAM(subClusterId, config, appContext.getApplicationAttemptId().getApplicationId(), amRegistrationResponse.getQueue(), appContext.getUser(), - homeSubClusterId.toString(), registryClient != null); + homeSubClusterId.toString(), true); + + secondaryRelayers.put(subClusterId, + uamPool.getAMRMClientRelayer(subClusterId)); uamResponse = uamPool.registerApplicationMaster(subClusterId, registerRequest); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java index 677732d634c..2794857c6e7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; +import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; @@ -536,6 +537,7 @@ public abstract class BaseAMRMProxyTest { capability.setMemorySize(memory); capability.setVirtualCores(vCores); req.setCapability(capability); + req.setExecutionTypeRequest(ExecutionTypeRequest.newInstance()); if (labelExpression != null) { req.setNodeLabelExpression(labelExpression); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java index eefaba1a8bd..a837eed14ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/TestFederationInterceptor.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.UpdateContainerError; import org.apache.hadoop.yarn.api.records.UpdatedContainer; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.ApplicationMasterNotRegisteredException; import org.apache.hadoop.yarn.exceptions.InvalidApplicationMasterRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.MockResourceManagerFacade; @@ -516,6 +517,22 @@ public class TestFederationInterceptor extends BaseAMRMProxyTest { interceptor.recover(recoveredDataMap); Assert.assertEquals(1, interceptor.getUnmanagedAMPoolSize()); + Assert.assertEquals(Integer.MAX_VALUE, + interceptor.getLastHomeResponseId()); + + // The first allocate call expects a fail-over exception and re-register + int responseId = 10; + AllocateRequest allocateRequest = + Records.newRecord(AllocateRequest.class); + allocateRequest.setResponseId(responseId); + try { + interceptor.allocate(allocateRequest); + Assert.fail("Expecting an ApplicationMasterNotRegisteredException " + + " after FederationInterceptor restarts and recovers"); + } catch (ApplicationMasterNotRegisteredException e) { + } + interceptor.registerApplicationMaster(registerReq); + Assert.assertEquals(responseId, interceptor.getLastHomeResponseId()); // Release all containers releaseContainersAndAssert(containers); From 4a08ddfa68a405bfd97ffd96fafc1e3d48d20d7e Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Mon, 9 Jul 2018 15:43:38 -0400 Subject: [PATCH 5/8] HADOOP-15568. fix some typos in the .sh comments. Contributed by Steve Loughran. --- .../hadoop-common/src/main/conf/hadoop-env.sh | 6 +++--- .../hadoop-common/src/main/conf/hadoop-metrics2.properties | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh index 3826f67a5ea..6db085a3261 100644 --- a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh +++ b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh @@ -88,7 +88,7 @@ # Extra Java runtime options for all Hadoop commands. We don't support # IPv6 yet/still, so by default the preference is set to IPv4. # export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true" -# For Kerberos debugging, an extended option set logs more invormation +# For Kerberos debugging, an extended option set logs more information # export HADOOP_OPTS="-Djava.net.preferIPv4Stack=true -Dsun.security.krb5.debug=true -Dsun.security.spnego.debug" # Some parts of the shell code may do special things dependent upon @@ -120,9 +120,9 @@ esac # # By default, Apache Hadoop overrides Java's CLASSPATH # environment variable. It is configured such -# that it sarts out blank with new entries added after passing +# that it starts out blank with new entries added after passing # a series of checks (file/dir exists, not already listed aka -# de-deduplication). During de-depulication, wildcards and/or +# de-deduplication). During de-deduplication, wildcards and/or # directories are *NOT* expanded to keep it simple. Therefore, # if the computed classpath has two specific mentions of # awesome-methods-1.0.jar, only the first one added will be seen. diff --git a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-metrics2.properties b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-metrics2.properties index 16fdcf05629..f061313cb4d 100644 --- a/hadoop-common-project/hadoop-common/src/main/conf/hadoop-metrics2.properties +++ b/hadoop-common-project/hadoop-common/src/main/conf/hadoop-metrics2.properties @@ -47,7 +47,7 @@ #*.sink.ganglia.dmax=jvm.metrics.threadsBlocked=70,jvm.metrics.memHeapUsedM=40 # Tag values to use for the ganglia prefix. If not defined no tags are used. -# If '*' all tags are used. If specifiying multiple tags separate them with +# If '*' all tags are used. If specifying multiple tags separate them with # commas. Note that the last segment of the property name is the context name. # # A typical use of tags is separating the metrics by the HDFS rpc port From cb5e225868a069d6d16244b462ebada44465dce8 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Mon, 9 Jul 2018 12:52:39 -0700 Subject: [PATCH 6/8] HDDS-224. Create metrics for Event Watcher. Contributed b Elek, Marton. --- .../hadoop/hdds/server/events/EventQueue.java | 118 ++++++++++-------- .../server/events/SingleThreadExecutor.java | 35 ++++-- .../hdds/server/events/TestEventQueue.java | 35 +----- 3 files changed, 96 insertions(+), 92 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java index 44d85f5ffc0..7e29223461f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -18,7 +18,11 @@ package org.apache.hadoop.hdds.server.events; import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; + +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +46,8 @@ public class EventQueue implements EventPublisher, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(EventQueue.class); + private static final String EXECUTOR_NAME_SEPARATOR = "For"; + private final Map>> executors = new HashMap<>(); @@ -51,38 +57,74 @@ public class EventQueue implements EventPublisher, AutoCloseable { public > void addHandler( EVENT_TYPE event, EventHandler handler) { - - this.addHandler(event, new SingleThreadExecutor<>( - event.getName()), handler); - } - - public > void addHandler( - EVENT_TYPE event, - EventExecutor executor, - EventHandler handler) { - - executors.putIfAbsent(event, new HashMap<>()); - executors.get(event).putIfAbsent(executor, new ArrayList<>()); - - executors.get(event) - .get(executor) - .add(handler); + this.addHandler(event, handler, generateHandlerName(handler)); } /** - * Creates one executor with multiple event handlers. + * Add new handler to the event queue. + *

+ * By default a separated single thread executor will be dedicated to + * deliver the events to the registered event handler. + * + * @param event Triggering event. + * @param handler Handler of event (will be called from a separated + * thread) + * @param handlerName The name of handler (should be unique together with + * the event name) + * @param The type of the event payload. + * @param The type of the event identifier. */ - public void addHandlerGroup(String name, HandlerForEvent... - eventsAndHandlers) { - SingleThreadExecutor sharedExecutor = - new SingleThreadExecutor(name); - for (HandlerForEvent handlerForEvent : eventsAndHandlers) { - addHandler(handlerForEvent.event, sharedExecutor, - handlerForEvent.handler); - } + public > void addHandler( + EVENT_TYPE event, EventHandler handler, String handlerName) { + validateEvent(event); + Preconditions.checkNotNull(handler, "Handler name should not be null."); + String executorName = + StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR + + handlerName; + this.addHandler(event, new SingleThreadExecutor<>(executorName), handler); + } + + private > void validateEvent(EVENT_TYPE event) { + Preconditions + .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR), + "Event name should not contain " + EXECUTOR_NAME_SEPARATOR + + " string."); } + private String generateHandlerName(EventHandler handler) { + if (!"".equals(handler.getClass().getSimpleName())) { + return handler.getClass().getSimpleName(); + } else { + return handler.getClass().getName(); + } + } + + /** + * Add event handler with custom executor. + * + * @param event Triggering event. + * @param executor The executor imlementation to deliver events from a + * separated threads. Please keep in your mind that + * registering metrics is the responsibility of the + * caller. + * @param handler Handler of event (will be called from a separated + * thread) + * @param The type of the event payload. + * @param The type of the event identifier. + */ + public > void addHandler( + EVENT_TYPE event, EventExecutor executor, + EventHandler handler) { + validateEvent(event); + executors.putIfAbsent(event, new HashMap<>()); + executors.get(event).putIfAbsent(executor, new ArrayList<>()); + + executors.get(event).get(executor).add(handler); + } + + + /** * Route an event with payload to the right listener(s). * @@ -183,31 +225,5 @@ public class EventQueue implements EventPublisher, AutoCloseable { }); } - /** - * Event identifier together with the handler. - * - * @param - */ - public static class HandlerForEvent { - - private final Event event; - - private final EventHandler handler; - - public HandlerForEvent( - Event event, - EventHandler handler) { - this.event = event; - this.handler = handler; - } - - public Event getEvent() { - return event; - } - - public EventHandler getHandler() { - return handler; - } - } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java index a64e3d761dd..3253f2d5db2 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java @@ -23,13 +23,18 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; /** * Simple EventExecutor to call all the event handler one-by-one. * * @param */ +@Metrics(context = "EventQueue") public class SingleThreadExecutor implements EventExecutor { public static final String THREAD_NAME_PREFIX = "EventQueue"; @@ -41,14 +46,24 @@ public class SingleThreadExecutor implements EventExecutor { private final ThreadPoolExecutor executor; - private final AtomicLong queuedCount = new AtomicLong(0); + @Metric + private MutableCounterLong queued; - private final AtomicLong successfulCount = new AtomicLong(0); + @Metric + private MutableCounterLong done; - private final AtomicLong failedCount = new AtomicLong(0); + @Metric + private MutableCounterLong failed; + /** + * Create SingleThreadExecutor. + * + * @param name Unique name used in monitoring and metrics. + */ public SingleThreadExecutor(String name) { this.name = name; + DefaultMetricsSystem.instance() + .register("EventQueue" + name, "Event Executor metrics ", this); LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>(); executor = @@ -64,31 +79,31 @@ public class SingleThreadExecutor implements EventExecutor { @Override public void onMessage(EventHandler handler, T message, EventPublisher publisher) { - queuedCount.incrementAndGet(); + queued.incr(); executor.execute(() -> { try { handler.onMessage(message, publisher); - successfulCount.incrementAndGet(); + done.incr(); } catch (Exception ex) { LOG.error("Error on execution message {}", message, ex); - failedCount.incrementAndGet(); + failed.incr(); } }); } @Override public long failedEvents() { - return failedCount.get(); + return failed.value(); } @Override public long successfulEvents() { - return successfulCount.get(); + return done.value(); } @Override public long queuedEvents() { - return queuedCount.get(); + return queued.value(); } @Override diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java index 39444097fed..2bdf705cfee 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java @@ -25,6 +25,8 @@ import org.junit.Test; import java.util.Set; import java.util.stream.Collectors; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + /** * Testing the basic functionality of the event queue. */ @@ -44,11 +46,13 @@ public class TestEventQueue { @Before public void startEventQueue() { + DefaultMetricsSystem.initialize(getClass().getSimpleName()); queue = new EventQueue(); } @After public void stopEventQueue() { + DefaultMetricsSystem.shutdown(); queue.close(); } @@ -79,35 +83,4 @@ public class TestEventQueue { } - @Test - public void handlerGroup() { - final long[] result = new long[2]; - queue.addHandlerGroup( - "group", - new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) -> - result[0] = payload), - new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) -> - result[1] = payload) - ); - - queue.fireEvent(EVENT3, 23L); - queue.fireEvent(EVENT4, 42L); - - queue.processAll(1000); - - Assert.assertEquals(23, result[0]); - Assert.assertEquals(42, result[1]); - - Set eventQueueThreadNames = - Thread.getAllStackTraces().keySet() - .stream() - .filter(t -> t.getName().startsWith(SingleThreadExecutor - .THREAD_NAME_PREFIX)) - .map(Thread::getName) - .collect(Collectors.toSet()); - System.out.println(eventQueueThreadNames); - Assert.assertEquals(1, eventQueueThreadNames.size()); - - } - } \ No newline at end of file From 3c0a66abe632277e89fccd8dced9e71ca5d87df0 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Mon, 9 Jul 2018 13:03:57 -0700 Subject: [PATCH 7/8] Revert "HDDS-224. Create metrics for Event Watcher." This reverts commit cb5e225868a069d6d16244b462ebada44465dce8. The JIRA number is wrong, reverting to fix it. --- .../hadoop/hdds/server/events/EventQueue.java | 110 ++++++++---------- .../server/events/SingleThreadExecutor.java | 35 ++---- .../hdds/server/events/TestEventQueue.java | 35 +++++- 3 files changed, 88 insertions(+), 92 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java index 7e29223461f..44d85f5ffc0 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -18,11 +18,7 @@ package org.apache.hadoop.hdds.server.events; import com.google.common.annotations.VisibleForTesting; - -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; - -import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -46,8 +42,6 @@ public class EventQueue implements EventPublisher, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(EventQueue.class); - private static final String EXECUTOR_NAME_SEPARATOR = "For"; - private final Map>> executors = new HashMap<>(); @@ -57,73 +51,37 @@ public class EventQueue implements EventPublisher, AutoCloseable { public > void addHandler( EVENT_TYPE event, EventHandler handler) { - this.addHandler(event, handler, generateHandlerName(handler)); + + this.addHandler(event, new SingleThreadExecutor<>( + event.getName()), handler); } - /** - * Add new handler to the event queue. - *

- * By default a separated single thread executor will be dedicated to - * deliver the events to the registered event handler. - * - * @param event Triggering event. - * @param handler Handler of event (will be called from a separated - * thread) - * @param handlerName The name of handler (should be unique together with - * the event name) - * @param The type of the event payload. - * @param The type of the event identifier. - */ public > void addHandler( - EVENT_TYPE event, EventHandler handler, String handlerName) { - validateEvent(event); - Preconditions.checkNotNull(handler, "Handler name should not be null."); - String executorName = - StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR - + handlerName; - this.addHandler(event, new SingleThreadExecutor<>(executorName), handler); - } - - private > void validateEvent(EVENT_TYPE event) { - Preconditions - .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR), - "Event name should not contain " + EXECUTOR_NAME_SEPARATOR - + " string."); - - } - - private String generateHandlerName(EventHandler handler) { - if (!"".equals(handler.getClass().getSimpleName())) { - return handler.getClass().getSimpleName(); - } else { - return handler.getClass().getName(); - } - } - - /** - * Add event handler with custom executor. - * - * @param event Triggering event. - * @param executor The executor imlementation to deliver events from a - * separated threads. Please keep in your mind that - * registering metrics is the responsibility of the - * caller. - * @param handler Handler of event (will be called from a separated - * thread) - * @param The type of the event payload. - * @param The type of the event identifier. - */ - public > void addHandler( - EVENT_TYPE event, EventExecutor executor, + EVENT_TYPE event, + EventExecutor executor, EventHandler handler) { - validateEvent(event); + executors.putIfAbsent(event, new HashMap<>()); executors.get(event).putIfAbsent(executor, new ArrayList<>()); - executors.get(event).get(executor).add(handler); + executors.get(event) + .get(executor) + .add(handler); } + /** + * Creates one executor with multiple event handlers. + */ + public void addHandlerGroup(String name, HandlerForEvent... + eventsAndHandlers) { + SingleThreadExecutor sharedExecutor = + new SingleThreadExecutor(name); + for (HandlerForEvent handlerForEvent : eventsAndHandlers) { + addHandler(handlerForEvent.event, sharedExecutor, + handlerForEvent.handler); + } + } /** * Route an event with payload to the right listener(s). @@ -225,5 +183,31 @@ public class EventQueue implements EventPublisher, AutoCloseable { }); } + /** + * Event identifier together with the handler. + * + * @param + */ + public static class HandlerForEvent { + + private final Event event; + + private final EventHandler handler; + + public HandlerForEvent( + Event event, + EventHandler handler) { + this.event = event; + this.handler = handler; + } + + public Event getEvent() { + return event; + } + + public EventHandler getHandler() { + return handler; + } + } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java index 3253f2d5db2..a64e3d761dd 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java @@ -23,18 +23,13 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.metrics2.annotation.Metric; -import org.apache.hadoop.metrics2.annotation.Metrics; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; -import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import java.util.concurrent.atomic.AtomicLong; /** * Simple EventExecutor to call all the event handler one-by-one. * * @param */ -@Metrics(context = "EventQueue") public class SingleThreadExecutor implements EventExecutor { public static final String THREAD_NAME_PREFIX = "EventQueue"; @@ -46,24 +41,14 @@ public class SingleThreadExecutor implements EventExecutor { private final ThreadPoolExecutor executor; - @Metric - private MutableCounterLong queued; + private final AtomicLong queuedCount = new AtomicLong(0); - @Metric - private MutableCounterLong done; + private final AtomicLong successfulCount = new AtomicLong(0); - @Metric - private MutableCounterLong failed; + private final AtomicLong failedCount = new AtomicLong(0); - /** - * Create SingleThreadExecutor. - * - * @param name Unique name used in monitoring and metrics. - */ public SingleThreadExecutor(String name) { this.name = name; - DefaultMetricsSystem.instance() - .register("EventQueue" + name, "Event Executor metrics ", this); LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>(); executor = @@ -79,31 +64,31 @@ public class SingleThreadExecutor implements EventExecutor { @Override public void onMessage(EventHandler handler, T message, EventPublisher publisher) { - queued.incr(); + queuedCount.incrementAndGet(); executor.execute(() -> { try { handler.onMessage(message, publisher); - done.incr(); + successfulCount.incrementAndGet(); } catch (Exception ex) { LOG.error("Error on execution message {}", message, ex); - failed.incr(); + failedCount.incrementAndGet(); } }); } @Override public long failedEvents() { - return failed.value(); + return failedCount.get(); } @Override public long successfulEvents() { - return done.value(); + return successfulCount.get(); } @Override public long queuedEvents() { - return queued.value(); + return queuedCount.get(); } @Override diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java index 2bdf705cfee..39444097fed 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java @@ -25,8 +25,6 @@ import org.junit.Test; import java.util.Set; import java.util.stream.Collectors; -import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; - /** * Testing the basic functionality of the event queue. */ @@ -46,13 +44,11 @@ public class TestEventQueue { @Before public void startEventQueue() { - DefaultMetricsSystem.initialize(getClass().getSimpleName()); queue = new EventQueue(); } @After public void stopEventQueue() { - DefaultMetricsSystem.shutdown(); queue.close(); } @@ -83,4 +79,35 @@ public class TestEventQueue { } + @Test + public void handlerGroup() { + final long[] result = new long[2]; + queue.addHandlerGroup( + "group", + new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) -> + result[0] = payload), + new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) -> + result[1] = payload) + ); + + queue.fireEvent(EVENT3, 23L); + queue.fireEvent(EVENT4, 42L); + + queue.processAll(1000); + + Assert.assertEquals(23, result[0]); + Assert.assertEquals(42, result[1]); + + Set eventQueueThreadNames = + Thread.getAllStackTraces().keySet() + .stream() + .filter(t -> t.getName().startsWith(SingleThreadExecutor + .THREAD_NAME_PREFIX)) + .map(Thread::getName) + .collect(Collectors.toSet()); + System.out.println(eventQueueThreadNames); + Assert.assertEquals(1, eventQueueThreadNames.size()); + + } + } \ No newline at end of file From 2403231c8c3685ba08cd6bdf715d281cae611e45 Mon Sep 17 00:00:00 2001 From: Anu Engineer Date: Mon, 9 Jul 2018 13:04:44 -0700 Subject: [PATCH 8/8] HDDS-240. Implement metrics for EventQueue. Contributed by Elek, Marton. --- .../hadoop/hdds/server/events/EventQueue.java | 118 ++++++++++-------- .../server/events/SingleThreadExecutor.java | 35 ++++-- .../hdds/server/events/TestEventQueue.java | 35 +----- 3 files changed, 96 insertions(+), 92 deletions(-) diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java index 44d85f5ffc0..7e29223461f 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -18,7 +18,11 @@ package org.apache.hadoop.hdds.server.events; import com.google.common.annotations.VisibleForTesting; + +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; + +import com.google.common.base.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,6 +46,8 @@ public class EventQueue implements EventPublisher, AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(EventQueue.class); + private static final String EXECUTOR_NAME_SEPARATOR = "For"; + private final Map>> executors = new HashMap<>(); @@ -51,38 +57,74 @@ public class EventQueue implements EventPublisher, AutoCloseable { public > void addHandler( EVENT_TYPE event, EventHandler handler) { - - this.addHandler(event, new SingleThreadExecutor<>( - event.getName()), handler); - } - - public > void addHandler( - EVENT_TYPE event, - EventExecutor executor, - EventHandler handler) { - - executors.putIfAbsent(event, new HashMap<>()); - executors.get(event).putIfAbsent(executor, new ArrayList<>()); - - executors.get(event) - .get(executor) - .add(handler); + this.addHandler(event, handler, generateHandlerName(handler)); } /** - * Creates one executor with multiple event handlers. + * Add new handler to the event queue. + *

+ * By default a separated single thread executor will be dedicated to + * deliver the events to the registered event handler. + * + * @param event Triggering event. + * @param handler Handler of event (will be called from a separated + * thread) + * @param handlerName The name of handler (should be unique together with + * the event name) + * @param The type of the event payload. + * @param The type of the event identifier. */ - public void addHandlerGroup(String name, HandlerForEvent... - eventsAndHandlers) { - SingleThreadExecutor sharedExecutor = - new SingleThreadExecutor(name); - for (HandlerForEvent handlerForEvent : eventsAndHandlers) { - addHandler(handlerForEvent.event, sharedExecutor, - handlerForEvent.handler); - } + public > void addHandler( + EVENT_TYPE event, EventHandler handler, String handlerName) { + validateEvent(event); + Preconditions.checkNotNull(handler, "Handler name should not be null."); + String executorName = + StringUtils.camelize(event.getName()) + EXECUTOR_NAME_SEPARATOR + + handlerName; + this.addHandler(event, new SingleThreadExecutor<>(executorName), handler); + } + + private > void validateEvent(EVENT_TYPE event) { + Preconditions + .checkArgument(!event.getName().contains(EXECUTOR_NAME_SEPARATOR), + "Event name should not contain " + EXECUTOR_NAME_SEPARATOR + + " string."); } + private String generateHandlerName(EventHandler handler) { + if (!"".equals(handler.getClass().getSimpleName())) { + return handler.getClass().getSimpleName(); + } else { + return handler.getClass().getName(); + } + } + + /** + * Add event handler with custom executor. + * + * @param event Triggering event. + * @param executor The executor imlementation to deliver events from a + * separated threads. Please keep in your mind that + * registering metrics is the responsibility of the + * caller. + * @param handler Handler of event (will be called from a separated + * thread) + * @param The type of the event payload. + * @param The type of the event identifier. + */ + public > void addHandler( + EVENT_TYPE event, EventExecutor executor, + EventHandler handler) { + validateEvent(event); + executors.putIfAbsent(event, new HashMap<>()); + executors.get(event).putIfAbsent(executor, new ArrayList<>()); + + executors.get(event).get(executor).add(handler); + } + + + /** * Route an event with payload to the right listener(s). * @@ -183,31 +225,5 @@ public class EventQueue implements EventPublisher, AutoCloseable { }); } - /** - * Event identifier together with the handler. - * - * @param - */ - public static class HandlerForEvent { - - private final Event event; - - private final EventHandler handler; - - public HandlerForEvent( - Event event, - EventHandler handler) { - this.event = event; - this.handler = handler; - } - - public Event getEvent() { - return event; - } - - public EventHandler getHandler() { - return handler; - } - } } diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java index a64e3d761dd..3253f2d5db2 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/SingleThreadExecutor.java @@ -23,13 +23,18 @@ import org.slf4j.LoggerFactory; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.metrics2.annotation.Metric; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; /** * Simple EventExecutor to call all the event handler one-by-one. * * @param */ +@Metrics(context = "EventQueue") public class SingleThreadExecutor implements EventExecutor { public static final String THREAD_NAME_PREFIX = "EventQueue"; @@ -41,14 +46,24 @@ public class SingleThreadExecutor implements EventExecutor { private final ThreadPoolExecutor executor; - private final AtomicLong queuedCount = new AtomicLong(0); + @Metric + private MutableCounterLong queued; - private final AtomicLong successfulCount = new AtomicLong(0); + @Metric + private MutableCounterLong done; - private final AtomicLong failedCount = new AtomicLong(0); + @Metric + private MutableCounterLong failed; + /** + * Create SingleThreadExecutor. + * + * @param name Unique name used in monitoring and metrics. + */ public SingleThreadExecutor(String name) { this.name = name; + DefaultMetricsSystem.instance() + .register("EventQueue" + name, "Event Executor metrics ", this); LinkedBlockingQueue workQueue = new LinkedBlockingQueue<>(); executor = @@ -64,31 +79,31 @@ public class SingleThreadExecutor implements EventExecutor { @Override public void onMessage(EventHandler handler, T message, EventPublisher publisher) { - queuedCount.incrementAndGet(); + queued.incr(); executor.execute(() -> { try { handler.onMessage(message, publisher); - successfulCount.incrementAndGet(); + done.incr(); } catch (Exception ex) { LOG.error("Error on execution message {}", message, ex); - failedCount.incrementAndGet(); + failed.incr(); } }); } @Override public long failedEvents() { - return failedCount.get(); + return failed.value(); } @Override public long successfulEvents() { - return successfulCount.get(); + return done.value(); } @Override public long queuedEvents() { - return queuedCount.get(); + return queued.value(); } @Override diff --git a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java index 39444097fed..2bdf705cfee 100644 --- a/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java +++ b/hadoop-hdds/framework/src/test/java/org/apache/hadoop/hdds/server/events/TestEventQueue.java @@ -25,6 +25,8 @@ import org.junit.Test; import java.util.Set; import java.util.stream.Collectors; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + /** * Testing the basic functionality of the event queue. */ @@ -44,11 +46,13 @@ public class TestEventQueue { @Before public void startEventQueue() { + DefaultMetricsSystem.initialize(getClass().getSimpleName()); queue = new EventQueue(); } @After public void stopEventQueue() { + DefaultMetricsSystem.shutdown(); queue.close(); } @@ -79,35 +83,4 @@ public class TestEventQueue { } - @Test - public void handlerGroup() { - final long[] result = new long[2]; - queue.addHandlerGroup( - "group", - new EventQueue.HandlerForEvent<>(EVENT3, (payload, publisher) -> - result[0] = payload), - new EventQueue.HandlerForEvent<>(EVENT4, (payload, publisher) -> - result[1] = payload) - ); - - queue.fireEvent(EVENT3, 23L); - queue.fireEvent(EVENT4, 42L); - - queue.processAll(1000); - - Assert.assertEquals(23, result[0]); - Assert.assertEquals(42, result[1]); - - Set eventQueueThreadNames = - Thread.getAllStackTraces().keySet() - .stream() - .filter(t -> t.getName().startsWith(SingleThreadExecutor - .THREAD_NAME_PREFIX)) - .map(Thread::getName) - .collect(Collectors.toSet()); - System.out.println(eventQueueThreadNames); - Assert.assertEquals(1, eventQueueThreadNames.size()); - - } - } \ No newline at end of file