mirror of https://github.com/apache/druid.git
Fix the build for #15013.: Lookup jitter upstream build fix (#15103)
Fix the build for #15013.
This commit is contained in:
parent
b5a87fd89b
commit
c7d0615af3
|
@ -352,6 +352,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
|
|||
|`filter`|The filter to use when selecting lookups, this is used to create a where clause on lookup population|No|No Filter|
|
||||
|`tsColumn`| The column in `table` which contains when the key was updated|No|Not used|
|
||||
|`pollPeriod`|How often to poll the DB|No|0 (only once)|
|
||||
|`jitterSeconds`| How much jitter to add (in seconds) up to maximum as a delay (actual value will be used as random from 0 to `jitterSeconds`), used to distribute db load more evenly|No|0|
|
||||
|`maxHeapPercentage`|The maximum percentage of heap size that the lookup should consume. If the lookup grows beyond this size, warning messages will be logged in the respective service logs.|No|10% of JVM heap size|
|
||||
|
||||
```json
|
||||
|
@ -367,6 +368,7 @@ The JDBC lookups will poll a database to populate its local cache. If the `tsCol
|
|||
"valueColumn":"the_new_dim_value",
|
||||
"tsColumn":"timestamp_column",
|
||||
"pollPeriod":600000,
|
||||
"jitterSeconds": 120,
|
||||
"maxHeapPercentage": 10
|
||||
}
|
||||
```
|
||||
|
|
|
@ -41,4 +41,12 @@ public interface ExtractionNamespace
|
|||
{
|
||||
return -1L;
|
||||
}
|
||||
|
||||
// For larger clusters, when they all startup at the same time and have lookups in the db,
|
||||
// it overwhelms the database, this allows implementations to introduce a jitter, which
|
||||
// should spread out the load.
|
||||
default long getJitterMills()
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -34,6 +34,7 @@ import javax.annotation.Nullable;
|
|||
import javax.validation.constraints.Min;
|
||||
import javax.validation.constraints.NotNull;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
/**
|
||||
*
|
||||
|
@ -61,6 +62,8 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
|
|||
private final Period pollPeriod;
|
||||
@JsonProperty
|
||||
private final long maxHeapPercentage;
|
||||
@JsonProperty
|
||||
private final int jitterSeconds;
|
||||
|
||||
@JsonCreator
|
||||
public JdbcExtractionNamespace(
|
||||
|
@ -73,6 +76,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
|
|||
@JsonProperty(value = "filter") @Nullable final String filter,
|
||||
@Min(0) @JsonProperty(value = "pollPeriod") @Nullable final Period pollPeriod,
|
||||
@JsonProperty(value = "maxHeapPercentage") @Nullable final Long maxHeapPercentage,
|
||||
@JsonProperty(value = "jitterSeconds") @Nullable Integer jitterSeconds,
|
||||
@JacksonInject JdbcAccessSecurityConfig securityConfig
|
||||
)
|
||||
{
|
||||
|
@ -95,6 +99,7 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
|
|||
} else {
|
||||
this.pollPeriod = pollPeriod;
|
||||
}
|
||||
this.jitterSeconds = jitterSeconds == null ? 0 : jitterSeconds;
|
||||
this.maxHeapPercentage = maxHeapPercentage == null ? DEFAULT_MAX_HEAP_PERCENTAGE : maxHeapPercentage;
|
||||
}
|
||||
|
||||
|
@ -162,6 +167,15 @@ public class JdbcExtractionNamespace implements ExtractionNamespace
|
|||
return maxHeapPercentage;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getJitterMills()
|
||||
{
|
||||
if (jitterSeconds == 0) {
|
||||
return jitterSeconds;
|
||||
}
|
||||
return 1000L * ThreadLocalRandom.current().nextInt(jitterSeconds + 1);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
|
|
|
@ -180,9 +180,9 @@ public final class CacheScheduler
|
|||
final long updateMs = namespace.getPollMs();
|
||||
Runnable command = this::updateCache;
|
||||
if (updateMs > 0) {
|
||||
return cacheManager.scheduledExecutorService().scheduleAtFixedRate(command, 0, updateMs, TimeUnit.MILLISECONDS);
|
||||
return cacheManager.scheduledExecutorService().scheduleAtFixedRate(command, namespace.getJitterMills(), updateMs, TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
return cacheManager.scheduledExecutorService().schedule(command, 0, TimeUnit.MILLISECONDS);
|
||||
return cacheManager.scheduledExecutorService().schedule(command, namespace.getJitterMills(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -63,7 +63,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
|||
"some filter",
|
||||
new Period(10),
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
0, new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
public Set<String> getAllowedProperties()
|
||||
|
@ -101,6 +101,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
|||
"some filter",
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
|
@ -137,6 +138,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
|||
"some filter",
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
|
@ -175,6 +177,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
|||
"some filter",
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
|
@ -217,6 +220,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
|||
"some filter",
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
|
@ -255,6 +259,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
|||
"some filter",
|
||||
new Period(10),
|
||||
10L,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
|
@ -291,7 +296,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
|||
"some filter",
|
||||
new Period(10),
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
0, new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
public Set<String> getAllowedProperties()
|
||||
|
@ -329,6 +334,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
|||
"some filter",
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
|
@ -373,6 +379,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
|||
"some filter",
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
|
@ -415,6 +422,7 @@ public class JdbcExtractionNamespaceUrlCheckTest
|
|||
"some filter",
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -137,6 +137,7 @@ public class JdbcCacheGeneratorTest
|
|||
"filter",
|
||||
Period.ZERO,
|
||||
null,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
);
|
||||
}
|
||||
|
|
|
@ -458,6 +458,7 @@ public class CacheSchedulerTest
|
|||
"some filter",
|
||||
new Period(10_000),
|
||||
null,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
{
|
||||
@Override
|
||||
|
|
|
@ -328,6 +328,7 @@ public class JdbcExtractionNamespaceTest
|
|||
null,
|
||||
new Period(0),
|
||||
null,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
);
|
||||
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
|
||||
|
@ -361,6 +362,7 @@ public class JdbcExtractionNamespaceTest
|
|||
FILTER_COLUMN + "='1'",
|
||||
new Period(0),
|
||||
null,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
);
|
||||
try (CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace)) {
|
||||
|
@ -399,6 +401,45 @@ public class JdbcExtractionNamespaceTest
|
|||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRandomJitter()
|
||||
{
|
||||
JdbcExtractionNamespace extractionNamespace = new JdbcExtractionNamespace(
|
||||
derbyConnectorRule.getMetadataConnectorConfig(),
|
||||
TABLE_NAME,
|
||||
KEY_NAME,
|
||||
VAL_NAME,
|
||||
tsColumn,
|
||||
FILTER_COLUMN + "='1'",
|
||||
new Period(0),
|
||||
null,
|
||||
120,
|
||||
new JdbcAccessSecurityConfig()
|
||||
);
|
||||
long jitter = extractionNamespace.getJitterMills();
|
||||
// jitter will be a random value between 0 and 120 seconds.
|
||||
Assert.assertTrue(jitter >= 0 && jitter <= 120000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRandomJitterNotSpecified()
|
||||
{
|
||||
JdbcExtractionNamespace extractionNamespace = new JdbcExtractionNamespace(
|
||||
derbyConnectorRule.getMetadataConnectorConfig(),
|
||||
TABLE_NAME,
|
||||
KEY_NAME,
|
||||
VAL_NAME,
|
||||
tsColumn,
|
||||
FILTER_COLUMN + "='1'",
|
||||
new Period(0),
|
||||
null,
|
||||
null,
|
||||
new JdbcAccessSecurityConfig()
|
||||
);
|
||||
// jitter will be a random value between 0 and 120 seconds.
|
||||
Assert.assertEquals(0, extractionNamespace.getJitterMills());
|
||||
}
|
||||
|
||||
@Test(timeout = 60_000L)
|
||||
public void testFindNew()
|
||||
throws InterruptedException
|
||||
|
@ -436,6 +477,7 @@ public class JdbcExtractionNamespaceTest
|
|||
"some filter",
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
securityConfig
|
||||
);
|
||||
final ObjectMapper mapper = new DefaultObjectMapper();
|
||||
|
@ -461,6 +503,7 @@ public class JdbcExtractionNamespaceTest
|
|||
null,
|
||||
new Period(10),
|
||||
null,
|
||||
0,
|
||||
new JdbcAccessSecurityConfig()
|
||||
);
|
||||
CacheScheduler.Entry entry = scheduler.schedule(extractionNamespace);
|
||||
|
|
Loading…
Reference in New Issue