Make sure updating coordinator config is protected against race condition (#11144)

* Make sure changing coordinator config is protected against concurrent updates

* Make sure updating coordinator config is protected against race condition

* add retry

* fix checkstyle

* add tests

* add tests

* add more tests

* add tests

* fix

* fix checkstyle
This commit is contained in:
Maytas Monsereenusorn 2021-05-10 13:58:08 -07:00 committed by GitHub
parent f6662b4893
commit 3a660bc6ee
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 676 additions and 73 deletions

View File

@ -20,15 +20,18 @@
package org.apache.druid.common.config; package org.apache.druid.common.config;
import com.google.common.base.Supplier; import com.google.common.base.Supplier;
import com.google.common.collect.ImmutableList;
import com.google.inject.Inject; import com.google.inject.Inject;
import org.apache.druid.java.util.common.concurrent.ScheduledExecutors; import org.apache.druid.java.util.common.concurrent.ScheduledExecutors;
import org.apache.druid.java.util.common.lifecycle.LifecycleStart; import org.apache.druid.java.util.common.lifecycle.LifecycleStart;
import org.apache.druid.java.util.common.lifecycle.LifecycleStop; import org.apache.druid.java.util.common.lifecycle.LifecycleStop;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.metadata.MetadataCASUpdate;
import org.apache.druid.metadata.MetadataStorageConnector; import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.joda.time.Duration; import org.joda.time.Duration;
import javax.annotation.Nonnull;
import javax.annotation.Nullable; import javax.annotation.Nullable;
import java.util.Arrays; import java.util.Arrays;
import java.util.Map; import java.util.Map;
@ -168,57 +171,87 @@ public class ConfigManager
return holder.getReference(); return holder.getReference();
} }
public <T> SetResult set(final String key, final ConfigSerde<T> serde, final T obj) public <T> SetResult set(final String key, final ConfigSerde<T> serde, final T obj)
{ {
if (obj == null || !started) { return set(key, serde, null, obj);
if (obj == null) { }
return SetResult.fail(new IllegalAccessException("input obj is null"));
public <T> SetResult set(final String key, final ConfigSerde<T> serde, @Nullable final T oldObject, final T newObject)
{
if (newObject == null || !started) {
if (newObject == null) {
return SetResult.fail(new IllegalAccessException("input obj is null"), false);
} else { } else {
return SetResult.fail(new IllegalStateException("configManager is not started yet")); return SetResult.fail(new IllegalStateException("configManager is not started yet"), false);
} }
} }
final byte[] newBytes = serde.serialize(obj); final byte[] newBytes = serde.serialize(newObject);
try { try {
exec.submit( return exec.submit(
() -> { () -> {
dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes); if (oldObject == null) {
dbConnector.insertOrUpdate(configTable, "name", "payload", key, newBytes);
} else {
final byte[] oldBytes = serde.serialize(oldObject);
MetadataCASUpdate metadataCASUpdate = createMetadataCASUpdate(key, oldBytes, newBytes);
boolean success = dbConnector.compareAndSwap(ImmutableList.of(metadataCASUpdate));
if (!success) {
return SetResult.fail(new IllegalStateException("Config value has changed"), true);
}
}
final ConfigHolder configHolder = watchedConfigs.get(key); final ConfigHolder configHolder = watchedConfigs.get(key);
if (configHolder != null) { if (configHolder != null) {
configHolder.swapIfNew(newBytes); configHolder.swapIfNew(newBytes);
} }
return SetResult.ok();
return true;
} }
).get(); ).get();
return SetResult.ok();
} }
catch (Exception e) { catch (Exception e) {
log.warn(e, "Failed to set[%s]", key); log.warn(e, "Failed to set[%s]", key);
return SetResult.fail(e); return SetResult.fail(e, false);
} }
} }
@Nonnull
private MetadataCASUpdate createMetadataCASUpdate(
String keyValue,
byte[] oldValue,
byte[] newValue
)
{
return new MetadataCASUpdate(
configTable,
MetadataStorageConnector.CONFIG_TABLE_KEY_COLUMN,
MetadataStorageConnector.CONFIG_TABLE_VALUE_COLUMN,
keyValue,
oldValue,
newValue
);
}
public static class SetResult public static class SetResult
{ {
private final Exception exception; private final Exception exception;
private final Boolean retryableException;
public static SetResult ok() public static SetResult ok()
{ {
return new SetResult(null); return new SetResult(null, null);
} }
public static SetResult fail(Exception e) public static SetResult fail(Exception e, boolean retryableException)
{ {
return new SetResult(e); return new SetResult(e, retryableException);
} }
private SetResult(@Nullable Exception exception) private SetResult(@Nullable Exception exception, @Nullable Boolean retryableException)
{ {
this.exception = exception; this.exception = exception;
this.retryableException = retryableException;
} }
public boolean isOk() public boolean isOk()
@ -226,6 +259,11 @@ public class ConfigManager
return exception == null; return exception == null;
} }
public boolean isRetryable()
{
return Boolean.TRUE.equals(retryableException);
}
public Exception getException() public Exception getException()
{ {
return exception; return exception;

View File

@ -31,6 +31,7 @@ import org.apache.druid.guice.annotations.Json;
import org.apache.druid.guice.annotations.JsonNonNull; import org.apache.druid.guice.annotations.JsonNonNull;
import org.apache.druid.java.util.common.jackson.JacksonUtils; import org.apache.druid.java.util.common.jackson.JacksonUtils;
import javax.annotation.Nullable;
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
@ -72,13 +73,40 @@ public class JacksonConfigManager
return configManager.watchConfig(key, create(clazz, defaultVal)); return configManager.watchConfig(key, create(clazz, defaultVal));
} }
/**
* Set the config and add audit entry
*
* @param key of the config to set
* @param val new config value to insert
* @param auditInfo metadata regarding the change to config, for audit purposes
*/
public <T> SetResult set(String key, T val, AuditInfo auditInfo) public <T> SetResult set(String key, T val, AuditInfo auditInfo)
{ {
ConfigSerde configSerde = create(val.getClass(), null); return set(key, null, val, auditInfo);
}
/**
* Set the config and add audit entry
*
* @param key of the config to set
* @param oldValue old config value. If not null, then the update will only succeed if the insert
* happens when current database entry is the same as this value. If null, then the insert
* will not consider the current database entry.
* @param newValue new config value to insert
* @param auditInfo metadata regarding the change to config, for audit purposes
*/
public <T> SetResult set(
String key,
@Nullable T oldValue,
T newValue,
AuditInfo auditInfo
)
{
ConfigSerde configSerde = create(newValue.getClass(), null);
// Audit and actual config change are done in separate transactions // Audit and actual config change are done in separate transactions
// there can be phantom audits and reOrdering in audit changes as well. // there can be phantom audits and reOrdering in audit changes as well.
auditManager.doAudit(key, key, auditInfo, val, configSerde); auditManager.doAudit(key, key, auditInfo, newValue, configSerde);
return configManager.set(key, configSerde, val); return configManager.set(key, configSerde, oldValue, newValue);
} }
@VisibleForTesting @VisibleForTesting

