cleanup redirects and reannounce workers on connection loss

This commit is contained in:
Fangjin Yang 2012-10-31 16:43:47 -07:00
parent 9547ea494d
commit 83a51a801e
6 changed files with 33 additions and 32 deletions

View File

@ -207,6 +207,9 @@ public class RemoteTaskRunner implements TaskRunner
WorkerWrapper workerWrapper;
if ((workerWrapper = findWorkerRunningTask(taskWrapper)) != null) {
final Worker worker = workerWrapper.getWorker();
log.info("Worker[%s] is already running task{%s].", worker.getHost(), taskWrapper.getTask().getId());
TaskStatus taskStatus = jsonMapper.readValue(
cf.getData().forPath(JOINER.join(config.getStatusPath(), worker.getHost(), taskWrapper.getTask().getId())),
TaskStatus.class
@ -454,7 +457,7 @@ public class RemoteTaskRunner implements TaskRunner
* Creates a ZK entry under a specific path associated with a worker. The worker is responsible for
* removing the task ZK entry and creating a task status ZK entry.
*
* @param theWorker The worker the task is assigned to
* @param theWorker The worker the task is assigned to
* @param taskWrapper The task to be assigned
*/
private void announceTask(Worker theWorker, TaskWrapper taskWrapper)

View File

@ -230,10 +230,6 @@ public class IndexerCoordinatorNode
root.addFilter(
new FilterHolder(
new RedirectFilter(
HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(),
new Lifecycle()
),
new ToStringResponseHandler(Charsets.UTF_8),
new RedirectInfo()
{

View File

@ -28,6 +28,8 @@ import com.metamx.common.logger.Logger;
import com.metamx.druid.merger.common.TaskStatus;
import com.metamx.druid.merger.common.config.IndexerZkConfig;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.state.ConnectionState;
import com.netflix.curator.framework.state.ConnectionStateListener;
import org.apache.zookeeper.CreateMode;
import org.codehaus.jackson.map.ObjectMapper;
import org.joda.time.DateTime;
@ -95,6 +97,28 @@ public class WorkerCuratorCoordinator
worker
);
curatorFramework.getConnectionStateListenable().addListener(
new ConnectionStateListener()
{
@Override
public void stateChanged(CuratorFramework client, ConnectionState newState)
{
try {
if (newState.equals(ConnectionState.RECONNECTED)) {
makePathIfNotExisting(
getAnnouncementsPathForWorker(),
CreateMode.EPHEMERAL,
worker
);
}
}
catch (Exception e) {
throw Throwables.propagate(e);
}
}
}
);
started = true;
}
}

View File

@ -267,10 +267,6 @@ public class MasterMain
root.addFilter(
new FilterHolder(
new RedirectFilter(
HttpClientInit.createClient(
HttpClientConfig.builder().withNumConnections(1).build(),
new Lifecycle()
),
new ToStringResponseHandler(Charsets.UTF_8),
redirectInfo
)

View File

@ -43,17 +43,14 @@ public class RedirectFilter implements Filter
{
private static final Logger log = new Logger(RedirectFilter.class);
private final HttpClient httpClient;
private final HttpResponseHandler<StringBuilder, String> responseHandler;
private final RedirectInfo redirectInfo;
public RedirectFilter(
HttpClient httpClient,
HttpResponseHandler<StringBuilder, String> responseHandler,
RedirectInfo redirectInfo
)
{
this.httpClient = httpClient;
this.responseHandler = responseHandler;
this.redirectInfo = redirectInfo;
}
@ -82,28 +79,11 @@ public class RedirectFilter implements Filter
URL url = redirectInfo.getRedirectURL(request.getQueryString(), request.getRequestURI());
log.info("Forwarding request to [%s]", url);
if (request.getMethod().equals(HttpMethod.POST)) {
try {
forward(request, url);
}
catch (Exception e) {
throw Throwables.propagate(e);
}
} else {
response.sendRedirect(url.toString());
}
response.setStatus(HttpServletResponse.SC_MOVED_TEMPORARILY);
response.setHeader("Location", url.toString());
}
}
@Override
public void destroy() {}
private void forward(HttpServletRequest req, URL url) throws Exception
{
byte[] requestQuery = ByteStreams.toByteArray(req.getInputStream());
httpClient.post(url)
.setContent("application/json", requestQuery)
.go(responseHandler)
.get();
}
}

View File

@ -65,7 +65,9 @@ public class RedirectServlet extends DefaultServlet
} else {
URL url = redirectInfo.getRedirectURL(request.getQueryString(), request.getRequestURI());
log.info("Forwarding request to [%s]", url);
response.sendRedirect(url.toString());
response.setStatus(HttpServletResponse.SC_MOVED_TEMPORARILY);
response.setHeader("Location", url.toString());
}
}
}