Cluster State Update APIs (master node) to respect master_timeout better
also respect the timeout when trying to obtain the md lock relates #3365
This commit is contained in:
parent
235a68c3bd
commit
c766f6bd97
|
@ -68,6 +68,8 @@ import java.io.IOException;
|
|||
import java.io.InputStreamReader;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -125,9 +127,12 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
|
||||
// we lock here, and not within the cluster service callback since we don't want to
|
||||
// block the whole cluster state handling
|
||||
MetaDataService.MdLock mdLock = metaDataService.indexMetaDataLock(request.index);
|
||||
Semaphore mdLock = metaDataService.indexMetaDataLock(request.index);
|
||||
try {
|
||||
mdLock.lock();
|
||||
if (!mdLock.tryAcquire(request.masterTimeout.nanos(), TimeUnit.NANOSECONDS)) {
|
||||
userListener.onFailure(new ProcessClusterEventTimeoutException(request.masterTimeout, "acquire index lock"));
|
||||
return;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
userListener.onFailure(e);
|
||||
return;
|
||||
|
@ -370,16 +375,12 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
class CreateIndexListener implements Listener {
|
||||
|
||||
private final AtomicBoolean notified = new AtomicBoolean();
|
||||
|
||||
private final MetaDataService.MdLock mdLock;
|
||||
|
||||
private final Semaphore mdLock;
|
||||
private final Request request;
|
||||
|
||||
private final Listener listener;
|
||||
|
||||
volatile ScheduledFuture future;
|
||||
|
||||
private CreateIndexListener(MetaDataService.MdLock mdLock, Request request, Listener listener) {
|
||||
private CreateIndexListener(Semaphore mdLock, Request request, Listener listener) {
|
||||
this.mdLock = mdLock;
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
|
@ -388,7 +389,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
@Override
|
||||
public void onResponse(final Response response) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
mdLock.unlock();
|
||||
mdLock.release();
|
||||
if (future != null) {
|
||||
future.cancel(false);
|
||||
}
|
||||
|
@ -399,7 +400,7 @@ public class MetaDataCreateIndexService extends AbstractComponent {
|
|||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
mdLock.unlock();
|
||||
mdLock.release();
|
||||
if (future != null) {
|
||||
future.cancel(false);
|
||||
}
|
||||
|
|
|
@ -38,6 +38,8 @@ import org.elasticsearch.indices.IndexMissingException;
|
|||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -73,9 +75,12 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
public void deleteIndex(final Request request, final Listener userListener) {
|
||||
// we lock here, and not within the cluster service callback since we don't want to
|
||||
// block the whole cluster state handling
|
||||
MetaDataService.MdLock mdLock = metaDataService.indexMetaDataLock(request.index);
|
||||
Semaphore mdLock = metaDataService.indexMetaDataLock(request.index);
|
||||
try {
|
||||
mdLock.lock();
|
||||
if (!mdLock.tryAcquire(request.masterTimeout.nanos(), TimeUnit.NANOSECONDS)) {
|
||||
userListener.onFailure(new ProcessClusterEventTimeoutException(request.masterTimeout, "acquire index lock"));
|
||||
return;
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
userListener.onFailure(e);
|
||||
return;
|
||||
|
@ -156,16 +161,12 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
class DeleteIndexListener implements Listener {
|
||||
|
||||
private final AtomicBoolean notified = new AtomicBoolean();
|
||||
|
||||
private final MetaDataService.MdLock mdLock;
|
||||
|
||||
private final Semaphore mdLock;
|
||||
private final Request request;
|
||||
|
||||
private final Listener listener;
|
||||
|
||||
volatile ScheduledFuture future;
|
||||
|
||||
private DeleteIndexListener(MetaDataService.MdLock mdLock, Request request, Listener listener) {
|
||||
private DeleteIndexListener(Semaphore mdLock, Request request, Listener listener) {
|
||||
this.mdLock = mdLock;
|
||||
this.request = request;
|
||||
this.listener = listener;
|
||||
|
@ -174,7 +175,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
@Override
|
||||
public void onResponse(final Response response) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
mdLock.unlock();
|
||||
mdLock.release();
|
||||
if (future != null) {
|
||||
future.cancel(false);
|
||||
}
|
||||
|
@ -185,7 +186,7 @@ public class MetaDataDeleteIndexService extends AbstractComponent {
|
|||
@Override
|
||||
public void onFailure(Throwable t) {
|
||||
if (notified.compareAndSet(false, true)) {
|
||||
mdLock.unlock();
|
||||
mdLock.release();
|
||||
if (future != null) {
|
||||
future.cancel(false);
|
||||
}
|
||||
|
|
|
@ -24,39 +24,24 @@ import org.elasticsearch.common.component.AbstractComponent;
|
|||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
|
||||
import java.util.concurrent.Semaphore;
|
||||
|
||||
/**
|
||||
*/
|
||||
public class MetaDataService extends AbstractComponent {
|
||||
|
||||
private final MdLock[] indexMdLocks;
|
||||
private final Semaphore[] indexMdLocks;
|
||||
|
||||
@Inject
|
||||
public MetaDataService(Settings settings) {
|
||||
super(settings);
|
||||
indexMdLocks = new MdLock[500];
|
||||
indexMdLocks = new Semaphore[1];
|
||||
for (int i = 0; i < indexMdLocks.length; i++) {
|
||||
indexMdLocks[i] = new MdLock();
|
||||
indexMdLocks[i] = new Semaphore(1);
|
||||
}
|
||||
}
|
||||
|
||||
public MdLock indexMetaDataLock(String index) {
|
||||
public Semaphore indexMetaDataLock(String index) {
|
||||
return indexMdLocks[Math.abs(DjbHashFunction.DJB_HASH(index) % indexMdLocks.length)];
|
||||
}
|
||||
|
||||
public class MdLock {
|
||||
|
||||
private boolean isLocked = false;
|
||||
|
||||
public synchronized void lock() throws InterruptedException {
|
||||
while (isLocked) {
|
||||
wait();
|
||||
}
|
||||
isLocked = true;
|
||||
}
|
||||
|
||||
public synchronized void unlock() {
|
||||
isLocked = false;
|
||||
notifyAll();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue