mirror of https://github.com/apache/druid.git
filter out segment callbacks not related to given filter
This commit is contained in:
parent
d98a10a7d8
commit
f30d58ad31
|
@ -135,7 +135,7 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
|
||||
@Override
|
||||
public void registerSegmentCallback(
|
||||
Executor exec, final SegmentCallback callback, Predicate<DataSegment> filter
|
||||
final Executor exec, final SegmentCallback callback, final Predicate<DataSegment> filter
|
||||
)
|
||||
{
|
||||
segmentPredicates.put(callback, filter);
|
||||
|
@ -147,9 +147,14 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
final CallbackAction action = callback.segmentAdded(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
final CallbackAction action;
|
||||
if(filter.apply(segment)) {
|
||||
action = callback.segmentAdded(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
}
|
||||
} else {
|
||||
action = CallbackAction.CONTINUE;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
@ -159,9 +164,14 @@ public class BatchServerInventoryView extends ServerInventoryView<Set<DataSegmen
|
|||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
final CallbackAction action = callback.segmentRemoved(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
final CallbackAction action;
|
||||
if(filter.apply(segment)) {
|
||||
action = callback.segmentRemoved(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
}
|
||||
} else {
|
||||
action = CallbackAction.CONTINUE;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
|
|
@ -99,7 +99,7 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
|||
|
||||
@Override
|
||||
public void registerSegmentCallback(
|
||||
Executor exec, final SegmentCallback callback, Predicate<DataSegment> filter
|
||||
final Executor exec, final SegmentCallback callback, final Predicate<DataSegment> filter
|
||||
)
|
||||
{
|
||||
segmentPredicates.put(callback, filter);
|
||||
|
@ -111,9 +111,14 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
|||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
final CallbackAction action = callback.segmentAdded(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
final CallbackAction action;
|
||||
if(filter.apply(segment)) {
|
||||
action = callback.segmentAdded(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
}
|
||||
} else {
|
||||
action = CallbackAction.CONTINUE;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
|
@ -123,11 +128,18 @@ public class SingleServerInventoryView extends ServerInventoryView<DataSegment>
|
|||
DruidServerMetadata server, DataSegment segment
|
||||
)
|
||||
{
|
||||
final CallbackAction action = callback.segmentRemoved(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
{
|
||||
final CallbackAction action;
|
||||
if(filter.apply(segment)) {
|
||||
action = callback.segmentRemoved(server, segment);
|
||||
if (action.equals(CallbackAction.UNREGISTER)) {
|
||||
segmentPredicates.remove(callback);
|
||||
}
|
||||
} else {
|
||||
action = CallbackAction.CONTINUE;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
return action;
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
Loading…
Reference in New Issue