From 785ed3f8ae1ccc4b1729710feea4ec9e7d17c4ec Mon Sep 17 00:00:00 2001 From: Rohith Sharma K S Date: Fri, 6 Jul 2018 15:19:01 -0700 Subject: [PATCH] YARN-8302. ATS v2 should handle HBase connection issue properly. Contributed by Billie Rinaldi. (cherry picked from commit ba683204498c97654be4727ab9e128c433a45498) --- .../hadoop/yarn/conf/YarnConfiguration.java | 7 + .../storage/TestTimelineReaderHBaseDown.java | 220 ++++++++++++++++++ .../storage/HBaseTimelineReaderImpl.java | 93 ++++++++ 3 files changed, 320 insertions(+) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index c963e0c04a4..586fabfd27b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -3604,6 +3604,13 @@ public class YarnConfiguration extends Configuration { DEFAULT_TIMELINE_SERVICE_READER_WEBAPP_HTTPS_ADDRESS = DEFAULT_TIMELINE_SERVICE_WEBAPP_HTTPS_ADDRESS; + @Private + public static final String + TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS = + TIMELINE_SERVICE_READER_PREFIX + "storage-monitor.interval-ms"; + public static final long + DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS = 60 * 1000; + /** * Marked collector properties as Private since it run as auxillary service. */ diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java new file mode 100644 index 00000000000..786f529a7aa --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java @@ -0,0 +1,220 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; +import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderServer; +import org.junit.Assert; +import org.junit.Test; + +import java.io.IOException; +import java.util.Set; +import java.util.concurrent.TimeoutException; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS; +import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.DATA_TO_RETRIEVE; +import static org.apache.hadoop.yarn.server.timelineservice.storage.HBaseTimelineReaderImpl.MONITOR_FILTERS; + +public class TestTimelineReaderHBaseDown { + + @Test(timeout=300000) + public void testTimelineReaderHBaseUp() throws Exception { + HBaseTestingUtility util = new HBaseTestingUtility(); + configure(util); + try { + util.startMiniCluster(); + DataGeneratorForTest.createSchema(util.getConfiguration()); + DataGeneratorForTest.loadApps(util, System.currentTimeMillis()); + + TimelineReaderServer server = getTimelineReaderServer(); + server.init(util.getConfiguration()); + HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server); + server.start(); + checkQuery(htr); + } finally { + util.shutdownMiniCluster(); + } + } + + @Test(timeout=300000) + public void testTimelineReaderInitWhenHBaseIsDown() throws + TimeoutException, InterruptedException { + HBaseTestingUtility util = new HBaseTestingUtility(); + configure(util); + TimelineReaderServer server = getTimelineReaderServer(); + + // init timeline reader when hbase is not running + server.init(util.getConfiguration()); + HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server); + server.start(); + waitForHBaseDown(htr); + } + + @Test(timeout=300000) + public void testTimelineReaderDetectsHBaseDown() throws Exception { + HBaseTestingUtility util = new HBaseTestingUtility(); + configure(util); + + try { + // start minicluster + util.startMiniCluster(); + DataGeneratorForTest.createSchema(util.getConfiguration()); + DataGeneratorForTest.loadApps(util, System.currentTimeMillis()); + + // init timeline reader + TimelineReaderServer server = getTimelineReaderServer(); + server.init(util.getConfiguration()); + HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server); + + // stop hbase after timeline reader init + util.shutdownMiniHBaseCluster(); + + // start server and check that it detects hbase is down + server.start(); + waitForHBaseDown(htr); + } finally { + util.shutdownMiniCluster(); + } + } + + @Test(timeout=300000) + public void testTimelineReaderDetectsZooKeeperDown() throws Exception { + HBaseTestingUtility util = new HBaseTestingUtility(); + configure(util); + + try { + // start minicluster + util.startMiniCluster(); + DataGeneratorForTest.createSchema(util.getConfiguration()); + DataGeneratorForTest.loadApps(util, System.currentTimeMillis()); + + // init timeline reader + TimelineReaderServer server = getTimelineReaderServer(); + server.init(util.getConfiguration()); + HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server); + + // stop hbase and zookeeper after timeline reader init + util.shutdownMiniCluster(); + + // start server and check that it detects hbase is down + server.start(); + waitForHBaseDown(htr); + } finally { + util.shutdownMiniCluster(); + } + } + + @Test(timeout=300000) + public void testTimelineReaderRecoversAfterHBaseReturns() throws Exception { + HBaseTestingUtility util = new HBaseTestingUtility(); + configure(util); + + try { + // start minicluster + util.startMiniCluster(); + DataGeneratorForTest.createSchema(util.getConfiguration()); + DataGeneratorForTest.loadApps(util, System.currentTimeMillis()); + + // init timeline reader + TimelineReaderServer server = getTimelineReaderServer(); + server.init(util.getConfiguration()); + HBaseTimelineReaderImpl htr = getHBaseTimelineReaderImpl(server); + + // stop hbase after timeline reader init + util.shutdownMiniHBaseCluster(); + + // start server and check that it detects hbase is down + server.start(); + waitForHBaseDown(htr); + + util.startMiniHBaseCluster(1, 1); + GenericTestUtils.waitFor(() -> !htr.isHBaseDown(), 1000, 150000); + } finally { + util.shutdownMiniCluster(); + } + } + + private static void waitForHBaseDown(HBaseTimelineReaderImpl htr) throws + TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> htr.isHBaseDown(), 1000, 150000); + try { + checkQuery(htr); + Assert.fail("Query should fail when HBase is down"); + } catch (IOException e) { + Assert.assertEquals("HBase is down", e.getMessage()); + } + } + + private static void checkQuery(HBaseTimelineReaderImpl htr) throws + IOException { + TimelineReaderContext context = + new TimelineReaderContext(YarnConfiguration.DEFAULT_RM_CLUSTER_ID, + null, null, null, null, TimelineEntityType + .YARN_FLOW_ACTIVITY.toString(), null, null); + Set entities = htr.getEntities(context, MONITOR_FILTERS, + DATA_TO_RETRIEVE); + } + + private static void configure(HBaseTestingUtility util) { + Configuration config = util.getConfiguration(); + config.setBoolean(YarnConfiguration.TIMELINE_SERVICE_ENABLED, true); + config.setFloat(YarnConfiguration.TIMELINE_SERVICE_VERSION, 2.0f); + config.set(YarnConfiguration.TIMELINE_SERVICE_READER_WEBAPP_ADDRESS, + "localhost:0"); + config.set(YarnConfiguration.RM_CLUSTER_ID, "cluster1"); + config.set(YarnConfiguration.TIMELINE_SERVICE_READER_CLASS, + "org.apache.hadoop.yarn.server.timelineservice.storage." + + "HBaseTimelineReaderImpl"); + config.setInt("hfile.format.version", 3); + config.setLong(TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS, 5000); + } + + private static TimelineReaderServer getTimelineReaderServer() { + return new TimelineReaderServer() { + @Override + protected void addFilters(Configuration conf) { + // The parent code uses hadoop-common jar from this version of + // Hadoop, but the tests are using hadoop-common jar from + // ${hbase-compatible-hadoop.version}. This version uses Jetty 9 + // while ${hbase-compatible-hadoop.version} uses Jetty 6, and there + // are many differences, including classnames and packages. + // We do nothing here, so that we don't cause a NoSuchMethodError or + // NoClassDefFoundError. + // Once ${hbase-compatible-hadoop.version} is changed to Hadoop 3, + // we should be able to remove this @Override. + } + }; + } + + private static HBaseTimelineReaderImpl getHBaseTimelineReaderImpl( + TimelineReaderServer server) { + for (Service s: server.getServices()) { + if (s instanceof HBaseTimelineReaderImpl) { + return (HBaseTimelineReaderImpl) s; + } + } + throw new IllegalStateException("Couldn't find HBaseTimelineReaderImpl"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index 1ebfab29cad..fadfd14edb6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -20,12 +20,18 @@ package org.apache.hadoop.yarn.server.timelineservice.storage; import java.io.IOException; import java.util.Set; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.ConnectionFactory; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineDataToRetrieve; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineEntityFilters; import org.apache.hadoop.yarn.server.timelineservice.reader.TimelineReaderContext; @@ -47,6 +53,12 @@ public class HBaseTimelineReaderImpl private Configuration hbaseConf = null; private Connection conn; + private Configuration monitorHBaseConf = null; + private Connection monitorConn; + private ScheduledExecutorService monitorExecutorService; + private TimelineReaderContext monitorContext; + private long monitorInterval; + private AtomicBoolean hbaseDown = new AtomicBoolean(); public HBaseTimelineReaderImpl() { super(HBaseTimelineReaderImpl.class.getName()); @@ -55,22 +67,72 @@ public class HBaseTimelineReaderImpl @Override public void serviceInit(Configuration conf) throws Exception { super.serviceInit(conf); + + String clusterId = conf.get( + YarnConfiguration.RM_CLUSTER_ID, + YarnConfiguration.DEFAULT_RM_CLUSTER_ID); + monitorContext = + new TimelineReaderContext(clusterId, null, null, null, null, + TimelineEntityType.YARN_FLOW_ACTIVITY.toString(), null, null); + monitorInterval = conf.getLong( + YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS, + YarnConfiguration.DEFAULT_TIMELINE_SERVICE_STORAGE_MONITOR_INTERVAL_MS); + + monitorHBaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); + monitorHBaseConf.setInt("hbase.client.retries.number", 3); + monitorHBaseConf.setLong("hbase.client.pause", 1000); + monitorHBaseConf.setLong("hbase.rpc.timeout", monitorInterval); + monitorHBaseConf.setLong("hbase.client.scanner.timeout.period", + monitorInterval); + monitorHBaseConf.setInt("zookeeper.recovery.retry", 1); + monitorConn = ConnectionFactory.createConnection(monitorHBaseConf); + + monitorExecutorService = Executors.newScheduledThreadPool(1); + hbaseConf = HBaseTimelineStorageUtils.getTimelineServiceHBaseConf(conf); conn = ConnectionFactory.createConnection(hbaseConf); } + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + LOG.info("Scheduling HBase liveness monitor at interval {}", + monitorInterval); + monitorExecutorService.scheduleAtFixedRate(new HBaseMonitor(), 0, + monitorInterval, TimeUnit.MILLISECONDS); + } + @Override protected void serviceStop() throws Exception { if (conn != null) { LOG.info("closing the hbase Connection"); conn.close(); } + if (monitorExecutorService != null) { + monitorExecutorService.shutdownNow(); + if (!monitorExecutorService.awaitTermination(30, TimeUnit.SECONDS)) { + LOG.warn("failed to stop the monitir task in time. " + + "will still proceed to close the monitor."); + } + } + monitorConn.close(); super.serviceStop(); } + private void checkHBaseDown() throws IOException { + if (hbaseDown.get()) { + throw new IOException("HBase is down"); + } + } + + public boolean isHBaseDown() { + return hbaseDown.get(); + } + @Override public TimelineEntity getEntity(TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) throws IOException { + checkHBaseDown(); TimelineEntityReader reader = TimelineEntityReaderFactory.createSingleEntityReader(context, dataToRetrieve); @@ -81,6 +143,7 @@ public class HBaseTimelineReaderImpl public Set getEntities(TimelineReaderContext context, TimelineEntityFilters filters, TimelineDataToRetrieve dataToRetrieve) throws IOException { + checkHBaseDown(); TimelineEntityReader reader = TimelineEntityReaderFactory.createMultipleEntitiesReader(context, filters, dataToRetrieve); @@ -90,7 +153,37 @@ public class HBaseTimelineReaderImpl @Override public Set getEntityTypes(TimelineReaderContext context) throws IOException { + checkHBaseDown(); EntityTypeReader reader = new EntityTypeReader(context); return reader.readEntityTypes(hbaseConf, conn); } + + protected static final TimelineEntityFilters MONITOR_FILTERS = + new TimelineEntityFilters.Builder().entityLimit(1L).build(); + protected static final TimelineDataToRetrieve DATA_TO_RETRIEVE = + new TimelineDataToRetrieve(null, null, null, null, null, null); + + private class HBaseMonitor implements Runnable { + @Override + public void run() { + try { + LOG.info("Running HBase liveness monitor"); + TimelineEntityReader reader = + TimelineEntityReaderFactory.createMultipleEntitiesReader( + monitorContext, MONITOR_FILTERS, DATA_TO_RETRIEVE); + reader.readEntities(monitorHBaseConf, monitorConn); + + // on success, reset hbase down flag + if (hbaseDown.getAndSet(false)) { + if(LOG.isDebugEnabled()) { + LOG.debug("HBase request succeeded, assuming HBase up"); + } + } + } catch (Exception e) { + LOG.warn("Got failure attempting to read from timeline storage, " + + "assuming HBase down", e); + hbaseDown.getAndSet(true); + } + } + } }