From 83a51a801e24c4460c71401496c976dd347cb3a3 Mon Sep 17 00:00:00 2001 From: Fangjin Yang Date: Wed, 31 Oct 2012 16:43:47 -0700 Subject: [PATCH] cleanup redirects and reannounce workers on connection loss --- .../merger/coordinator/RemoteTaskRunner.java | 5 +++- .../http/IndexerCoordinatorNode.java | 4 ---- .../worker/WorkerCuratorCoordinator.java | 24 +++++++++++++++++++ .../com/metamx/druid/http/MasterMain.java | 4 ---- .../com/metamx/druid/http/RedirectFilter.java | 24 ++----------------- .../metamx/druid/http/RedirectServlet.java | 4 +++- 6 files changed, 33 insertions(+), 32 deletions(-) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java index 5e8b31078af..16d0ff2c6d8 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/RemoteTaskRunner.java @@ -207,6 +207,9 @@ public class RemoteTaskRunner implements TaskRunner WorkerWrapper workerWrapper; if ((workerWrapper = findWorkerRunningTask(taskWrapper)) != null) { final Worker worker = workerWrapper.getWorker(); + + log.info("Worker[%s] is already running task{%s].", worker.getHost(), taskWrapper.getTask().getId()); + TaskStatus taskStatus = jsonMapper.readValue( cf.getData().forPath(JOINER.join(config.getStatusPath(), worker.getHost(), taskWrapper.getTask().getId())), TaskStatus.class @@ -454,7 +457,7 @@ public class RemoteTaskRunner implements TaskRunner * Creates a ZK entry under a specific path associated with a worker. The worker is responsible for * removing the task ZK entry and creating a task status ZK entry. * - * @param theWorker The worker the task is assigned to + * @param theWorker The worker the task is assigned to * @param taskWrapper The task to be assigned */ private void announceTask(Worker theWorker, TaskWrapper taskWrapper) diff --git a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java index 8b0d5eff747..30d3b4fd53e 100644 --- a/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java +++ b/merger/src/main/java/com/metamx/druid/merger/coordinator/http/IndexerCoordinatorNode.java @@ -230,10 +230,6 @@ public class IndexerCoordinatorNode root.addFilter( new FilterHolder( new RedirectFilter( - HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(1).build(), - new Lifecycle() - ), new ToStringResponseHandler(Charsets.UTF_8), new RedirectInfo() { diff --git a/merger/src/main/java/com/metamx/druid/merger/worker/WorkerCuratorCoordinator.java b/merger/src/main/java/com/metamx/druid/merger/worker/WorkerCuratorCoordinator.java index cdf056a88fa..9375ef81696 100644 --- a/merger/src/main/java/com/metamx/druid/merger/worker/WorkerCuratorCoordinator.java +++ b/merger/src/main/java/com/metamx/druid/merger/worker/WorkerCuratorCoordinator.java @@ -28,6 +28,8 @@ import com.metamx.common.logger.Logger; import com.metamx.druid.merger.common.TaskStatus; import com.metamx.druid.merger.common.config.IndexerZkConfig; import com.netflix.curator.framework.CuratorFramework; +import com.netflix.curator.framework.state.ConnectionState; +import com.netflix.curator.framework.state.ConnectionStateListener; import org.apache.zookeeper.CreateMode; import org.codehaus.jackson.map.ObjectMapper; import org.joda.time.DateTime; @@ -95,6 +97,28 @@ public class WorkerCuratorCoordinator worker ); + curatorFramework.getConnectionStateListenable().addListener( + new ConnectionStateListener() + { + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + try { + if (newState.equals(ConnectionState.RECONNECTED)) { + makePathIfNotExisting( + getAnnouncementsPathForWorker(), + CreateMode.EPHEMERAL, + worker + ); + } + } + catch (Exception e) { + throw Throwables.propagate(e); + } + } + } + ); + started = true; } } diff --git a/server/src/main/java/com/metamx/druid/http/MasterMain.java b/server/src/main/java/com/metamx/druid/http/MasterMain.java index fe65e14b075..d0c77df081a 100644 --- a/server/src/main/java/com/metamx/druid/http/MasterMain.java +++ b/server/src/main/java/com/metamx/druid/http/MasterMain.java @@ -267,10 +267,6 @@ public class MasterMain root.addFilter( new FilterHolder( new RedirectFilter( - HttpClientInit.createClient( - HttpClientConfig.builder().withNumConnections(1).build(), - new Lifecycle() - ), new ToStringResponseHandler(Charsets.UTF_8), redirectInfo ) diff --git a/server/src/main/java/com/metamx/druid/http/RedirectFilter.java b/server/src/main/java/com/metamx/druid/http/RedirectFilter.java index 45fac15d407..28bdb3f5417 100644 --- a/server/src/main/java/com/metamx/druid/http/RedirectFilter.java +++ b/server/src/main/java/com/metamx/druid/http/RedirectFilter.java @@ -43,17 +43,14 @@ public class RedirectFilter implements Filter { private static final Logger log = new Logger(RedirectFilter.class); - private final HttpClient httpClient; private final HttpResponseHandler responseHandler; private final RedirectInfo redirectInfo; public RedirectFilter( - HttpClient httpClient, HttpResponseHandler responseHandler, RedirectInfo redirectInfo ) { - this.httpClient = httpClient; this.responseHandler = responseHandler; this.redirectInfo = redirectInfo; } @@ -82,28 +79,11 @@ public class RedirectFilter implements Filter URL url = redirectInfo.getRedirectURL(request.getQueryString(), request.getRequestURI()); log.info("Forwarding request to [%s]", url); - if (request.getMethod().equals(HttpMethod.POST)) { - try { - forward(request, url); - } - catch (Exception e) { - throw Throwables.propagate(e); - } - } else { - response.sendRedirect(url.toString()); - } + response.setStatus(HttpServletResponse.SC_MOVED_TEMPORARILY); + response.setHeader("Location", url.toString()); } } @Override public void destroy() {} - - private void forward(HttpServletRequest req, URL url) throws Exception - { - byte[] requestQuery = ByteStreams.toByteArray(req.getInputStream()); - httpClient.post(url) - .setContent("application/json", requestQuery) - .go(responseHandler) - .get(); - } } diff --git a/server/src/main/java/com/metamx/druid/http/RedirectServlet.java b/server/src/main/java/com/metamx/druid/http/RedirectServlet.java index f91f03d987d..0f7c030d610 100644 --- a/server/src/main/java/com/metamx/druid/http/RedirectServlet.java +++ b/server/src/main/java/com/metamx/druid/http/RedirectServlet.java @@ -65,7 +65,9 @@ public class RedirectServlet extends DefaultServlet } else { URL url = redirectInfo.getRedirectURL(request.getQueryString(), request.getRequestURI()); log.info("Forwarding request to [%s]", url); - response.sendRedirect(url.toString()); + + response.setStatus(HttpServletResponse.SC_MOVED_TEMPORARILY); + response.setHeader("Location", url.toString()); } } }