cleanup: use standard Executor API

This commit is contained in:
Eugene Zhuravlev
2012-04-16 16:54:38 +02:00
parent e9b4d85855
commit bc00139039
10 changed files with 35 additions and 70 deletions

View File

@@ -76,10 +76,7 @@ import java.io.FileOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@@ -95,22 +92,17 @@ public class CompileServerManager implements ApplicationComponent{
private final File mySystemDirectory;
@Nullable
private volatile CompileServerClient myClient;
private final SequentialTaskExecutor myTaskExecutor = new SequentialTaskExecutor(new AsyncTaskExecutor() {
@Override
public void submit(Runnable runnable) {
ApplicationManager.getApplication().executeOnPooledThread(runnable);
}
});
private final ProjectManager myProjectManager;
private static final int MAKE_TRIGGER_DELAY = 5 * 1000 /*5 seconds*/;
private final Map<RequestFuture, Project> myAutomakeFutures = new HashMap<RequestFuture, Project>();
private final CompileServerClasspathManager myClasspathManager = new CompileServerClasspathManager();
private final AsyncTaskExecutor myAsyncExec = new AsyncTaskExecutor() {
private final Executor myPooledThreadExecutor = new Executor() {
@Override
public void submit(Runnable runnable) {
ApplicationManager.getApplication().executeOnPooledThread(runnable);
public void execute(Runnable command) {
ApplicationManager.getApplication().executeOnPooledThread(command);
}
};
private final SequentialTaskExecutor myTaskExecutor = new SequentialTaskExecutor(myPooledThreadExecutor);
public CompileServerManager(final ProjectManager projectManager) {
myProjectManager = projectManager;
@@ -468,7 +460,7 @@ public class CompileServerManager implements ApplicationComponent{
throw new Exception("Server startup failed: " + startupMsg);
}
CompileServerClient client = new CompileServerClient(serverPingInterval, myAsyncExec);
CompileServerClient client = new CompileServerClient(serverPingInterval, myPooledThreadExecutor);
boolean connected = false;
try {
connected = client.connect(NetUtils.getLocalHostString(), port);

View File

@@ -1,16 +0,0 @@
package org.jetbrains.jps.api;
/**
* @author Eugene Zhuravlev
* Date: 2/28/12
*/
public interface AsyncTaskExecutor {
AsyncTaskExecutor DEFAULT = new AsyncTaskExecutor() {
@Override
public void submit(Runnable runnable) {
new Thread(runnable).start();
}
};
void submit(Runnable runnable);
}

View File

@@ -1,6 +1,7 @@
package org.jetbrains.jps.api;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RunnableFuture;
@@ -11,7 +12,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
* Date: 9/24/11
*/
public class SequentialTaskExecutor {
private final AsyncTaskExecutor myExecutor;
private final Executor myExecutor;
private final Queue<FutureTask> myTaskQueue = new LinkedBlockingQueue<FutureTask>();
private final AtomicBoolean myInProgress = new AtomicBoolean(false);
private final Runnable USER_TASK_RUNNER = new Runnable() {
@@ -31,7 +32,7 @@ public class SequentialTaskExecutor {
}
};
public SequentialTaskExecutor(AsyncTaskExecutor executor) {
public SequentialTaskExecutor(Executor executor) {
myExecutor = executor;
}
@@ -48,7 +49,7 @@ public class SequentialTaskExecutor {
private void processQueue() {
if (!myInProgress.getAndSet(true)) {
myExecutor.submit(USER_TASK_RUNNER);
myExecutor.execute(USER_TASK_RUNNER);
}
}

View File

@@ -9,10 +9,4 @@ import java.util.concurrent.Executors;
*/
public class SharedThreadPool {
public static final ExecutorService INSTANCE = Executors.newCachedThreadPool();
public static final AsyncTaskExecutor ASYNC_EXEC = new AsyncTaskExecutor() {
@Override
public void submit(Runnable runnable) {
INSTANCE.submit(runnable);
}
};
}

View File

@@ -6,7 +6,11 @@ import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.jps.api.*;
import java.util.*;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -20,7 +24,7 @@ public class CompileServerClient extends SimpleProtobufClient<JpsServerResponseH
private volatile ScheduledFuture<?> myPingFuture;
private final long myServerPingInterval;
public CompileServerClient(long serverPingInterval, final AsyncTaskExecutor asyncExec) {
public CompileServerClient(long serverPingInterval, final Executor asyncExec) {
super(JpsRemoteProto.Message.getDefaultInstance(), asyncExec, new UUIDGetter() {
@NotNull
public UUID getSessionUUID(@NotNull MessageEvent e) {

View File

@@ -6,12 +6,12 @@ import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.jps.api.AsyncTaskExecutor;
import org.jetbrains.jps.api.RequestFuture;
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
/**
* @author Eugene Zhuravlev
@@ -22,9 +22,9 @@ final class ProtobufClientMessageHandler<T extends ProtobufResponseHandler> exte
@NotNull
private final UUIDGetter myUuidGetter;
private final SimpleProtobufClient myClient;
private final AsyncTaskExecutor myAsyncExec;
private final Executor myAsyncExec;
public ProtobufClientMessageHandler(@NotNull UUIDGetter uuidGetter, SimpleProtobufClient client, AsyncTaskExecutor asyncExec) {
public ProtobufClientMessageHandler(@NotNull UUIDGetter uuidGetter, SimpleProtobufClient client, Executor asyncExec) {
myUuidGetter = uuidGetter;
myClient = client;
myAsyncExec = asyncExec;
@@ -92,7 +92,7 @@ final class ProtobufClientMessageHandler<T extends ProtobufResponseHandler> exte
}
finally {
// make sure the client is in disconnected state
myAsyncExec.submit(new Runnable() {
myAsyncExec.execute(new Runnable() {
@Override
public void run() {
myClient.disconnect();

View File

@@ -10,13 +10,11 @@ import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.jps.api.AsyncTaskExecutor;
import org.jetbrains.jps.api.RequestFuture;
import java.net.InetSocketAddress;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
/**
@@ -25,7 +23,6 @@ import java.util.concurrent.atomic.AtomicReference;
*/
public class SimpleProtobufClient<T extends ProtobufResponseHandler> {
private static final Logger LOG = Logger.getInstance("#org.jetbrains.jps.client.SimpleProtobufClient");
private final ExecutorService ourExecutor = Executors.newCachedThreadPool();
private static enum State {
DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING
@@ -37,9 +34,9 @@ public class SimpleProtobufClient<T extends ProtobufResponseHandler> {
protected volatile ChannelFuture myConnectFuture;
private final ProtobufClientMessageHandler<T> myMessageHandler;
public SimpleProtobufClient(final MessageLite msgDefaultInstance, final AsyncTaskExecutor asyncExec, final UUIDGetter uuidGetter) {
public SimpleProtobufClient(final MessageLite msgDefaultInstance, final Executor asyncExec, final UUIDGetter uuidGetter) {
myMessageHandler = new ProtobufClientMessageHandler<T>(uuidGetter, this, asyncExec);
myChannelFactory = new NioClientSocketChannelFactory(ourExecutor, ourExecutor, 1);
myChannelFactory = new NioClientSocketChannelFactory(asyncExec, asyncExec, 1);
myPipelineFactory = new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
@@ -117,13 +114,8 @@ public class SimpleProtobufClient<T extends ProtobufResponseHandler> {
catch (Throwable e) {
LOG.error(e);
}
try {
final ChannelFuture closeFuture = future.getChannel().close();
closeFuture.awaitUninterruptibly();
}
finally {
myChannelFactory.releaseExternalResources();
}
final ChannelFuture closeFuture = future.getChannel().close();
closeFuture.awaitUninterruptibly();
}
}
finally {

View File

@@ -17,7 +17,7 @@ import java.util.*;
public class JavacServerClient extends SimpleProtobufClient<JavacServerResponseHandler>{
public JavacServerClient() {
super(JavacRemoteProto.Message.getDefaultInstance(), SharedThreadPool.ASYNC_EXEC, new UUIDGetter() {
super(JavacRemoteProto.Message.getDefaultInstance(), SharedThreadPool.INSTANCE, new UUIDGetter() {
@NotNull
public UUID getSessionUUID(@NotNull MessageEvent e) {
final JavacRemoteProto.Message message = (JavacRemoteProto.Message)e.getMessage();

View File

@@ -1,6 +1,7 @@
package org.jetbrains.jps.server;
//import com.intellij.openapi.diagnostic.Logger;
import com.intellij.openapi.diagnostic.Logger;
import com.intellij.util.ConcurrencyUtil;
import org.apache.log4j.Level;
@@ -17,17 +18,13 @@ import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;
import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;
import org.jetbrains.annotations.NonNls;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.jps.api.AsyncTaskExecutor;
import org.jetbrains.jps.api.GlobalOptions;
import org.jetbrains.jps.api.JpsRemoteProto;
import org.jetbrains.jps.incremental.Utils;
import java.io.File;
import java.net.InetSocketAddress;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.*;
/**
* @author Eugene Zhuravlev
@@ -77,14 +74,14 @@ public class Server {
myBuildsExecutor = Executors.newFixedThreadPool(MAX_SIMULTANEOUS_BUILD_SESSIONS);
myChannelFactory = new NioServerSocketChannelFactory(threadPool, threadPool, 1);
final ChannelRegistrar channelRegistrar = new ChannelRegistrar();
myMessageHandler = new ServerMessageHandler(this, new AsyncTaskExecutor() {
myMessageHandler = new ServerMessageHandler(this, new Executor() {
@Override
public void submit(final Runnable runnable) {
public void execute(final Runnable command) {
myBuildsExecutor.submit(new Runnable() {
@Override
public void run() {
try {
runnable.run();
command.run();
}
finally {
Thread.interrupted(); // clear interrupted status before returning to pull

View File

@@ -15,6 +15,7 @@ import java.io.File;
import java.io.PrintStream;
import java.util.*;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.RunnableFuture;
/**
@@ -27,9 +28,9 @@ class ServerMessageHandler extends SimpleChannelHandler {
private final Map<String, SequentialTaskExecutor> myTaskExecutors = new HashMap<String, SequentialTaskExecutor>();
private final List<Pair<RunnableFuture, CompilationTask>> myBuildsInProgress = Collections.synchronizedList(new LinkedList<Pair<RunnableFuture, CompilationTask>>());
private final Server myServer;
private final AsyncTaskExecutor myAsyncExecutor;
private final Executor myAsyncExecutor;
public ServerMessageHandler(Server server, final AsyncTaskExecutor asyncExecutor) {
public ServerMessageHandler(Server server, final Executor asyncExecutor) {
myServer = server;
myAsyncExecutor = asyncExecutor;
}
@@ -87,7 +88,7 @@ class ServerMessageHandler extends SimpleChannelHandler {
break;
case SHUTDOWN_COMMAND :
myAsyncExecutor.submit(new Runnable() {
myAsyncExecutor.execute(new Runnable() {
public void run() {
try {
cancelAllBuildsAndClearState();