Threadpool Info: Allow to serialize negative thread pool sizes

As a SizeValue is used for serializing the thread pool size, a negative number
resulted in throwing an exception when deserializing (using -ea an assertionerror
was thrown).

This fixes a check for changing the serialization logic, so that negative numbers are read correctly, by adding an internal UNBOUNDED value.

Closes #6325
Closes #5357
This commit is contained in:
Alexander Reelsen 2014-07-16 14:39:39 +02:00
parent f9a9348508
commit b0c0ff8ac0
4 changed files with 207 additions and 6 deletions

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.unit;
import org.elasticsearch.ElasticsearchParseException;
import org.elasticsearch.common.Preconditions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -46,6 +47,7 @@ public class SizeValue implements Serializable, Streamable {
}
public SizeValue(long size, SizeUnit sizeUnit) {
Preconditions.checkArgument(size >= 0, "size in SizeValue may not be negative");
this.size = size;
this.sizeUnit = sizeUnit;
}

View File

@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.MoreExecutors;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.Version;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.inject.Inject;
@ -32,6 +33,8 @@ import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.settings.SettingsException;
import org.elasticsearch.common.unit.SizeUnit;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.EsAbortPolicy;
@ -52,6 +55,7 @@ import java.util.concurrent.*;
import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.unit.SizeValue.parseSizeValue;
import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes;
/**
@ -316,11 +320,11 @@ public class ThreadPool extends AbstractComponent {
return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null));
} else if ("fixed".equals(type)) {
int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings));
SizeValue defaultQueueSize = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null));
SizeValue defaultQueueSize = getAsSizeOrUnbounded(defaultSettings, "queue", getAsSizeOrUnbounded(defaultSettings, "queue_size", null));
if (previousExecutorHolder != null) {
if ("fixed".equals(previousInfo.getType())) {
SizeValue updatedQueueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", previousInfo.getQueueSize())));
SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize())));
if (Objects.equal(previousInfo.getQueueSize(), updatedQueueSize)) {
int updatedSize = settings.getAsInt("size", previousInfo.getMax());
if (previousInfo.getMax() != updatedSize) {
@ -339,7 +343,7 @@ public class ThreadPool extends AbstractComponent {
}
int size = settings.getAsInt("size", defaultSize);
SizeValue queueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultQueueSize)));
SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize)));
logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize);
Executor executor = EsExecutors.newFixed(size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory);
return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize));
@ -415,6 +419,16 @@ public class ThreadPool extends AbstractComponent {
}
}
/**
* A thread pool size can also be unbounded and is represented by -1, which is not supported by SizeValue (which only supports positive numbers)
*/
private SizeValue getAsSizeOrUnbounded(Settings settings, String setting, SizeValue defaultValue) throws SettingsException {
if ("-1".equals(settings.get(setting))) {
return null;
}
return parseSizeValue(settings.get(setting), defaultValue);
}
class ExecutorShutdownListener implements EsThreadPoolExecutor.ShutdownListener {
private ExecutorHolder holder;
@ -603,8 +617,13 @@ public class ThreadPool extends AbstractComponent {
keepAlive = TimeValue.readTimeValue(in);
}
if (in.readBoolean()) {
if (in.getVersion().after(Version.V_1_2_2)) {
boolean isQueueSizeBounded = in.readBoolean();
queueSize = isQueueSizeBounded ? SizeValue.readSizeValue(in) : null;
} else {
queueSize = SizeValue.readSizeValue(in);
}
}
in.readBoolean(); // here to conform with removed waitTime
in.readBoolean(); // here to conform with removed rejected setting
in.readBoolean(); // here to conform with queue type
@ -626,8 +645,16 @@ public class ThreadPool extends AbstractComponent {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
if (out.getVersion().onOrAfter(Version.V_1_2_3)) {
boolean isQueueSizeBounded = queueSize.singles() >= 0;
out.writeBoolean(isQueueSizeBounded);
if (isQueueSizeBounded) {
queueSize.writeTo(out);
}
} else {
queueSize.writeTo(out);
}
}
out.writeBoolean(false); // here to conform with removed waitTime
out.writeBoolean(false); // here to conform with removed rejected setting
out.writeBoolean(false); // here to conform with queue type
@ -646,7 +673,9 @@ public class ThreadPool extends AbstractComponent {
if (keepAlive != null) {
builder.field(Fields.KEEP_ALIVE, keepAlive.toString());
}
if (queueSize != null) {
if (queueSize == null) {
builder.field(Fields.QUEUE_SIZE, -1);
} else {
builder.field(Fields.QUEUE_SIZE, queueSize.toString());
}
builder.endObject();

View File

@ -0,0 +1,70 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.unit;
import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import static org.hamcrest.Matchers.is;
/**
*
*/
public class SizeValueTests extends ElasticsearchTestCase {
@Test
public void testThatConversionWorks() {
SizeValue sizeValue = new SizeValue(1000);
assertThat(sizeValue.kilo(), is(1l));
assertThat(sizeValue.toString(), is("1k"));
sizeValue = new SizeValue(1000, SizeUnit.KILO);
assertThat(sizeValue.singles(), is(1000000L));
assertThat(sizeValue.toString(), is("1m"));
sizeValue = new SizeValue(1000, SizeUnit.MEGA);
assertThat(sizeValue.singles(), is(1000000000L));
assertThat(sizeValue.toString(), is("1g"));
sizeValue = new SizeValue(1000, SizeUnit.GIGA);
assertThat(sizeValue.singles(), is(1000000000000L));
assertThat(sizeValue.toString(), is("1t"));
sizeValue = new SizeValue(1000, SizeUnit.TERA);
assertThat(sizeValue.singles(), is(1000000000000000L));
assertThat(sizeValue.toString(), is("1p"));
sizeValue = new SizeValue(1000, SizeUnit.PETA);
assertThat(sizeValue.singles(), is(1000000000000000000L));
assertThat(sizeValue.toString(), is("1000p"));
}
@Test
public void testThatParsingWorks() {
assertThat(SizeValue.parseSizeValue("1k").toString(), is(new SizeValue(1000).toString()));
assertThat(SizeValue.parseSizeValue("1p").toString(), is(new SizeValue(1, SizeUnit.PETA).toString()));
assertThat(SizeValue.parseSizeValue("1G").toString(), is(new SizeValue(1, SizeUnit.GIGA).toString()));
}
@Test(expected = ElasticsearchIllegalArgumentException.class)
public void testThatNegativeValuesThrowException() {
new SizeValue(-1);
}
}

View File

@ -0,0 +1,100 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.threadpool;
import org.elasticsearch.Version;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamInput;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.SizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import java.util.Map;
import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.hasKey;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
/**
*
*/
public class ThreadPoolSerializationTests extends ElasticsearchTestCase {
BytesStreamOutput output = new BytesStreamOutput();
@Test
public void testThatQueueSizeSerializationWorks() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("10k"));
output.setVersion(Version.CURRENT);
info.writeTo(output);
StreamInput input = new BytesStreamInput(output.bytes());
ThreadPool.Info newInfo = new ThreadPool.Info();
newInfo.readFrom(input);
assertThat(newInfo.getQueueSize().singles(), is(10000l));
}
@Test
public void testThatNegativeQueueSizesCanBeSerialized() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null);
output.setVersion(Version.CURRENT);
info.writeTo(output);
StreamInput input = new BytesStreamInput(output.bytes());
ThreadPool.Info newInfo = new ThreadPool.Info();
newInfo.readFrom(input);
assertThat(newInfo.getQueueSize(), is(nullValue()));
}
@Test
public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception {
ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null);
XContentBuilder builder = jsonBuilder();
builder.startObject();
info.toXContent(builder, ToXContent.EMPTY_PARAMS);
builder.endObject();
BytesReference bytesReference = builder.bytes();
XContentParser parser = XContentFactory.xContent(bytesReference).createParser(bytesReference);
Map<String, Object> map = parser.mapAndClose();
assertThat(map, hasKey("foo"));
map = (Map<String, Object>) map.get("foo");
assertThat(map, hasKey("queue_size"));
assertThat(map.get("queue_size").toString(), is("-1"));
}
@Test
public void testThatNegativeSettingAllowsToStart() {
Settings settings = settingsBuilder().put("name", "index").put("threadpool.index.queue_size", "-1").build();
ThreadPool threadPool = new ThreadPool(settings, null);
assertThat(threadPool.info("index").getQueueSize(), is(nullValue()));
}
}