diff --git a/src/main/java/org/elasticsearch/common/unit/SizeValue.java b/src/main/java/org/elasticsearch/common/unit/SizeValue.java index 6e2c638f288..458c855d833 100644 --- a/src/main/java/org/elasticsearch/common/unit/SizeValue.java +++ b/src/main/java/org/elasticsearch/common/unit/SizeValue.java @@ -20,6 +20,7 @@ package org.elasticsearch.common.unit; import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.common.Preconditions; import org.elasticsearch.common.Strings; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; @@ -46,6 +47,7 @@ public class SizeValue implements Serializable, Streamable { } public SizeValue(long size, SizeUnit sizeUnit) { + Preconditions.checkArgument(size >= 0, "size in SizeValue may not be negative"); this.size = size; this.sizeUnit = sizeUnit; } diff --git a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index 6d4dc5c5e06..b509048b533 100644 --- a/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -24,6 +24,7 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.util.concurrent.MoreExecutors; import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.Version; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; @@ -32,6 +33,8 @@ import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Streamable; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsException; +import org.elasticsearch.common.unit.SizeUnit; import org.elasticsearch.common.unit.SizeValue; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.EsAbortPolicy; @@ -52,6 +55,7 @@ import java.util.concurrent.*; import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.common.unit.SizeValue.parseSizeValue; import static org.elasticsearch.common.unit.TimeValue.timeValueMinutes; /** @@ -316,11 +320,11 @@ public class ThreadPool extends AbstractComponent { return new ExecutorHolder(executor, new Info(name, type, -1, -1, keepAlive, null)); } else if ("fixed".equals(type)) { int defaultSize = defaultSettings.getAsInt("size", EsExecutors.boundedNumberOfProcessors(settings)); - SizeValue defaultQueueSize = defaultSettings.getAsSize("queue", defaultSettings.getAsSize("queue_size", null)); + SizeValue defaultQueueSize = getAsSizeOrUnbounded(defaultSettings, "queue", getAsSizeOrUnbounded(defaultSettings, "queue_size", null)); if (previousExecutorHolder != null) { if ("fixed".equals(previousInfo.getType())) { - SizeValue updatedQueueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", previousInfo.getQueueSize()))); + SizeValue updatedQueueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", previousInfo.getQueueSize()))); if (Objects.equal(previousInfo.getQueueSize(), updatedQueueSize)) { int updatedSize = settings.getAsInt("size", previousInfo.getMax()); if (previousInfo.getMax() != updatedSize) { @@ -339,7 +343,7 @@ public class ThreadPool extends AbstractComponent { } int size = settings.getAsInt("size", defaultSize); - SizeValue queueSize = settings.getAsSize("capacity", settings.getAsSize("queue", settings.getAsSize("queue_size", defaultQueueSize))); + SizeValue queueSize = getAsSizeOrUnbounded(settings, "capacity", getAsSizeOrUnbounded(settings, "queue", getAsSizeOrUnbounded(settings, "queue_size", defaultQueueSize))); logger.debug("creating thread_pool [{}], type [{}], size [{}], queue_size [{}]", name, type, size, queueSize); Executor executor = EsExecutors.newFixed(size, queueSize == null ? -1 : (int) queueSize.singles(), threadFactory); return new ExecutorHolder(executor, new Info(name, type, size, size, null, queueSize)); @@ -415,6 +419,16 @@ public class ThreadPool extends AbstractComponent { } } + /** + * A thread pool size can also be unbounded and is represented by -1, which is not supported by SizeValue (which only supports positive numbers) + */ + private SizeValue getAsSizeOrUnbounded(Settings settings, String setting, SizeValue defaultValue) throws SettingsException { + if ("-1".equals(settings.get(setting))) { + return null; + } + return parseSizeValue(settings.get(setting), defaultValue); + } + class ExecutorShutdownListener implements EsThreadPoolExecutor.ShutdownListener { private ExecutorHolder holder; @@ -603,7 +617,12 @@ public class ThreadPool extends AbstractComponent { keepAlive = TimeValue.readTimeValue(in); } if (in.readBoolean()) { - queueSize = SizeValue.readSizeValue(in); + if (in.getVersion().after(Version.V_1_2_2)) { + boolean isQueueSizeBounded = in.readBoolean(); + queueSize = isQueueSizeBounded ? SizeValue.readSizeValue(in) : null; + } else { + queueSize = SizeValue.readSizeValue(in); + } } in.readBoolean(); // here to conform with removed waitTime in.readBoolean(); // here to conform with removed rejected setting @@ -626,7 +645,15 @@ public class ThreadPool extends AbstractComponent { out.writeBoolean(false); } else { out.writeBoolean(true); - queueSize.writeTo(out); + if (out.getVersion().onOrAfter(Version.V_1_2_3)) { + boolean isQueueSizeBounded = queueSize.singles() >= 0; + out.writeBoolean(isQueueSizeBounded); + if (isQueueSizeBounded) { + queueSize.writeTo(out); + } + } else { + queueSize.writeTo(out); + } } out.writeBoolean(false); // here to conform with removed waitTime out.writeBoolean(false); // here to conform with removed rejected setting @@ -646,7 +673,9 @@ public class ThreadPool extends AbstractComponent { if (keepAlive != null) { builder.field(Fields.KEEP_ALIVE, keepAlive.toString()); } - if (queueSize != null) { + if (queueSize == null) { + builder.field(Fields.QUEUE_SIZE, -1); + } else { builder.field(Fields.QUEUE_SIZE, queueSize.toString()); } builder.endObject(); diff --git a/src/test/java/org/elasticsearch/common/unit/SizeValueTests.java b/src/test/java/org/elasticsearch/common/unit/SizeValueTests.java new file mode 100644 index 00000000000..e52b9f8ff81 --- /dev/null +++ b/src/test/java/org/elasticsearch/common/unit/SizeValueTests.java @@ -0,0 +1,70 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.common.unit; + +import org.elasticsearch.ElasticsearchIllegalArgumentException; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import static org.hamcrest.Matchers.is; + +/** + * + */ +public class SizeValueTests extends ElasticsearchTestCase { + + @Test + public void testThatConversionWorks() { + SizeValue sizeValue = new SizeValue(1000); + assertThat(sizeValue.kilo(), is(1l)); + assertThat(sizeValue.toString(), is("1k")); + + sizeValue = new SizeValue(1000, SizeUnit.KILO); + assertThat(sizeValue.singles(), is(1000000L)); + assertThat(sizeValue.toString(), is("1m")); + + sizeValue = new SizeValue(1000, SizeUnit.MEGA); + assertThat(sizeValue.singles(), is(1000000000L)); + assertThat(sizeValue.toString(), is("1g")); + + sizeValue = new SizeValue(1000, SizeUnit.GIGA); + assertThat(sizeValue.singles(), is(1000000000000L)); + assertThat(sizeValue.toString(), is("1t")); + + sizeValue = new SizeValue(1000, SizeUnit.TERA); + assertThat(sizeValue.singles(), is(1000000000000000L)); + assertThat(sizeValue.toString(), is("1p")); + + sizeValue = new SizeValue(1000, SizeUnit.PETA); + assertThat(sizeValue.singles(), is(1000000000000000000L)); + assertThat(sizeValue.toString(), is("1000p")); + } + + @Test + public void testThatParsingWorks() { + assertThat(SizeValue.parseSizeValue("1k").toString(), is(new SizeValue(1000).toString())); + assertThat(SizeValue.parseSizeValue("1p").toString(), is(new SizeValue(1, SizeUnit.PETA).toString())); + assertThat(SizeValue.parseSizeValue("1G").toString(), is(new SizeValue(1, SizeUnit.GIGA).toString())); + } + + @Test(expected = ElasticsearchIllegalArgumentException.class) + public void testThatNegativeValuesThrowException() { + new SizeValue(-1); + } +} diff --git a/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java b/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java new file mode 100644 index 00000000000..534472f523c --- /dev/null +++ b/src/test/java/org/elasticsearch/threadpool/ThreadPoolSerializationTests.java @@ -0,0 +1,100 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.threadpool; + +import org.elasticsearch.Version; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.io.stream.BytesStreamInput; +import org.elasticsearch.common.io.stream.BytesStreamOutput; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.SizeValue; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.test.ElasticsearchTestCase; +import org.junit.Test; + +import java.util.Map; + +import static org.elasticsearch.common.settings.ImmutableSettings.settingsBuilder; +import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; +import static org.hamcrest.Matchers.hasKey; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.nullValue; + +/** + * + */ +public class ThreadPoolSerializationTests extends ElasticsearchTestCase { + + BytesStreamOutput output = new BytesStreamOutput(); + + @Test + public void testThatQueueSizeSerializationWorks() throws Exception { + ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), SizeValue.parseSizeValue("10k")); + output.setVersion(Version.CURRENT); + info.writeTo(output); + + StreamInput input = new BytesStreamInput(output.bytes()); + ThreadPool.Info newInfo = new ThreadPool.Info(); + newInfo.readFrom(input); + + assertThat(newInfo.getQueueSize().singles(), is(10000l)); + } + + @Test + public void testThatNegativeQueueSizesCanBeSerialized() throws Exception { + ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null); + output.setVersion(Version.CURRENT); + info.writeTo(output); + + StreamInput input = new BytesStreamInput(output.bytes()); + ThreadPool.Info newInfo = new ThreadPool.Info(); + newInfo.readFrom(input); + + assertThat(newInfo.getQueueSize(), is(nullValue())); + } + + @Test + public void testThatToXContentWritesOutUnboundedCorrectly() throws Exception { + ThreadPool.Info info = new ThreadPool.Info("foo", "search", 1, 10, TimeValue.timeValueMillis(3000), null); + XContentBuilder builder = jsonBuilder(); + builder.startObject(); + info.toXContent(builder, ToXContent.EMPTY_PARAMS); + builder.endObject(); + + BytesReference bytesReference = builder.bytes(); + XContentParser parser = XContentFactory.xContent(bytesReference).createParser(bytesReference); + Map map = parser.mapAndClose(); + assertThat(map, hasKey("foo")); + map = (Map) map.get("foo"); + assertThat(map, hasKey("queue_size")); + assertThat(map.get("queue_size").toString(), is("-1")); + } + + @Test + public void testThatNegativeSettingAllowsToStart() { + Settings settings = settingsBuilder().put("name", "index").put("threadpool.index.queue_size", "-1").build(); + ThreadPool threadPool = new ThreadPool(settings, null); + assertThat(threadPool.info("index").getQueueSize(), is(nullValue())); + } +}