From 8fd587b28cc5ee3562362682555d7f843f4d4a68 Mon Sep 17 00:00:00 2001 From: Clint Wylie Date: Thu, 3 Feb 2022 12:57:34 -0800 Subject: [PATCH] remove duplicate Broker ServerInventoryView, improve HttpServerInventoryView logging (#12209) * changes: * remove SystemSchema duplicate ServerInventoryView in broker * suppress duplicate segment added/removed warnings in HttpServerInventoryView when doing a full sync * fixes --- .../druid/client/HttpServerInventoryView.java | 18 ++++++++++-------- .../druid/sql/calcite/schema/SystemSchema.java | 8 ++++---- .../schema/DruidCalciteSchemaModuleTest.java | 6 +++--- .../sql/calcite/schema/SystemSchemaTest.java | 7 +++---- .../druid/sql/calcite/util/CalciteTests.java | 18 +++++++++++++++--- .../apache/druid/sql/guice/SqlModuleTest.java | 6 +++--- 6 files changed, 38 insertions(+), 25 deletions(-) diff --git a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java index aacd4b74f14..decab1f7cc7 100644 --- a/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java +++ b/server/src/main/java/org/apache/druid/client/HttpServerInventoryView.java @@ -550,7 +550,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer if (request instanceof SegmentChangeRequestLoad) { DataSegment segment = ((SegmentChangeRequestLoad) request).getSegment(); toRemove.remove(segment.getId()); - addSegment(segment); + addSegment(segment, true); } else { log.error( "Server[%s] gave a non-load dataSegmentChangeRequest[%s]., Ignored.", @@ -561,7 +561,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer } for (DataSegment segmentToRemove : toRemove.values()) { - removeSegment(segmentToRemove); + removeSegment(segmentToRemove, true); } } @@ -570,9 +570,9 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer { for (DataSegmentChangeRequest request : changes) { if (request instanceof SegmentChangeRequestLoad) { - addSegment(((SegmentChangeRequestLoad) request).getSegment()); + addSegment(((SegmentChangeRequestLoad) request).getSegment(), false); } else if (request instanceof SegmentChangeRequestDrop) { - removeSegment(((SegmentChangeRequestDrop) request).getSegment()); + removeSegment(((SegmentChangeRequestDrop) request).getSegment(), false); } else { log.error( "Server[%s] gave a non load/drop dataSegmentChangeRequest[%s], Ignored.", @@ -585,7 +585,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer }; } - private void addSegment(DataSegment segment) + private void addSegment(DataSegment segment, boolean fullSync) { if (finalPredicate.apply(Pair.of(druidServer.getMetadata(), segment))) { if (druidServer.getSegment(segment.getId()) == null) { @@ -601,7 +601,8 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer } } ); - } else { + } else if (!fullSync) { + // duplicates can happen when doing a full sync from a 'reset', so only warn for dupes on delta changes log.warn( "Not adding or running callbacks for existing segment[%s] on server[%s]", segment.getId(), @@ -611,7 +612,7 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer } } - private void removeSegment(final DataSegment segment) + private void removeSegment(final DataSegment segment, boolean fullSync) { if (druidServer.removeDataSegment(segment.getId()) != null) { runSegmentCallbacks( @@ -624,7 +625,8 @@ public class HttpServerInventoryView implements ServerInventoryView, FilteredSer } } ); - } else { + } else if (!fullSync) { + // duplicates can happen when doing a full sync from a 'reset', so only warn for dupes on delta changes log.warn( "Not running cleanup or callbacks for non-existing segment[%s] on server[%s]", segment.getId(), diff --git a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java index 527a3d4bf70..f408edd28b2 100644 --- a/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java +++ b/sql/src/main/java/org/apache/druid/sql/calcite/schema/SystemSchema.java @@ -45,8 +45,8 @@ import org.apache.calcite.schema.Table; import org.apache.calcite.schema.impl.AbstractSchema; import org.apache.calcite.schema.impl.AbstractTable; import org.apache.druid.client.DruidServer; +import org.apache.druid.client.FilteredServerInventoryView; import org.apache.druid.client.ImmutableDruidServer; -import org.apache.druid.client.InventoryView; import org.apache.druid.client.JsonParserIterator; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.coordinator.Coordinator; @@ -206,7 +206,7 @@ public class SystemSchema extends AbstractSchema final DruidSchema druidSchema, final MetadataSegmentView metadataView, final TimelineServerView serverView, - final InventoryView serverInventoryView, + final FilteredServerInventoryView serverInventoryView, final AuthorizerMapper authorizerMapper, final @Coordinator DruidLeaderClient coordinatorDruidLeaderClient, final @IndexingService DruidLeaderClient overlordDruidLeaderClient, @@ -480,13 +480,13 @@ public class SystemSchema extends AbstractSchema private final AuthorizerMapper authorizerMapper; private final DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; - private final InventoryView serverInventoryView; + private final FilteredServerInventoryView serverInventoryView; private final DruidLeaderClient overlordLeaderClient; private final DruidLeaderClient coordinatorLeaderClient; public ServersTable( DruidNodeDiscoveryProvider druidNodeDiscoveryProvider, - InventoryView serverInventoryView, + FilteredServerInventoryView serverInventoryView, AuthorizerMapper authorizerMapper, DruidLeaderClient overlordLeaderClient, DruidLeaderClient coordinatorLeaderClient diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java index 70a5862e3d9..7387fbe2463 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/DruidCalciteSchemaModuleTest.java @@ -28,7 +28,7 @@ import com.google.inject.Key; import com.google.inject.Scopes; import com.google.inject.TypeLiteral; import com.google.inject.name.Names; -import org.apache.druid.client.InventoryView; +import org.apache.druid.client.FilteredServerInventoryView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; @@ -77,7 +77,7 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase @Mock AuthorizerMapper authorizerMapper; @Mock - private InventoryView serverInventoryView; + private FilteredServerInventoryView serverInventoryView; @Mock private DruidLeaderClient coordinatorDruidLeaderClient; @Mock @@ -110,7 +110,7 @@ public class DruidCalciteSchemaModuleTest extends CalciteTestBase binder.bind(ViewManager.class).toInstance(viewManager); binder.bind(Escalator.class).toInstance(escalator); binder.bind(AuthorizerMapper.class).toInstance(authorizerMapper); - binder.bind(InventoryView.class).toInstance(serverInventoryView); + binder.bind(FilteredServerInventoryView.class).toInstance(serverInventoryView); binder.bind(SegmentManager.class).toInstance(segmentManager); binder.bind(DruidLeaderClient.class) .annotatedWith(Coordinator.class) diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java index f07686b7830..8773034b7d6 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/schema/SystemSchemaTest.java @@ -36,10 +36,9 @@ import org.apache.calcite.schema.Table; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.DruidServer; +import org.apache.druid.client.FilteredServerInventoryView; import org.apache.druid.client.ImmutableDruidDataSource; import org.apache.druid.client.ImmutableDruidServer; -import org.apache.druid.client.InventoryView; -import org.apache.druid.client.ServerInventoryView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.common.config.NullHandling; import org.apache.druid.data.input.InputRow; @@ -162,7 +161,7 @@ public class SystemSchemaTest extends CalciteTestBase private static Closer resourceCloser; private MetadataSegmentView metadataView; private DruidNodeDiscoveryProvider druidNodeDiscoveryProvider; - private InventoryView serverInventoryView; + private FilteredServerInventoryView serverInventoryView; @Rule public TemporaryFolder temporaryFolder = new TemporaryFolder(); @@ -260,7 +259,7 @@ public class SystemSchemaTest extends CalciteTestBase druidSchema.awaitInitialization(); metadataView = EasyMock.createMock(MetadataSegmentView.class); druidNodeDiscoveryProvider = EasyMock.createMock(DruidNodeDiscoveryProvider.class); - serverInventoryView = EasyMock.createMock(ServerInventoryView.class); + serverInventoryView = EasyMock.createMock(FilteredServerInventoryView.class); schema = new SystemSchema( druidSchema, metadataView, diff --git a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java index 67901a8e450..2f07a5ed52f 100644 --- a/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java +++ b/sql/src/test/java/org/apache/druid/sql/calcite/util/CalciteTests.java @@ -21,6 +21,7 @@ package org.apache.druid.sql.calcite.util; import com.fasterxml.jackson.databind.InjectableValues; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Predicate; import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -35,7 +36,9 @@ import org.apache.calcite.schema.SchemaPlus; import org.apache.druid.client.BrokerInternalQueryConfig; import org.apache.druid.client.BrokerSegmentWatcherConfig; import org.apache.druid.client.DruidServer; +import org.apache.druid.client.FilteredServerInventoryView; import org.apache.druid.client.ServerInventoryView; +import org.apache.druid.client.ServerView; import org.apache.druid.data.input.InputRow; import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.impl.DimensionSchema; @@ -56,6 +59,7 @@ import org.apache.druid.discovery.NodeRole; import org.apache.druid.guice.ExpressionModule; import org.apache.druid.guice.annotations.Json; import org.apache.druid.java.util.common.DateTimes; +import org.apache.druid.java.util.common.Pair; import org.apache.druid.java.util.emitter.core.NoopEmitter; import org.apache.druid.java.util.emitter.service.ServiceEmitter; import org.apache.druid.java.util.http.client.HttpClient; @@ -100,6 +104,7 @@ import org.apache.druid.server.QueryLifecycleFactory; import org.apache.druid.server.QueryScheduler; import org.apache.druid.server.QueryStackTests; import org.apache.druid.server.SegmentManager; +import org.apache.druid.server.coordination.DruidServerMetadata; import org.apache.druid.server.log.NoopRequestLogger; import org.apache.druid.server.security.Access; import org.apache.druid.server.security.AllowAllAuthenticator; @@ -1347,7 +1352,7 @@ public class CalciteTests /** * A fake {@link ServerInventoryView} for {@link #createMockSystemSchema}. */ - private static class FakeServerInventoryView implements ServerInventoryView + private static class FakeServerInventoryView implements FilteredServerInventoryView { @Nullable @Override @@ -1375,13 +1380,20 @@ public class CalciteTests } @Override - public void registerServerRemovedCallback(Executor exec, ServerRemovedCallback callback) + public void registerSegmentCallback( + Executor exec, + ServerView.SegmentCallback callback, + Predicate> filter + ) { throw new UnsupportedOperationException(); } @Override - public void registerSegmentCallback(Executor exec, SegmentCallback callback) + public void registerServerRemovedCallback( + Executor exec, + ServerView.ServerRemovedCallback callback + ) { throw new UnsupportedOperationException(); } diff --git a/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java b/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java index e80ae48cb86..4da433e46e6 100644 --- a/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java +++ b/sql/src/test/java/org/apache/druid/sql/guice/SqlModuleTest.java @@ -28,7 +28,7 @@ import com.google.inject.Injector; import com.google.inject.Key; import com.google.inject.Module; import com.google.inject.TypeLiteral; -import org.apache.druid.client.InventoryView; +import org.apache.druid.client.FilteredServerInventoryView; import org.apache.druid.client.TimelineServerView; import org.apache.druid.client.coordinator.Coordinator; import org.apache.druid.client.indexing.IndexingService; @@ -85,7 +85,7 @@ public class SqlModuleTest private ServiceEmitter serviceEmitter; @Mock - private InventoryView inventoryView; + private FilteredServerInventoryView inventoryView; @Mock private TimelineServerView timelineServerView; @@ -183,7 +183,7 @@ public class SqlModuleTest binder.bind(ServiceEmitter.class).toInstance(serviceEmitter); binder.bind(RequestLogger.class).toInstance(new NoopRequestLogger()); binder.bind(new TypeLiteral>(){}).toInstance(Suppliers.ofInstance(new DefaultQueryConfig(null))); - binder.bind(InventoryView.class).toInstance(inventoryView); + binder.bind(FilteredServerInventoryView.class).toInstance(inventoryView); binder.bind(TimelineServerView.class).toInstance(timelineServerView); binder.bind(DruidLeaderClient.class).annotatedWith(Coordinator.class).toInstance(druidLeaderClient); binder.bind(DruidLeaderClient.class).annotatedWith(IndexingService.class).toInstance(druidLeaderClient);