Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 74 additions & 15 deletions mfd_connect/sol.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2025 Intel Corporation
# Copyright (C) 2025-2026 Intel Corporation
# SPDX-License-Identifier: MIT
"""It's a Connection for Serial over LAN protocol."""

Expand All @@ -19,7 +19,16 @@
from mfd_typing.os_values import OSName, OSType, OSBitness
from mfd_common_libs import add_logging_level, log_levels, log_func_info

from mfd_connect.util import ansiterm

from .util.decorators import conditional_cache
from .util.console_utils import (
ANSITERM_COLS_SIZE,
ANSITERM_ROWS_SIZE,
BLACK_BACKGROUND_COLOR,
BLUE_BACKGROUND_COLOR,
GREY_BACKGROUND_COLOR,
)
from .util.serial_utils import SerialKeyCode
from .base import Connection, ConnectionCompletedProcess
from .exceptions import SolException, ConnectionCalledProcessError, OsNotSupported
Expand Down Expand Up @@ -312,7 +321,7 @@ def _establish_connection(self, retry_count: int = 0) -> "pexpect.spawn":
if index_of_found_string != 0:
if retry_count:
logger.log(level=log_levels.MODULE_DEBUG, msg="Fatal Error while establishing SoL session! Retrying...")
self._establish_connection(retry_count - 1)
return self._establish_connection(retry_count - 1)
else:
raise SolException(f"SoL sessions activation failure: {sol_connection_process.before}")
logger.log(level=log_levels.MODULE_DEBUG, msg="...Done!")
Expand Down Expand Up @@ -497,39 +506,89 @@ def _parse_selection(legacy: bool, output_to_parse: str) -> str:

:param legacy: enable flow for legacy bios mode
:param output_to_parse - string from console, from handle.before
:return - string with pre-cleaning, lines separated by new line character
:return: string with pre-cleaning, lines separated by new line character
"""
term = ansiterm.Ansiterm(ANSITERM_ROWS_SIZE, ANSITERM_COLS_SIZE)
term.feed(output_to_parse)

# Windows Boot Manager often places selected entry marker at row end.
for row in range(ANSITERM_ROWS_SIZE):
row_tiles = term.get_tiles(row * ANSITERM_COLS_SIZE, row * ANSITERM_COLS_SIZE + ANSITERM_COLS_SIZE)
marker_index = next((idx for idx, tile in enumerate(row_tiles) if tile.glyph == ">"), -1)
if marker_index < int(ANSITERM_COLS_SIZE * 0.75):
continue
selected_line = "".join(tile.glyph for tile in row_tiles).replace(">", " ").strip()
if selected_line:
logger.log(level=log_levels.MODULE_DEBUG, msg=f"Selected option from marker: {selected_line}")
return selected_line

output_lines = []
selected_backgrounds = {BLUE_BACKGROUND_COLOR, GREY_BACKGROUND_COLOR, BLACK_BACKGROUND_COLOR}
for row in range(ANSITERM_ROWS_SIZE):
row_tiles = term.get_tiles(row * ANSITERM_COLS_SIZE, row * ANSITERM_COLS_SIZE + ANSITERM_COLS_SIZE)
selected_chars = []
started = False
for tile in row_tiles:
color = tile.color
is_selected = color.get("reverse") or color.get("bg") in selected_backgrounds
if is_selected and (tile.glyph != " " or started):
Comment thread
adrianlasota marked this conversation as resolved.
started = True
selected_chars.append(tile.glyph)
elif started:
break

if not selected_chars:
continue

line = "".join(selected_chars).rstrip()
if not line:
continue
if legacy and "*" not in line and output_lines:
output_lines[-1] += " " + line
else:
output_lines.append(line)

if output_lines:
selected_output = "\n".join(output_lines)
logger.log(level=log_levels.MODULE_DEBUG, msg=f"Selected option from colors: {selected_output}")
return selected_output

logger.log(level=log_levels.MODULE_DEBUG, msg="Selected option not found via Ansiterm, trying regex fallback")
return SolConnection._parse_selection_regex_fallback(legacy, output_to_parse)

@staticmethod
def _parse_selection_regex_fallback(legacy: bool, output_to_parse: str) -> str:
"""Fallback parser for selected option for compatibility with old SOL flows."""
output_lines = []
entries = ("".join(output_to_parse)).split("\x1b")
selected_pattern = r"\[44m(.+)" # color of selected option, blue background

# color of selected option, blue background
selected_pattern = rf"\[{BLUE_BACKGROUND_COLOR}m(.+)"
pattern = re.compile(selected_pattern)
for elem in entries:
matched_pattern = pattern.match(elem)
if matched_pattern: # BootMenu
if matched_pattern:
line = matched_pattern.groups()[0]
if legacy:
# Legacy
if "*" not in line:
if "*" not in line and output_lines:
output_lines[-1] += " " + line
else:
output_lines.append(line)
else:
# EFI
output_lines.append(line)

if output_lines:
return "\n".join(output_lines)

logger.log(level=log_levels.MODULE_DEBUG, msg="Selected option not found, trying method for GRUB2")
selected_pattern = r"\[47m" # color of selected option, grey background
# color of selected option, grey background
selected_pattern = rf"\[{GREY_BACKGROUND_COLOR}m"
pattern = re.compile(selected_pattern)
for i, elem in enumerate(entries):
matched_pattern = pattern.match(elem)
if matched_pattern:
# next line could be selected option
if i + 1 < len(entries):
if "*" in entries[i + 1]:
output_lines.append(entries[i + 1])
if matched_pattern and i + 1 < len(entries):
if "*" in entries[i + 1]:
output_lines.append(entries[i + 1])

return "\n".join(output_lines)

def _check_if_unix(self) -> bool:
Expand Down
4 changes: 3 additions & 1 deletion mfd_connect/util/console_utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
# Copyright (C) 2025 Intel Corporation
# Copyright (C) 2025-2026 Intel Corporation
# SPDX-License-Identifier: MIT
"""Console utils."""

BLACK_BACKGROUND_COLOR = 40
BLUE_BACKGROUND_COLOR = 44
GREY_BACKGROUND_COLOR = 47
ANSITERM_ROWS_SIZE = 30
ANSITERM_COLS_SIZE = 80
95 changes: 94 additions & 1 deletion tests/unit/test_mfd_connect/test_sol.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (C) 2025 Intel Corporation
# Copyright (C) 2025-2026 Intel Corporation
# SPDX-License-Identifier: MIT
import sys
from subprocess import CalledProcessError
Expand All @@ -11,6 +11,7 @@
from mfd_connect import SolConnection
from mfd_connect.base import ConnectionCompletedProcess
from mfd_connect.exceptions import OsNotSupported, SolException
from mfd_connect.util.serial_utils import SerialKeyCode


class TestSolConnection:
Expand Down Expand Up @@ -91,6 +92,80 @@ def test_execute_command_not_raise_custom_exception(self, sol, mocker):
"cmd arg1 arg2", discard_stdout=True, expected_return_codes=[0], custom_exception=self.CustomTestException
)

def test_clear_buffer_with_pending_output(self, sol, mocker):
sol._connection_handle = mocker.Mock(before=b"pending output")

sol._clear_buffer()

sol._connection_handle.expect.assert_called_once()

def test_wait_for_string_success(self, sol, mocker):
sol._connection_handle = mocker.Mock()
sol._connection_handle.expect.return_value = 0

assert sol.wait_for_string(["ready"], expect_timeout=True, timeout=1) == 0
sol._connection_handle.expect.assert_called_once()

def test_get_output_after_user_action(self, sol, mocker):
sol._connection_handle = mocker.Mock(before=b"raw console output")
sol.wait_for_string = mocker.Mock()
parse_output = mocker.patch.object(sol, "_parse_output", return_value="parsed output")

assert sol.get_output_after_user_action(selected_option=True) == "parsed output"
parse_output.assert_called_once_with("raw console output", True)

def test_send_key_sends_requested_times(self, sol, mocker):
sol._connection_handle = mocker.Mock()
sleep = mocker.patch("mfd_connect.sol.time.sleep")

sol.send_key(SerialKeyCode.enter, count=2, sleeptime=0)

assert sol._connection_handle.send.call_count == 2
sleep.assert_called()

def test_init_raises_on_windows(self, mocker):
mocker.patch("mfd_connect.sol.platform.system", return_value="Windows")

with pytest.raises(SolException, match="Windows is not supported as test controller, yet"):
SolConnection(username="admin", password="secret", ip="10.10.10.10")

def test_init_raises_when_ipmiutil_missing(self, mocker):
mocker.patch("mfd_connect.sol.platform.system", return_value="Linux")
mocker.patch("mfd_connect.sol.pexpect.popen_spawn.PopenSpawn", side_effect=FileNotFoundError("ipmiutil"))

with pytest.raises(SolException):
SolConnection(username="admin", password="secret", ip="10.10.10.10")

def test_establish_connection_retries_once(self, sol, mocker):
first_child = mocker.Mock()
first_child.expect.return_value = 1
second_child = mocker.Mock()
second_child.expect.return_value = 0

sol._ipmi_tool_name = "ipmiutil"
sol._ipmi_parameters = "-F lan2 -U admin -P secret -N 10.10.10.10 -V 4"
mocker.patch.object(sol, "_deactivate_sol_session")
spawn = mocker.patch("mfd_connect.sol.pexpect.spawn", create=True, side_effect=[first_child, second_child])

result = sol._establish_connection(retry_count=1)

assert spawn.call_count == 2
assert result is second_child

def test__parse_selection_regex_fallback_blue_background(self):
output = "\x1b[44mSelected Boot Option"

selected = SolConnection._parse_selection_regex_fallback(False, output)

assert selected == "Selected Boot Option"

def test__parse_selection_regex_fallback_grey_background_legacy(self):
output = "\x1b[47m\x1b*Legacy Boot Option"

selected = SolConnection._parse_selection_regex_fallback(True, output)

assert selected == "*Legacy Boot Option"

def test__parse_output(self, sol):
output = (
", use '~.' to end, '~?' for help.]\r\r\n"
Expand All @@ -111,6 +186,24 @@ def test__parse_output(self, sol):
)
assert sol._parse_output(output) == expected_output

def test__parse_selection_windows_boot_manager(self, sol):
output = (
"\x1b[7;2H \x1b[7;6HWin2022_wRelease30.1.2RefDrv"
"\x1b[6;2H \x1b[0m\x1b[30;47m \x1b[6;6HWindows2022_Release30.2_Reference"
"\x1b[6;72H>\x1b[0m\x1b[37;40m"
)

selected = sol._parse_output(output, selection=True)

assert "Windows2022_Release30.2_Reference" in selected

def test__parse_selection_legacy_highlight(self, sol):
output = "\x1b[44m*Legacy Boot Option\x1b[0m"

selected = sol._parse_output(output, selection=True, legacy=True)

assert "*Legacy Boot Option" in selected

def test__check_if_unix(self, sol, mocker):
sol.execute_command = mocker.create_autospec(
sol.execute_command,
Expand Down
Loading