mirror of https://github.com/apache/druid.git
Fix loading lookup extension (#17212)
We introduce the option to iterate over fetched data from the dataFetcher for loadingLookups in the lookups-cached-single extension. Also, added the handling of a use case where the data exists in Druid but not in the actual data fetcher, which is in our use-case JDBC Data fetcher, where the value returned is null. Signed-off-by: TessaIO <ahmedgrati1999@gmail.com>
This commit is contained in:
parent
f80e2c229e
commit
a9f582711e
|
@ -28,8 +28,10 @@ import org.apache.druid.server.lookup.cache.loading.LoadingCache;
|
|||
|
||||
import javax.annotation.Nullable;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
@ -73,15 +75,19 @@ public class LoadingLookup extends LookupExtractor
|
|||
return null;
|
||||
}
|
||||
|
||||
final String presentVal;
|
||||
try {
|
||||
presentVal = loadingCache.get(keyEquivalent, new ApplyCallable(keyEquivalent));
|
||||
final String presentVal = this.loadingCache.getIfPresent(keyEquivalent);
|
||||
if (presentVal != null) {
|
||||
return NullHandling.emptyToNullIfNeeded(presentVal);
|
||||
}
|
||||
catch (ExecutionException e) {
|
||||
LOGGER.debug("value not found for key [%s]", key);
|
||||
|
||||
final String val = this.dataFetcher.fetch(keyEquivalent);
|
||||
if (val == null) {
|
||||
return null;
|
||||
}
|
||||
|
||||
this.loadingCache.putAll(Collections.singletonMap(keyEquivalent, val));
|
||||
|
||||
return NullHandling.emptyToNullIfNeeded(val);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -108,13 +114,16 @@ public class LoadingLookup extends LookupExtractor
|
|||
@Override
|
||||
public boolean supportsAsMap()
|
||||
{
|
||||
return false;
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<String, String> asMap()
|
||||
{
|
||||
throw new UnsupportedOperationException("Cannot get map view");
|
||||
final Map<String, String> map = new HashMap<>();
|
||||
Optional.ofNullable(this.dataFetcher.fetchAll())
|
||||
.ifPresent(data -> data.forEach(entry -> map.put(entry.getKey(), entry.getValue())));
|
||||
return map;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -123,24 +132,6 @@ public class LoadingLookup extends LookupExtractor
|
|||
return LookupExtractionModule.getRandomCacheKey();
|
||||
}
|
||||
|
||||
private class ApplyCallable implements Callable<String>
|
||||
{
|
||||
private final String key;
|
||||
|
||||
public ApplyCallable(String key)
|
||||
{
|
||||
this.key = key;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String call()
|
||||
{
|
||||
// When SQL compatible null handling is disabled,
|
||||
// avoid returning null and return an empty string to cache it.
|
||||
return NullHandling.nullToEmptyIfNeeded(dataFetcher.fetch(key));
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void close()
|
||||
{
|
||||
if (isOpen.getAndSet(false)) {
|
||||
|
|
|
@ -33,13 +33,15 @@ import org.junit.rules.ExpectedException;
|
|||
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
|
||||
public class LoadingLookupTest extends InitializedNullHandlingTest
|
||||
{
|
||||
DataFetcher dataFetcher = EasyMock.createMock(DataFetcher.class);
|
||||
LoadingCache lookupCache = EasyMock.createStrictMock(LoadingCache.class);
|
||||
LoadingCache lookupCache = EasyMock.createMock(LoadingCache.class);
|
||||
LoadingCache reverseLookupCache = EasyMock.createStrictMock(LoadingCache.class);
|
||||
LoadingLookup loadingLookup = new LoadingLookup(dataFetcher, lookupCache, reverseLookupCache);
|
||||
|
||||
|
@ -47,9 +49,9 @@ public class LoadingLookupTest extends InitializedNullHandlingTest
|
|||
public ExpectedException expectedException = ExpectedException.none();
|
||||
|
||||
@Test
|
||||
public void testApplyEmptyOrNull() throws ExecutionException
|
||||
public void testApplyEmptyOrNull()
|
||||
{
|
||||
EasyMock.expect(lookupCache.get(EasyMock.eq(""), EasyMock.anyObject(Callable.class)))
|
||||
EasyMock.expect(lookupCache.getIfPresent(EasyMock.eq("")))
|
||||
.andReturn("empty").atLeastOnce();
|
||||
EasyMock.replay(lookupCache);
|
||||
Assert.assertEquals("empty", loadingLookup.apply(""));
|
||||
|
@ -73,14 +75,40 @@ public class LoadingLookupTest extends InitializedNullHandlingTest
|
|||
}
|
||||
|
||||
@Test
|
||||
public void testApply() throws ExecutionException
|
||||
public void testApply()
|
||||
{
|
||||
EasyMock.expect(lookupCache.get(EasyMock.eq("key"), EasyMock.anyObject(Callable.class))).andReturn("value").once();
|
||||
EasyMock.expect(lookupCache.getIfPresent(EasyMock.eq("key"))).andReturn("value").once();
|
||||
EasyMock.replay(lookupCache);
|
||||
Assert.assertEquals(ImmutableMap.of("key", "value"), loadingLookup.applyAll(ImmutableSet.of("key")));
|
||||
EasyMock.verify(lookupCache);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testApplyWithNullValue()
|
||||
{
|
||||
EasyMock.expect(lookupCache.getIfPresent(EasyMock.eq("key"))).andReturn(null).once();
|
||||
EasyMock.expect(dataFetcher.fetch("key")).andReturn(null).once();
|
||||
EasyMock.replay(lookupCache, dataFetcher);
|
||||
Assert.assertNull(loadingLookup.apply("key"));
|
||||
EasyMock.verify(lookupCache, dataFetcher);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testApplyTriggersCacheMissAndSubsequentCacheHit()
|
||||
{
|
||||
Map<String, String> map = new HashMap<>();
|
||||
map.put("key", "value");
|
||||
EasyMock.expect(lookupCache.getIfPresent(EasyMock.eq("key"))).andReturn(null).once();
|
||||
EasyMock.expect(dataFetcher.fetch("key")).andReturn("value").once();
|
||||
lookupCache.putAll(map);
|
||||
EasyMock.expectLastCall().andVoid();
|
||||
EasyMock.expect(lookupCache.getIfPresent("key")).andReturn("value").once();
|
||||
EasyMock.replay(lookupCache, dataFetcher);
|
||||
Assert.assertEquals(loadingLookup.apply("key"), "value");
|
||||
Assert.assertEquals(loadingLookup.apply("key"), "value");
|
||||
EasyMock.verify(lookupCache, dataFetcher);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnapplyAll() throws ExecutionException
|
||||
{
|
||||
|
@ -105,17 +133,6 @@ public class LoadingLookupTest extends InitializedNullHandlingTest
|
|||
EasyMock.verify(lookupCache, reverseLookupCache);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testApplyWithExecutionError() throws ExecutionException
|
||||
{
|
||||
EasyMock.expect(lookupCache.get(EasyMock.eq("key"), EasyMock.anyObject(Callable.class)))
|
||||
.andThrow(new ExecutionException(null))
|
||||
.once();
|
||||
EasyMock.replay(lookupCache);
|
||||
Assert.assertNull(loadingLookup.apply("key"));
|
||||
EasyMock.verify(lookupCache);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUnApplyWithExecutionError() throws ExecutionException
|
||||
{
|
||||
|
@ -136,13 +153,19 @@ public class LoadingLookupTest extends InitializedNullHandlingTest
|
|||
@Test
|
||||
public void testSupportsAsMap()
|
||||
{
|
||||
Assert.assertFalse(loadingLookup.supportsAsMap());
|
||||
Assert.assertTrue(loadingLookup.supportsAsMap());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAsMap()
|
||||
{
|
||||
expectedException.expect(UnsupportedOperationException.class);
|
||||
loadingLookup.asMap();
|
||||
final Map<String, String> fetchedData = new HashMap<>();
|
||||
fetchedData.put("dummy", "test");
|
||||
fetchedData.put("key", null);
|
||||
fetchedData.put(null, "value");
|
||||
EasyMock.expect(dataFetcher.fetchAll()).andReturn(fetchedData.entrySet());
|
||||
EasyMock.replay(dataFetcher);
|
||||
Assert.assertEquals(loadingLookup.asMap(), fetchedData);
|
||||
EasyMock.verify(dataFetcher);
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue