diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java index e738d3971d9..1148b80d196 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineReaderHBaseDown.java @@ -150,7 +150,14 @@ public class TestTimelineReaderHBaseDown { waitForHBaseDown(htr); util.startMiniHBaseCluster(1, 1); - GenericTestUtils.waitFor(() -> !htr.isHBaseDown(), 1000, 150000); + GenericTestUtils.waitFor(() -> { + try { + htr.getTimelineStorageMonitor().checkStorageIsUp(); + return true; + } catch (IOException e) { + return false; + } + }, 1000, 150000); } finally { util.shutdownMiniCluster(); } @@ -158,8 +165,15 @@ public class TestTimelineReaderHBaseDown { private static void waitForHBaseDown(HBaseTimelineReaderImpl htr) throws TimeoutException, InterruptedException { - GenericTestUtils.waitFor(() -> htr.isHBaseDown(), 1000, 150000); try { + GenericTestUtils.waitFor(() -> { + try { + htr.getTimelineStorageMonitor().checkStorageIsUp(); + return false; + } catch (IOException e) { + return true; + } + }, 1000, 150000); checkQuery(htr); Assert.fail("Query should fail when HBase is down"); } catch (IOException e) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterHBaseDown.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterHBaseDown.java new file mode 100644 index 00000000000..cb89ba4223e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase-tests/src/test/java/org/apache/hadoop/yarn/server/timelineservice/storage/TestTimelineWriterHBaseDown.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.timelineservice.storage; + +import java.io.IOException; + +import org.junit.Test; +import org.junit.Assert; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext; +import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities; +import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS; +import static org.junit.Assert.assertTrue; + +/** + * This class tests HbaseTimelineWriter with Hbase Down. + */ +public class TestTimelineWriterHBaseDown { + + @Test(timeout=300000) + public void testTimelineWriterHBaseDown() throws Exception { + HBaseTestingUtility util = new HBaseTestingUtility(); + HBaseTimelineWriterImpl writer = new HBaseTimelineWriterImpl(); + try { + Configuration c1 = util.getConfiguration(); + c1.setLong(TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS, 5000); + writer.init(c1); + writer.start(); + + util.startMiniCluster(); + DataGeneratorForTest.createSchema(util.getConfiguration()); + + TimelineStorageMonitor storageMonitor = writer. + getTimelineStorageMonitor(); + waitForHBaseToUp(storageMonitor); + + try { + storageMonitor.checkStorageIsUp(); + } catch(IOException e) { + Assert.fail("HBaseStorageMonitor failed to detect HBase Up"); + } + + util.shutdownMiniHBaseCluster(); + waitForHBaseToDown(storageMonitor); + + TimelineEntities te = new TimelineEntities(); + ApplicationEntity entity = new ApplicationEntity(); + String appId = "application_1000178881110_2002"; + entity.setId(appId); + Long cTime = 1425016501000L; + entity.setCreatedTime(cTime); + te.addEntity(entity); + + boolean exceptionCaught = false; + try{ + writer.write(new TimelineCollectorContext("ATS1", "user1", "flow2", + "AB7822C10F1111", 1002345678919L, appId), te, + UserGroupInformation.createRemoteUser("user1")); + } catch (IOException e) { + if (e.getMessage().equals("HBase is down")) { + exceptionCaught = true; + } + } + assertTrue("HBaseStorageMonitor failed to detect HBase Down", + exceptionCaught); + } finally { + writer.stop(); + util.shutdownMiniCluster(); + } + } + + public void waitForHBaseToUp(TimelineStorageMonitor storageMonitor) + throws Exception { + GenericTestUtils.waitFor(() -> { + try { + storageMonitor.checkStorageIsUp(); + return true; + } catch (IOException e) { + return false; + } + }, 1000, 150000); + } + + public void waitForHBaseToDown(TimelineStorageMonitor storageMonitor) + throws Exception { + GenericTestUtils.waitFor(() -> { + try { + storageMonitor.checkStorageIsUp(); + return false; + } catch (IOException e) { + return true; + } + }, 1000, 150000); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java index 4c71fd6b49e..f3592d29240 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineReaderImpl.java @@ -78,10 +78,6 @@ public class HBaseTimelineReaderImpl super.serviceStop(); } - public boolean isHBaseDown() { - return storageMonitor.isStorageDown(); - } - @Override public TimelineEntity getEntity(TimelineReaderContext context, TimelineDataToRetrieve dataToRetrieve) throws IOException { @@ -113,14 +109,19 @@ public class HBaseTimelineReaderImpl @Override public TimelineHealth getHealthStatus() { - if (!this.isHBaseDown()) { + try { + storageMonitor.checkStorageIsUp(); return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING, ""); - } else { + } catch (IOException e){ return new TimelineHealth( TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE, "HBase connection is down"); } } + protected TimelineStorageMonitor getTimelineStorageMonitor() { + return storageMonitor; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java index 3414a56ef35..a398febccc7 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice-hbase/hadoop-yarn-server-timelineservice-hbase-client/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/HBaseTimelineWriterImpl.java @@ -100,6 +100,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements .getLogger(HBaseTimelineWriterImpl.class); private Connection conn; + private TimelineStorageMonitor storageMonitor; private TypedBufferedMutator entityTable; private TypedBufferedMutator appToFlowTable; private TypedBufferedMutator applicationTable; @@ -150,9 +151,16 @@ public class HBaseTimelineWriterImpl extends AbstractService implements UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ? UserGroupInformation.getLoginUser() : UserGroupInformation.getCurrentUser(); + storageMonitor = new HBaseStorageMonitor(conf); LOG.info("Initialized HBaseTimelineWriterImpl UGI to " + ugi); } + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + storageMonitor.start(); + } + /** * Stores the entire information in TimelineEntities to the timeline store. */ @@ -160,7 +168,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements public TimelineWriteResponse write(TimelineCollectorContext context, TimelineEntities data, UserGroupInformation callerUgi) throws IOException { - + storageMonitor.checkStorageIsUp(); TimelineWriteResponse putStatus = new TimelineWriteResponse(); String clusterId = context.getClusterId(); @@ -242,6 +250,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements public TimelineWriteResponse write(TimelineCollectorContext context, TimelineDomain domain) throws IOException { + storageMonitor.checkStorageIsUp(); TimelineWriteResponse putStatus = new TimelineWriteResponse(); String clusterId = context.getClusterId(); @@ -591,6 +600,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements @Override public TimelineWriteResponse aggregate(TimelineEntity data, TimelineAggregationTrack track) throws IOException { + storageMonitor.checkStorageIsUp(); return null; } @@ -603,6 +613,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements */ @Override public void flush() throws IOException { + storageMonitor.checkStorageIsUp(); // flush all buffered mutators entityTable.flush(); appToFlowTable.flush(); @@ -653,6 +664,12 @@ public class HBaseTimelineWriterImpl extends AbstractService implements LOG.info("closing the hbase Connection"); conn.close(); } + storageMonitor.stop(); super.serviceStop(); } + + protected TimelineStorageMonitor getTimelineStorageMonitor() { + return storageMonitor; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java index fc96f19d75e..dce6b8d6f91 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-timelineservice/src/main/java/org/apache/hadoop/yarn/server/timelineservice/storage/TimelineStorageMonitor.java @@ -81,10 +81,6 @@ public abstract class TimelineStorageMonitor { } } - public boolean isStorageDown() { - return storageDown.get(); - } - private class MonitorThread implements Runnable { @Override public void run() {