mirror of https://github.com/apache/druid.git
Improves on the fix for 8918 (#9387)
* Improves on the fix for 8918 * factorize constants for ITRetryUtil.retryUntil call * increasing retries and sleep in HttpUtil to cope with 401s in testing * adding retries in EventReceiverFirehoseTestClient * adding missing space
This commit is contained in:
parent
f619903403
commit
14accb50ad
|
@ -25,6 +25,7 @@ import com.fasterxml.jackson.jaxrs.smile.SmileMediaTypes;
|
||||||
import org.apache.druid.java.util.common.ISE;
|
import org.apache.druid.java.util.common.ISE;
|
||||||
import org.apache.druid.java.util.common.StringUtils;
|
import org.apache.druid.java.util.common.StringUtils;
|
||||||
import org.apache.druid.java.util.common.jackson.JacksonUtils;
|
import org.apache.druid.java.util.common.jackson.JacksonUtils;
|
||||||
|
import org.apache.druid.java.util.common.logger.Logger;
|
||||||
import org.apache.druid.java.util.http.client.HttpClient;
|
import org.apache.druid.java.util.http.client.HttpClient;
|
||||||
import org.apache.druid.java.util.http.client.Request;
|
import org.apache.druid.java.util.http.client.Request;
|
||||||
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
|
import org.apache.druid.java.util.http.client.response.StatusResponseHandler;
|
||||||
|
@ -41,9 +42,15 @@ import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
public class EventReceiverFirehoseTestClient
|
public class EventReceiverFirehoseTestClient
|
||||||
{
|
{
|
||||||
|
private static final Logger LOG = new Logger(EventReceiverFirehoseTestClient.class);
|
||||||
|
|
||||||
|
static final int NUM_RETRIES = 30;
|
||||||
|
static final long DELAY_FOR_RETRIES_MS = 10000;
|
||||||
|
|
||||||
private final String host;
|
private final String host;
|
||||||
private final ObjectMapper jsonMapper;
|
private final ObjectMapper jsonMapper;
|
||||||
private final HttpClient httpClient;
|
private final HttpClient httpClient;
|
||||||
|
@ -82,31 +89,45 @@ public class EventReceiverFirehoseTestClient
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public int postEvents(Collection<Map<String, Object>> events, ObjectMapper objectMapper, String mediaType)
|
public int postEvents(Collection<Map<String, Object>> events, ObjectMapper objectMapper, String mediaType)
|
||||||
|
throws InterruptedException
|
||||||
{
|
{
|
||||||
try {
|
int retryCount = 0;
|
||||||
StatusResponseHolder response = httpClient.go(
|
while (true) {
|
||||||
new Request(HttpMethod.POST, new URL(getURL()))
|
try {
|
||||||
.setContent(mediaType, objectMapper.writeValueAsBytes(events)),
|
StatusResponseHolder response = httpClient.go(
|
||||||
StatusResponseHandler.getInstance()
|
new Request(HttpMethod.POST, new URL(getURL()))
|
||||||
).get();
|
.setContent(mediaType, objectMapper.writeValueAsBytes(events)),
|
||||||
|
StatusResponseHandler.getInstance()
|
||||||
|
).get();
|
||||||
|
|
||||||
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
|
if (!response.getStatus().equals(HttpResponseStatus.OK)) {
|
||||||
throw new ISE(
|
throw new ISE(
|
||||||
"Error while posting events to url[%s] status[%s] content[%s]",
|
"Error while posting events to url[%s] status[%s] content[%s]",
|
||||||
getURL(),
|
getURL(),
|
||||||
response.getStatus(),
|
response.getStatus(),
|
||||||
response.getContent()
|
response.getContent()
|
||||||
|
);
|
||||||
|
}
|
||||||
|
Map<String, Integer> responseData = objectMapper.readValue(
|
||||||
|
response.getContent(), new TypeReference<Map<String, Integer>>()
|
||||||
|
{
|
||||||
|
}
|
||||||
);
|
);
|
||||||
|
return responseData.get("eventCount");
|
||||||
|
}
|
||||||
|
// adding retries to flaky tests using channels
|
||||||
|
catch (ExecutionException e) {
|
||||||
|
if (retryCount > NUM_RETRIES) {
|
||||||
|
throw new RuntimeException(e); //giving up now
|
||||||
|
} else {
|
||||||
|
LOG.info(e, "received exception, sleeping and retrying");
|
||||||
|
retryCount++;
|
||||||
|
Thread.sleep(DELAY_FOR_RETRIES_MS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
catch (Exception e) {
|
||||||
|
throw new RuntimeException(e);
|
||||||
}
|
}
|
||||||
Map<String, Integer> responseData = objectMapper.readValue(
|
|
||||||
response.getContent(), new TypeReference<Map<String, Integer>>()
|
|
||||||
{
|
|
||||||
}
|
|
||||||
);
|
|
||||||
return responseData.get("eventCount");
|
|
||||||
}
|
|
||||||
catch (Exception e) {
|
|
||||||
throw new RuntimeException(e);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -39,6 +39,9 @@ public class HttpUtil
|
||||||
private static final Logger LOG = new Logger(AbstractQueryResourceTestClient.class);
|
private static final Logger LOG = new Logger(AbstractQueryResourceTestClient.class);
|
||||||
private static final StatusResponseHandler RESPONSE_HANDLER = StatusResponseHandler.getInstance();
|
private static final StatusResponseHandler RESPONSE_HANDLER = StatusResponseHandler.getInstance();
|
||||||
|
|
||||||
|
static final int NUM_RETRIES = 30;
|
||||||
|
static final long DELAY_FOR_RETRIES_MS = 10000;
|
||||||
|
|
||||||
public static StatusResponseHolder makeRequest(HttpClient httpClient, HttpMethod method, String url, byte[] content)
|
public static StatusResponseHolder makeRequest(HttpClient httpClient, HttpMethod method, String url, byte[] content)
|
||||||
{
|
{
|
||||||
return makeRequestWithExpectedStatus(
|
return makeRequestWithExpectedStatus(
|
||||||
|
@ -78,13 +81,13 @@ public class HttpUtil
|
||||||
response.getContent()
|
response.getContent()
|
||||||
);
|
);
|
||||||
// it can take time for the auth config to propagate, so we retry
|
// it can take time for the auth config to propagate, so we retry
|
||||||
if (retryCount > 10) {
|
if (retryCount > NUM_RETRIES) {
|
||||||
throw new ISE(errMsg);
|
throw new ISE(errMsg);
|
||||||
} else {
|
} else {
|
||||||
LOG.error(errMsg);
|
LOG.error(errMsg);
|
||||||
LOG.error("retrying in 3000ms, retryCount: " + retryCount);
|
LOG.error("retrying in 3000ms, retryCount: " + retryCount);
|
||||||
retryCount++;
|
retryCount++;
|
||||||
Thread.sleep(3000);
|
Thread.sleep(DELAY_FOR_RETRIES_MS);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
|
|
@ -69,6 +69,9 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
|
||||||
DateTime dtLast; // timestamp of last event
|
DateTime dtLast; // timestamp of last event
|
||||||
DateTime dtGroupBy; // timestamp for expected response for groupBy query
|
DateTime dtGroupBy; // timestamp for expected response for groupBy query
|
||||||
|
|
||||||
|
static final int NUM_RETRIES = 60;
|
||||||
|
static final long DELAY_FOR_RETRIES_MS = 10000;
|
||||||
|
|
||||||
@Inject
|
@Inject
|
||||||
ServerDiscoveryFactory factory;
|
ServerDiscoveryFactory factory;
|
||||||
@Inject
|
@Inject
|
||||||
|
@ -106,11 +109,14 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
|
||||||
postEvents();
|
postEvents();
|
||||||
|
|
||||||
// wait for a while to let the events be ingested
|
// wait for a while to let the events be ingested
|
||||||
ITRetryUtil.retryUntilTrue(
|
ITRetryUtil.retryUntil(
|
||||||
() -> {
|
() -> {
|
||||||
final int countRows = queryHelper.countRows(fullDatasourceName, Intervals.ETERNITY.toString());
|
final int countRows = queryHelper.countRows(fullDatasourceName, Intervals.ETERNITY.toString());
|
||||||
return countRows == getNumExpectedRowsIngested();
|
return countRows == getNumExpectedRowsIngested();
|
||||||
},
|
},
|
||||||
|
true,
|
||||||
|
DELAY_FOR_RETRIES_MS,
|
||||||
|
NUM_RETRIES,
|
||||||
"Waiting all events are ingested"
|
"Waiting all events are ingested"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -152,8 +158,8 @@ public abstract class AbstractITRealtimeIndexTaskTest extends AbstractIndexerTes
|
||||||
ITRetryUtil.retryUntil(
|
ITRetryUtil.retryUntil(
|
||||||
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
|
() -> coordinator.areSegmentsLoaded(fullDatasourceName),
|
||||||
true,
|
true,
|
||||||
10000,
|
DELAY_FOR_RETRIES_MS,
|
||||||
60,
|
NUM_RETRIES,
|
||||||
"Real-time generated segments loaded"
|
"Real-time generated segments loaded"
|
||||||
);
|
);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue