Revert "bulk api: use `System.nanoTime()` instead of `System.currentTimeMillis()` to compute the estimated took time"
This reverts commit 82567f1bdf
.
`System.nanoTime()` was being called from multiple threads causing the timing to be incorrect.
This commit is contained in:
parent
942eb70956
commit
27ea143c3f
|
@ -61,7 +61,6 @@ import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -196,15 +195,16 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||||
* @see #doExecute(BulkRequest, org.elasticsearch.action.ActionListener)
|
* @see #doExecute(BulkRequest, org.elasticsearch.action.ActionListener)
|
||||||
*/
|
*/
|
||||||
public void executeBulk(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
|
public void executeBulk(final BulkRequest bulkRequest, final ActionListener<BulkResponse> listener) {
|
||||||
final long startTimeNanos = System.nanoTime();
|
final long startTime = System.currentTimeMillis();
|
||||||
executeBulk(bulkRequest, startTimeNanos, listener, new AtomicArray<>(bulkRequest.requests.size()));
|
executeBulk(bulkRequest, startTime, listener, new AtomicArray<>(bulkRequest.requests.size()));
|
||||||
}
|
}
|
||||||
|
|
||||||
private long buildTookInMillis(long startTimeNanos) {
|
private long buildTookInMillis(long startTime) {
|
||||||
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTimeNanos);
|
// protect ourselves against time going backwards
|
||||||
|
return Math.max(1, System.currentTimeMillis() - startTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void executeBulk(final BulkRequest bulkRequest, final long startTimeNanos, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
|
private void executeBulk(final BulkRequest bulkRequest, final long startTime, final ActionListener<BulkResponse> listener, final AtomicArray<BulkItemResponse> responses ) {
|
||||||
final ClusterState clusterState = clusterService.state();
|
final ClusterState clusterState = clusterService.state();
|
||||||
// TODO use timeout to wait here if its blocked...
|
// TODO use timeout to wait here if its blocked...
|
||||||
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);
|
clusterState.blocks().globalBlockedRaiseException(ClusterBlockLevel.WRITE);
|
||||||
|
@ -302,7 +302,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||||
}
|
}
|
||||||
|
|
||||||
if (requestsByShard.isEmpty()) {
|
if (requestsByShard.isEmpty()) {
|
||||||
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
|
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTime)));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -352,7 +352,7 @@ public class TransportBulkAction extends HandledTransportAction<BulkRequest, Bul
|
||||||
}
|
}
|
||||||
|
|
||||||
private void finishHim() {
|
private void finishHim() {
|
||||||
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTimeNanos)));
|
listener.onResponse(new BulkResponse(responses.toArray(new BulkItemResponse[responses.length()]), buildTookInMillis(startTime)));
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue