Pass through all exceptions in IndicesLifecycleListeners

This allows a plugin or user that registers a listener to be able to
stop actions like creating an index or starting a shard by throwing an
exception. Previously all exceptions were logged without being rethrown.
This commit is contained in:
Lee Hinman 2015-01-14 18:20:15 +01:00
parent 366ddfc89a
commit 283a467e20
2 changed files with 32 additions and 1 deletions

View File

@ -34,7 +34,9 @@ import org.elasticsearch.index.shard.ShardId;
import java.util.concurrent.CopyOnWriteArrayList;
/**
*
* InternalIndicesLifecycle handles invoking each listener for the Index. All
* exceptions thrown by listeners are logged and then re-thrown to stop further
* index action.
*/
public class InternalIndicesLifecycle extends AbstractComponent implements IndicesLifecycle {
@ -69,6 +71,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.beforeIndexCreated(index, indexSettings);
} catch (Throwable t) {
logger.warn("[{}] failed to invoke before index created callback", t, index.name());
throw t;
}
}
}
@ -79,6 +82,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.afterIndexCreated(indexService);
} catch (Throwable t) {
logger.warn("[{}] failed to invoke after index created callback", t, indexService.index().name());
throw t;
}
}
}
@ -89,6 +93,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.beforeIndexShardCreated(shardId, indexSettings);
} catch (Throwable t) {
logger.warn("{} failed to invoke before shard created callback", t, shardId);
throw t;
}
}
}
@ -99,6 +104,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.afterIndexShardCreated(indexShard);
} catch (Throwable t) {
logger.warn("{} failed to invoke after shard created callback", t, indexShard.shardId());
throw t;
}
}
}
@ -109,6 +115,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.afterIndexShardPostRecovery(indexShard);
} catch (Throwable t) {
logger.warn("{} failed to invoke after shard post recovery callback", t, indexShard.shardId());
throw t;
}
}
}
@ -119,6 +126,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.afterIndexShardStarted(indexShard);
} catch (Throwable t) {
logger.warn("{} failed to invoke after shard started callback", t, indexShard.shardId());
throw t;
}
}
}
@ -129,6 +137,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.beforeIndexClosed(indexService);
} catch (Throwable t) {
logger.warn("[{}] failed to invoke before index closed callback", t, indexService.index().name());
throw t;
}
}
}
@ -139,6 +148,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.beforeIndexDeleted(indexService);
} catch (Throwable t) {
logger.warn("[{}] failed to invoke before index deleted callback", t, indexService.index().name());
throw t;
}
}
}
@ -149,6 +159,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.afterIndexDeleted(index, indexSettings);
} catch (Throwable t) {
logger.warn("[{}] failed to invoke after index deleted callback", t, index.name());
throw t;
}
}
}
@ -159,6 +170,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.afterIndexClosed(index, indexSettings);
} catch (Throwable t) {
logger.warn("[{}] failed to invoke after index closed callback", t, index.name());
throw t;
}
}
}
@ -170,6 +182,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.beforeIndexShardClosed(shardId, indexShard, indexSettings);
} catch (Throwable t) {
logger.warn("{} failed to invoke before shard closed callback", t, shardId);
throw t;
}
}
}
@ -181,6 +194,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.afterIndexShardClosed(shardId, indexShard, indexSettings);
} catch (Throwable t) {
logger.warn("{} failed to invoke after shard closed callback", t, shardId);
throw t;
}
}
}
@ -191,6 +205,7 @@ public class InternalIndicesLifecycle extends AbstractComponent implements Indic
listener.indexShardStateChanged(indexShard, previousState, indexShard.state(), reason);
} catch (Throwable t) {
logger.warn("{} failed to invoke index shard state changed callback", t, indexShard.shardId());
throw t;
}
}
}

View File

@ -20,6 +20,8 @@ package org.elasticsearch.indices;
import com.google.common.base.Predicate;
import com.google.common.collect.Maps;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.settings.ImmutableSettings;
@ -60,6 +62,17 @@ public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest
//add a listener that keeps track of the shard state changes
internalCluster().getInstance(IndicesLifecycle.class, node1).addListener(stateChangeListenerNode1);
//create an index that should fail
try {
client().admin().indices().prepareCreate("failed").setSettings(SETTING_NUMBER_OF_SHARDS, 1, "index.fail", true).get();
fail("should have thrown an exception");
} catch (ElasticsearchException e) {
assertTrue(e.getMessage().contains("failing on purpose"));
ClusterStateResponse resp = client().admin().cluster().prepareState().get();
assertFalse(resp.getState().routingTable().indicesRouting().keySet().contains("failed"));
}
//create an index
assertAcked(client().admin().indices().prepareCreate("test")
.setSettings(SETTING_NUMBER_OF_SHARDS, 6, SETTING_NUMBER_OF_REPLICAS, 0));
@ -160,6 +173,9 @@ public class IndicesLifecycleListenerTests extends ElasticsearchIntegrationTest
@Override
public void beforeIndexCreated(Index index, @IndexSettings Settings indexSettings) {
this.creationSettings = indexSettings;
if (indexSettings.getAsBoolean("index.fail", false)) {
throw new ElasticsearchException("failing on purpose");
}
}
@Override