View File

@ -0,0 +1,174 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.common.config;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Suppliers;
import org.apache.druid.audit.AuditManager;
import org.apache.druid.metadata.MetadataCASUpdate;
import org.apache.druid.metadata.MetadataStorageConnector;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import java.util.List;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.when;
@RunWith(MockitoJUnitRunner.class)
public class ConfigManagerTest
{
private static final String CONFIG_KEY = "configX";
private static final String TABLE_NAME = "config_table";
private static final TestConfig OLD_CONFIG = new TestConfig("1", "x", 1);
private static final TestConfig NEW_CONFIG = new TestConfig("2", "y", 2);
@Mock
private MetadataStorageConnector mockDbConnector;
@Mock
private MetadataStorageTablesConfig mockMetadataStorageTablesConfig;
@Mock
private AuditManager mockAuditManager;
@Mock
private ConfigManagerConfig mockConfigManagerConfig;
private ConfigSerde<TestConfig> configConfigSerdeFromClass;
private ConfigManager configManager;
private JacksonConfigManager jacksonConfigManager;
@Before
public void setup()
{
when(mockMetadataStorageTablesConfig.getConfigTable()).thenReturn(TABLE_NAME);
when(mockConfigManagerConfig.getPollDuration()).thenReturn(new Period());
configManager = new ConfigManager(mockDbConnector, Suppliers.ofInstance(mockMetadataStorageTablesConfig), Suppliers.ofInstance(mockConfigManagerConfig));
jacksonConfigManager = new JacksonConfigManager(
configManager,
new ObjectMapper(),
new ObjectMapper().setSerializationInclusion(JsonInclude.Include.NON_NULL),
mockAuditManager
);
configConfigSerdeFromClass = jacksonConfigManager.create(TestConfig.class, null);
}
@Test
public void testSetNewObjectIsNull()
{
ConfigManager.SetResult setResult = configManager.set(CONFIG_KEY, configConfigSerdeFromClass, null);
Assert.assertFalse(setResult.isOk());
Assert.assertFalse(setResult.isRetryable());
Assert.assertTrue(setResult.getException() instanceof IllegalAccessException);
}
@Test
public void testSetConfigManagerNotStarted()
{
ConfigManager.SetResult setResult = configManager.set(CONFIG_KEY, configConfigSerdeFromClass, NEW_CONFIG);
Assert.assertFalse(setResult.isOk());
Assert.assertFalse(setResult.isRetryable());
Assert.assertTrue(setResult.getException() instanceof IllegalStateException);
}
@Test
public void testSetOldObjectNullShouldInsertWithoutSwap()
{
configManager.start();
ConfigManager.SetResult setResult = configManager.set(CONFIG_KEY, configConfigSerdeFromClass, null, NEW_CONFIG);
Assert.assertTrue(setResult.isOk());
Mockito.verify(mockDbConnector).insertOrUpdate(
ArgumentMatchers.eq(TABLE_NAME),
ArgumentMatchers.anyString(),
ArgumentMatchers.anyString(),
ArgumentMatchers.eq(CONFIG_KEY),
ArgumentMatchers.any(byte[].class)
);
Mockito.verifyNoMoreInteractions(mockDbConnector);
}
@Test
public void testSetOldObjectNotNullShouldSwap()
{
when(mockDbConnector.compareAndSwap(any(List.class))).thenReturn(true);
final ArgumentCaptor<List<MetadataCASUpdate>> updateCaptor = ArgumentCaptor.forClass(List.class);
configManager.start();
ConfigManager.SetResult setResult = configManager.set(CONFIG_KEY, configConfigSerdeFromClass, OLD_CONFIG, NEW_CONFIG);
Assert.assertTrue(setResult.isOk());
Mockito.verify(mockDbConnector).compareAndSwap(
updateCaptor.capture()
);
Mockito.verifyNoMoreInteractions(mockDbConnector);
Assert.assertEquals(1, updateCaptor.getValue().size());
Assert.assertEquals(TABLE_NAME, updateCaptor.getValue().get(0).getTableName());
Assert.assertEquals(MetadataStorageConnector.CONFIG_TABLE_KEY_COLUMN, updateCaptor.getValue().get(0).getKeyColumn());
Assert.assertEquals(MetadataStorageConnector.CONFIG_TABLE_VALUE_COLUMN, updateCaptor.getValue().get(0).getValueColumn());
Assert.assertEquals(CONFIG_KEY, updateCaptor.getValue().get(0).getKey());
Assert.assertArrayEquals(configConfigSerdeFromClass.serialize(OLD_CONFIG), updateCaptor.getValue().get(0).getOldValue());
Assert.assertArrayEquals(configConfigSerdeFromClass.serialize(NEW_CONFIG), updateCaptor.getValue().get(0).getNewValue());
}
static class TestConfig
{
private final String version;
private final String settingString;
private final int settingInt;
@JsonCreator
public TestConfig(
@JsonProperty("version") String version,
@JsonProperty("settingString") String settingString,
@JsonProperty("settingInt") int settingInt
)
{
this.version = version;
this.settingString = settingString;
this.settingInt = settingInt;
}
public String getVersion()
{
return version;
}
public String getSettingString()
{
return settingString;
}
public int getSettingInt()
{
return settingInt;
}
}
}

