HBASE-27315 Add timeout to JavaRegexEngine
This commit is contained in:
parent
e5620e26a2
commit
d167868598
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* if exception is not meant to be retried
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class DoNotRetryUncheckedIOException extends RuntimeException {
|
||||
private static final long serialVersionUID = 5334859039077080405L;
|
||||
|
||||
/**
|
||||
* @param message the message
|
||||
*/
|
||||
public DoNotRetryUncheckedIOException(String message) {
|
||||
super(message);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a {@link DoNotRetryIOException}.
|
||||
* @return the {@link DoNotRetryIOException}
|
||||
*/
|
||||
public DoNotRetryIOException toDoNotRetryIOException() {
|
||||
return new DoNotRetryIOException(getMessage());
|
||||
}
|
||||
}
|
|
@ -21,8 +21,11 @@ import java.nio.charset.Charset;
|
|||
import java.nio.charset.IllegalCharsetNameException;
|
||||
import java.util.Arrays;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.exceptions.DeserializationException;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.jcodings.Encoding;
|
||||
import org.jcodings.EncodingDB;
|
||||
|
@ -255,11 +258,20 @@ public class RegexStringComparator extends ByteArrayComparable {
|
|||
* This is the default engine.
|
||||
*/
|
||||
static class JavaRegexEngine implements Engine {
|
||||
private static final Configuration conf = HBaseConfiguration.create();
|
||||
|
||||
private Charset charset = Charset.forName("UTF-8");
|
||||
private Pattern pattern;
|
||||
private final long timeoutMillis;
|
||||
|
||||
public JavaRegexEngine(String regex, int flags) {
|
||||
this.pattern = Pattern.compile(regex, flags);
|
||||
this.timeoutMillis = conf.getLong("hbase.filter.regex.java.timeout", -1);
|
||||
}
|
||||
|
||||
JavaRegexEngine(String regex, int flags, long timeoutMillis) {
|
||||
this.pattern = Pattern.compile(regex, flags);
|
||||
this.timeoutMillis = timeoutMillis;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -294,7 +306,14 @@ public class RegexStringComparator extends ByteArrayComparable {
|
|||
} else {
|
||||
tmp = new String(value, offset, length, charset);
|
||||
}
|
||||
|
||||
if (timeoutMillis == -1) {
|
||||
return pattern.matcher(tmp).find() ? 0 : 1;
|
||||
} else {
|
||||
final TimeoutCharSequence chars =
|
||||
new TimeoutCharSequence(tmp, EnvironmentEdgeManager.currentTime(), timeoutMillis);
|
||||
return pattern.matcher(chars).find() ? 0 : 1;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -0,0 +1,93 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.filter;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryUncheckedIOException;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
||||
/**
|
||||
* It checks whether the timeout has been exceeded whenever the charAt method is called.
|
||||
*/
|
||||
class TimeoutCharSequence implements CharSequence {
|
||||
static final int DEFAULT_CHECK_POINT = 10_000;
|
||||
|
||||
private final CharSequence value;
|
||||
private final long startMillis;
|
||||
private final long timeoutMillis;
|
||||
private final int checkPoint;
|
||||
private int numberOfCalls;
|
||||
|
||||
/**
|
||||
* Initialize a TimeoutCharSequence.
|
||||
* @param value the original data
|
||||
* @param startMillis time the operation started (ms)
|
||||
* @param timeoutMillis the timeout (ms)
|
||||
*/
|
||||
TimeoutCharSequence(CharSequence value, long startMillis, long timeoutMillis) {
|
||||
this.value = value;
|
||||
this.startMillis = startMillis;
|
||||
this.timeoutMillis = timeoutMillis;
|
||||
this.checkPoint = DEFAULT_CHECK_POINT;
|
||||
this.numberOfCalls = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a TimeoutCharSequence.
|
||||
* @param value the original data
|
||||
* @param startMillis time the operation started (ms)
|
||||
* @param checkPoint the check point
|
||||
* @param timeoutMillis the timeout (ms)
|
||||
*/
|
||||
TimeoutCharSequence(CharSequence value, long startMillis, long timeoutMillis, int checkPoint) {
|
||||
this.value = value;
|
||||
this.startMillis = startMillis;
|
||||
this.timeoutMillis = timeoutMillis;
|
||||
this.checkPoint = checkPoint;
|
||||
this.numberOfCalls = 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int length() {
|
||||
return value.length();
|
||||
}
|
||||
|
||||
@Override
|
||||
public char charAt(int index) {
|
||||
numberOfCalls++;
|
||||
if (numberOfCalls % checkPoint == 0) {
|
||||
final long diff = EnvironmentEdgeManager.currentTime() - startMillis;
|
||||
if (diff > timeoutMillis) {
|
||||
throw new DoNotRetryUncheckedIOException(
|
||||
String.format("Operation timed out after %s.", StringUtils.formatTime(diff)));
|
||||
}
|
||||
numberOfCalls = 0;
|
||||
}
|
||||
return value.charAt(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
public CharSequence subSequence(int start, int end) {
|
||||
return new TimeoutCharSequence(value.subSequence(start, end), startMillis, timeoutMillis);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return value.toString();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.filter;
|
||||
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.regex.Pattern;
|
||||
import org.apache.hadoop.hbase.DoNotRetryUncheckedIOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class RegexStringComparatorTest {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(RegexStringComparatorTest.class);
|
||||
|
||||
@Test(expected = DoNotRetryUncheckedIOException.class)
|
||||
public void testCompareToTimeout() {
|
||||
final RegexStringComparator.JavaRegexEngine regex =
|
||||
new RegexStringComparator.JavaRegexEngine("(0*)*A", Pattern.DOTALL, 0);
|
||||
|
||||
EnvironmentEdgeManager.injectEdge(new IncrementingEnvironmentEdge());
|
||||
final byte[] input = "00000000000000000000000000000".getBytes(StandardCharsets.UTF_8);
|
||||
// It actually takes a few seconds
|
||||
regex.compareTo(input, 0, input.length);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,79 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.filter;
|
||||
|
||||
import static org.apache.hadoop.hbase.filter.TimeoutCharSequence.DEFAULT_CHECK_POINT;
|
||||
|
||||
import org.apache.hadoop.hbase.DoNotRetryUncheckedIOException;
|
||||
import org.apache.hadoop.hbase.HBaseClassTestRule;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.IncrementingEnvironmentEdge;
|
||||
import org.junit.Assert;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TimeoutCharSequenceTest {
|
||||
@ClassRule
|
||||
public static final HBaseClassTestRule CLASS_RULE =
|
||||
HBaseClassTestRule.forClass(TimeoutCharSequenceTest.class);
|
||||
|
||||
@Test(expected = DoNotRetryUncheckedIOException.class)
|
||||
public void testCharAtTimeout() {
|
||||
final IncrementingEnvironmentEdge environmentEdge = new IncrementingEnvironmentEdge(7);
|
||||
EnvironmentEdgeManager.injectEdge(environmentEdge);
|
||||
|
||||
final TimeoutCharSequence chars = new TimeoutCharSequence("test", 0, 8, 1);
|
||||
for (int i = 0; i < 3; i++) {
|
||||
chars.charAt(3);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCheckPoint() {
|
||||
final long initialAmount = 10;
|
||||
final IncrementingEnvironmentEdge edge = new IncrementingEnvironmentEdge(initialAmount);
|
||||
EnvironmentEdgeManager.injectEdge(edge);
|
||||
|
||||
final TimeoutCharSequence chars = new TimeoutCharSequence("test", 0, 8);
|
||||
|
||||
boolean thrownException = false;
|
||||
for (int i = 0; i < DEFAULT_CHECK_POINT; i++) {
|
||||
try {
|
||||
chars.charAt(3);
|
||||
} catch (DoNotRetryUncheckedIOException e) {
|
||||
thrownException = true;
|
||||
}
|
||||
}
|
||||
|
||||
Assert.assertTrue(thrownException);
|
||||
Assert.assertEquals(edge.currentTime(), initialAmount + 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCharAt() {
|
||||
final IncrementingEnvironmentEdge environmentEdge = new IncrementingEnvironmentEdge(1);
|
||||
EnvironmentEdgeManager.injectEdge(environmentEdge); // next currentTime: 1
|
||||
|
||||
final TimeoutCharSequence chars = new TimeoutCharSequence("test", 0, 10);
|
||||
Assert.assertEquals('t', chars.charAt(0));
|
||||
Assert.assertEquals('e', chars.charAt(1));
|
||||
}
|
||||
}
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configuration;
|
|||
import org.apache.hadoop.hbase.CallQueueTooBigException;
|
||||
import org.apache.hadoop.hbase.CellScanner;
|
||||
import org.apache.hadoop.hbase.DoNotRetryIOException;
|
||||
import org.apache.hadoop.hbase.DoNotRetryUncheckedIOException;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||
|
@ -477,6 +478,9 @@ public abstract class RpcServer implements RpcServerInterface, ConfigurationObse
|
|||
|
||||
if (e instanceof LinkageError) throw new DoNotRetryIOException(e);
|
||||
if (e instanceof IOException) throw (IOException) e;
|
||||
if (e instanceof DoNotRetryUncheckedIOException) {
|
||||
throw ((DoNotRetryUncheckedIOException) e).toDoNotRetryIOException();
|
||||
}
|
||||
LOG.error("Unexpected throwable object ", e);
|
||||
throw new IOException(e.getMessage(), e);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue