auto_expand_replicas causing very large amount of cluster state changes when a node joins or leaves the cluster
Closes #3399
This commit is contained in:
parent
0e3c67cbf8
commit
95855c515b
|
@ -36,9 +36,7 @@ import org.elasticsearch.common.settings.Settings;
|
|||
import org.elasticsearch.common.unit.TimeValue;
|
||||
import org.elasticsearch.index.settings.IndexDynamicSettings;
|
||||
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.*;
|
||||
|
||||
import static org.elasticsearch.cluster.ClusterState.newClusterStateBuilder;
|
||||
|
||||
|
@ -68,6 +66,9 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
|
|||
if (!event.state().nodes().localNodeMaster()) {
|
||||
return;
|
||||
}
|
||||
|
||||
Map<Integer, List<String>> nrReplicasChanged = new HashMap<Integer, List<String>>();
|
||||
|
||||
// we need to do this each time in case it was changed by update settings
|
||||
for (final IndexMetaData indexMetaData : event.state().metaData()) {
|
||||
String autoExpandReplicas = indexMetaData.settings().get(IndexMetaData.SETTING_AUTO_EXPAND_REPLICAS);
|
||||
|
@ -101,25 +102,41 @@ public class MetaDataUpdateSettingsService extends AbstractComponent implements
|
|||
}
|
||||
|
||||
if (numberOfReplicas >= min && numberOfReplicas <= max) {
|
||||
final int fNumberOfReplicas = numberOfReplicas;
|
||||
Settings settings = ImmutableSettings.settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, fNumberOfReplicas).build();
|
||||
updateSettings(settings, new String[]{indexMetaData.index()}, TimeValue.timeValueMinutes(10), new Listener() {
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
logger.info("[{}] auto expanded replicas to [{}]", indexMetaData.index(), fNumberOfReplicas);
|
||||
|
||||
if (!nrReplicasChanged.containsKey(numberOfReplicas)) {
|
||||
nrReplicasChanged.put(numberOfReplicas, new ArrayList<String>());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
logger.warn("[{}] fail to auto expand replicas to [{}]", indexMetaData.index(), fNumberOfReplicas);
|
||||
}
|
||||
});
|
||||
nrReplicasChanged.get(numberOfReplicas).add(indexMetaData.index());
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.warn("[{}] failed to parse auto expand replicas", e, indexMetaData.index());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (nrReplicasChanged.size() > 0) {
|
||||
for (final Integer fNumberOfReplicas : nrReplicasChanged.keySet()) {
|
||||
Settings settings = ImmutableSettings.settingsBuilder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, fNumberOfReplicas).build();
|
||||
final List<String> indices = nrReplicasChanged.get(fNumberOfReplicas);
|
||||
|
||||
updateSettings(settings, indices.toArray(new String[indices.size()]), TimeValue.timeValueMinutes(10), new Listener() {
|
||||
@Override
|
||||
public void onSuccess() {
|
||||
for (String index : indices) {
|
||||
logger.info("[{}] auto expanded replicas to [{}]", index, fNumberOfReplicas);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
for (String index : indices) {
|
||||
logger.warn("[{}] fail to auto expand replicas to [{}]", index, fNumberOfReplicas);
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void updateSettings(final Settings pSettings, final String[] indices, final TimeValue masterTimeout, final Listener listener) {
|
||||
|
|
Loading…
Reference in New Issue