View File

@ -19,6 +19,7 @@
package org.apache.druid.server.http; package org.apache.druid.server.http;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject; import com.google.inject.Inject;
@ -46,6 +47,9 @@ import javax.ws.rs.core.Context;
import javax.ws.rs.core.MediaType; import javax.ws.rs.core.MediaType;
import javax.ws.rs.core.Response; import javax.ws.rs.core.Response;
import java.util.Map; import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ThreadLocalRandom;
import java.util.function.Function; import java.util.function.Function;
import java.util.stream.Collectors; import java.util.stream.Collectors;
@ -53,6 +57,9 @@ import java.util.stream.Collectors;
@ResourceFilters(ConfigResourceFilter.class) @ResourceFilters(ConfigResourceFilter.class)
public class CoordinatorCompactionConfigsResource public class CoordinatorCompactionConfigsResource
{ {
private static final long UPDATE_RETRY_DELAY = 1000;
static final int UPDATE_NUM_RETRY = 5;
private final JacksonConfigManager manager; private final JacksonConfigManager manager;
@Inject @Inject
@ -79,27 +86,24 @@ public class CoordinatorCompactionConfigsResource
@Context HttpServletRequest req @Context HttpServletRequest req
) )
{ {
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager); Callable<SetResult> callable = () -> {
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager);
final CoordinatorCompactionConfig newCompactionConfig = CoordinatorCompactionConfig.from( final CoordinatorCompactionConfig newCompactionConfig = CoordinatorCompactionConfig.from(
current, current,
compactionTaskSlotRatio, compactionTaskSlotRatio,
maxCompactionTaskSlots maxCompactionTaskSlots
); );
final SetResult setResult = manager.set( return manager.set(
CoordinatorCompactionConfig.CONFIG_KEY, CoordinatorCompactionConfig.CONFIG_KEY,
newCompactionConfig, // Do database insert without swap if the current config is empty as this means the config may be null in the database
new AuditInfo(author, comment, req.getRemoteAddr()) CoordinatorCompactionConfig.empty().equals(current) ? null : current,
); newCompactionConfig,
new AuditInfo(author, comment, req.getRemoteAddr())
if (setResult.isOk()) { );
return Response.ok().build(); };
} else { return updateConfigHelper(callable);
return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", setResult.getException()))
.build();
}
} }
@POST @POST
@ -111,26 +115,25 @@ public class CoordinatorCompactionConfigsResource
@Context HttpServletRequest req @Context HttpServletRequest req
) )
{ {
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager); Callable<SetResult> callable = () -> {
final CoordinatorCompactionConfig newCompactionConfig; final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager);
final Map<String, DataSourceCompactionConfig> newConfigs = current final CoordinatorCompactionConfig newCompactionConfig;
.getCompactionConfigs() final Map<String, DataSourceCompactionConfig> newConfigs = current
.stream() .getCompactionConfigs()
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); .stream()
newConfigs.put(newConfig.getDataSource(), newConfig); .collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
newCompactionConfig = CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(newConfigs.values())); newConfigs.put(newConfig.getDataSource(), newConfig);
newCompactionConfig = CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(newConfigs.values()));
final SetResult setResult = manager.set( return manager.set(
CoordinatorCompactionConfig.CONFIG_KEY, CoordinatorCompactionConfig.CONFIG_KEY,
newCompactionConfig, // Do database insert without swap if the current config is empty as this means the config may be null in the database
new AuditInfo(author, comment, req.getRemoteAddr()) CoordinatorCompactionConfig.empty().equals(current) ? null : current,
); newCompactionConfig,
new AuditInfo(author, comment, req.getRemoteAddr())
if (setResult.isOk()) { );
return Response.ok().build(); };
} else { return updateConfigHelper(callable);
return Response.status(Response.Status.BAD_REQUEST).build();
}
} }
@GET @GET
@ -162,27 +165,68 @@ public class CoordinatorCompactionConfigsResource
@Context HttpServletRequest req @Context HttpServletRequest req
) )
{ {
final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager); Callable<SetResult> callable = () -> {
final Map<String, DataSourceCompactionConfig> configs = current final CoordinatorCompactionConfig current = CoordinatorCompactionConfig.current(manager);
.getCompactionConfigs() final Map<String, DataSourceCompactionConfig> configs = current
.stream() .getCompactionConfigs()
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity())); .stream()
.collect(Collectors.toMap(DataSourceCompactionConfig::getDataSource, Function.identity()));
final DataSourceCompactionConfig config = configs.remove(dataSource); final DataSourceCompactionConfig config = configs.remove(dataSource);
if (config == null) { if (config == null) {
return Response.status(Response.Status.NOT_FOUND).build(); return SetResult.fail(new NoSuchElementException("datasource not found"), false);
}
return manager.set(
CoordinatorCompactionConfig.CONFIG_KEY,
// Do database insert without swap if the current config is empty as this means the config may be null in the database
CoordinatorCompactionConfig.empty().equals(current) ? null : current,
CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(configs.values())),
new AuditInfo(author, comment, req.getRemoteAddr())
);
};
return updateConfigHelper(callable);
}
@VisibleForTesting
Response updateConfigHelper(Callable<SetResult> updateMethod)
{
int attemps = 0;
SetResult setResult = null;
try {
while (attemps < UPDATE_NUM_RETRY) {
setResult = updateMethod.call();
if (setResult.isOk() || !setResult.isRetryable()) {
break;
}
attemps++;
updateRetryDelay();
}
}
catch (Exception e) {
return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
.entity(ImmutableMap.of("error", e))
.build();
} }
final SetResult setResult = manager.set(
CoordinatorCompactionConfig.CONFIG_KEY,
CoordinatorCompactionConfig.from(current, ImmutableList.copyOf(configs.values())),
new AuditInfo(author, comment, req.getRemoteAddr())
);
if (setResult.isOk()) { if (setResult.isOk()) {
return Response.ok().build(); return Response.ok().build();
} else if (setResult.getException() instanceof NoSuchElementException) {
return Response.status(Response.Status.NOT_FOUND).build();
} else { } else {
return Response.status(Response.Status.BAD_REQUEST).build(); return Response.status(Response.Status.BAD_REQUEST)
.entity(ImmutableMap.of("error", setResult.getException()))
.build();
}
}
private void updateRetryDelay()
{
try {
Thread.sleep(ThreadLocalRandom.current().nextLong(UPDATE_RETRY_DELAY));
}
catch (InterruptedException ie) {
throw new RuntimeException(ie);
} }
} }
} }

