# coding: utf-8 ''' The idea is that we record the commands sent to the debugger and reproduce them from this script (so, this works as the client, which spawns the debugger as a separate process and communicates to it as if it was run from the outside) Note that it's a python script but it'll spawn a process to run as jython, ironpython and as python. ''' import time import pytest from pydev_tests_python import debugger_unittest from pydev_tests_python.debugger_unittest import (CMD_SET_PROPERTY_TRACE, REASON_CAUGHT_EXCEPTION, REASON_UNCAUGHT_EXCEPTION, REASON_STOP_ON_BREAKPOINT, REASON_THREAD_SUSPEND, overrides, CMD_THREAD_CREATE, CMD_GET_THREAD_STACK, REASON_STEP_INTO_MY_CODE, CMD_GET_EXCEPTION_DETAILS, IS_IRONPYTHON, IS_JYTHON, IS_CPYTHON, IS_APPVEYOR, wait_for_condition, CMD_GET_FRAME, CMD_GET_BREAKPOINT_EXCEPTION, CMD_THREAD_SUSPEND, CMD_STEP_OVER, REASON_STEP_OVER, CMD_THREAD_SUSPEND_SINGLE_NOTIFICATION, CMD_THREAD_RESUME_SINGLE_NOTIFICATION, IS_PY37_OR_GREATER, IS_PY38_OR_GREATER) from _pydevd_bundle.pydevd_constants import IS_WINDOWS, IS_PY38, GET_FRAME_RETURN_GROUP try: from urllib import unquote except ImportError: from urllib.parse import unquote from pydev_tests_python.debug_constants import * pytest_plugins = [ str('pydev_tests_python.debugger_fixtures'), ] try: xrange except: xrange = range TEST_DJANGO = False TEST_FLASK = False try: import django version = [int(x) for x in django.get_version().split('.')][:2] TEST_DJANGO = version == [1, 7] or version == [2, 2] except: pass try: import flask TEST_FLASK = True except: pass if IS_PY2: builtin_qualifier = "__builtin__" else: builtin_qualifier = "builtins" @pytest.mark.skipif(IS_IRONPYTHON, reason='Test needs gc.get_referrers to really check anything.') def test_case_referrers(case_setup): with case_setup.test_file('_debugger_case1.py') as writer: writer.log.append('writing add breakpoint') writer.write_add_breakpoint(6, 'set_up') writer.log.append('making initial run') writer.write_make_initial_run() writer.log.append('waiting for breakpoint hit') hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.log.append('get frame') writer.write_get_frame(thread_id, frame_id) writer.log.append('step over') writer.write_step_over(thread_id) writer.log.append('get frame') writer.write_get_frame(thread_id, frame_id) writer.log.append('run thread') writer.write_run_thread(thread_id) writer.log.append('asserting') try: assert 13 == writer._sequence, 'Expected 13. Had: %s' % writer._sequence except: writer.log.append('assert failed!') raise writer.log.append('asserted') writer.finished_ok = True def test_case_2(case_setup): with case_setup.test_file('_debugger_case2.py') as writer: writer.write_add_breakpoint(3, 'Call4') # seq = 3 writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.write_get_frame(thread_id, frame_id) # Note: write get frame but not waiting for it to be gotten. writer.write_add_breakpoint(14, 'Call2') writer.write_run_thread(thread_id) hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.write_get_frame(thread_id, frame_id) # Note: write get frame but not waiting for it to be gotten. writer.write_run_thread(thread_id) writer.log.append('Checking sequence. Found: %s' % (writer._sequence)) assert 15 == writer._sequence, 'Expected 15. Had: %s' % writer._sequence writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.parametrize( 'skip_suspend_on_breakpoint_exception, skip_print_breakpoint_exception', ( [['NameError'], []], [['NameError'], ['NameError']], [[], []], # Empty means it'll suspend/print in any exception [[], ['NameError']], [['ValueError'], ['Exception']], [['Exception'], ['ValueError']], # ValueError will also suspend/print since we're dealing with a NameError ) ) def test_case_breakpoint_condition_exc(case_setup, skip_suspend_on_breakpoint_exception, skip_print_breakpoint_exception): msgs_in_stderr = ( 'Error while evaluating expression: i > 5', "NameError: name 'i' is not defined", 'Traceback (most recent call last):', 'File "", line 1, in ', ) def _ignore_stderr_line(line): if original_ignore_stderr_line(line): return True for msg in msgs_in_stderr: if msg in line: return True return False def additional_output_checks(stdout, stderr): original_additional_output_checks(stdout, stderr) if skip_print_breakpoint_exception in ([], ['ValueError']): for msg in msgs_in_stderr: assert msg in stderr else: for msg in msgs_in_stderr: assert msg not in stderr with case_setup.test_file('_debugger_case_breakpoint_condition_exc.py') as writer: original_ignore_stderr_line = writer._ignore_stderr_line writer._ignore_stderr_line = _ignore_stderr_line original_additional_output_checks = writer.additional_output_checks writer.additional_output_checks = additional_output_checks writer.write_suspend_on_breakpoint_exception(skip_suspend_on_breakpoint_exception, skip_print_breakpoint_exception) breakpoint_id = writer.write_add_breakpoint( writer.get_line_index_with_content('break here'), 'Call', condition='i > 5') writer.write_make_initial_run() if skip_suspend_on_breakpoint_exception in ([], ['ValueError']): writer.wait_for_message(CMD_GET_BREAKPOINT_EXCEPTION) hit = writer.wait_for_breakpoint_hit() writer.write_run_thread(hit.thread_id) if IS_JYTHON: # Jython will break twice. if skip_suspend_on_breakpoint_exception in ([], ['ValueError']): writer.wait_for_message(CMD_GET_BREAKPOINT_EXCEPTION) hit = writer.wait_for_breakpoint_hit() writer.write_run_thread(hit.thread_id) hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.write_get_frame(thread_id, frame_id) msg = writer.wait_for_message(CMD_GET_FRAME) name_to_value = {} for var in msg.var: name_to_value[var['name']] = var['value'] assert name_to_value == {'i': '6', 'last_i': '6'} writer.write_remove_breakpoint(breakpoint_id) writer.write_run_thread(thread_id) writer.finished_ok = True @pytest.mark.skipif(IS_IRONPYTHON, reason='This test fails once in a while due to timing issues on IronPython, so, skipping it.') def test_case_3(case_setup): with case_setup.test_file('_debugger_case3.py') as writer: writer.write_make_initial_run() time.sleep(.5) breakpoint_id = writer.write_add_breakpoint(4, '') writer.write_add_breakpoint(5, 'FuncNotAvailable') # Check that it doesn't get hit in the global when a function is available hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.write_get_frame(thread_id, frame_id) writer.write_run_thread(thread_id) hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.write_get_frame(thread_id, frame_id) writer.write_remove_breakpoint(breakpoint_id) writer.write_run_thread(thread_id) assert 17 == writer._sequence, 'Expected 17. Had: %s' % writer._sequence writer.finished_ok = True def test_case_suspend_thread(case_setup): with case_setup.test_file('_debugger_case4.py') as writer: writer.write_make_initial_run() thread_id = writer.wait_for_new_thread() writer.write_suspend_thread(thread_id) while True: hit = writer.wait_for_breakpoint_hit((REASON_THREAD_SUSPEND, REASON_STOP_ON_BREAKPOINT)) if hit.name == 'sleep': break # Ok, broke on 'sleep'. else: # i.e.: if it doesn't hit on 'sleep', release and pause again. writer.write_run_thread(thread_id) time.sleep(.1) writer.write_suspend_thread(thread_id) assert hit.thread_id == thread_id writer.write_evaluate_expression('%s\t%s\t%s' % (hit.thread_id, hit.frame_id, 'LOCAL'), 'exit_while_loop()') writer.wait_for_evaluation([ [ '%0A'.format(builtin_qualifier), '%0A%0A'.format(builtin_qualifier), '%0A%0A', # jython ] ]) writer.write_run_thread(hit.thread_id) assert 17 == writer._sequence, 'Expected 17. Had: %s' % writer._sequence writer.finished_ok = True def test_case_8(case_setup): with case_setup.test_file('_debugger_case89.py') as writer: writer.write_add_breakpoint(10, 'Method3') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') writer.write_step_return(hit.thread_id) hit = writer.wait_for_breakpoint_hit('109', line=15) writer.write_run_thread(hit.thread_id) assert 9 == writer._sequence, 'Expected 9. Had: %s' % writer._sequence writer.finished_ok = True def test_case_9(case_setup): with case_setup.test_file('_debugger_case89.py') as writer: writer.write_add_breakpoint(10, 'Method3') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') # Note: no active exception (should not give an error and should return no # exception details as there's no exception). writer.write_get_current_exception(hit.thread_id) msg = writer.wait_for_message(CMD_GET_EXCEPTION_DETAILS) assert msg.thread['id'] == hit.thread_id assert not hasattr(msg.thread, 'frames') # No frames should be found. writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit('108', line=11) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit('108', line=12) writer.write_run_thread(hit.thread_id) assert 13 == writer._sequence, 'Expected 13. Had: %s' % writer._sequence writer.finished_ok = True def test_case_10(case_setup): with case_setup.test_file('_debugger_case_simple_calls.py') as writer: writer.write_add_breakpoint(2, 'None') # None or Method should make hit. writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') writer.write_step_return(hit.thread_id) hit = writer.wait_for_breakpoint_hit('109', line=11) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit('108', line=12) writer.write_run_thread(hit.thread_id) assert 11 == writer._sequence, 'Expected 11. Had: %s' % writer._sequence writer.finished_ok = True def test_case_11(case_setup): with case_setup.test_file('_debugger_case_simple_calls.py') as writer: writer.write_add_breakpoint(2, 'Method1') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111', line=2) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit('108', line=3) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit('108', line=11) writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit('108', line=12) writer.write_run_thread(hit.thread_id) assert 13 == writer._sequence, 'Expected 13. Had: %s' % writer._sequence writer.finished_ok = True def test_case_12(case_setup): with case_setup.test_file('_debugger_case_simple_calls.py') as writer: writer.write_add_breakpoint(2, '') # Should not be hit: setting empty function (not None) should only hit global. writer.write_add_breakpoint(6, 'Method1a') writer.write_add_breakpoint(11, 'Method2') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111', line=11) writer.write_step_return(hit.thread_id) hit = writer.wait_for_breakpoint_hit('111', line=6) # not a return (it stopped in the other breakpoint) writer.write_run_thread(hit.thread_id) assert 13 == writer._sequence, 'Expected 13. Had: %s' % writer._sequence writer.finished_ok = True @pytest.mark.skipif(IS_IRONPYTHON, reason='Failing on IronPython (needs to be investigated).') def test_case_13(case_setup): with case_setup.test_file('_debugger_case13.py') as writer: def _ignore_stderr_line(line): if original_ignore_stderr_line(line): return True if IS_JYTHON: for expected in ( "RuntimeWarning: Parent module '_pydevd_bundle' not found while handling absolute import", "import __builtin__"): if expected in line: return True return False original_ignore_stderr_line = writer._ignore_stderr_line writer._ignore_stderr_line = _ignore_stderr_line writer.write_add_breakpoint(35, 'main') writer.write("%s\t%s\t%s" % (CMD_SET_PROPERTY_TRACE, writer.next_seq(), "true;false;false;true")) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') writer.write_get_frame(hit.thread_id, hit.frame_id) writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit('107', line=25) # Should go inside setter method writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit('107') writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit('107', line=21) # Should go inside getter method writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit('107') # Disable property tracing writer.write("%s\t%s\t%s" % (CMD_SET_PROPERTY_TRACE, writer.next_seq(), "true;true;true;true")) writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit('107', line=39) # Should Skip step into properties setter # Enable property tracing writer.write("%s\t%s\t%s" % (CMD_SET_PROPERTY_TRACE, writer.next_seq(), "true;false;false;true")) writer.write_step_in(hit.thread_id) hit = writer.wait_for_breakpoint_hit('107', line=8) # Should go inside getter method writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_case_14(case_setup): # Interactive Debug Console with case_setup.test_file('_debugger_case14.py') as writer: writer.write_add_breakpoint(22, 'main') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') assert hit.thread_id, '%s not valid.' % hit.thread_id assert hit.frame_id, '%s not valid.' % hit.frame_id # Access some variable writer.write_debug_console_expression("%s\t%s\tEVALUATE\tcarObj.color" % (hit.thread_id, hit.frame_id)) writer.wait_for_var(['False', '%27Black%27']) assert 7 == writer._sequence, 'Expected 9. Had: %s' % writer._sequence # Change some variable writer.write_debug_console_expression("%s\t%s\tEVALUATE\tcarObj.color='Red'" % (hit.thread_id, hit.frame_id)) writer.write_debug_console_expression("%s\t%s\tEVALUATE\tcarObj.color" % (hit.thread_id, hit.frame_id)) writer.wait_for_var(['False', '%27Red%27']) assert 11 == writer._sequence, 'Expected 13. Had: %s' % writer._sequence # Iterate some loop writer.write_debug_console_expression("%s\t%s\tEVALUATE\tfor i in range(3):" % (hit.thread_id, hit.frame_id)) writer.wait_for_var(['True']) writer.write_debug_console_expression("%s\t%s\tEVALUATE\t print(i)" % (hit.thread_id, hit.frame_id)) writer.wait_for_var(['True']) writer.write_debug_console_expression("%s\t%s\tEVALUATE\t" % (hit.thread_id, hit.frame_id)) writer.wait_for_var( [ 'False' ] ) assert 17 == writer._sequence, 'Expected 19. Had: %s' % writer._sequence writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_case_15(case_setup): with case_setup.test_file('_debugger_case15.py') as writer: writer.write_add_breakpoint(22, 'main') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT) # Access some variable writer.write_custom_operation("%s\t%s\tEXPRESSION\tcarObj.color" % (hit.thread_id, hit.frame_id), "EXEC", "f=lambda x: 'val=%s' % x", "f") writer.wait_for_custom_operation('val=Black') assert 7 == writer._sequence, 'Expected 7. Had: %s' % writer._sequence writer.write_custom_operation("%s\t%s\tEXPRESSION\tcarObj.color" % (hit.thread_id, hit.frame_id), "EXECFILE", debugger_unittest._get_debugger_test_file('_debugger_case15_execfile.py'), "f") writer.wait_for_custom_operation('val=Black') assert 9 == writer._sequence, 'Expected 9. Had: %s' % writer._sequence writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_case_16(case_setup): # numpy.ndarray resolver try: import numpy except ImportError: pytest.skip('numpy not available') with case_setup.test_file('_debugger_case16.py') as writer: writer.write_add_breakpoint(9, 'main') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT) # In this test we check that the three arrays of different shapes, sizes and types # are all resolved properly as ndarrays. # First pass check is that we have all three expected variables defined writer.write_get_frame(hit.thread_id, hit.frame_id) writer.wait_for_multiple_vars(( ( '', '' ), ( '', '' ), # Any of the ones below will do. ( '', '' ) )) # For each variable, check each of the resolved (meta data) attributes... writer.write_get_variable(hit.thread_id, hit.frame_id, 'smallarray') writer.wait_for_multiple_vars(( ''.format(builtin_qualifier), ''.format(builtin_qualifier), ''.format(builtin_qualifier), ''.format(builtin_qualifier), ], [ ''.format(builtin_qualifier), ''.format(builtin_qualifier), ''.format(builtin_qualifier), ''.format(builtin_qualifier), ], '%0A'.format(builtin_qualifier,)) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_case_19(case_setup): # Check evaluate '__' attributes with case_setup.test_file('_debugger_case19.py') as writer: writer.write_add_breakpoint(8, None) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT, line=8) writer.write_evaluate_expression('%s\t%s\t%s' % (hit.thread_id, hit.frame_id, 'LOCAL'), 'a.__var') writer.wait_for_evaluation([ [ 'Hello' in t.contents assert 'Flask-Jinja-Test' in t.contents writer.finished_ok = True @pytest.mark.skipif(not TEST_DJANGO, reason='No django available') def test_case_django_a(case_setup_django): with case_setup_django.test_file(EXPECTED_RETURNCODE='any') as writer: writer.write_add_breakpoint_django(5, None, 'index.html') writer.write_make_initial_run() t = writer.create_request_thread('my_app') time.sleep(5) # Give django some time to get to startup before requesting the page t.start() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT, line=5) writer.write_get_variable(hit.thread_id, hit.frame_id, 'entry') writer.wait_for_vars([ '
  • v1:v1
  • v2:v2
  • ' % (contents,)) writer.finished_ok = True @pytest.mark.skipif(not TEST_DJANGO, reason='No django available') def test_case_django_b(case_setup_django): with case_setup_django.test_file(EXPECTED_RETURNCODE='any') as writer: writer.write_add_breakpoint_django(4, None, 'name.html') writer.write_make_initial_run() t = writer.create_request_thread('my_app/name') time.sleep(5) # Give django some time to get to startup before requesting the page t.start() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT, line=4) writer.write_get_frame(hit.thread_id, hit.frame_id) writer.wait_for_var('' thread_id3 = hit.thread_id # Requesting the stack in an unhandled exception should provide the stack of the exception, # not the current location of the program. writer.write_get_thread_stack(thread_id3) msg = writer.wait_for_message(CMD_GET_THREAD_STACK) assert len(msg.thread.frame) == 0 # In main thread (must have no back frames). assert msg.thread.frame['name'] == '' check(hit, 'IndexError', 'in main') writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Failing on Jython -- needs to be investigated).') def test_unhandled_exceptions_in_top_level(case_setup_unhandled_exceptions): with case_setup_unhandled_exceptions.test_file( '_debugger_case_unhandled_exceptions_on_top_level.py', EXPECTED_RETURNCODE=1, ) as writer: writer.write_add_exception_breakpoint_with_policy('Exception', "0", "1", "0") writer.write_make_initial_run() # Will stop in main thread hit = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Failing on Jython -- needs to be investigated).') def test_unhandled_exceptions_in_top_level2(case_setup_unhandled_exceptions): # Note: expecting unhandled exception to be printed to stderr. def get_environ(writer): env = os.environ.copy() curr_pythonpath = env.get('PYTHONPATH', '') pydevd_dirname = os.path.dirname(writer.get_pydevd_file()) curr_pythonpath = pydevd_dirname + os.pathsep + curr_pythonpath env['PYTHONPATH'] = curr_pythonpath return env def update_command_line_args(writer, args): # Start pydevd with '-m' to see how it deal with being called with # runpy at the start. assert args[0].endswith('pydevd.py') args = ['-m', 'pydevd'] + args[1:] return args with case_setup_unhandled_exceptions.test_file( '_debugger_case_unhandled_exceptions_on_top_level.py', get_environ=get_environ, update_command_line_args=update_command_line_args, EXPECTED_RETURNCODE=(1, 255), # Python 2.6, Jython can give 255 ) as writer: writer.write_add_exception_breakpoint_with_policy('Exception', "0", "1", "0") writer.write_make_initial_run() # Should stop (only once) in the main thread. hit = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Failing on Jython -- needs to be investigated).') def test_unhandled_exceptions_in_top_level3(case_setup_unhandled_exceptions): with case_setup_unhandled_exceptions.test_file( '_debugger_case_unhandled_exceptions_on_top_level.py', EXPECTED_RETURNCODE=1 ) as writer: # Handled and unhandled writer.write_add_exception_breakpoint_with_policy('Exception', "1", "1", "0") writer.write_make_initial_run() # Will stop in main thread twice: once one we find that the exception is being # thrown and another in postmortem mode when we discover it's uncaught. hit = writer.wait_for_breakpoint_hit(REASON_CAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) hit = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Failing on Jython -- needs to be investigated).') def test_unhandled_exceptions_in_top_level4(case_setup_unhandled_exceptions): # Note: expecting unhandled exception to be printed to stderr. with case_setup_unhandled_exceptions.test_file( '_debugger_case_unhandled_exceptions_on_top_level2.py', EXPECTED_RETURNCODE=1, ) as writer: # Handled and unhandled writer.write_add_exception_breakpoint_with_policy('Exception', "1", "1", "0") writer.write_make_initial_run() # We have an exception thrown and handled and another which is thrown and is then unhandled. hit = writer.wait_for_breakpoint_hit(REASON_CAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) hit = writer.wait_for_breakpoint_hit(REASON_CAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) hit = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='Only for Python.') def test_case_set_next_statement(case_setup): with case_setup.test_file('_debugger_case_set_next_statement.py') as writer: breakpoint_id = writer.write_add_breakpoint(6, None) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT, line=6) # Stop in line a=3 (before setting it) writer.write_evaluate_expression('%s\t%s\t%s' % (hit.thread_id, hit.frame_id, 'LOCAL'), 'a') writer.wait_for_evaluation('' assert msg.thread.frame['line'] == str(writer.get_line_index_with_content('break line on unhandled exception')) writer.write_run_thread(hit.thread_id) writer.log.append('Marking finished ok.') writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='Only for Python.') def test_case_get_next_statement_targets(case_setup): with case_setup.test_file('_debugger_case_get_next_statement_targets.py') as writer: breakpoint_id = writer.write_add_breakpoint(21, None) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT, line=21) writer.write_get_next_statement_targets(hit.thread_id, hit.frame_id) targets = writer.wait_for_get_next_statement_targets() expected = set((2, 3, 5, 8, 9, 10, 12, 13, 14, 15, 17, 18, 19, 21)) assert targets == expected, 'Expected targets to be %s, was: %s' % (expected, targets) writer.write_remove_breakpoint(breakpoint_id) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(IS_IRONPYTHON or IS_JYTHON, reason='Failing on IronPython and Jython (needs to be investigated).') def test_case_type_ext(case_setup): # Custom type presentation extensions def get_environ(self): env = os.environ.copy() python_path = env.get("PYTHONPATH", "") ext_base = debugger_unittest._get_debugger_test_file('my_extensions') env['PYTHONPATH'] = ext_base + os.pathsep + python_path if python_path else ext_base return env with case_setup.test_file('_debugger_case_type_ext.py', get_environ=get_environ) as writer: writer.get_environ = get_environ writer.write_add_breakpoint(7, None) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') writer.write_get_frame(hit.thread_id, hit.frame_id) assert writer.wait_for_var([ [ r'', r'', r''.format(builtin_qualifier)) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(IS_IRONPYTHON or IS_JYTHON, reason='Failing on IronPython and Jython (needs to be investigated).') def test_case_event_ext(case_setup): def get_environ(self): env = os.environ.copy() python_path = env.get("PYTHONPATH", "") ext_base = debugger_unittest._get_debugger_test_file('my_extensions') env['PYTHONPATH'] = ext_base + os.pathsep + python_path if python_path else ext_base env["VERIFY_EVENT_TEST"] = "1" return env # Test initialize event for extensions with case_setup.test_file('_debugger_case_event_ext.py', get_environ=get_environ) as writer: original_additional_output_checks = writer.additional_output_checks @overrides(writer.additional_output_checks) def additional_output_checks(stdout, stderr): original_additional_output_checks(stdout, stderr) if 'INITIALIZE EVENT RECEIVED' not in stdout: raise AssertionError('No initialize event received') writer.additional_output_checks = additional_output_checks writer.write_make_initial_run() writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Jython does not seem to be creating thread started inside tracing (investigate).') def test_case_writer_creation_deadlock(case_setup): # check case where there was a deadlock evaluating expressions with case_setup.test_file('_debugger_case_thread_creation_deadlock.py') as writer: writer.write_add_breakpoint(26, None) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit('111') assert hit.line == 26, 'Expected return to be in line 26, was: %s' % (hit.line,) writer.write_evaluate_expression('%s\t%s\t%s' % (hit.thread_id, hit.frame_id, 'LOCAL'), 'create_thread()') writer.wait_for_evaluation('= 3: expected.extend(( 'binary\n', 'ação2\n'.encode(encoding='latin1').decode('utf-8', 'replace'), 'ação3\n', )) new_expected = [(x, 'stdout') for x in expected] new_expected.extend([(x, 'stderr') for x in expected]) writer.write_start_redirect() writer.write_make_initial_run() msgs = [] ignored = [] while len(msgs) < len(new_expected): try: msg = writer.wait_for_output() except AssertionError: for msg in msgs: sys.stderr.write('Found: %s\n' % (msg,)) for msg in new_expected: sys.stderr.write('Expected: %s\n' % (msg,)) for msg in ignored: sys.stderr.write('Ignored: %s\n' % (msg,)) raise if msg not in new_expected: ignored.append(msg) continue msgs.append(msg) if msgs != new_expected: print(msgs) print(new_expected) assert msgs == new_expected writer.finished_ok = True def test_path_translation(case_setup): def get_file_in_client(writer): # Instead of using: test_python/_debugger_case_path_translation.py # we'll set the breakpoints at foo/_debugger_case_path_translation.py file_in_client = os.path.dirname(os.path.dirname(writer.TEST_FILE)) return os.path.join(os.path.dirname(file_in_client), 'foo', '_debugger_case_path_translation.py') def get_environ(writer): import json env = os.environ.copy() env["PYTHONIOENCODING"] = 'utf-8' assert writer.TEST_FILE.endswith('_debugger_case_path_translation.py') env["PATHS_FROM_ECLIPSE_TO_PYTHON"] = json.dumps([ ( os.path.dirname(get_file_in_client(writer)), os.path.dirname(writer.TEST_FILE) ) ]) return env with case_setup.test_file('_debugger_case_path_translation.py', get_environ=get_environ) as writer: from pydev_tests_python.debugger_unittest import CMD_LOAD_SOURCE writer.write_start_redirect() file_in_client = get_file_in_client(writer) assert 'pydev_tests_python' not in file_in_client writer.write_add_breakpoint(2, 'main', filename=file_in_client) writer.write_make_initial_run() xml = writer.wait_for_message(lambda msg:'stop_reason="111"' in msg) assert xml.thread.frame[0]['file'] == file_in_client thread_id = xml.thread['id'] # Request a file that exists files_to_match = [file_in_client] if IS_WINDOWS: files_to_match.append(file_in_client.upper()) for f in files_to_match: writer.write_load_source(f) writer.wait_for_message( lambda msg: '%s\t' % CMD_LOAD_SOURCE in msg and \ "def main():" in msg and \ "print('break here')" in msg and \ "print('TEST SUCEEDED!')" in msg , expect_xml=False) # Request a file that does not exist writer.write_load_source(file_in_client + 'not_existent.py') writer.wait_for_message( lambda msg:'901\t' in msg and ('FileNotFoundError' in msg or 'IOError' in msg), expect_xml=False) writer.write_run_thread(thread_id) writer.finished_ok = True def test_evaluate_errors(case_setup): with case_setup.test_file('_debugger_case7.py') as writer: writer.write_add_breakpoint(4, 'Call') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() thread_id = hit.thread_id frame_id = hit.frame_id writer.write_evaluate_expression('%s\t%s\t%s' % (thread_id, frame_id, 'LOCAL'), 'name_error') writer.wait_for_evaluation('' else: assert len(msg.thread.frame) > 1 # Stopped in threading (must have back frames). assert msg.thread.frame[0]['name'] == 'method' writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_case_dump_threads_to_stderr(case_setup): from pydev_tests_python.debugger_unittest import wait_for_condition def additional_output_checks(writer, stdout, stderr): assert is_stderr_ok(stderr), make_error_msg(stderr) def make_error_msg(stderr): return 'Did not find thread dump in stderr. stderr:\n%s' % (stderr,) def is_stderr_ok(stderr): return 'Thread Dump' in stderr and 'Thread pydevd.CommandThread (daemon: True, pydevd thread: True)' in stderr with case_setup.test_file( '_debugger_case_get_thread_stack.py', additional_output_checks=additional_output_checks) as writer: writer.write_add_breakpoint(12, None) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STOP_ON_BREAKPOINT) writer.write_dump_threads() wait_for_condition( lambda: is_stderr_ok(writer.get_stderr()), lambda: make_error_msg(writer.get_stderr()) ) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_stop_on_start_regular(case_setup): with case_setup.test_file('_debugger_case_simple_calls.py') as writer: writer.write_stop_on_start() writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STEP_INTO_MY_CODE, file='_debugger_case_simple_calls.py', line=1) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def _get_breakpoint_cases(): if sys.version_info >= (3, 7): # Just check breakpoint() return ('_debugger_case_breakpoint.py',) else: # Check breakpoint() and sys.__breakpointhook__ replacement. return ('_debugger_case_breakpoint.py', '_debugger_case_breakpoint2.py') @pytest.mark.parametrize("filename", _get_breakpoint_cases()) @pytest.mark.skipif(not IS_PY37_OR_GREATER, reason="Supported only in Python 3.7") def test_py_37_breakpoint(case_setup, filename): with case_setup.test_file(filename) as writer: writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(file=filename, line=3) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def _get_generator_cases(): if IS_PY2: return ('_debugger_case_generator_py2.py',) else: # On py3 we should check both versions. return ('_debugger_case_generator_py2.py', '_debugger_case_generator_py3.py') @pytest.mark.parametrize("filename", _get_generator_cases()) def test_generator_cases(case_setup, filename): with case_setup.test_file(filename) as writer: writer.write_add_breakpoint(writer.get_line_index_with_content('break here')) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_stop_on_start_m_switch(case_setup_m_switch): with case_setup_m_switch.test_file() as writer: writer.write_stop_on_start() writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STEP_INTO_MY_CODE, file='_debugger_case_m_switch.py', line=1) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_stop_on_start_entry_point(case_setup_m_switch_entry_point): with case_setup_m_switch_entry_point.test_file() as writer: writer.write_stop_on_start() writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit(REASON_STEP_INTO_MY_CODE, file='_debugger_case_module_entry_point.py', line=1) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(IS_JYTHON, reason='Not working properly on Jython (needs investigation).') def test_debug_zip_files(case_setup, tmpdir): def get_environ(writer): env = os.environ.copy() curr_pythonpath = env.get('PYTHONPATH', '') curr_pythonpath = str(tmpdir.join('myzip.zip')) + os.pathsep + curr_pythonpath curr_pythonpath = str(tmpdir.join('myzip2.egg!')) + os.pathsep + curr_pythonpath env['PYTHONPATH'] = curr_pythonpath env["IDE_PROJECT_ROOTS"] = str(tmpdir.join('myzip.zip')) return env import zipfile zip_file = zipfile.ZipFile( str(tmpdir.join('myzip.zip')), 'w') zip_file.writestr('zipped/__init__.py', '') zip_file.writestr('zipped/zipped_contents.py', 'def call_in_zip():\n return 1') zip_file.close() zip_file = zipfile.ZipFile( str(tmpdir.join('myzip2.egg!')), 'w') zip_file.writestr('zipped2/__init__.py', '') zip_file.writestr('zipped2/zipped_contents2.py', 'def call_in_zip2():\n return 1') zip_file.close() with case_setup.test_file('_debugger_case_zip_files.py', get_environ=get_environ) as writer: writer.write_add_breakpoint( 2, 'None', filename=os.path.join(str(tmpdir.join('myzip.zip')), 'zipped', 'zipped_contents.py') ) writer.write_add_breakpoint( 2, 'None', filename=os.path.join(str(tmpdir.join('myzip2.egg!')), 'zipped2', 'zipped_contents2.py') ) writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() assert hit.name == 'call_in_zip' writer.write_run_thread(hit.thread_id) hit = writer.wait_for_breakpoint_hit() assert hit.name == 'call_in_zip2' writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='CPython only test.') def test_multiprocessing(case_setup_multiprocessing): import threading from pydev_tests_python.debugger_unittest import AbstractWriterThread with case_setup_multiprocessing.test_file('_debugger_case_multiprocessing.py') as writer: break1_line = writer.get_line_index_with_content('break 1 here') break2_line = writer.get_line_index_with_content('break 2 here') writer.write_add_breakpoint(break1_line) writer.write_add_breakpoint(break2_line) server_socket = writer.server_socket class SecondaryProcessWriterThread(AbstractWriterThread): TEST_FILE = writer.get_main_filename() _sequence = -1 class SecondaryProcessThreadCommunication(threading.Thread): def run(self): from pydev_tests_python.debugger_unittest import ReaderThread server_socket.listen(1) self.server_socket = server_socket new_sock, addr = server_socket.accept() reader_thread = ReaderThread(new_sock) reader_thread.start() writer2 = SecondaryProcessWriterThread() writer2.reader_thread = reader_thread writer2.sock = new_sock writer2.write_version() writer2.write_add_breakpoint(break1_line) writer2.write_add_breakpoint(break2_line) writer2.write_make_initial_run() hit = writer2.wait_for_breakpoint_hit() writer2.write_run_thread(hit.thread_id) secondary_process_thread_communication = SecondaryProcessThreadCommunication() secondary_process_thread_communication.start() writer.write_make_initial_run() hit2 = writer.wait_for_breakpoint_hit() secondary_process_thread_communication.join(10) if secondary_process_thread_communication.is_alive(): raise AssertionError('The SecondaryProcessThreadCommunication did not finish') writer.write_run_thread(hit2.thread_id) writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='CPython only test.') def test_fork_no_attach(case_setup): with case_setup.test_file('_debugger_case_fork.py') as writer: writer.write_add_breakpoint(writer.get_line_index_with_content('break here')) writer.write_make_initial_run() writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='CPython only test.') def test_fork_with_attach_no_breakpoints(case_setup_multiproc): with case_setup_multiproc.test_file('_debugger_case_fork.py') as writer: wait_for_condition(lambda: len(writer.writers) == 1) writer1 = writer.writers[0] writer1.write_make_initial_run() wait_for_condition(lambda: len(writer.writers) == 2) writer2 = writer.writers[1] writer2.write_make_initial_run() for w in writer.writers: w.finished_ok = True writer.finished_ok = True @pytest.mark.skip('PY-44245 - forking with breakpoints available hangs the debugger') @pytest.mark.skipif(not IS_CPYTHON, reason='CPython only test.') def test_fork_with_attach(case_setup_multiproc): with case_setup_multiproc.test_file('_debugger_case_fork.py') as writer: break_line = writer.get_line_index_with_content('break here') writer.write_add_breakpoint(break_line) wait_for_condition(lambda: len(writer.writers) == 1) writer1 = writer.writers[0] writer1.write_make_initial_run() wait_for_condition(lambda: len(writer.writers) == 2) writer2 = writer.writers[1] writer2.write_make_initial_run() hit1 = writer1.wait_for_breakpoint_hit() writer1.write_run_thread(hit1.thread_id) for w in writer.writers: w.finished_ok = True writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='CPython only test.') def test_remote_debugger_basic(case_setup_remote): with case_setup_remote.test_file('_debugger_case_remote.py') as writer: writer.log.append('making initial run') writer.write_make_initial_run() writer.log.append('waiting for breakpoint hit') hit = writer.wait_for_breakpoint_hit() writer.log.append('run thread') writer.write_run_thread(hit.thread_id) writer.log.append('asserting') try: assert 5 == writer._sequence, 'Expected 5. Had: %s' % writer._sequence except: writer.log.append('assert failed!') raise writer.log.append('asserted') writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON or not IS_PY37_OR_GREATER, reason='CPython only test.') def test_py_37_breakpoint_remote(case_setup_remote): with case_setup_remote.test_file('_debugger_case_breakpoint_remote.py') as writer: writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit( filename='_debugger_case_breakpoint_remote.py', line=13, ) writer.write_run_thread(hit.thread_id) try: assert 5 == writer._sequence, 'Expected 5. Had: %s' % writer._sequence except: writer.log.append('assert failed!') raise writer.log.append('asserted') writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON or not IS_PY37_OR_GREATER, reason='CPython only test.') def test_py_37_breakpoint_remote_no_import(case_setup_remote): def get_environ(writer): env = os.environ.copy() curr_pythonpath = env.get('PYTHONPATH', '') pydevd_dirname = os.path.join( os.path.dirname(writer.get_pydevd_file()), 'pydev_sitecustomize') curr_pythonpath = pydevd_dirname + os.pathsep + curr_pythonpath env['PYTHONPATH'] = curr_pythonpath return env with case_setup_remote.test_file( '_debugger_case_breakpoint_remote_no_import.py', get_environ=get_environ) as writer: writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit( "108", filename='_debugger_case_breakpoint_remote_no_import.py', line=12, ) writer.write_run_thread(hit.thread_id) try: assert 5 == writer._sequence, 'Expected 5. Had: %s' % writer._sequence except: writer.log.append('assert failed!') raise writer.log.append('asserted') writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='CPython only test.') def test_remote_debugger_multi_proc(case_setup_remote): class _SecondaryMultiProcProcessWriterThread(debugger_unittest.AbstractWriterThread): FORCE_KILL_PROCESS_WHEN_FINISHED_OK = True def __init__(self, server_socket): debugger_unittest.AbstractWriterThread.__init__(self) self.server_socket = server_socket def run(self): print('waiting for second process') self.sock, addr = self.server_socket.accept() print('accepted second process') from pydev_tests_python.debugger_unittest import ReaderThread self.reader_thread = ReaderThread(self.sock) self.reader_thread.start() self._sequence = -1 # initial command is always the version self.write_version() self.log.append('start_socket') self.write_make_initial_run() time.sleep(.5) self.finished_ok = True def do_kill(writer): debugger_unittest.AbstractWriterThread.do_kill(writer) if hasattr(writer, 'secondary_multi_proc_process_writer'): writer.secondary_multi_proc_process_writer.do_kill() with case_setup_remote.test_file( '_debugger_case_remote_1.py', do_kill=do_kill, EXPECTED_RETURNCODE='any' ) as writer: # It seems sometimes it becomes flaky on the ci because the process outlives the writer thread... # As we're only interested in knowing if a second connection was received, just kill the related # process. assert hasattr(writer, 'FORCE_KILL_PROCESS_WHEN_FINISHED_OK') writer.FORCE_KILL_PROCESS_WHEN_FINISHED_OK = True writer.log.append('making initial run') writer.write_make_initial_run() writer.log.append('waiting for breakpoint hit') hit = writer.wait_for_breakpoint_hit() writer.secondary_multi_proc_process_writer = secondary_multi_proc_process_writer = \ _SecondaryMultiProcProcessWriterThread(writer.server_socket) secondary_multi_proc_process_writer.start() writer.log.append('run thread') writer.write_run_thread(hit.thread_id) for _i in xrange(400): if secondary_multi_proc_process_writer.finished_ok: break time.sleep(.1) else: writer.log.append('Secondary process not finished ok!') raise AssertionError('Secondary process not finished ok!') writer.log.append('Secondary process finished!') try: assert 5 == writer._sequence, 'Expected 5. Had: %s' % writer._sequence except: writer.log.append('assert failed!') raise writer.log.append('asserted') writer.finished_ok = True @pytest.mark.skipif(not IS_CPYTHON, reason='CPython only test.') def test_remote_unhandled_exceptions(case_setup_remote): def check_test_suceeded_msg(writer, stdout, stderr): return 'TEST SUCEEDED' in ''.join(stderr) def additional_output_checks(writer, stdout, stderr): # Don't call super as we have an expected exception assert 'ValueError: TEST SUCEEDED' in stderr with case_setup_remote.test_file( '_debugger_case_remote_unhandled_exceptions.py', additional_output_checks=additional_output_checks, check_test_suceeded_msg=check_test_suceeded_msg, EXPECTED_RETURNCODE=1) as writer: writer.log.append('making initial run') writer.write_make_initial_run() writer.log.append('waiting for breakpoint hit') hit = writer.wait_for_breakpoint_hit() writer.write_add_exception_breakpoint_with_policy('Exception', '0', '1', '0') writer.log.append('run thread') writer.write_run_thread(hit.thread_id) writer.log.append('waiting for uncaught exception') hit = writer.wait_for_breakpoint_hit(REASON_UNCAUGHT_EXCEPTION) writer.write_run_thread(hit.thread_id) writer.log.append('finished ok') writer.finished_ok = True def test_trace_dispatch_correct(case_setup): def get_environ(writer): env = os.environ.copy() env['PYDEVD_USE_FRAME_EVAL'] = 'NO' # This test checks trace dispatch (so, disable frame eval). return env with case_setup.test_file('_debugger_case_trace_dispatch.py', get_environ=get_environ) as writer: breakpoint_id = writer.write_add_breakpoint(5, 'method') writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_remove_breakpoint(breakpoint_id) writer.write_run_thread(hit.thread_id) writer.finished_ok = True @pytest.mark.skipif(IS_PY26, reason='Failing on Python 2.6 on travis (needs investigation).') def test_case_single_notification_on_step(case_setup): from pydev_tests_python.debugger_unittest import REASON_STEP_INTO with case_setup.test_file('_debugger_case_import_main.py') as writer: writer.write_multi_threads_single_notification(True) writer.write_add_breakpoint(writer.get_line_index_with_content('break here'), '') writer.write_make_initial_run() hit = writer.wait_for_single_notification_as_hit() writer.write_step_in(hit.thread_id) hit = writer.wait_for_single_notification_as_hit(reason=REASON_STEP_INTO) writer.write_step_in(hit.thread_id) hit = writer.wait_for_single_notification_as_hit(reason=REASON_STEP_INTO) writer.write_step_in(hit.thread_id) hit = writer.wait_for_single_notification_as_hit(reason=REASON_STEP_INTO) writer.write_run_thread(hit.thread_id) writer.finished_ok = True def test_return_value(case_setup): with case_setup.test_file('_debugger_case_return_value.py') as writer: writer.write_add_breakpoint(writer.get_line_index_with_content('break here'), '') writer.write_show_return_vars() writer.write_make_initial_run() hit = writer.wait_for_breakpoint_hit() writer.write_step_over(hit.thread_id) hit = writer.wait_for_breakpoint_hit(REASON_STEP_OVER) writer.write_get_frame(hit.thread_id, hit.frame_id) writer.wait_for_vars([ [ '', '', '