diff --git a/mfd_connect/sol.py b/mfd_connect/sol.py index fdfe070..91682a5 100644 --- a/mfd_connect/sol.py +++ b/mfd_connect/sol.py @@ -1,4 +1,4 @@ -# Copyright (C) 2025 Intel Corporation +# Copyright (C) 2025-2026 Intel Corporation # SPDX-License-Identifier: MIT """It's a Connection for Serial over LAN protocol.""" @@ -19,7 +19,16 @@ from mfd_typing.os_values import OSName, OSType, OSBitness from mfd_common_libs import add_logging_level, log_levels, log_func_info +from mfd_connect.util import ansiterm + from .util.decorators import conditional_cache +from .util.console_utils import ( + ANSITERM_COLS_SIZE, + ANSITERM_ROWS_SIZE, + BLACK_BACKGROUND_COLOR, + BLUE_BACKGROUND_COLOR, + GREY_BACKGROUND_COLOR, +) from .util.serial_utils import SerialKeyCode from .base import Connection, ConnectionCompletedProcess from .exceptions import SolException, ConnectionCalledProcessError, OsNotSupported @@ -312,7 +321,7 @@ def _establish_connection(self, retry_count: int = 0) -> "pexpect.spawn": if index_of_found_string != 0: if retry_count: logger.log(level=log_levels.MODULE_DEBUG, msg="Fatal Error while establishing SoL session! Retrying...") - self._establish_connection(retry_count - 1) + return self._establish_connection(retry_count - 1) else: raise SolException(f"SoL sessions activation failure: {sol_connection_process.before}") logger.log(level=log_levels.MODULE_DEBUG, msg="...Done!") @@ -497,39 +506,89 @@ def _parse_selection(legacy: bool, output_to_parse: str) -> str: :param legacy: enable flow for legacy bios mode :param output_to_parse - string from console, from handle.before - :return - string with pre-cleaning, lines separated by new line character + :return: string with pre-cleaning, lines separated by new line character """ + term = ansiterm.Ansiterm(ANSITERM_ROWS_SIZE, ANSITERM_COLS_SIZE) + term.feed(output_to_parse) + + # Windows Boot Manager often places selected entry marker at row end. + for row in range(ANSITERM_ROWS_SIZE): + row_tiles = term.get_tiles(row * ANSITERM_COLS_SIZE, row * ANSITERM_COLS_SIZE + ANSITERM_COLS_SIZE) + marker_index = next((idx for idx, tile in enumerate(row_tiles) if tile.glyph == ">"), -1) + if marker_index < int(ANSITERM_COLS_SIZE * 0.75): + continue + selected_line = "".join(tile.glyph for tile in row_tiles).replace(">", " ").strip() + if selected_line: + logger.log(level=log_levels.MODULE_DEBUG, msg=f"Selected option from marker: {selected_line}") + return selected_line + + output_lines = [] + selected_backgrounds = {BLUE_BACKGROUND_COLOR, GREY_BACKGROUND_COLOR, BLACK_BACKGROUND_COLOR} + for row in range(ANSITERM_ROWS_SIZE): + row_tiles = term.get_tiles(row * ANSITERM_COLS_SIZE, row * ANSITERM_COLS_SIZE + ANSITERM_COLS_SIZE) + selected_chars = [] + started = False + for tile in row_tiles: + color = tile.color + is_selected = color.get("reverse") or color.get("bg") in selected_backgrounds + if is_selected and (tile.glyph != " " or started): + started = True + selected_chars.append(tile.glyph) + elif started: + break + + if not selected_chars: + continue + + line = "".join(selected_chars).rstrip() + if not line: + continue + if legacy and "*" not in line and output_lines: + output_lines[-1] += " " + line + else: + output_lines.append(line) + + if output_lines: + selected_output = "\n".join(output_lines) + logger.log(level=log_levels.MODULE_DEBUG, msg=f"Selected option from colors: {selected_output}") + return selected_output + + logger.log(level=log_levels.MODULE_DEBUG, msg="Selected option not found via Ansiterm, trying regex fallback") + return SolConnection._parse_selection_regex_fallback(legacy, output_to_parse) + + @staticmethod + def _parse_selection_regex_fallback(legacy: bool, output_to_parse: str) -> str: + """Fallback parser for selected option for compatibility with old SOL flows.""" output_lines = [] entries = ("".join(output_to_parse)).split("\x1b") - selected_pattern = r"\[44m(.+)" # color of selected option, blue background + + # color of selected option, blue background + selected_pattern = rf"\[{BLUE_BACKGROUND_COLOR}m(.+)" pattern = re.compile(selected_pattern) for elem in entries: matched_pattern = pattern.match(elem) - if matched_pattern: # BootMenu + if matched_pattern: line = matched_pattern.groups()[0] if legacy: - # Legacy - if "*" not in line: + if "*" not in line and output_lines: output_lines[-1] += " " + line else: output_lines.append(line) else: - # EFI output_lines.append(line) if output_lines: return "\n".join(output_lines) - logger.log(level=log_levels.MODULE_DEBUG, msg="Selected option not found, trying method for GRUB2") - selected_pattern = r"\[47m" # color of selected option, grey background + # color of selected option, grey background + selected_pattern = rf"\[{GREY_BACKGROUND_COLOR}m" pattern = re.compile(selected_pattern) for i, elem in enumerate(entries): matched_pattern = pattern.match(elem) - if matched_pattern: - # next line could be selected option - if i + 1 < len(entries): - if "*" in entries[i + 1]: - output_lines.append(entries[i + 1]) + if matched_pattern and i + 1 < len(entries): + if "*" in entries[i + 1]: + output_lines.append(entries[i + 1]) + return "\n".join(output_lines) def _check_if_unix(self) -> bool: diff --git a/mfd_connect/util/console_utils.py b/mfd_connect/util/console_utils.py index 4b05168..a53f106 100644 --- a/mfd_connect/util/console_utils.py +++ b/mfd_connect/util/console_utils.py @@ -1,7 +1,9 @@ -# Copyright (C) 2025 Intel Corporation +# Copyright (C) 2025-2026 Intel Corporation # SPDX-License-Identifier: MIT """Console utils.""" BLACK_BACKGROUND_COLOR = 40 +BLUE_BACKGROUND_COLOR = 44 +GREY_BACKGROUND_COLOR = 47 ANSITERM_ROWS_SIZE = 30 ANSITERM_COLS_SIZE = 80 diff --git a/tests/unit/test_mfd_connect/test_sol.py b/tests/unit/test_mfd_connect/test_sol.py index 3a80267..fadf2d4 100644 --- a/tests/unit/test_mfd_connect/test_sol.py +++ b/tests/unit/test_mfd_connect/test_sol.py @@ -1,4 +1,4 @@ -# Copyright (C) 2025 Intel Corporation +# Copyright (C) 2025-2026 Intel Corporation # SPDX-License-Identifier: MIT import sys from subprocess import CalledProcessError @@ -11,6 +11,7 @@ from mfd_connect import SolConnection from mfd_connect.base import ConnectionCompletedProcess from mfd_connect.exceptions import OsNotSupported, SolException +from mfd_connect.util.serial_utils import SerialKeyCode class TestSolConnection: @@ -91,6 +92,80 @@ def test_execute_command_not_raise_custom_exception(self, sol, mocker): "cmd arg1 arg2", discard_stdout=True, expected_return_codes=[0], custom_exception=self.CustomTestException ) + def test_clear_buffer_with_pending_output(self, sol, mocker): + sol._connection_handle = mocker.Mock(before=b"pending output") + + sol._clear_buffer() + + sol._connection_handle.expect.assert_called_once() + + def test_wait_for_string_success(self, sol, mocker): + sol._connection_handle = mocker.Mock() + sol._connection_handle.expect.return_value = 0 + + assert sol.wait_for_string(["ready"], expect_timeout=True, timeout=1) == 0 + sol._connection_handle.expect.assert_called_once() + + def test_get_output_after_user_action(self, sol, mocker): + sol._connection_handle = mocker.Mock(before=b"raw console output") + sol.wait_for_string = mocker.Mock() + parse_output = mocker.patch.object(sol, "_parse_output", return_value="parsed output") + + assert sol.get_output_after_user_action(selected_option=True) == "parsed output" + parse_output.assert_called_once_with("raw console output", True) + + def test_send_key_sends_requested_times(self, sol, mocker): + sol._connection_handle = mocker.Mock() + sleep = mocker.patch("mfd_connect.sol.time.sleep") + + sol.send_key(SerialKeyCode.enter, count=2, sleeptime=0) + + assert sol._connection_handle.send.call_count == 2 + sleep.assert_called() + + def test_init_raises_on_windows(self, mocker): + mocker.patch("mfd_connect.sol.platform.system", return_value="Windows") + + with pytest.raises(SolException, match="Windows is not supported as test controller, yet"): + SolConnection(username="admin", password="secret", ip="10.10.10.10") + + def test_init_raises_when_ipmiutil_missing(self, mocker): + mocker.patch("mfd_connect.sol.platform.system", return_value="Linux") + mocker.patch("mfd_connect.sol.pexpect.popen_spawn.PopenSpawn", side_effect=FileNotFoundError("ipmiutil")) + + with pytest.raises(SolException): + SolConnection(username="admin", password="secret", ip="10.10.10.10") + + def test_establish_connection_retries_once(self, sol, mocker): + first_child = mocker.Mock() + first_child.expect.return_value = 1 + second_child = mocker.Mock() + second_child.expect.return_value = 0 + + sol._ipmi_tool_name = "ipmiutil" + sol._ipmi_parameters = "-F lan2 -U admin -P secret -N 10.10.10.10 -V 4" + mocker.patch.object(sol, "_deactivate_sol_session") + spawn = mocker.patch("mfd_connect.sol.pexpect.spawn", create=True, side_effect=[first_child, second_child]) + + result = sol._establish_connection(retry_count=1) + + assert spawn.call_count == 2 + assert result is second_child + + def test__parse_selection_regex_fallback_blue_background(self): + output = "\x1b[44mSelected Boot Option" + + selected = SolConnection._parse_selection_regex_fallback(False, output) + + assert selected == "Selected Boot Option" + + def test__parse_selection_regex_fallback_grey_background_legacy(self): + output = "\x1b[47m\x1b*Legacy Boot Option" + + selected = SolConnection._parse_selection_regex_fallback(True, output) + + assert selected == "*Legacy Boot Option" + def test__parse_output(self, sol): output = ( ", use '~.' to end, '~?' for help.]\r\r\n" @@ -111,6 +186,24 @@ def test__parse_output(self, sol): ) assert sol._parse_output(output) == expected_output + def test__parse_selection_windows_boot_manager(self, sol): + output = ( + "\x1b[7;2H \x1b[7;6HWin2022_wRelease30.1.2RefDrv" + "\x1b[6;2H \x1b[0m\x1b[30;47m \x1b[6;6HWindows2022_Release30.2_Reference" + "\x1b[6;72H>\x1b[0m\x1b[37;40m" + ) + + selected = sol._parse_output(output, selection=True) + + assert "Windows2022_Release30.2_Reference" in selected + + def test__parse_selection_legacy_highlight(self, sol): + output = "\x1b[44m*Legacy Boot Option\x1b[0m" + + selected = sol._parse_output(output, selection=True, legacy=True) + + assert "*Legacy Boot Option" in selected + def test__check_if_unix(self, sol, mocker): sol.execute_command = mocker.create_autospec( sol.execute_command,