View File

@ -0,0 +1,319 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.server.http;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.druid.common.config.ConfigManager;
import org.apache.druid.common.config.JacksonConfigManager;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.server.coordinator.CoordinatorCompactionConfig;
import org.apache.druid.server.coordinator.DataSourceCompactionConfig;
import org.apache.druid.server.coordinator.UserCompactionTaskGranularityConfig;
import org.joda.time.Period;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import javax.servlet.http.HttpServletRequest;
import javax.ws.rs.core.Response;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicReference;
@RunWith(MockitoJUnitRunner.class)
public class CoordinatorCompactionConfigsResourceTest
{
private static final DataSourceCompactionConfig OLD_CONFIG = new DataSourceCompactionConfig(
"oldDataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
null,
ImmutableMap.of("key", "val")
);
private static final CoordinatorCompactionConfig ORIGINAL_CONFIG = CoordinatorCompactionConfig.from(ImmutableList.of(OLD_CONFIG));
@Mock
private JacksonConfigManager mockJacksonConfigManager;
@Mock
private HttpServletRequest mockHttpServletRequest;
private CoordinatorCompactionConfigsResource coordinatorCompactionConfigsResource;
@Before
public void setup()
{
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(ORIGINAL_CONFIG));
coordinatorCompactionConfigsResource = new CoordinatorCompactionConfigsResource(mockJacksonConfigManager);
Mockito.when(mockHttpServletRequest.getRemoteAddr()).thenReturn("123");
}
@Test
public void testSetCompactionTaskLimitWithExistingConfig()
{
final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any())
).thenReturn(ConfigManager.SetResult.ok());
double compactionTaskSlotRatio = 0.5;
int maxCompactionTaskSlots = 9;
String author = "maytas";
String comment = "hello";
Response result = coordinatorCompactionConfigsResource.setCompactionTaskLimit(
compactionTaskSlotRatio,
maxCompactionTaskSlots,
author,
comment,
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNotNull(oldConfigCaptor.getValue());
Assert.assertEquals(oldConfigCaptor.getValue(), ORIGINAL_CONFIG);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
}
@Test
public void testAddOrUpdateCompactionConfigWithExistingConfig()
{
final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any())
).thenReturn(ConfigManager.SetResult.ok());
final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
null,
ImmutableMap.of("key", "val")
);
String author = "maytas";
String comment = "hello";
Response result = coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
newConfig,
author,
comment,
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNotNull(oldConfigCaptor.getValue());
Assert.assertEquals(oldConfigCaptor.getValue(), ORIGINAL_CONFIG);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(2, newConfigCaptor.getValue().getCompactionConfigs().size());
Assert.assertEquals(OLD_CONFIG, newConfigCaptor.getValue().getCompactionConfigs().get(0));
Assert.assertEquals(newConfig, newConfigCaptor.getValue().getCompactionConfigs().get(1));
}
@Test
public void testDeleteCompactionConfigWithExistingConfig()
{
final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any())
).thenReturn(ConfigManager.SetResult.ok());
final String datasourceName = "dataSource";
final DataSourceCompactionConfig toDelete = new DataSourceCompactionConfig(
datasourceName,
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
null,
ImmutableMap.of("key", "val")
);
final CoordinatorCompactionConfig originalConfig = CoordinatorCompactionConfig.from(ImmutableList.of(toDelete));
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(originalConfig));
String author = "maytas";
String comment = "hello";
Response result = coordinatorCompactionConfigsResource.deleteCompactionConfig(
datasourceName,
author,
comment,
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNotNull(oldConfigCaptor.getValue());
Assert.assertEquals(oldConfigCaptor.getValue(), originalConfig);
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(0, newConfigCaptor.getValue().getCompactionConfigs().size());
}
@Test
public void testUpdateConfigHelperShouldRetryIfRetryableException()
{
MutableInt nunCalled = new MutableInt(0);
Callable<ConfigManager.SetResult> callable = () -> {
nunCalled.increment();
return ConfigManager.SetResult.fail(new Exception(), true);
};
coordinatorCompactionConfigsResource.updateConfigHelper(callable);
Assert.assertEquals(CoordinatorCompactionConfigsResource.UPDATE_NUM_RETRY, (int) nunCalled.getValue());
}
@Test
public void testUpdateConfigHelperShouldNotRetryIfNotRetryableException()
{
MutableInt nunCalled = new MutableInt(0);
Callable<ConfigManager.SetResult> callable = () -> {
nunCalled.increment();
return ConfigManager.SetResult.fail(new Exception(), false);
};
coordinatorCompactionConfigsResource.updateConfigHelper(callable);
Assert.assertEquals(1, (int) nunCalled.getValue());
}
@Test
public void testSetCompactionTaskLimitWithoutExistingConfig()
{
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any())
).thenReturn(ConfigManager.SetResult.ok());
double compactionTaskSlotRatio = 0.5;
int maxCompactionTaskSlots = 9;
String author = "maytas";
String comment = "hello";
Response result = coordinatorCompactionConfigsResource.setCompactionTaskLimit(
compactionTaskSlotRatio,
maxCompactionTaskSlots,
author,
comment,
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNull(oldConfigCaptor.getValue());
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(newConfigCaptor.getValue().getMaxCompactionTaskSlots(), maxCompactionTaskSlots);
Assert.assertEquals(compactionTaskSlotRatio, newConfigCaptor.getValue().getCompactionTaskSlotRatio(), 0);
}
@Test
public void testAddOrUpdateCompactionConfigWithoutExistingConfig()
{
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
final ArgumentCaptor<CoordinatorCompactionConfig> oldConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
final ArgumentCaptor<CoordinatorCompactionConfig> newConfigCaptor = ArgumentCaptor.forClass(CoordinatorCompactionConfig.class);
Mockito.when(mockJacksonConfigManager.set(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
oldConfigCaptor.capture(),
newConfigCaptor.capture(),
ArgumentMatchers.any())
).thenReturn(ConfigManager.SetResult.ok());
final DataSourceCompactionConfig newConfig = new DataSourceCompactionConfig(
"dataSource",
null,
500L,
null,
new Period(3600),
null,
new UserCompactionTaskGranularityConfig(Granularities.HOUR, null),
null,
ImmutableMap.of("key", "val")
);
String author = "maytas";
String comment = "hello";
Response result = coordinatorCompactionConfigsResource.addOrUpdateCompactionConfig(
newConfig,
author,
comment,
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.OK.getStatusCode(), result.getStatus());
Assert.assertNull(oldConfigCaptor.getValue());
Assert.assertNotNull(newConfigCaptor.getValue());
Assert.assertEquals(1, newConfigCaptor.getValue().getCompactionConfigs().size());
Assert.assertEquals(newConfig, newConfigCaptor.getValue().getCompactionConfigs().get(0));
}
@Test
public void testDeleteCompactionConfigWithoutExistingConfigShouldFailAsDatasourceNotExist()
{
Mockito.when(mockJacksonConfigManager.watch(
ArgumentMatchers.eq(CoordinatorCompactionConfig.CONFIG_KEY),
ArgumentMatchers.eq(CoordinatorCompactionConfig.class),
ArgumentMatchers.eq(CoordinatorCompactionConfig.empty()))
).thenReturn(new AtomicReference<>(CoordinatorCompactionConfig.empty()));
String author = "maytas";
String comment = "hello";
Response result = coordinatorCompactionConfigsResource.deleteCompactionConfig(
"notExist",
author,
comment,
mockHttpServletRequest
);
Assert.assertEquals(Response.Status.NOT_FOUND.getStatusCode(), result.getStatus());
}
}