YARN-4210. HBase reader throws NPE if Get returns no rows (Varun Saxena via vrushali)
This commit is contained in:
parent
da2b7bd08e
commit
708fa8b1ae
|
@ -27,6 +27,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntityType;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
||||||
|
@ -85,24 +86,10 @@ class ApplicationEntityReader extends GenericEntityReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Iterable<Result> getResults(Configuration hbaseConf,
|
protected ResultScanner getResults(Configuration hbaseConf,
|
||||||
Connection conn) throws IOException {
|
Connection conn) throws IOException {
|
||||||
// If getEntities() is called for an application, there can be at most
|
throw new UnsupportedOperationException(
|
||||||
// one entity. If the entity passes the filter, it is returned. Otherwise,
|
"we don't support multiple apps query");
|
||||||
// an empty set is returned.
|
|
||||||
byte[] rowKey = ApplicationRowKey.getRowKey(clusterId, userId, flowId,
|
|
||||||
flowRunId, appId);
|
|
||||||
Get get = new Get(rowKey);
|
|
||||||
get.setMaxVersions(Integer.MAX_VALUE);
|
|
||||||
Result result = table.getResult(hbaseConf, conn, get);
|
|
||||||
TimelineEntity entity = parseEntity(result);
|
|
||||||
Set<Result> set;
|
|
||||||
if (entity != null) {
|
|
||||||
set = Collections.singleton(result);
|
|
||||||
} else {
|
|
||||||
set = Collections.emptySet();
|
|
||||||
}
|
|
||||||
return set;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -27,6 +27,7 @@ import java.util.TreeSet;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.filter.PageFilter;
|
import org.apache.hadoop.hbase.filter.PageFilter;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.FlowActivityEntity;
|
||||||
|
@ -88,18 +89,22 @@ class FlowActivityEntityReader extends TimelineEntityReader {
|
||||||
augmentParams(hbaseConf, conn);
|
augmentParams(hbaseConf, conn);
|
||||||
|
|
||||||
NavigableSet<TimelineEntity> entities = new TreeSet<>();
|
NavigableSet<TimelineEntity> entities = new TreeSet<>();
|
||||||
Iterable<Result> results = getResults(hbaseConf, conn);
|
ResultScanner results = getResults(hbaseConf, conn);
|
||||||
for (Result result : results) {
|
try {
|
||||||
TimelineEntity entity = parseEntity(result);
|
for (Result result : results) {
|
||||||
if (entity == null) {
|
TimelineEntity entity = parseEntity(result);
|
||||||
continue;
|
if (entity == null) {
|
||||||
}
|
continue;
|
||||||
entities.add(entity);
|
}
|
||||||
if (entities.size() == limit) {
|
entities.add(entity);
|
||||||
break;
|
if (entities.size() == limit) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return entities;
|
||||||
|
} finally {
|
||||||
|
results.close();
|
||||||
}
|
}
|
||||||
return entities;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -123,7 +128,7 @@ class FlowActivityEntityReader extends TimelineEntityReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Iterable<Result> getResults(Configuration hbaseConf,
|
protected ResultScanner getResults(Configuration hbaseConf,
|
||||||
Connection conn) throws IOException {
|
Connection conn) throws IOException {
|
||||||
Scan scan = new Scan();
|
Scan scan = new Scan();
|
||||||
scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
|
scan.setRowPrefixFilter(FlowActivityRowKey.getRowKeyPrefix(clusterId));
|
||||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.FlowRunEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
||||||
|
@ -96,7 +97,7 @@ class FlowRunEntityReader extends TimelineEntityReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Iterable<Result> getResults(Configuration hbaseConf,
|
protected ResultScanner getResults(Configuration hbaseConf,
|
||||||
Connection conn) throws IOException {
|
Connection conn) throws IOException {
|
||||||
throw new UnsupportedOperationException(
|
throw new UnsupportedOperationException(
|
||||||
"multiple entity query is not supported");
|
"multiple entity query is not supported");
|
||||||
|
@ -110,14 +111,14 @@ class FlowRunEntityReader extends TimelineEntityReader {
|
||||||
flowRun.setRunId(flowRunId);
|
flowRun.setRunId(flowRunId);
|
||||||
|
|
||||||
// read the start time
|
// read the start time
|
||||||
Long startTime = (Long)FlowRunColumn.MIN_START_TIME.readResult(result);
|
Number startTime = (Number)FlowRunColumn.MIN_START_TIME.readResult(result);
|
||||||
if (startTime != null) {
|
if (startTime != null) {
|
||||||
flowRun.setStartTime(startTime);
|
flowRun.setStartTime(startTime.longValue());
|
||||||
}
|
}
|
||||||
// read the end time if available
|
// read the end time if available
|
||||||
Long endTime = (Long)FlowRunColumn.MAX_END_TIME.readResult(result);
|
Number endTime = (Number)FlowRunColumn.MAX_END_TIME.readResult(result);
|
||||||
if (endTime != null) {
|
if (endTime != null) {
|
||||||
flowRun.setMaxEndTime(endTime);
|
flowRun.setMaxEndTime(endTime.longValue());
|
||||||
}
|
}
|
||||||
|
|
||||||
// read the flow version
|
// read the flow version
|
||||||
|
|
|
@ -30,6 +30,7 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.Get;
|
import org.apache.hadoop.hbase.client.Get;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.hbase.client.Scan;
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
|
@ -176,7 +177,7 @@ class GenericEntityReader extends TimelineEntityReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected Iterable<Result> getResults(Configuration hbaseConf,
|
protected ResultScanner getResults(Configuration hbaseConf,
|
||||||
Connection conn) throws IOException {
|
Connection conn) throws IOException {
|
||||||
// Scan through part of the table to find the entities belong to one app
|
// Scan through part of the table to find the entities belong to one app
|
||||||
// and one type
|
// and one type
|
||||||
|
|
|
@ -25,9 +25,12 @@ import java.util.NavigableSet;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.Result;
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineEntity;
|
||||||
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
import org.apache.hadoop.yarn.api.records.timelineservice.TimelineMetric;
|
||||||
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
import org.apache.hadoop.yarn.server.timelineservice.storage.TimelineReader.Field;
|
||||||
|
@ -40,6 +43,7 @@ import org.apache.hadoop.yarn.server.timelineservice.storage.common.ColumnPrefix
|
||||||
* entities that are being requested.
|
* entities that are being requested.
|
||||||
*/
|
*/
|
||||||
abstract class TimelineEntityReader {
|
abstract class TimelineEntityReader {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TimelineEntityReader.class);
|
||||||
protected final boolean singleEntityRead;
|
protected final boolean singleEntityRead;
|
||||||
|
|
||||||
protected String userId;
|
protected String userId;
|
||||||
|
@ -131,6 +135,11 @@ abstract class TimelineEntityReader {
|
||||||
augmentParams(hbaseConf, conn);
|
augmentParams(hbaseConf, conn);
|
||||||
|
|
||||||
Result result = getResult(hbaseConf, conn);
|
Result result = getResult(hbaseConf, conn);
|
||||||
|
if (result == null || result.isEmpty()) {
|
||||||
|
// Could not find a matching row.
|
||||||
|
LOG.info("Cannot find matching entity of type " + entityType);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
return parseEntity(result);
|
return parseEntity(result);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -145,18 +154,22 @@ abstract class TimelineEntityReader {
|
||||||
augmentParams(hbaseConf, conn);
|
augmentParams(hbaseConf, conn);
|
||||||
|
|
||||||
NavigableSet<TimelineEntity> entities = new TreeSet<>();
|
NavigableSet<TimelineEntity> entities = new TreeSet<>();
|
||||||
Iterable<Result> results = getResults(hbaseConf, conn);
|
ResultScanner results = getResults(hbaseConf, conn);
|
||||||
for (Result result : results) {
|
try {
|
||||||
TimelineEntity entity = parseEntity(result);
|
for (Result result : results) {
|
||||||
if (entity == null) {
|
TimelineEntity entity = parseEntity(result);
|
||||||
continue;
|
if (entity == null) {
|
||||||
}
|
continue;
|
||||||
entities.add(entity);
|
}
|
||||||
if (entities.size() > limit) {
|
entities.add(entity);
|
||||||
entities.pollLast();
|
if (entities.size() > limit) {
|
||||||
|
entities.pollLast();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
return entities;
|
||||||
|
} finally {
|
||||||
|
results.close();
|
||||||
}
|
}
|
||||||
return entities;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -184,9 +197,9 @@ abstract class TimelineEntityReader {
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Fetches an iterator for {@link Result} instances for a multi-entity read.
|
* Fetches a {@link ResultScanner} for a multi-entity read.
|
||||||
*/
|
*/
|
||||||
protected abstract Iterable<Result> getResults(Configuration hbaseConf,
|
protected abstract ResultScanner getResults(Configuration hbaseConf,
|
||||||
Connection conn) throws IOException;
|
Connection conn) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -59,6 +59,7 @@ import org.junit.Test;
|
||||||
import com.sun.jersey.api.client.Client;
|
import com.sun.jersey.api.client.Client;
|
||||||
import com.sun.jersey.api.client.ClientResponse;
|
import com.sun.jersey.api.client.ClientResponse;
|
||||||
import com.sun.jersey.api.client.GenericType;
|
import com.sun.jersey.api.client.GenericType;
|
||||||
|
import com.sun.jersey.api.client.ClientResponse.Status;
|
||||||
import com.sun.jersey.api.client.config.ClientConfig;
|
import com.sun.jersey.api.client.config.ClientConfig;
|
||||||
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
import com.sun.jersey.api.client.config.DefaultClientConfig;
|
||||||
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
import com.sun.jersey.client.urlconnection.HttpURLConnectionFactory;
|
||||||
|
@ -281,6 +282,16 @@ public class TestTimelineReaderWebServicesFlowRun {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void verifyHttpResponse(Client client, URI uri,
|
||||||
|
Status status) {
|
||||||
|
ClientResponse resp =
|
||||||
|
client.resource(uri).accept(MediaType.APPLICATION_JSON)
|
||||||
|
.type(MediaType.APPLICATION_JSON).get(ClientResponse.class);
|
||||||
|
assertNotNull(resp);
|
||||||
|
assertTrue("Response from server should have been " + status,
|
||||||
|
resp.getClientResponseStatus().equals(status));
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testGetFlowRun() throws Exception {
|
public void testGetFlowRun() throws Exception {
|
||||||
Client client = createClient();
|
Client client = createClient();
|
||||||
|
@ -354,6 +365,35 @@ public class TestTimelineReaderWebServicesFlowRun {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetFlowRunNotPresent() throws Exception {
|
||||||
|
Client client = createClient();
|
||||||
|
try {
|
||||||
|
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||||
|
"timeline/flowrun/cluster1/flow_name/1002345678929?userid=user1");
|
||||||
|
verifyHttpResponse(client, uri, Status.NOT_FOUND);
|
||||||
|
} finally {
|
||||||
|
client.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testGetFlowsNotPresent() throws Exception {
|
||||||
|
Client client = createClient();
|
||||||
|
try {
|
||||||
|
URI uri = URI.create("http://localhost:" + serverPort + "/ws/v2/" +
|
||||||
|
"timeline/flows/cluster2");
|
||||||
|
ClientResponse resp = getResponse(client, uri);
|
||||||
|
Set<FlowActivityEntity> entities =
|
||||||
|
resp.getEntity(new GenericType<Set<FlowActivityEntity>>(){});
|
||||||
|
assertEquals(MediaType.APPLICATION_JSON_TYPE, resp.getType());
|
||||||
|
assertNotNull(entities);
|
||||||
|
assertEquals(0, entities.size());
|
||||||
|
} finally {
|
||||||
|
client.destroy();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void stop() throws Exception {
|
public void stop() throws Exception {
|
||||||
if (server != null) {
|
if (server != null) {
|
||||||
|
|
|
@ -249,11 +249,7 @@ public class TestHBaseTimelineStorage {
|
||||||
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
|
TimelineEntity e1 = hbr.getEntity(user, cluster, flow, runid, appId,
|
||||||
entity.getType(), entity.getId(),
|
entity.getType(), entity.getId(),
|
||||||
EnumSet.of(TimelineReader.Field.ALL));
|
EnumSet.of(TimelineReader.Field.ALL));
|
||||||
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
|
||||||
appId, entity.getType(), null, null, null, null, null, null, null,
|
|
||||||
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
|
||||||
assertNotNull(e1);
|
assertNotNull(e1);
|
||||||
assertEquals(1, es1.size());
|
|
||||||
|
|
||||||
// verify attributes
|
// verify attributes
|
||||||
assertEquals(appId, e1.getId());
|
assertEquals(appId, e1.getId());
|
||||||
|
@ -610,18 +606,9 @@ public class TestHBaseTimelineStorage {
|
||||||
TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
|
TimelineEntity e2 = hbr.getEntity(user, cluster, null, null, appName,
|
||||||
entity.getType(), entity.getId(),
|
entity.getType(), entity.getId(),
|
||||||
EnumSet.of(TimelineReader.Field.ALL));
|
EnumSet.of(TimelineReader.Field.ALL));
|
||||||
Set<TimelineEntity> es1 = hbr.getEntities(user, cluster, flow, runid,
|
|
||||||
appName, entity.getType(), null, null, null, null, null, null, null,
|
|
||||||
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
|
||||||
Set<TimelineEntity> es2 = hbr.getEntities(user, cluster, null, null,
|
|
||||||
appName, entity.getType(), null, null, null, null, null, null, null,
|
|
||||||
null, null, null, null, EnumSet.of(TimelineReader.Field.ALL));
|
|
||||||
assertNotNull(e1);
|
assertNotNull(e1);
|
||||||
assertNotNull(e2);
|
assertNotNull(e2);
|
||||||
assertEquals(e1, e2);
|
assertEquals(e1, e2);
|
||||||
assertEquals(1, es1.size());
|
|
||||||
assertEquals(1, es2.size());
|
|
||||||
assertEquals(es1, es2);
|
|
||||||
|
|
||||||
// check the events
|
// check the events
|
||||||
NavigableSet<TimelineEvent> events = e1.getEvents();
|
NavigableSet<TimelineEvent> events = e1.getEvents();
|
||||||
|
|
Loading…
Reference in New Issue