PY-21264 Debugger hanging after creating a new subprocess the other way than forking fixed

PROCESS_CREATED message is sent in all cases to IDE when new process is being created. ClientModeDebuggerTransport connection logic has been changed to fit into waitForConnect() method without scheduling reconnection tasks. Handshake message is used to check for connection with subprocess to be real one, not ephemeral. DebuggerTransport.messageReceived() has been abolished.
This commit is contained in:
Alexander Koshevoy
2016-11-06 17:56:53 +03:00
parent fe051a3dc3
commit 0d77411666
8 changed files with 93 additions and 79 deletions

View File

@@ -319,6 +319,7 @@ def create_execl(original_name):
"""
import os
args = patch_args(args)
send_process_created_message()
return getattr(os, original_name)(path, *args)
return new_execl
@@ -330,6 +331,7 @@ def create_execv(original_name):
os.execvp(file, args)
"""
import os
send_process_created_message()
return getattr(os, original_name)(path, patch_args(args))
return new_execv
@@ -341,6 +343,7 @@ def create_execve(original_name):
"""
def new_execve(path, args, env):
import os
send_process_created_message()
return getattr(os, original_name)(path, patch_args(args), env)
return new_execve
@@ -353,6 +356,7 @@ def create_spawnl(original_name):
"""
import os
args = patch_args(args)
send_process_created_message()
return getattr(os, original_name)(mode, path, *args)
return new_spawnl
@@ -364,6 +368,7 @@ def create_spawnv(original_name):
os.spawnvp(mode, file, args)
"""
import os
send_process_created_message()
return getattr(os, original_name)(mode, path, patch_args(args))
return new_spawnv
@@ -375,6 +380,7 @@ def create_spawnve(original_name):
"""
def new_spawnve(mode, path, args, env):
import os
send_process_created_message()
return getattr(os, original_name)(mode, path, patch_args(args), env)
return new_spawnve
@@ -386,6 +392,7 @@ def create_fork_exec(original_name):
def new_fork_exec(args, *other_args):
import _posixsubprocess # @UnresolvedImport
args = patch_args(args)
send_process_created_message()
return getattr(_posixsubprocess, original_name)(args, *other_args)
return new_fork_exec
@@ -413,6 +420,7 @@ def create_CreateProcess(original_name):
import _subprocess
except ImportError:
import _winapi as _subprocess
send_process_created_message()
return getattr(_subprocess, original_name)(app_name, patch_arg_str_win(cmd_line), *args)
return new_CreateProcess
@@ -468,6 +476,13 @@ def create_fork(original_name):
return new_fork
def send_process_created_message():
from _pydevd_bundle.pydevd_comm import get_global_debugger
debugger = get_global_debugger()
if debugger is not None:
debugger.send_process_created_message()
def patch_new_process_functions():
# os.execl(path, arg0, arg1, ...)
# os.execle(path, arg0, arg1, ..., env)

View File

@@ -71,7 +71,8 @@ public class ClientModeMultiProcessDebugger implements ProcessDebugger {
addDebugger(debugger);
LOG.debug("Connected to subprocess on attempt");
myDebugProcess.init();
debugger.run();
return;
}

View File

