diff --git a/src/main/java/org/apache/lucene/store/StoreRateLimiting.java b/src/main/java/org/apache/lucene/store/StoreRateLimiting.java index 9be2209a076..96840a94f07 100644 --- a/src/main/java/org/apache/lucene/store/StoreRateLimiting.java +++ b/src/main/java/org/apache/lucene/store/StoreRateLimiting.java @@ -53,8 +53,8 @@ public class StoreRateLimiting { } } - private final RateLimiter.SimpleRateLimiter rateLimiter = new RateLimiter.SimpleRateLimiter(0); - private volatile RateLimiter.SimpleRateLimiter actualRateLimiter; + private final XSimpleRateLimiter rateLimiter = new XSimpleRateLimiter(0); + private volatile XSimpleRateLimiter actualRateLimiter; private volatile Type type; diff --git a/src/main/java/org/apache/lucene/store/XSimpleRateLimiter.java b/src/main/java/org/apache/lucene/store/XSimpleRateLimiter.java new file mode 100644 index 00000000000..20a375fe17c --- /dev/null +++ b/src/main/java/org/apache/lucene/store/XSimpleRateLimiter.java @@ -0,0 +1,98 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lucene.store; + +import org.apache.lucene.util.ThreadInterruptedException; + +// LUCENE UPGRADE - this is a copy of a RateLimiter.SimpleRateLimiter fixing bug #2785 Lucene 4.3 should fix that +public final class XSimpleRateLimiter extends RateLimiter { + private volatile double mbPerSec; + private volatile double nsPerByte; + private volatile long lastNS; + + // TODO: we could also allow eg a sub class to dynamically + // determine the allowed rate, eg if an app wants to + // change the allowed rate over time or something + + /** mbPerSec is the MB/sec max IO rate */ + public XSimpleRateLimiter(double mbPerSec) { + setMbPerSec(mbPerSec); + } + + /** + * Sets an updated mb per second rate limit. + */ + @Override + public void setMbPerSec(double mbPerSec) { + this.mbPerSec = mbPerSec; + nsPerByte = 1000000000. / (1024*1024*mbPerSec); + + } + + /** + * The current mb per second rate limit. + */ + @Override + public double getMbPerSec() { + return this.mbPerSec; + } + + /** Pauses, if necessary, to keep the instantaneous IO + * rate at or below the target. NOTE: multiple threads + * may safely use this, however the implementation is + * not perfectly thread safe but likely in practice this + * is harmless (just means in some rare cases the rate + * might exceed the target). It's best to call this + * with a biggish count, not one byte at a time. + * @return the pause time in nano seconds + * */ + @Override + public long pause(long bytes) { + if (bytes == 1) { + return 0; + } + + // TODO: this is purely instantaneous rate; maybe we + // should also offer decayed recent history one? + final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte)); + final long startNs; + long curNS = startNs = System.nanoTime(); + if (lastNS < curNS) { + lastNS = curNS; + } + + // While loop because Thread.sleep doesn't always sleep + // enough: + while(true) { + final long pauseNS = targetNS - curNS; + if (pauseNS > 0) { + try { + Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000)); + } catch (InterruptedException ie) { + throw new ThreadInterruptedException(ie); + } + curNS = System.nanoTime(); + continue; + } + break; + } + + return curNS - startNs; + } + } \ No newline at end of file diff --git a/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java b/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java index 11047c6f38d..5ff49d0e072 100644 --- a/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java +++ b/src/main/java/org/elasticsearch/indices/recovery/RecoverySettings.java @@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery; import com.google.common.base.Objects; import org.apache.lucene.store.RateLimiter; +import org.apache.lucene.store.XSimpleRateLimiter; import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.Settings; @@ -53,7 +54,7 @@ public class RecoverySettings extends AbstractComponent { private final ThreadPoolExecutor concurrentStreamPool; private volatile ByteSizeValue maxSizePerSec; - private volatile RateLimiter.SimpleRateLimiter rateLimiter; + private volatile XSimpleRateLimiter rateLimiter; @Inject public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) { @@ -71,7 +72,7 @@ public class RecoverySettings extends AbstractComponent { if (maxSizePerSec.bytes() <= 0) { rateLimiter = null; } else { - rateLimiter = new RateLimiter.SimpleRateLimiter(maxSizePerSec.mbFrac()); + rateLimiter = new XSimpleRateLimiter(maxSizePerSec.mbFrac()); } logger.debug("using max_size_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]", @@ -130,7 +131,7 @@ public class RecoverySettings extends AbstractComponent { } else if (rateLimiter != null) { rateLimiter.setMbPerSec(maxSizePerSec.mbFrac()); } else { - rateLimiter = new RateLimiter.SimpleRateLimiter(maxSizePerSec.mbFrac()); + rateLimiter = new XSimpleRateLimiter(maxSizePerSec.mbFrac()); } } diff --git a/src/test/java/org/apache/lucene/store/XSimpleRateLimiterTest.java b/src/test/java/org/apache/lucene/store/XSimpleRateLimiterTest.java new file mode 100644 index 00000000000..e53ed73398d --- /dev/null +++ b/src/test/java/org/apache/lucene/store/XSimpleRateLimiterTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to ElasticSearch and Shay Banon under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. ElasticSearch licenses this + * file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lucene.store; + +import java.util.concurrent.TimeUnit; + +import org.testng.annotations.Test; +import org.apache.lucene.store.RateLimiter.SimpleRateLimiter; +import org.apache.lucene.util.Version; +import org.elasticsearch.common.lucene.Lucene; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.Matchers.greaterThan; + +public class XSimpleRateLimiterTest { + + @Test + public void testPause() { + XSimpleRateLimiter limiter = new XSimpleRateLimiter(10); // 10 MB / Sec + limiter.pause(2);//init + long pause = 0; + for (int i = 0; i < 3; i++) { + pause += limiter.pause(4 * 1024 * 1024); // fire up 3 * 4 MB + } + final long convert = TimeUnit.MILLISECONDS.convert(pause, TimeUnit.NANOSECONDS); + assertThat(convert, lessThan(2000l)); // more than 2 seconds should be an error here! + assertThat(convert, greaterThan(1000l)); // we should sleep at lease 1 sec + } + + @Test + public void testPauseLucene() { + if (Version.LUCENE_42 != Lucene.VERSION) { // once we upgrade test the lucene impl again + SimpleRateLimiter limiter = new SimpleRateLimiter(10); // 10 MB / Sec + limiter.pause(2);//init + long pause = 0; + for (int i = 0; i < 3; i++) { + pause += limiter.pause(4 * 1024 * 1024); // fire up 3 * 4 MB + } + final long convert = TimeUnit.MILLISECONDS.convert(pause, TimeUnit.NANOSECONDS); + assertThat(convert, lessThan(2000l)); // more than 2 seconds should be an error here! + assertThat(convert, greaterThan(1000l)); // we should sleep at lease 1 sec + assert false : "Upgrade XSimpleRateLimiter to Lucene SimpleRateLimiter"; + } + } +}