mirror of https://github.com/apache/druid.git
This reverts commit a931debf79
.
Fixes #3283
The core issue here is that realtime nodes announce their size as 0, so a coordinator which interns the realtime version of the data segment will not be able to see the new sized announcement when handoff occurs.
This is caused by the `eauals` method on a `DataSegment` only evaluating the identifier. the `eauals` method *should* be correct for object equivalence, and things which need to check equivalence of some sub-portion of the object should do so explicitly.
This commit is contained in:
parent
95a58097e2
commit
188a4bc89a
|
@ -25,10 +25,6 @@ import com.google.common.base.Function;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
import com.google.common.base.Predicates;
|
import com.google.common.base.Predicates;
|
||||||
import com.google.common.collect.FluentIterable;
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
|
||||||
import com.google.common.collect.Interner;
|
|
||||||
import com.google.common.collect.Interners;
|
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.MapMaker;
|
import com.google.common.collect.MapMaker;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
@ -52,7 +48,6 @@ import java.util.concurrent.Executor;
|
||||||
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>>
|
public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegment>>
|
||||||
implements FilteredServerInventoryView
|
implements FilteredServerInventoryView
|
||||||
{
|
{
|
||||||
private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
|
|
||||||
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
|
private static final EmittingLogger log = new EmittingLogger(BatchServerInventoryView.class);
|
||||||
|
|
||||||
final private ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
|
final private ConcurrentMap<String, Set<DataSegment>> zNodes = new MapMaker().makeMap();
|
||||||
|
@ -193,17 +188,4 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
||||||
{
|
{
|
||||||
segmentPredicates.remove(callback);
|
segmentPredicates.remove(callback);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected Set<DataSegment> internInventory(Set<DataSegment> sample)
|
|
||||||
{
|
|
||||||
return ImmutableSet.copyOf(FluentIterable.from(sample).transform(new Function<DataSegment, DataSegment>()
|
|
||||||
{
|
|
||||||
@Override
|
|
||||||
public DataSegment apply(DataSegment input)
|
|
||||||
{
|
|
||||||
return DATA_SEGMENT_INTERNER.intern(input);
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@
|
||||||
|
|
||||||
package io.druid.client;
|
package io.druid.client;
|
||||||
|
|
||||||
|
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||||
import com.fasterxml.jackson.core.type.TypeReference;
|
import com.fasterxml.jackson.core.type.TypeReference;
|
||||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Charsets;
|
import com.google.common.base.Charsets;
|
||||||
|
@ -100,7 +101,7 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
|
||||||
public InventoryType deserializeInventory(byte[] bytes)
|
public InventoryType deserializeInventory(byte[] bytes)
|
||||||
{
|
{
|
||||||
try {
|
try {
|
||||||
return internInventory(jsonMapper.<InventoryType>readValue(bytes, typeReference));
|
return jsonMapper.readValue(bytes, typeReference);
|
||||||
}
|
}
|
||||||
catch (IOException e) {
|
catch (IOException e) {
|
||||||
CharBuffer.wrap(StringUtils.fromUtf8(bytes).toCharArray());
|
CharBuffer.wrap(StringUtils.fromUtf8(bytes).toCharArray());
|
||||||
|
@ -193,15 +194,6 @@ public abstract class ServerInventoryView<InventoryType> implements ServerView,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Optionally override to allow interning the inventory
|
|
||||||
* @param sample The inventory to intern
|
|
||||||
* @return An interned instance equal to sample
|
|
||||||
*/
|
|
||||||
protected InventoryType internInventory(InventoryType sample) {
|
|
||||||
return sample;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isStarted()
|
public boolean isStarted()
|
||||||
{
|
{
|
||||||
return started.get();
|
return started.get();
|
||||||
|
|
|
@ -24,8 +24,6 @@ import com.fasterxml.jackson.databind.ObjectMapper;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
import com.google.common.base.Predicate;
|
import com.google.common.base.Predicate;
|
||||||
import com.google.common.base.Predicates;
|
import com.google.common.base.Predicates;
|
||||||
import com.google.common.collect.Interner;
|
|
||||||
import com.google.common.collect.Interners;
|
|
||||||
import com.google.common.collect.MapMaker;
|
import com.google.common.collect.MapMaker;
|
||||||
import com.google.inject.Inject;
|
import com.google.inject.Inject;
|
||||||
import com.metamx.common.Pair;
|
import com.metamx.common.Pair;
|
||||||
|
@ -45,7 +43,6 @@ import java.util.concurrent.Executor;
|
||||||
public class SingleServerInventoryView extends ServerInventoryView<DataSegment> implements FilteredServerInventoryView
|
public class SingleServerInventoryView extends ServerInventoryView<DataSegment> implements FilteredServerInventoryView
|
||||||
{
|
{
|
||||||
private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);
|
private static final EmittingLogger log = new EmittingLogger(SingleServerInventoryView.class);
|
||||||
private static final Interner<DataSegment> DATA_SEGMENT_INTERNER = Interners.newWeakInterner();
|
|
||||||
|
|
||||||
final private ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates = new MapMaker()
|
final private ConcurrentMap<SegmentCallback, Predicate<Pair<DruidServerMetadata, DataSegment>>> segmentPredicates = new MapMaker()
|
||||||
.makeMap();
|
.makeMap();
|
||||||
|
@ -167,9 +164,4 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
protected DataSegment internInventory(DataSegment sample)
|
|
||||||
{
|
|
||||||
return DATA_SEGMENT_INTERNER.intern(sample);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue