diff --git a/python_files/tests/pytestadapter/helpers.py b/python_files/tests/pytestadapter/helpers.py index 03f1187149df..e10db6118ec9 100644 --- a/python_files/tests/pytestadapter/helpers.py +++ b/python_files/tests/pytestadapter/helpers.py @@ -6,6 +6,7 @@ import json import os import pathlib +import select import socket import subprocess import sys @@ -14,10 +15,6 @@ import uuid from typing import Any, Dict, List, Optional, Tuple -if sys.platform == "win32": - from namedpipe import NPopen - - script_dir = pathlib.Path(__file__).parent.parent.parent script_dir_child = pathlib.Path(__file__).parent.parent sys.path.append(os.fspath(script_dir)) @@ -128,79 +125,95 @@ def parse_rpc_message(data: str) -> Tuple[Dict[str, str], str]: print("json decode error") -def _listen_on_fifo(pipe_name: str, result: List[str], completed: threading.Event): - # Open the FIFO for reading - fifo_path = pathlib.Path(pipe_name) - with fifo_path.open() as fifo: - print("Waiting for data...") - while True: - if completed.is_set(): - break # Exit loop if completed event is set - data = fifo.read() # This will block until data is available - if len(data) == 0: - # If data is empty, assume EOF - break - print(f"Received: {data}") - result.append(data) +if sys.platform == "win32": + from namedpipe import NPopen + @contextlib.contextmanager + def pipe_setup_and_listen(pipe_name: str, result: List[str]): + # For Windows, named pipes have a specific naming convention. + pipe_path = f"\\\\.\\pipe\\{pipe_name}" -def _listen_on_pipe_new(listener, result: List[str], completed: threading.Event): - """Listen on the named pipe or Unix domain socket for JSON data from the server. + with NPopen("r+t", name=pipe_name, bufsize=0) as pipe: + completed = threading.Event() - Created as a separate function for clarity in threading context. - """ - # Windows design - if sys.platform == "win32": - all_data: list = [] - stream = listener.wait() - while True: - # Read data from collection - close = stream.closed - if close: - break - data = stream.readlines() - if not data: - if completed.is_set(): - break # Exit loop if completed event is set - else: - try: - # Attempt to accept another connection if the current one closes unexpectedly - print("attempt another connection") - except socket.timeout: - # On timeout, append all collected data to result and return - # result.append("".join(all_data)) - return - data_decoded = "".join(data) - all_data.append(data_decoded) - # Append all collected data to result array - result.append("".join(all_data)) - else: # Unix design - connection, _ = listener.socket.accept() - listener.socket.settimeout(1) - all_data: list = [] - while True: - # Reading from connection - data: bytes = connection.recv(1024 * 1024) - if not data: - if completed.is_set(): - break # Exit loop if completed event is set - else: - try: - # Attempt to accept another connection if the current one closes unexpectedly - connection, _ = listener.socket.accept() - except socket.timeout: - # On timeout, append all collected data to result and return - result.append("".join(all_data)) - return - all_data.append(data.decode("utf-8")) - # Append all collected data to result array - result.append("".join(all_data)) - - -def _run_test_code(proc_args: List[str], proc_env, proc_cwd: str, completed: threading.Event): - result = subprocess.run(proc_args, env=proc_env, cwd=proc_cwd) - completed.set() - return result + def listen(): + all_data: list = [] + stream = pipe.wait() + while True: + # Read data from collection + close = stream.closed + if close: + break + data = stream.readlines() + if not data: + if completed.is_set(): + break # Exit loop if completed event is set + else: + try: + # Attempt to accept another connection if the current one closes unexpectedly + print("attempt another connection") + except socket.timeout: + # On timeout, append all collected data to result and return + # result.append("".join(all_data)) + return + data_decoded = "".join(data) + all_data.append(data_decoded) + # Append all collected data to result array + result.append("".join(all_data)) + + thread = threading.Thread(target=listen) + thread.start() + try: + yield pipe_path + finally: + completed.set() + thread.join() +else: + + @contextlib.contextmanager + def pipe_setup_and_listen(pipe_name: str, result: List[str]): + # For Unix-like systems, use either the XDG_RUNTIME_DIR or a temporary directory. + xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR") + pipe_path = pathlib.Path( + xdg_runtime_dir if xdg_runtime_dir else tempfile.gettempdir(), + pipe_name, + ) + os.mkfifo(pipe_path) + + completed = threading.Event() + + def listen(): + # When using blocking IO, open blocks forever if the subprocess compleates but never + # opens the pipe for writing (which may happen if there is an error early in the + # subprocess.) Hence we go to the effort of using non-blocking io so that we can + # break out of this function if that happens. + fd = os.open(pipe_path, os.O_RDONLY | os.O_NONBLOCK) + try: + all_data = bytearray() + while True: + if completed.is_set(): + break + + # Wait till the pipe has data to read, with a timeout. + rlist, _, _ = select.select([fd], [], [], 0.1) + if rlist: + # Data is available, read it. + data = os.read(fd, 1024) + if not data: + # Empty data indicates EOF. + break + all_data.extend(data) + result.append(all_data.decode()) + finally: + os.close(fd) + + thread = threading.Thread(target=listen) + thread.start() + try: + yield pipe_path + finally: + completed.set() + thread.join() def runner(args: List[str]) -> Optional[List[Dict[str, Any]]]: @@ -293,80 +306,20 @@ def runner_with_cwd_env( *args, ] - # Generate pipe name, pipe name specific per OS type. - - # Windows design - if sys.platform == "win32": - with NPopen("r+t", name=pipe_name, bufsize=0) as pipe: - # Update the environment with the pipe name and PYTHONPATH. - env = os.environ.copy() - env.update( - { - "TEST_RUN_PIPE": pipe.path, - "PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent), - } - ) - # if additional environment variables are passed, add them to the environment - if env_add: - env.update(env_add) - - completed = threading.Event() - - result = [] # result is a string array to store the data during threading - t1: threading.Thread = threading.Thread( - target=_listen_on_pipe_new, args=(pipe, result, completed) - ) - t1.start() - - t2 = threading.Thread( - target=_run_test_code, - args=(process_args, env, path, completed), - ) - t2.start() - - t1.join() - t2.join() - - return process_data_received(result[0]) if result else None - else: # Unix design - # Update the environment with the pipe name and PYTHONPATH. + result = [] # result is a string array to store the data during threading + with pipe_setup_and_listen(pipe_name, result) as pipe_path: env = os.environ.copy() env.update( { - "TEST_RUN_PIPE": pipe_name, + "TEST_RUN_PIPE": pipe_path, "PYTHONPATH": os.fspath(pathlib.Path(__file__).parent.parent.parent), } ) # if additional environment variables are passed, add them to the environment if env_add: env.update(env_add) - # server = UnixPipeServer(pipe_name) - # server.start() - ################# - # Create the FIFO (named pipe) if it doesn't exist - # if not pathlib.Path.exists(pipe_name): - os.mkfifo(pipe_name) - ################# - - completed = threading.Event() - - result = [] # result is a string array to store the data during threading - t1: threading.Thread = threading.Thread( - target=_listen_on_fifo, args=(pipe_name, result, completed) - ) - t1.start() - - t2: threading.Thread = threading.Thread( - target=_run_test_code, - args=(process_args, env, path, completed), - ) - - t2.start() - - t1.join() - t2.join() - - return process_data_received(result[0]) if result else None + subprocess.run(process_args, env=env, cwd=path) + return process_data_received(result[0]) if result else None def find_test_line_number(test_name: str, test_file_path) -> str: @@ -422,48 +375,4 @@ def generate_random_pipe_name(prefix=""): if not prefix: prefix = "python-ext-rpc" - # For Windows, named pipes have a specific naming convention. - if sys.platform == "win32": - return f"\\\\.\\pipe\\{prefix}-{random_suffix}" - - # For Unix-like systems, use either the XDG_RUNTIME_DIR or a temporary directory. - xdg_runtime_dir = os.getenv("XDG_RUNTIME_DIR") - if xdg_runtime_dir: - return os.path.join(xdg_runtime_dir, f"{prefix}-{random_suffix}") # noqa: PTH118 - else: - return os.path.join(tempfile.gettempdir(), f"{prefix}-{random_suffix}") # noqa: PTH118 - - -class UnixPipeServer: - def __init__(self, name): - self.name = name - self.is_windows = sys.platform == "win32" - if self.is_windows: - raise NotImplementedError( - "This class is only intended for Unix-like systems, not Windows." - ) - else: - # For Unix-like systems, use a Unix domain socket. - self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - # Ensure the socket does not already exist - try: - os.unlink(self.name) # noqa: PTH108 - except OSError: - if os.path.exists(self.name): # noqa: PTH110 - raise - - def start(self): - if self.is_windows: - raise NotImplementedError( - "This class is only intended for Unix-like systems, not Windows." - ) - else: - # Bind the socket to the address and listen for incoming connections. - self.socket.bind(self.name) - self.socket.listen(1) - print(f"Server listening on {self.name}") - - def stop(self): - # Clean up the server socket. - self.socket.close() - print("Server stopped.") + return f"{prefix}-{random_suffix}"