@@ -57,7 +57,7 @@ public class RemoteDebugger implements ProcessDebugger {
public RemoteDebugger(@NotNull IPyDebugProcess debugProcess, @NotNull String host, int port) {
myDebugProcess = debugProcess;
myDebuggerTransport = new ClientModeDebuggerTransport(debugProcess, this, host, port);
myDebuggerTransport = new ClientModeDebuggerTransport(this, host, port);
}
public RemoteDebugger(@NotNull IPyDebugProcess debugProcess, @NotNull ServerSocket socket, int timeout) {
@@ -438,8 +438,6 @@ public class RemoteDebugger implements ProcessDebugger {
final ProtocolFrame frame = new ProtocolFrame(line);
logFrame(frame, false);
myDebuggerTransport.messageReceived(frame);
if (AbstractThreadCommand.isThreadCommand(frame.getCommand())) {
processThreadEvent(frame);
}

View File

@@ -11,6 +11,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.Charset;
import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
/**
@@ -22,6 +23,8 @@ public abstract class BaseDebuggerReader extends BaseOutputReader {
@NotNull private final RemoteDebugger myDebugger;
@NotNull private StringBuilder myTextBuilder = new StringBuilder();
private final CountDownLatch myReadyToReadLatch = new CountDownLatch(1);
public BaseDebuggerReader(@NotNull InputStream inputStream, @NotNull Charset charset, @NotNull RemoteDebugger debugger) {
super(inputStream, charset);
myDebugger = debugger;
@@ -33,6 +36,7 @@ public abstract class BaseDebuggerReader extends BaseOutputReader {
}
protected void doRun() {
myReadyToReadLatch.countDown();
try {
while (true) {
boolean read = readAvailableBlocking();
@@ -99,4 +103,8 @@ public abstract class BaseDebuggerReader extends BaseOutputReader {
}
}
}
public void awaitReadyToRead() throws InterruptedException {
myReadyToReadLatch.await();
}
}

View File

@@ -34,7 +34,7 @@ public abstract class BaseDebuggerTransport implements DebuggerTransport {
myDebugger.fireCommunicationError();
}
catch (IOException e) {
LOG.error(e);
LOG.debug(e);
}
return false;
}

View File

@@ -1,12 +1,11 @@
package com.jetbrains.python.debugger.pydev.transport;
import com.intellij.openapi.application.ApplicationManager;
import com.intellij.openapi.diagnostic.Logger;
import com.intellij.openapi.vfs.CharsetToolkit;
import com.jetbrains.python.debugger.IPyDebugProcess;
import com.jetbrains.python.debugger.PyDebuggerException;
import com.jetbrains.python.debugger.pydev.AbstractCommand;
import com.jetbrains.python.debugger.pydev.ClientModeMultiProcessDebugger;
import com.jetbrains.python.debugger.pydev.ProtocolFrame;
import com.jetbrains.python.debugger.pydev.RemoteDebugger;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
@@ -17,11 +16,8 @@ import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
/**
* {@link DebuggerTransport} implementation that expects a debugging script to behave as a server. The main process of the debugging script
@@ -51,21 +47,10 @@ import java.util.concurrent.atomic.AtomicInteger;
public class ClientModeDebuggerTransport extends BaseDebuggerTransport {
private static final Logger LOG = Logger.getInstance(ClientModeDebuggerTransport.class);
private static final ScheduledExecutorService myScheduledExecutor = new ScheduledThreadPoolExecutor(1, new ThreadFactory() {
private AtomicInteger num = new AtomicInteger(1);
@Override
public Thread newThread(@NotNull Runnable r) {
return new Thread(r, "Python Debug Script Connection " + num.getAndIncrement());
}
});
private static final int MAX_CONNECTION_TRIES = 10;
private static final int MAX_CONNECTION_TRIES = 20;
private static final long CHECK_CONNECTION_APPROVED_DELAY = 1000L;
private static final long SLEEP_TIME_BETWEEN_CONNECTION_TRIES = 150L;
@NotNull private final IPyDebugProcess myDebugProcess;
@NotNull private final String myHost;
private final int myPort;
@@ -74,25 +59,16 @@ public class ClientModeDebuggerTransport extends BaseDebuggerTransport {
@Nullable private Socket mySocket;
@Nullable private volatile DebuggerReader myDebuggerReader;
public ClientModeDebuggerTransport(@NotNull IPyDebugProcess debugProcess,
@NotNull RemoteDebugger debugger,
public ClientModeDebuggerTransport(@NotNull RemoteDebugger debugger,
@NotNull String host,
int port) {
super(debugger);
myDebugProcess = debugProcess;
myHost = host;
myPort = port;
}
@Override
public void waitForConnect() throws IOException {
try {
Thread.sleep(500L);
}
catch (InterruptedException e) {
throw new IOException(e);
}
if (myState != State.INIT) {
throw new IllegalStateException(
"Inappropriate state of Python debugger for connecting to Python debugger: " + myState + "; " + State.INIT + " is expected");
@@ -120,23 +96,77 @@ public class ClientModeDebuggerTransport extends BaseDebuggerTransport {
boolean connected = false;
while (!connected && i < MAX_CONNECTION_TRIES) {
i++;
int attempt = i;
LOG.debug(String.format("[%d] Trying to connect: #%d attempt", hashCode(), attempt));
try {
Socket clientSocket = new Socket();
clientSocket.setSoTimeout(0);
clientSocket.connect(new InetSocketAddress(myHost, myPort));
synchronized (mySocketObject) {
mySocket = clientSocket;
myState = State.CONNECTED;
}
try {
myDebuggerReader = new DebuggerReader(myDebugger, clientSocket.getInputStream());
InputStream stream;
synchronized (mySocketObject) {
stream = mySocket.getInputStream();
}
myDebuggerReader = new DebuggerReader(myDebugger, stream);
}
catch (IOException e) {
LOG.debug("Failed to create debugger reader", e);
throw e;
}
synchronized (mySocketObject) {
mySocket = clientSocket;
AtomicBoolean success = new AtomicBoolean(false);
CountDownLatch beforeHandshake = new CountDownLatch(1);
Future<Void> future = ApplicationManager.getApplication().executeOnPooledThread(() -> {
try {
myDebuggerReader.awaitReadyToRead();
}
finally {
beforeHandshake.countDown();
}
try {
myDebugger.handshake();
success.set(true);
}
catch (PyDebuggerException e) {
LOG.debug(String.format("[%d] Handshake failed: #%d attempt", hashCode(), attempt));
}
return null;
});
try {
beforeHandshake.await();
future.get(CHECK_CONNECTION_APPROVED_DELAY, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
LOG.debug(String.format("[%d] Waiting for handshake interrupted: #%d attempt", hashCode(), attempt), e);
myDebuggerReader.close();
throw new IOException("Waiting for subprocess interrupted", e);
}
catch (ExecutionException e) {
LOG.debug(String.format("[%d] Execution exception occurred: #%d attempt", hashCode(), attempt), e);
}
catch (TimeoutException e) {
LOG.debug(String.format("[%d] Timeout: #%d attempt", hashCode(), attempt), e);
future.cancel(true);
}
connected = success.get();
if (!connected) {
myDebuggerReader.close();
try {
Thread.sleep(SLEEP_TIME_BETWEEN_CONNECTION_TRIES);
}
catch (InterruptedException e) {
throw new IOException(e);
}
}
connected = true;
}
catch (ConnectException e) {
if (i < MAX_CONNECTION_TRIES) {
@@ -155,37 +185,14 @@ public class ClientModeDebuggerTransport extends BaseDebuggerTransport {
throw new IOException("Failed to connect to debugging script");
}
myState = State.CONNECTED;
LOG.debug("Connected to Python debugger script on #" + i + " attempt");
try {
myDebugProcess.init();
myDebugger.run();
}
catch (PyDebuggerException e) {
myState = State.DISCONNECTED;
throw new IOException("Failed to send run command", e);
}
myScheduledExecutor.schedule(() -> {
if (myState == State.CONNECTED) {
try {
LOG.debug("Reconnecting...");
doConnect();
}
catch (IOException e) {
LOG.debug(e);
myDebugger.fireCommunicationError();
}
}
}, CHECK_CONNECTION_APPROVED_DELAY, TimeUnit.MILLISECONDS);
myState = State.APPROVED;
LOG.debug(String.format("[%d] Connected to Python debugger script on #%d attempt", hashCode(), i));
}
@Override
protected boolean sendMessageImpl(byte[] packed) throws IOException {
synchronized (mySocketObject) {
if (mySocket == null) {
if (mySocket == null || mySocket.isClosed()) {
return false;
}
final OutputStream os = mySocket.getOutputStream();
@@ -226,13 +233,6 @@ public class ClientModeDebuggerTransport extends BaseDebuggerTransport {
// TODO disconnect?
}
@Override
public void messageReceived(@NotNull ProtocolFrame frame) {
if (myState == State.CONNECTED) {
myState = State.APPROVED;
}
}
private enum State {
/**
* Before calling {@link #waitForConnect()}

View File

@@ -18,6 +18,4 @@ public interface DebuggerTransport {
boolean isConnected();
void disconnect();
void messageReceived(@NotNull ProtocolFrame frame);
}

View File

@@ -2,7 +2,6 @@ package com.jetbrains.python.debugger.pydev.transport;
import com.intellij.openapi.diagnostic.Logger;
import com.intellij.openapi.vfs.CharsetToolkit;
import com.jetbrains.python.debugger.pydev.ProtocolFrame;
import com.jetbrains.python.debugger.pydev.RemoteDebugger;
import org.jetbrains.annotations.NotNull;
@@ -96,11 +95,6 @@ public class ServerModeDebuggerTransport extends BaseDebuggerTransport {
}
}
@Override
public void messageReceived(@NotNull ProtocolFrame frame) {
// do nothing
}
@Override
protected boolean sendMessageImpl(byte[] packed) throws IOException {
synchronized (mySocketObject) {