YARN-9374. Improve Timeline service resilience when HBase is unavailable.

Contributed by Prabhu Joseph and Szilard Nemeth
This commit is contained in:
Eric Yang 2019-06-24 12:19:14 -04:00
parent 81d2f1b724
commit b220ec6f61
5 changed files with 158 additions and 13 deletions

View File

@ -150,7 +150,14 @@ public void testTimelineReaderRecoversAfterHBaseReturns() throws Exception {
waitForHBaseDown(htr);
util.startMiniHBaseCluster(1, 1);
GenericTestUtils.waitFor(() -> !htr.isHBaseDown(), 1000, 150000);
GenericTestUtils.waitFor(() -> {
try {
htr.getTimelineStorageMonitor().checkStorageIsUp();
return true;
} catch (IOException e) {
return false;
}
}, 1000, 150000);
} finally {
util.shutdownMiniCluster();
}
@ -158,8 +165,15 @@ public void testTimelineReaderRecoversAfterHBaseReturns() throws Exception {
private static void waitForHBaseDown(HBaseTimelineReaderImpl htr) throws
TimeoutException, InterruptedException {
GenericTestUtils.waitFor(() -> htr.isHBaseDown(), 1000, 150000);
try {
GenericTestUtils.waitFor(() -> {
try {
htr.getTimelineStorageMonitor().checkStorageIsUp();
return false;
} catch (IOException e) {
return true;
}
}, 1000, 150000);
checkQuery(htr);
Assert.fail("Query should fail when HBase is down");
} catch (IOException e) {

View File

@ -0,0 +1,117 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.timelineservice.storage;
import java.io.IOException;
import org.junit.Test;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.server.timelineservice.collector.TimelineCollectorContext;
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntities;
import org.apache.hadoop.yarn.api.records.timelineservice.ApplicationEntity;
import static org.apache.hadoop.yarn.conf.YarnConfiguration.TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS;
import static org.junit.Assert.assertTrue;
/**
* This class tests HbaseTimelineWriter with Hbase Down.
*/
public class TestTimelineWriterHBaseDown {
@Test(timeout=300000)
public void testTimelineWriterHBaseDown() throws Exception {
HBaseTestingUtility util = new HBaseTestingUtility();
HBaseTimelineWriterImpl writer = new HBaseTimelineWriterImpl();
try {
Configuration c1 = util.getConfiguration();
c1.setLong(TIMELINE_SERVICE_READER_STORAGE_MONITOR_INTERVAL_MS, 5000);
writer.init(c1);
writer.start();
util.startMiniCluster();
DataGeneratorForTest.createSchema(util.getConfiguration());
TimelineStorageMonitor storageMonitor = writer.
getTimelineStorageMonitor();
waitForHBaseToUp(storageMonitor);
try {
storageMonitor.checkStorageIsUp();
} catch(IOException e) {
Assert.fail("HBaseStorageMonitor failed to detect HBase Up");
}
util.shutdownMiniHBaseCluster();
waitForHBaseToDown(storageMonitor);
TimelineEntities te = new TimelineEntities();
ApplicationEntity entity = new ApplicationEntity();
String appId = "application_1000178881110_2002";
entity.setId(appId);
Long cTime = 1425016501000L;
entity.setCreatedTime(cTime);
te.addEntity(entity);
boolean exceptionCaught = false;
try{
writer.write(new TimelineCollectorContext("ATS1", "user1", "flow2",
"AB7822C10F1111", 1002345678919L, appId), te,
UserGroupInformation.createRemoteUser("user1"));
} catch (IOException e) {
if (e.getMessage().equals("HBase is down")) {
exceptionCaught = true;
}
}
assertTrue("HBaseStorageMonitor failed to detect HBase Down",
exceptionCaught);
} finally {
writer.stop();
util.shutdownMiniCluster();
}
}
public void waitForHBaseToUp(TimelineStorageMonitor storageMonitor)
throws Exception {
GenericTestUtils.waitFor(() -> {
try {
storageMonitor.checkStorageIsUp();
return true;
} catch (IOException e) {
return false;
}
}, 1000, 150000);
}
public void waitForHBaseToDown(TimelineStorageMonitor storageMonitor)
throws Exception {
GenericTestUtils.waitFor(() -> {
try {
storageMonitor.checkStorageIsUp();
return false;
} catch (IOException e) {
return true;
}
}, 1000, 150000);
}
}

View File

@ -78,10 +78,6 @@ protected void serviceStop() throws Exception {
super.serviceStop();
}
public boolean isHBaseDown() {
return storageMonitor.isStorageDown();
}
@Override
public TimelineEntity getEntity(TimelineReaderContext context,
TimelineDataToRetrieve dataToRetrieve) throws IOException {
@ -113,14 +109,19 @@ public Set<String> getEntityTypes(TimelineReaderContext context)
@Override
public TimelineHealth getHealthStatus() {
if (!this.isHBaseDown()) {
try {
storageMonitor.checkStorageIsUp();
return new TimelineHealth(TimelineHealth.TimelineHealthStatus.RUNNING,
"");
} else {
} catch (IOException e){
return new TimelineHealth(
TimelineHealth.TimelineHealthStatus.READER_CONNECTION_FAILURE,
"HBase connection is down");
}
}
protected TimelineStorageMonitor getTimelineStorageMonitor() {
return storageMonitor;
}
}

View File

@ -100,6 +100,7 @@ public class HBaseTimelineWriterImpl extends AbstractService implements
.getLogger(HBaseTimelineWriterImpl.class);
private Connection conn;
private TimelineStorageMonitor storageMonitor;
private TypedBufferedMutator<EntityTable> entityTable;
private TypedBufferedMutator<AppToFlowTable> appToFlowTable;
private TypedBufferedMutator<ApplicationTable> applicationTable;
@ -150,9 +151,16 @@ protected void serviceInit(Configuration conf) throws Exception {
UserGroupInformation ugi = UserGroupInformation.isSecurityEnabled() ?
UserGroupInformation.getLoginUser() :
UserGroupInformation.getCurrentUser();
storageMonitor = new HBaseStorageMonitor(conf);
LOG.info("Initialized HBaseTimelineWriterImpl UGI to " + ugi);
}
@Override
protected void serviceStart() throws Exception {
super.serviceStart();
storageMonitor.start();
}
/**
* Stores the entire information in TimelineEntities to the timeline store.
*/
@ -160,7 +168,7 @@ protected void serviceInit(Configuration conf) throws Exception {
public TimelineWriteResponse write(TimelineCollectorContext context,
TimelineEntities data, UserGroupInformation callerUgi)
throws IOException {
storageMonitor.checkStorageIsUp();
TimelineWriteResponse putStatus = new TimelineWriteResponse();
String clusterId = context.getClusterId();
@ -242,6 +250,7 @@ public TimelineWriteResponse write(TimelineCollectorContext context,
public TimelineWriteResponse write(TimelineCollectorContext context,
TimelineDomain domain)
throws IOException {
storageMonitor.checkStorageIsUp();
TimelineWriteResponse putStatus = new TimelineWriteResponse();
String clusterId = context.getClusterId();
@ -591,6 +600,7 @@ private <T extends BaseTable<T>> void storeEvents(
@Override
public TimelineWriteResponse aggregate(TimelineEntity data,
TimelineAggregationTrack track) throws IOException {
storageMonitor.checkStorageIsUp();
return null;
}
@ -603,6 +613,7 @@ public TimelineWriteResponse aggregate(TimelineEntity data,
*/
@Override
public void flush() throws IOException {
storageMonitor.checkStorageIsUp();
// flush all buffered mutators
entityTable.flush();
appToFlowTable.flush();
@ -653,6 +664,12 @@ protected void serviceStop() throws Exception {
LOG.info("closing the hbase Connection");
conn.close();
}
storageMonitor.stop();
super.serviceStop();
}
protected TimelineStorageMonitor getTimelineStorageMonitor() {
return storageMonitor;
}
}

View File

@ -81,10 +81,6 @@ public void checkStorageIsUp() throws IOException {
}
}
public boolean isStorageDown() {
return storageDown.get();
}
private class MonitorThread implements Runnable {
@Override
public void run() {