Expose JoinableFactory through Guice Bindings (#9271)

* Make JoinableFactory an extension point

This change makes it so that extensions can register a JoinableFactory that
should be used for a DataSource.

Extensions can provide the factories via DruidBinders#joinableFactoryBinder
Known DataSources - like InlineDataSource are provided in the
JoinableFactoryModule. This module installs a FactoryWarehouse that is
used to decide which factory should be used to generate the Joinable for
the provided DataSource.

The ExtensionPoint is marked as Beta since it is not yet clear if this
needs to remain available to other extensions or if the best way to
register a factory is by using the datasource class.

* Add module test

* remove useless bindings in test

* remove ExtensionPoint annotation

* Make LifecycleLock not final to help with testing
This commit is contained in:
Suneet Saldanha 2020-01-28 13:59:06 -08:00 committed by Chi Cao Minh
parent 14253c63d6
commit 0ccfe5ca89
11 changed files with 350 additions and 39 deletions

View File

@ -66,7 +66,7 @@ import java.util.concurrent.locks.AbstractQueuedSynchronizer;
* }
* }
*/
public final class LifecycleLock
public class LifecycleLock
{
private static class Sync extends AbstractQueuedSynchronizer
{

View File

@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import com.google.inject.Inject;
import org.apache.druid.query.DataSource;
import java.util.IdentityHashMap;
import java.util.Map;
import java.util.Optional;
/**
* A {@link JoinableFactory} that delegates to the appropriate factory based on the type of the datasource.
*
* Datasources can register a factory via a DruidBinder
*/
public class MapDataSourceJoinableFactoryWarehouse implements JoinableFactory
{
private final Map<Class<? extends DataSource>, JoinableFactory> joinableFactories;
@Inject
MapDataSourceJoinableFactoryWarehouse(Map<Class<? extends DataSource>, JoinableFactory> joinableFactories)
{
// Accesses to IdentityHashMap should be faster than to HashMap or ImmutableMap.
// Class doesn't override Object.equals().
this.joinableFactories = new IdentityHashMap<>(joinableFactories);
}
@Override
public Optional<Joinable> build(DataSource dataSource, JoinConditionAnalysis condition)
{
JoinableFactory factory = joinableFactories.get(dataSource.getClass());
if (factory == null) {
return Optional.empty();
} else {
return factory.build(dataSource, condition);
}
}
}

View File

@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.segment.join;
import com.google.common.collect.ImmutableMap;
import org.apache.druid.query.InlineDataSource;
import org.easymock.EasyMock;
import org.easymock.EasyMockRunner;
import org.easymock.Mock;
import org.easymock.MockType;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import java.util.Optional;
@RunWith(EasyMockRunner.class)
public class MapDataSourceJoinableFactoryWarehouseTest
{
@Mock
private InlineDataSource inlineDataSource;
@Mock(MockType.NICE)
private JoinableFactory noopJoinableFactory;
private NoopDataSource noopDataSource;
@Mock
private JoinConditionAnalysis condition;
@Mock
private Joinable mockJoinable;
private MapDataSourceJoinableFactoryWarehouse target;
@Before
public void setUp()
{
noopDataSource = new NoopDataSource();
target = new MapDataSourceJoinableFactoryWarehouse(
ImmutableMap.of(NoopDataSource.class, noopJoinableFactory));
}
@Test
public void testBuildDataSourceNotRegisteredShouldReturnAbsent()
{
Optional<Joinable> joinable = target.build(inlineDataSource, condition);
Assert.assertFalse(joinable.isPresent());
}
@Test
public void testBuildDataSourceIsRegisteredAndFactoryDoesNotBuildJoinableShouldReturnAbsent()
{
EasyMock.expect(noopJoinableFactory.build(noopDataSource, condition)).andReturn(Optional.empty());
EasyMock.replay(noopJoinableFactory);
Optional<Joinable> joinable = target.build(noopDataSource, condition);
Assert.assertFalse(joinable.isPresent());
}
@Test
public void testBuildDataSourceIsRegisteredShouldReturnJoinableFromFactory()
{
EasyMock.expect(noopJoinableFactory.build(noopDataSource, condition)).andReturn(Optional.of(mockJoinable));
EasyMock.replay(noopJoinableFactory);
Optional<Joinable> joinable = target.build(noopDataSource, condition);
Assert.assertEquals(mockJoinable, joinable.get());
}
}

View File

@ -19,34 +19,49 @@
package org.apache.druid.segment.join;
import com.google.inject.Inject;
import org.apache.druid.query.DataSource;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.Set;
public class DefaultJoinableFactory implements JoinableFactory
/**
* A datasource that returns nothing. Only used to test un-registered datasources.
*/
public class NoopDataSource implements DataSource
{
private final List<JoinableFactory> factories;
@Inject
public DefaultJoinableFactory(final InlineJoinableFactory inlineJoinableFactory)
@Override
public Set<String> getTableNames()
{
// Just one right now, but we expect there to be more in the future, and maybe even an extension mechanism.
this.factories = Collections.singletonList(inlineJoinableFactory);
return null;
}
@Override
public Optional<Joinable> build(final DataSource dataSource, final JoinConditionAnalysis condition)
public List<DataSource> getChildren()
{
for (JoinableFactory factory : factories) {
final Optional<Joinable> maybeJoinable = factory.build(dataSource, condition);
if (maybeJoinable.isPresent()) {
return maybeJoinable;
}
}
return null;
}
return Optional.empty();
@Override
public DataSource withChildren(List<DataSource> children)
{
return null;
}
@Override
public boolean isCacheable()
{
return false;
}
@Override
public boolean isGlobal()
{
return false;
}
@Override
public boolean isConcrete()
{
return false;
}
}

View File

@ -24,9 +24,11 @@ import com.google.inject.TypeLiteral;
import com.google.inject.multibindings.MapBinder;
import com.google.inject.multibindings.Multibinder;
import org.apache.druid.java.util.metrics.Monitor;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.Query;
import org.apache.druid.query.QueryRunnerFactory;
import org.apache.druid.query.QueryToolChest;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.DruidNode;
/**
@ -60,4 +62,13 @@ public class DruidBinders
{
return Multibinder.newSetBinder(binder, new TypeLiteral<Class<? extends Monitor>>(){});
}
public static MapBinder<Class<? extends DataSource>, JoinableFactory> joinableFactoryBinder(Binder binder)
{
return MapBinder.newMapBinder(
binder,
new TypeLiteral<Class<? extends DataSource>>() {},
new TypeLiteral<JoinableFactory>() {}
);
}
}

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.multibindings.MapBinder;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.MapDataSourceJoinableFactoryWarehouse;
import java.util.Map;
/**
* Module that installs {@link JoinableFactory} for the appropriate DataSource.
*/
public class JoinableFactoryModule implements Module
{
/**
* Default mappings of datasources to factories.
*/
private static final Map<Class<? extends DataSource>, Class<? extends JoinableFactory>> FACTORY_MAPPINGS =
ImmutableMap.of(InlineDataSource.class, InlineJoinableFactory.class);
@Override
public void configure(Binder binder)
{
MapBinder<Class<? extends DataSource>, JoinableFactory> joinableFactories =
DruidBinders.joinableFactoryBinder(binder);
FACTORY_MAPPINGS.forEach((ds, factory) -> {
joinableFactories.addBinding(ds).to(factory);
binder.bind(factory).in(LazySingleton.class);
});
binder.bind(JoinableFactory.class).to(MapDataSourceJoinableFactoryWarehouse.class)
.in(Scopes.SINGLETON);
}
}

View File

@ -20,10 +20,8 @@
package org.apache.druid.guice;
import com.google.common.collect.ImmutableList;
import com.google.inject.Binder;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Module;
import org.apache.druid.js.JavaScriptConfig;
import org.junit.Assert;
import org.junit.Test;
@ -55,15 +53,10 @@ public class JavaScriptModuleTest
return Guice.createInjector(
ImmutableList.of(
new DruidGuiceExtensions(),
new Module()
{
@Override
public void configure(Binder binder)
{
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
binder.bind(JsonConfigurator.class).in(LazySingleton.class);
binder.bind(Properties.class).toInstance(props);
}
binder -> {
binder.bind(Validator.class).toInstance(Validation.buildDefaultValidatorFactory().getValidator());
binder.bind(JsonConfigurator.class).in(LazySingleton.class);
binder.bind(Properties.class).toInstance(props);
},
new JavaScriptModule()
)

View File

@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.guice;
import com.google.common.collect.ImmutableList;
import com.google.inject.Guice;
import com.google.inject.Injector;
import com.google.inject.Key;
import com.google.inject.Module;
import com.google.inject.Scopes;
import com.google.inject.TypeLiteral;
import org.apache.druid.query.DataSource;
import org.apache.druid.query.InlineDataSource;
import org.apache.druid.segment.join.InlineJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.join.MapDataSourceJoinableFactoryWarehouse;
import org.apache.druid.segment.join.NoopDataSource;
import org.apache.druid.segment.join.NoopJoinableFactory;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.Map;
public class JoinableFactoryModuleTest
{
private Injector injector;
@Before
public void setUp()
{
injector = makeInjectorWithProperties();
}
@Test
public void testInjectJoinableFactoryIsSingleton()
{
JoinableFactory factory = injector.getInstance(JoinableFactory.class);
Assert.assertEquals(MapDataSourceJoinableFactoryWarehouse.class, factory.getClass());
JoinableFactory otherFactory = injector.getInstance(JoinableFactory.class);
Assert.assertSame(factory, otherFactory);
}
@Test
public void testInjectDefaultBindingsShouldBeInjected()
{
Map<Class<? extends DataSource>, JoinableFactory> joinableFactories =
injector.getInstance(Key.get(new TypeLiteral<Map<Class<? extends DataSource>, JoinableFactory>>() {}));
Assert.assertEquals(1, joinableFactories.size());
Assert.assertEquals(InlineJoinableFactory.class, joinableFactories.get(InlineDataSource.class).getClass());
}
@Test
public void testJoinableFactoryCanBind()
{
injector = makeInjectorWithProperties(
binder -> DruidBinders
.joinableFactoryBinder(binder).addBinding(NoopDataSource.class).toInstance(NoopJoinableFactory.INSTANCE));
Map<Class<? extends DataSource>, JoinableFactory> joinableFactories =
injector.getInstance(Key.get(new TypeLiteral<Map<Class<? extends DataSource>, JoinableFactory>>() {}));
Assert.assertEquals(2, joinableFactories.size());
Assert.assertEquals(NoopJoinableFactory.INSTANCE, joinableFactories.get(NoopDataSource.class));
}
private Injector makeInjectorWithProperties(Module... otherModules)
{
ImmutableList.Builder<Module> modulesBuilder =
ImmutableList.<Module>builder()
.add(new JoinableFactoryModule())
.add(binder -> {
binder.bindScope(LazySingleton.class, Scopes.SINGLETON);
});
for (Module otherModule : otherModules) {
modulesBuilder.add(otherModule);
}
return Guice.createInjector(
modulesBuilder.build()
);
}
}

View File

@ -31,6 +31,7 @@ import org.apache.druid.discovery.NodeRole;
import org.apache.druid.guice.CacheModule;
import org.apache.druid.guice.DruidProcessingModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
@ -41,8 +42,6 @@ import org.apache.druid.guice.ServerTypeConfig;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.join.DefaultJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.server.QueryResource;
import org.apache.druid.server.SegmentManager;
import org.apache.druid.server.coordination.ServerManager;
@ -78,6 +77,7 @@ public class CliHistorical extends ServerRunnable
new DruidProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
new JoinableFactoryModule(),
binder -> {
binder.bindConstant().annotatedWith(Names.named("serviceName")).to("druid/historical");
binder.bindConstant().annotatedWith(Names.named("servicePort")).to(8083);
@ -90,7 +90,6 @@ public class CliHistorical extends ServerRunnable
binder.bind(SegmentManager.class).in(LazySingleton.class);
binder.bind(ZkCoordinator.class).in(ManageLifecycle.class);
binder.bind(QuerySegmentWalker.class).to(ServerManager.class).in(LazySingleton.class);
binder.bind(JoinableFactory.class).to(DefaultJoinableFactory.class).in(LazySingleton.class);
binder.bind(ServerTypeConfig.class).toInstance(new ServerTypeConfig(ServerType.HISTORICAL));
binder.bind(JettyServerInitializer.class).to(QueryJettyServerInitializer.class).in(LazySingleton.class);

View File

@ -39,6 +39,7 @@ import org.apache.druid.guice.IndexingServiceModuleHelper;
import org.apache.druid.guice.IndexingServiceTaskLogsModule;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
@ -59,8 +60,6 @@ import org.apache.druid.indexing.worker.http.ShuffleResource;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.join.DefaultJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.realtime.CliIndexerDataSegmentServerAnnouncerLifecycleHandler;
import org.apache.druid.segment.realtime.appenderator.AppenderatorsManager;
import org.apache.druid.segment.realtime.appenderator.UnifiedIndexerAppenderatorsManager;
@ -101,6 +100,7 @@ public class CliIndexer extends ServerRunnable
new DruidProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
new JoinableFactoryModule(),
new Module()
{
@Override
@ -121,7 +121,6 @@ public class CliIndexer extends ServerRunnable
binder.bind(TaskRunner.class).to(ThreadingTaskRunner.class);
binder.bind(QuerySegmentWalker.class).to(ThreadingTaskRunner.class);
binder.bind(JoinableFactory.class).to(DefaultJoinableFactory.class).in(LazySingleton.class);
binder.bind(ThreadingTaskRunner.class).in(LazySingleton.class);
CliPeon.bindRowIngestionMeters(binder);

View File

@ -48,6 +48,7 @@ import org.apache.druid.guice.IndexingServiceFirehoseModule;
import org.apache.druid.guice.IndexingServiceInputSourceModule;
import org.apache.druid.guice.IndexingServiceTuningConfigModule;
import org.apache.druid.guice.Jerseys;
import org.apache.druid.guice.JoinableFactoryModule;
import org.apache.druid.guice.JsonConfigProvider;
import org.apache.druid.guice.LazySingleton;
import org.apache.druid.guice.LifecycleModule;
@ -91,8 +92,6 @@ import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.IndexerSQLMetadataStorageCoordinator;
import org.apache.druid.query.QuerySegmentWalker;
import org.apache.druid.query.lookup.LookupModule;
import org.apache.druid.segment.join.DefaultJoinableFactory;
import org.apache.druid.segment.join.JoinableFactory;
import org.apache.druid.segment.loading.DataSegmentArchiver;
import org.apache.druid.segment.loading.DataSegmentKiller;
import org.apache.druid.segment.loading.DataSegmentMover;
@ -170,6 +169,7 @@ public class CliPeon extends GuiceRunnable
new DruidProcessingModule(),
new QueryableModule(),
new QueryRunnerFactoryModule(),
new JoinableFactoryModule(),
new Module()
{
@Override
@ -209,7 +209,6 @@ public class CliPeon extends GuiceRunnable
binder.bind(TaskRunner.class).to(SingleTaskBackgroundRunner.class);
binder.bind(QuerySegmentWalker.class).to(SingleTaskBackgroundRunner.class);
binder.bind(JoinableFactory.class).to(DefaultJoinableFactory.class).in(LazySingleton.class);
binder.bind(SingleTaskBackgroundRunner.class).in(ManageLifecycle.class);
bindRealtimeCache(binder);