Improve error handling of ClassCastException in terms aggregations.

What is the problem we are trying to solve ?
===========================================

When we are doing aggregations against a field name as shown in
https://github.com/HarishAtGitHub/elasticsearch-tester/blob/master/12135.py#L37-L46

search = {
           "aggs": {
             "NAME": {
               "terms": {
                 "field": "ip_str",
                 "size": 10
               }
             }
           }
         }
and when the field "ip_str" has values of different types in different indices
. say one is of type StringTerms type and other is of IP(LongTerms type) then
the aggregation fails as the types do not match(incompatible).
The failure throws a class cast exception as follows:
{
   "error": {
      "root_cause": [],
      "type": "reduce_search_phase_exception",
      "reason": "[reduce] ",
      "phase": "query",
      "grouped": true,
      "failed_shards": [],
      "caused_by": {
         "type": "class_cast_exception",
         "reason": "org.elasticsearch.search.aggregations.bucket.terms.LongTerms$Bucket cannot be cast to org.elasticsearch.search.aggregations.bucket.terms.StringTerms$Bucket"
      }
   },
   "status": 503
}

which is hard to understand . User cannot infer anything about the cause of the problem and what he should do from seeing the
class cast exception.

What can be the possible solution ?
===================================

Make the exception more readable by showing him the root cause of the problem so that he can
understand which area actually caused the problem, so that he can take necessary steps further.

Code Analysis
=============

Debugging code shows that:
the query /{indices}/_search?search_type=count involves two phases
1) search phase
***************
     searchService.sendExecuteQuery(...) [Ref: TransportSearchCountAction]

     what happens here ?
        the phase 1, which is the search phase goes without error.
        In this phase the shards for the given indexes are collected and the search is done on all asynchronously
        and finally collected in the variable "firstResults" and given to meger phase.

        [Flow: .... -> TransportSearchTypeAction -> method performFirstPhase]

2) merge phase
**************
     searchPhaseController.merge(...firstResults...) [Ref: TransportSearchCountAction]

     what happens here ?
        the "firstresults" QuerySearchResults are now to be aggregated and combined.

        [Flow: SearchPhaseController.merge(...) -> ..... -> InternalTerms.doReduce(...)]

the phase 1, which is the search phase goes without error.
The problem comes in phase 2, which is merge phase.
Now the individual term buckets are available.
As per the test case , there are two indices cast and cast2, so by default 10 shards.
cast has ip_str of type StringTerms
cast2 has ip_str of type ip which is actually LongTerms

so here two types of Buckets exist. StringTerms_Bucket and LongTerms_Bucket.
Now the aggregation is to be put inside the BucketPriorityQueue(size 2: as out of 10, 2 has hits) finally.
(docs of PriorityQueue: https://lucene.apache.org/core/4_4_0/core/org/apache/lucene/util/PriorityQueue.html#insertWithOverflow(T))

Now first the LongTerms$Bucket is put inside.
then the StringTerms$Bucket is to be put in.
This is the area where exception is thrown. What happens is when adding the StringTerms$Bucket now it has to
goes through the code "lessThan(element, heap[1])"
which finally calls

---------------------------------------------------------------------------------------------
|      StringTerms$Bucket.compareTerms(other)  <---------------- Area of exception          |
|                                                                                           |
--------------------------------------------------------------------------------------------

where when comparing one to other a type cast is done and it fails as StringTerms$Bucket and LongTerms$Bucket are
incompatible.

Approach to solve:
==================

The best way is to make user understand that the problem is when reducing/merging/aggregating the buckets which came as a result of
querying different shards, so that this will make them infer that the problem is because the values of the fields are of different types.
The message is also user friendly and much better than the indecipherable classcastexception.

The only place to infer correctly that the aggregation has failed is in the place where aggregations take place.
so

at InternalTerms.java -> (BucketPriorityQueue)ordered.insertWithOverflow(b);

so here I can throw AggregationExecutionException saying it is because the buckets are of different
types.

But when can I infer at this point that the failure is due to mismatch of types of buckets ???
it can be possible only if at this point it is informed that the problem which occurred deep inside
is due to buckets that were incomparable.
so from just a classCastException we cannot make such a pointed exact inference, because
as class cast exception can be due to a number of scenarios and at a number of places.

so unless we inform the exact problem to InternalTerms it will not be able to infer properly.
so infer the classCastException at the compareTerms function itself that it is a IncomparableTermBucktesTypeException.
This is the best place to infer classCastException as this the place which generated the exception.
Best inference of exceptions can be done only at the source/origin of the exception.

so IncomparableTermBucktesTypeException to InternalTerms-> will make it infer and conclude on why
aggregation failed and give best information to user.

Close #12821
This commit is contained in:
Harish Kayarohanam 2015-08-12 13:27:00 +05:30 committed by Adrien Grand
parent 532806af1a
commit 3976854ada
1 changed files with 18 additions and 0 deletions

View File

@ -24,6 +24,7 @@ import com.google.common.collect.Multimap;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.search.aggregations.AggregationExecutionException;
import org.elasticsearch.search.aggregations.Aggregations;
import org.elasticsearch.search.aggregations.InternalAggregation;
import org.elasticsearch.search.aggregations.InternalAggregations;
@ -170,8 +171,25 @@ public abstract class InternalTerms<A extends InternalTerms, B extends InternalT
Multimap<Object, InternalTerms.Bucket> buckets = ArrayListMultimap.create();
long sumDocCountError = 0;
long otherDocCount = 0;
InternalTerms<A, B> referenceTerms = null;
for (InternalAggregation aggregation : aggregations) {
InternalTerms<A, B> terms = (InternalTerms<A, B>) aggregation;
if (referenceTerms == null && !terms.getClass().equals(UnmappedTerms.class)) {
referenceTerms = (InternalTerms<A, B>) aggregation;
}
if (referenceTerms != null &&
!referenceTerms.getClass().equals(terms.getClass()) &&
!terms.getClass().equals(UnmappedTerms.class)) {
// control gets into this loop when the same field name against which the query is executed
// is of different types in different indices.
throw new AggregationExecutionException("Merging/Reducing the aggregations failed " +
"when computing the aggregation [ Name: " +
referenceTerms.getName() + ", Type: " +
referenceTerms.type() + " ]" + " because: " +
"the field you gave in the aggregation query " +
"existed as two different types " +
"in two different indices");
}
otherDocCount += terms.getSumOfOtherDocCounts();
final long thisAggDocCountError;
if (terms.buckets.size() < this.shardSize || this.order == InternalOrder.TERM_ASC || this.order == InternalOrder.TERM_DESC) {