mirror of https://github.com/apache/druid.git
Fix NPE while loading lookups from empty JDBC source (#16307)
This commit is contained in:
parent
960a674442
commit
79e48c6b45
|
@ -204,18 +204,20 @@ public final class JdbcCacheGenerator implements CacheGenerator<JdbcExtractionNa
|
||||||
if (tsColumn == null) {
|
if (tsColumn == null) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
final Timestamp update = dbi.withHandle(
|
final String query = StringUtils.format(
|
||||||
handle -> {
|
"SELECT MAX(%s) FROM %s",
|
||||||
final String query = StringUtils.format(
|
tsColumn, table
|
||||||
"SELECT MAX(%s) FROM %s",
|
|
||||||
tsColumn, table
|
|
||||||
);
|
|
||||||
return handle
|
|
||||||
.createQuery(query)
|
|
||||||
.map(TimestampMapper.FIRST)
|
|
||||||
.first();
|
|
||||||
}
|
|
||||||
);
|
);
|
||||||
|
final Timestamp update = dbi.withHandle(
|
||||||
|
handle -> handle
|
||||||
|
.createQuery(query)
|
||||||
|
.map(TimestampMapper.FIRST)
|
||||||
|
.first()
|
||||||
|
);
|
||||||
|
if (update == null) {
|
||||||
|
LOG.info("Lookup table[%s] is empty. No rows returned for the query[%s].", table, query);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
return update.getTime();
|
return update.getTime();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -121,7 +121,7 @@ public class NamespacedExtractorModuleTest
|
||||||
Assert.assertNotNull(version);
|
Assert.assertNotNull(version);
|
||||||
Map<String, String> map = cache.getCache();
|
Map<String, String> map = cache.getCache();
|
||||||
Assert.assertEquals("bar", map.get("foo"));
|
Assert.assertEquals("bar", map.get("foo"));
|
||||||
Assert.assertEquals(null, map.get("baz"));
|
Assert.assertNull(map.get("baz"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
|
|
@ -390,6 +390,36 @@ public class JdbcExtractionNamespaceTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60_000L)
|
||||||
|
public void testEmptyTable()
|
||||||
|
throws InterruptedException
|
||||||
|
{
|
||||||
|
// Delete existing rows from table.
|
||||||
|
final Handle handle = derbyConnectorRule.getConnector().getDBI().open();
|
||||||
|
handle.createStatement(
|
||||||
|
StringUtils.format("DELETE FROM %s", TABLE_NAME)
|
||||||
|
).setQueryTimeout(1).execute();
|
||||||
|
|
||||||
|
final JdbcExtractionNamespace extractionNamespace = new JdbcExtractionNamespace(
|
||||||
|
derbyConnectorRule.getMetadataConnectorConfig(),
|
||||||
|
TABLE_NAME,
|
||||||
|
KEY_NAME,
|
||||||
|
VAL_NAME,
|
||||||
|
tsColumn,
|
||||||
|
null,
|
||||||
|
new Period(0),
|
||||||
|
null,
|
||||||
|
0,
|
||||||
|
null,
|
||||||
|
new JdbcAccessSecurityConfig()
|
||||||
|
);
|
||||||
|
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
|
||||||
|
CacheSchedulerTest.waitFor(entry);
|
||||||
|
final Map<String, String> map = entry.getCache();
|
||||||
|
Assert.assertTrue(map.isEmpty());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout = 60_000L)
|
@Test(timeout = 60_000L)
|
||||||
public void testSkipOld()
|
public void testSkipOld()
|
||||||
throws InterruptedException
|
throws InterruptedException
|
||||||
|
|
Loading…
Reference in New Issue