Fix bug in RateLimiter.SimpleRateLimiter causing numeric overflow in StoreStats
Closes #2785
This commit is contained in:
parent
d5da8f22ff
commit
c25eb7defe
|
@ -53,8 +53,8 @@ public class StoreRateLimiting {
|
|||
}
|
||||
}
|
||||
|
||||
private final RateLimiter.SimpleRateLimiter rateLimiter = new RateLimiter.SimpleRateLimiter(0);
|
||||
private volatile RateLimiter.SimpleRateLimiter actualRateLimiter;
|
||||
private final XSimpleRateLimiter rateLimiter = new XSimpleRateLimiter(0);
|
||||
private volatile XSimpleRateLimiter actualRateLimiter;
|
||||
|
||||
private volatile Type type;
|
||||
|
||||
|
|
|
@ -0,0 +1,98 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.lucene.store;
|
||||
|
||||
import org.apache.lucene.util.ThreadInterruptedException;
|
||||
|
||||
// LUCENE UPGRADE - this is a copy of a RateLimiter.SimpleRateLimiter fixing bug #2785 Lucene 4.3 should fix that
|
||||
public final class XSimpleRateLimiter extends RateLimiter {
|
||||
private volatile double mbPerSec;
|
||||
private volatile double nsPerByte;
|
||||
private volatile long lastNS;
|
||||
|
||||
// TODO: we could also allow eg a sub class to dynamically
|
||||
// determine the allowed rate, eg if an app wants to
|
||||
// change the allowed rate over time or something
|
||||
|
||||
/** mbPerSec is the MB/sec max IO rate */
|
||||
public XSimpleRateLimiter(double mbPerSec) {
|
||||
setMbPerSec(mbPerSec);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets an updated mb per second rate limit.
|
||||
*/
|
||||
@Override
|
||||
public void setMbPerSec(double mbPerSec) {
|
||||
this.mbPerSec = mbPerSec;
|
||||
nsPerByte = 1000000000. / (1024*1024*mbPerSec);
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* The current mb per second rate limit.
|
||||
*/
|
||||
@Override
|
||||
public double getMbPerSec() {
|
||||
return this.mbPerSec;
|
||||
}
|
||||
|
||||
/** Pauses, if necessary, to keep the instantaneous IO
|
||||
* rate at or below the target. NOTE: multiple threads
|
||||
* may safely use this, however the implementation is
|
||||
* not perfectly thread safe but likely in practice this
|
||||
* is harmless (just means in some rare cases the rate
|
||||
* might exceed the target). It's best to call this
|
||||
* with a biggish count, not one byte at a time.
|
||||
* @return the pause time in nano seconds
|
||||
* */
|
||||
@Override
|
||||
public long pause(long bytes) {
|
||||
if (bytes == 1) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
// TODO: this is purely instantaneous rate; maybe we
|
||||
// should also offer decayed recent history one?
|
||||
final long targetNS = lastNS = lastNS + ((long) (bytes * nsPerByte));
|
||||
final long startNs;
|
||||
long curNS = startNs = System.nanoTime();
|
||||
if (lastNS < curNS) {
|
||||
lastNS = curNS;
|
||||
}
|
||||
|
||||
// While loop because Thread.sleep doesn't always sleep
|
||||
// enough:
|
||||
while(true) {
|
||||
final long pauseNS = targetNS - curNS;
|
||||
if (pauseNS > 0) {
|
||||
try {
|
||||
Thread.sleep((int) (pauseNS/1000000), (int) (pauseNS % 1000000));
|
||||
} catch (InterruptedException ie) {
|
||||
throw new ThreadInterruptedException(ie);
|
||||
}
|
||||
curNS = System.nanoTime();
|
||||
continue;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
return curNS - startNs;
|
||||
}
|
||||
}
|
|
@ -21,6 +21,7 @@ package org.elasticsearch.indices.recovery;
|
|||
|
||||
import com.google.common.base.Objects;
|
||||
import org.apache.lucene.store.RateLimiter;
|
||||
import org.apache.lucene.store.XSimpleRateLimiter;
|
||||
import org.elasticsearch.common.component.AbstractComponent;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
@ -53,7 +54,7 @@ public class RecoverySettings extends AbstractComponent {
|
|||
private final ThreadPoolExecutor concurrentStreamPool;
|
||||
|
||||
private volatile ByteSizeValue maxSizePerSec;
|
||||
private volatile RateLimiter.SimpleRateLimiter rateLimiter;
|
||||
private volatile XSimpleRateLimiter rateLimiter;
|
||||
|
||||
@Inject
|
||||
public RecoverySettings(Settings settings, NodeSettingsService nodeSettingsService) {
|
||||
|
@ -71,7 +72,7 @@ public class RecoverySettings extends AbstractComponent {
|
|||
if (maxSizePerSec.bytes() <= 0) {
|
||||
rateLimiter = null;
|
||||
} else {
|
||||
rateLimiter = new RateLimiter.SimpleRateLimiter(maxSizePerSec.mbFrac());
|
||||
rateLimiter = new XSimpleRateLimiter(maxSizePerSec.mbFrac());
|
||||
}
|
||||
|
||||
logger.debug("using max_size_per_sec[{}], concurrent_streams [{}], file_chunk_size [{}], translog_size [{}], translog_ops [{}], and compress [{}]",
|
||||
|
@ -130,7 +131,7 @@ public class RecoverySettings extends AbstractComponent {
|
|||
} else if (rateLimiter != null) {
|
||||
rateLimiter.setMbPerSec(maxSizePerSec.mbFrac());
|
||||
} else {
|
||||
rateLimiter = new RateLimiter.SimpleRateLimiter(maxSizePerSec.mbFrac());
|
||||
rateLimiter = new XSimpleRateLimiter(maxSizePerSec.mbFrac());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,62 @@
|
|||
/*
|
||||
* Licensed to ElasticSearch and Shay Banon under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. ElasticSearch licenses this
|
||||
* file to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.apache.lucene.store;
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.testng.annotations.Test;
|
||||
import org.apache.lucene.store.RateLimiter.SimpleRateLimiter;
|
||||
import org.apache.lucene.util.Version;
|
||||
import org.elasticsearch.common.lucene.Lucene;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
|
||||
public class XSimpleRateLimiterTest {
|
||||
|
||||
@Test
|
||||
public void testPause() {
|
||||
XSimpleRateLimiter limiter = new XSimpleRateLimiter(10); // 10 MB / Sec
|
||||
limiter.pause(2);//init
|
||||
long pause = 0;
|
||||
for (int i = 0; i < 3; i++) {
|
||||
pause += limiter.pause(4 * 1024 * 1024); // fire up 3 * 4 MB
|
||||
}
|
||||
final long convert = TimeUnit.MILLISECONDS.convert(pause, TimeUnit.NANOSECONDS);
|
||||
assertThat(convert, lessThan(2000l)); // more than 2 seconds should be an error here!
|
||||
assertThat(convert, greaterThan(1000l)); // we should sleep at lease 1 sec
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testPauseLucene() {
|
||||
if (Version.LUCENE_42 != Lucene.VERSION) { // once we upgrade test the lucene impl again
|
||||
SimpleRateLimiter limiter = new SimpleRateLimiter(10); // 10 MB / Sec
|
||||
limiter.pause(2);//init
|
||||
long pause = 0;
|
||||
for (int i = 0; i < 3; i++) {
|
||||
pause += limiter.pause(4 * 1024 * 1024); // fire up 3 * 4 MB
|
||||
}
|
||||
final long convert = TimeUnit.MILLISECONDS.convert(pause, TimeUnit.NANOSECONDS);
|
||||
assertThat(convert, lessThan(2000l)); // more than 2 seconds should be an error here!
|
||||
assertThat(convert, greaterThan(1000l)); // we should sleep at lease 1 sec
|
||||
assert false : "Upgrade XSimpleRateLimiter to Lucene SimpleRateLimiter";
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue