Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
Upcoming (TBD)
==============

Features
---------
* Allow styling prompts with HTML-like tags.


Internal
---------
* Remove unused fixture data.
Expand Down
11 changes: 6 additions & 5 deletions mycli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@
from mycli.main_modes.execute import main_execute_from_cli
from mycli.main_modes.list_dsn import main_list_dsn
from mycli.main_modes.list_ssh_config import main_list_ssh_config
from mycli.main_modes.repl import get_prompt, main_repl, set_all_external_titles
from mycli.main_modes.repl import main_repl, render_prompt_string, set_all_external_titles
from mycli.packages import special
from mycli.packages.cli_utils import filtered_sys_argv, is_valid_connection_scheme
from mycli.packages.filepaths import dir_path_exists, guess_socket_location
Expand Down Expand Up @@ -268,8 +268,8 @@ def __init__(
self.min_completion_trigger = c["main"].as_int("min_completion_trigger")
# a hack, pending a better way to handle settings and state
repl_package.MIN_COMPLETION_TRIGGER = self.min_completion_trigger
self.last_prompt_message = ANSI('')
self.last_custom_toolbar_message = ANSI('')
self.last_prompt_message = to_formatted_text('')
self.last_custom_toolbar_message = to_formatted_text('')

# Register custom special commands.
self.register_special_commands()
Expand Down Expand Up @@ -907,8 +907,9 @@ def get_output_margin(self, status: str | None = None) -> int:
render_counter = self.prompt_session.app.render_counter
else:
render_counter = 0
# todo: this jump back to get_prompt() in repl.py is a sign that separation is incomplete
self.prompt_lines = get_prompt(self, self.prompt_format, render_counter).count('\n') + 1
# todo: this jump back to render_prompt_string() in repl.py is a sign that separation is incomplete
prompt_string = render_prompt_string(self, self.prompt_format, render_counter)
self.prompt_lines = to_plain_text(prompt_string).count('\n') + 1
margin = self.get_reserved_space() + self.prompt_lines
if special.is_timing_enabled():
margin += 1
Expand Down
183 changes: 124 additions & 59 deletions mycli/main_modes/repl.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from datetime import datetime
import functools
from functools import partial
import html
from importlib import resources
import os
import random
Expand All @@ -13,6 +14,7 @@
import time
import traceback
from typing import TYPE_CHECKING, Any, Generator
from xml.parsers.expat import ExpatError

import click
import prompt_toolkit
Expand All @@ -23,6 +25,10 @@
from prompt_toolkit.filters import Condition, has_focus, is_done
from prompt_toolkit.formatted_text import (
ANSI,
HTML,
FormattedText,
to_formatted_text,
to_plain_text,
)
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.layout.processors import ConditionalProcessor, HighlightMatchingBracketProcessor
Expand Down Expand Up @@ -162,7 +168,13 @@ def set_external_terminal_tab_title(mycli: 'MyCli') -> None:
return
if not sys.stderr.isatty():
return
title = sanitize_terminal_title(get_prompt(mycli, mycli.terminal_tab_title_format, mycli.prompt_session.app.render_counter))
title = sanitize_terminal_title(
render_prompt_string(
mycli,
mycli.terminal_tab_title_format,
mycli.prompt_session.app.render_counter,
)
)
print(f'\x1b]1;{title}\a', file=sys.stderr, end='')
sys.stderr.flush()

Expand All @@ -174,7 +186,13 @@ def set_external_terminal_window_title(mycli: 'MyCli') -> None:
return
if not sys.stderr.isatty():
return
title = sanitize_terminal_title(get_prompt(mycli, mycli.terminal_window_title_format, mycli.prompt_session.app.render_counter))
title = sanitize_terminal_title(
render_prompt_string(
mycli,
mycli.terminal_window_title_format,
mycli.prompt_session.app.render_counter,
)
)
print(f'\x1b]2;{title}\a', file=sys.stderr, end='')
sys.stderr.flush()

Expand All @@ -186,7 +204,13 @@ def set_external_multiplex_window_title(mycli: 'MyCli') -> None:
return
if not mycli.prompt_session:
return
title = sanitize_terminal_title(get_prompt(mycli, mycli.multiplex_window_title_format, mycli.prompt_session.app.render_counter))
title = sanitize_terminal_title(
render_prompt_string(
mycli,
mycli.multiplex_window_title_format,
mycli.prompt_session.app.render_counter,
)
)
try:
subprocess.run(
['tmux', 'rename-window', title],
Expand All @@ -208,33 +232,47 @@ def set_external_multiplex_pane_title(mycli: 'MyCli') -> None:
return
if not sys.stderr.isatty():
return
title = sanitize_terminal_title(get_prompt(mycli, mycli.multiplex_pane_title_format, mycli.prompt_session.app.render_counter))
title = sanitize_terminal_title(
render_prompt_string(
mycli,
mycli.multiplex_pane_title_format,
mycli.prompt_session.app.render_counter,
)
)
print(f'\x1b]2;{title}\x1b\\', file=sys.stderr, end='')
sys.stderr.flush()


def get_custom_toolbar(
mycli: 'MyCli',
toolbar_format: str,
) -> ANSI:
) -> FormattedText:
if not mycli.prompt_session:
return ANSI('')
return to_formatted_text('')
if not mycli.prompt_session.app:
return ANSI('')
return to_formatted_text('')
if mycli.prompt_session.app.current_buffer.text:
return mycli.last_custom_toolbar_message
toolbar = get_prompt(mycli, toolbar_format, mycli.prompt_session.app.render_counter)
toolbar = toolbar.replace('\\x1b', '\x1b')
mycli.last_custom_toolbar_message = ANSI(toolbar)
mycli.last_custom_toolbar_message = render_prompt_string(
mycli,
toolbar_format,
mycli.prompt_session.app.render_counter,
)
return mycli.last_custom_toolbar_message


def maybe_html_escape(string: str, is_html: bool) -> str:
if is_html:
return html.escape(string, quote=False)
return string


@functools.lru_cache(maxsize=256)
def get_prompt(
def render_prompt_string(
mycli: 'MyCli',
string: str,
_render_counter: int,
) -> str:
) -> FormattedText:
sqlexecute = mycli.sqlexecute
assert sqlexecute is not None
if mycli.login_path and mycli.login_path_as_host:
Expand All @@ -247,79 +285,106 @@ def get_prompt(
if re.match(r'^[\d\.]+$', short_prompt_host):
short_prompt_host = prompt_host
now = datetime.now()
backslash_placeholder = '\ufffc_backslash'
string = string.replace('\\\\', backslash_placeholder)
string = string.replace('\\u', sqlexecute.user or '(none)')
string = string.replace('\\h', prompt_host or '(none)')
string = string.replace('\\H', short_prompt_host or '(none)')
string = string.replace('\\d', sqlexecute.dbname or '(none)')
species_name = sqlexecute.server_info.species.name if sqlexecute.server_info and sqlexecute.server_info.species else 'MySQL'
string = string.replace('\\t', species_name)
string = string.replace('\\n', '\n')
string = string.replace('\\D', now.strftime('%a %b %d %H:%M:%S %Y'))
string = string.replace('\\m', now.strftime('%M'))
string = string.replace('\\P', now.strftime('%p'))
string = string.replace('\\R', now.strftime('%H'))
string = string.replace('\\r', now.strftime('%I'))
string = string.replace('\\s', now.strftime('%S'))
string = string.replace('\\p', str(sqlexecute.port))
string = string.replace('\\j', os.path.basename(sqlexecute.socket or '(none)'))
string = string.replace('\\J', sqlexecute.socket or '(none)')
string = string.replace('\\k', os.path.basename(sqlexecute.socket or str(sqlexecute.port)))
string = string.replace('\\K', sqlexecute.socket or str(sqlexecute.port))
string = string.replace('\\A', mycli.dsn_alias or '(none)')
string = string.replace('\\_', ' ')
string = string.replace(backslash_placeholder, '\\')

strings = string.split('\\\\')
is_html = strings[0].startswith('\\<html>')
strings = [x.replace('\\u', maybe_html_escape(sqlexecute.user or '(none)', is_html)) for x in strings]
strings = [x.replace('\\h', maybe_html_escape(prompt_host or '(none)', is_html)) for x in strings]
strings = [x.replace('\\H', maybe_html_escape(short_prompt_host or '(none)', is_html)) for x in strings]
strings = [x.replace('\\d', maybe_html_escape(sqlexecute.dbname or '(none)', is_html)) for x in strings]
strings = [x.replace('\\t', maybe_html_escape(species_name, is_html)) for x in strings]
strings = [x.replace('\\n', '\n') for x in strings]
strings = [x.replace('\\D', maybe_html_escape(now.strftime('%a %b %d %H:%M:%S %Y'), is_html)) for x in strings]
strings = [x.replace('\\m', maybe_html_escape(now.strftime('%M'), is_html)) for x in strings]
strings = [x.replace('\\P', maybe_html_escape(now.strftime('%p'), is_html)) for x in strings]
strings = [x.replace('\\R', maybe_html_escape(now.strftime('%H'), is_html)) for x in strings]
strings = [x.replace('\\r', maybe_html_escape(now.strftime('%I'), is_html)) for x in strings]
strings = [x.replace('\\s', maybe_html_escape(now.strftime('%S'), is_html)) for x in strings]
strings = [x.replace('\\p', maybe_html_escape(str(sqlexecute.port), is_html)) for x in strings]
strings = [
x.replace('\\j', maybe_html_escape(os.path.basename(sqlexecute.socket or '(none)').replace('\\', '/'), is_html)) for x in strings
]
strings = [x.replace('\\J', maybe_html_escape((sqlexecute.socket or '(none)').replace('\\', '/'), is_html)) for x in strings]
strings = [
x.replace('\\k', maybe_html_escape(os.path.basename(sqlexecute.socket or str(sqlexecute.port)).replace('\\', '/'), is_html))
for x in strings
]
strings = [
x.replace('\\K', maybe_html_escape((sqlexecute.socket or str(sqlexecute.port)).replace('\\', '/'), is_html)) for x in strings
]
strings = [x.replace('\\A', maybe_html_escape(mycli.dsn_alias or '(none)', is_html)) for x in strings]
strings = [x.replace('\\_', ' ') for x in strings]

checker_string = ' '.join(strings)
if hasattr(sqlexecute, 'conn') and sqlexecute.conn is not None:
if '\\y' in string:
if '\\y' in checker_string:
with sqlexecute.conn.cursor() as cur:
string = string.replace('\\y', str(get_uptime(cur)) or '(none)')
if '\\Y' in string:
strings = [x.replace('\\y', maybe_html_escape(str(get_uptime(cur)) or '(none)', is_html)) for x in strings]
if '\\Y' in checker_string:
with sqlexecute.conn.cursor() as cur:
string = string.replace('\\Y', format_uptime(str(get_uptime(cur))) or '(none)')
strings = [x.replace('\\Y', maybe_html_escape(format_uptime(str(get_uptime(cur))) or '(none)', is_html)) for x in strings]
else:
string = string.replace('\\y', '(none)')
string = string.replace('\\Y', '(none)')
strings = [x.replace('\\y', '(none)') for x in strings]
strings = [x.replace('\\Y', '(none)') for x in strings]

if hasattr(sqlexecute, 'conn') and sqlexecute.conn is not None:
if '\\T' in string:
if '\\T' in checker_string:
with sqlexecute.conn.cursor() as cur:
string = string.replace('\\T', get_ssl_version(cur) or '(none)')
strings = [x.replace('\\T', maybe_html_escape(get_ssl_version(cur) or '(none)', is_html)) for x in strings]
else:
string = string.replace('\\T', '(none)')
strings = [x.replace('\\T', '(none)') for x in strings]

if hasattr(sqlexecute, 'conn') and sqlexecute.conn is not None:
if '\\w' in string:
if '\\w' in checker_string:
with sqlexecute.conn.cursor() as cur:
string = string.replace('\\w', str(get_warning_count(cur) or '(none)'))
strings = [x.replace('\\w', maybe_html_escape(str(get_warning_count(cur) or '(none)'), is_html)) for x in strings]
else:
string = string.replace('\\w', '(none)')
strings = [x.replace('\\w', '(none)') for x in strings]
if hasattr(sqlexecute, 'conn') and sqlexecute.conn is not None:
if '\\W' in string:
if '\\W' in checker_string:
with sqlexecute.conn.cursor() as cur:
string = string.replace('\\W', str(get_warning_count(cur) or ''))
strings = [x.replace('\\W', maybe_html_escape(str(get_warning_count(cur) or ''), is_html)) for x in strings]
else:
string = string.replace('\\W', '')
strings = [x.replace('\\W', '') for x in strings]

return string
if is_html:
strings[0] = strings[0].removeprefix('\\<html>')
strings[-1] = strings[-1].removesuffix('\\</html>')
elif '\\x1b' in checker_string:
strings = [x.replace('\\x1b', '\x1b') for x in strings]

strings = [re.sub(r'\\(.)', r'(unknown prompt format string: \\\1)', x) for x in strings]

string = '\\'.join(strings)

if is_html:
try:
formatted_string = to_formatted_text(HTML(string))
except (ExpatError, ValueError):
formatted_string = to_formatted_text(HTML('(cannot parse HTML prompt string)'))
else:
formatted_string = to_formatted_text(ANSI(string))

return formatted_string


def _get_prompt_message(
mycli: 'MyCli',
app: prompt_toolkit.application.application.Application,
) -> ANSI:
) -> FormattedText:
if app.current_buffer.text:
return mycli.last_prompt_message

prompt = get_prompt(mycli, mycli.prompt_format, app.render_counter)
if mycli.prompt_format == mycli.default_prompt and len(prompt) > mycli.max_len_prompt:
prompt = get_prompt(mycli, mycli.default_prompt_splitln, app.render_counter)
mycli.prompt_lines = prompt.count('\n') + 1
prompt = prompt.replace('\\x1b', '\x1b')
prompt = render_prompt_string(mycli, mycli.prompt_format, app.render_counter)
prompt_plain = to_plain_text(prompt)
if mycli.prompt_format == mycli.default_prompt and len(prompt_plain) > mycli.max_len_prompt:
prompt = render_prompt_string(mycli, mycli.default_prompt_splitln, app.render_counter)
prompt_plain = to_plain_text(prompt)
mycli.prompt_lines = prompt_plain.count('\n') + 1
if not mycli.prompt_lines:
mycli.prompt_lines = prompt.count('\n') + 1
mycli.last_prompt_message = ANSI(prompt)
mycli.prompt_lines = prompt_plain.count('\n') + 1

mycli.last_prompt_message = prompt
return mycli.last_prompt_message


Expand Down
5 changes: 5 additions & 0 deletions mycli/myclirc
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ wider_completion_menu = False
# * \_ - a space
# * \\ - a literal backslash
# * \x1b[...m - an ANSI escape sequence (can style with color)
# * \<html> - a leading sequence indicating that the rest of the prompt be styled like HTML.
# See https://python-prompt-toolkit.readthedocs.io/en/stable/pages/printing_text.html#html .
# Characters such as "&" or literal "<" and ">" must be HTML-escaped.
# HTML styles cannot be combined with ANSI sequences. HTML takes precedence.
# HTML color example: prompt = '\<html><red><u>root</u></red>@localhost:\d&gt; '
prompt = '\t \u@\h:\d> '
prompt_continuation = '->'

Expand Down
9 changes: 7 additions & 2 deletions mycli/packages/string_utils.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import re

from cli_helpers.utils import strip_ansi
from prompt_toolkit.formatted_text import (
FormattedText,
to_plain_text,
)


def sanitize_terminal_title(title: str) -> str:
sanitized = strip_ansi(title)
def sanitize_terminal_title(title: FormattedText) -> str:
sanitized = to_plain_text(title)
sanitized = strip_ansi(sanitized)
sanitized = sanitized.replace('\n', ' ')
sanitized = re.sub('[\x00-\x1f\x7f]', '', sanitized)
return sanitized
9 changes: 7 additions & 2 deletions test/myclirc
Original file line number Diff line number Diff line change
Expand Up @@ -125,16 +125,21 @@ wider_completion_menu = False
# * \K - full connection socket path OR the port
# * \T - connection SSL/TLS version
# * \t - database vendor (Percona, MySQL, MariaDB, TiDB)
# * \u - username
# * \w - number of warnings, or "(none)" (requires frequent trips to the server)
# * \W - number of warnings, or the empty string (requires frequent trips to the server)
# * \W - number of warnings, or the empty string (requires frequent trips to the server)
# * \y - uptime in seconds (requires frequent trips to the server)
# * \Y - uptime in words (requires frequent trips to the server)
# * \u - username
# * \A - DSN alias
# * \n - a newline
# * \_ - a space
# * \\ - a literal backslash
# * \x1b[...m - an ANSI escape sequence (can style with color)
# * \<html> - a leading sequence indicating that the rest of the prompt be styled like HTML.
# See https://python-prompt-toolkit.readthedocs.io/en/stable/pages/printing_text.html#html .
# Characters such as "&" or literal "<" and ">" must be HTML-escaped.
# HTML styles cannot be combined with ANSI sequences. HTML takes precedence.
# HTML color example: prompt = '\<html><red><u>root</u></red>@localhost:\d&gt; '
prompt = "\t \u@\h:\d> "
prompt_continuation = ->

Expand Down
Loading
Loading