From d1ea584474b86ef3aaf2389df80f42ba6fa0e7ff Mon Sep 17 00:00:00 2001 From: Antek Date: Wed, 10 Dec 2025 14:10:11 +0100 Subject: [PATCH] feat: add input panels, ticket popups, and station info logic --- .../GUI/counter_panel/__init__.py | 0 .../GUI/counter_panel/input_panel.py | 200 +++++++++++ .../GUI/counter_panel/submit_handler.py | 314 ++++++++++++++++++ .../GUI/counter_panel/ticket_popup.py | 134 ++++++++ .../GUI/station_info/__init__.py | 0 .../GUI/station_info/station_info_logic.py | 150 +++++++++ .../GUI/station_info/station_info_popup.py | 148 +++++++++ 7 files changed, 946 insertions(+) create mode 100644 src/rail_network_graph/GUI/counter_panel/__init__.py create mode 100644 src/rail_network_graph/GUI/counter_panel/input_panel.py create mode 100644 src/rail_network_graph/GUI/counter_panel/submit_handler.py create mode 100644 src/rail_network_graph/GUI/counter_panel/ticket_popup.py create mode 100644 src/rail_network_graph/GUI/station_info/__init__.py create mode 100644 src/rail_network_graph/GUI/station_info/station_info_logic.py create mode 100644 src/rail_network_graph/GUI/station_info/station_info_popup.py diff --git a/src/rail_network_graph/GUI/counter_panel/__init__.py b/src/rail_network_graph/GUI/counter_panel/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/rail_network_graph/GUI/counter_panel/input_panel.py b/src/rail_network_graph/GUI/counter_panel/input_panel.py new file mode 100644 index 0000000..f67dea1 --- /dev/null +++ b/src/rail_network_graph/GUI/counter_panel/input_panel.py @@ -0,0 +1,200 @@ +import logging +from collections.abc import Callable + +import customtkinter as ctk +import pandas as pd + +from rail_network_graph import config + +log = logging.getLogger(__name__) + +# Define color constants +ORANGE = "#FFA500" +DARK_BG = "#1a1a1a" +ENTRY_BG = "#2a2a2a" +TEXT_COLOR = "#ffffff" + + +class InputPanel: + """ + User input panel for selecting origin/destination stations and a ticket discount. + + The panel includes: + - two text fields for start/end stations, + - an option menu for discount selection, + - a "Search" button with a user-provided callback. + + :param root: Root CTk window or frame to contain the panel. + :param submit_callback: Function to call when the "Search" button is pressed. + """ + + def __init__(self, root: ctk.CTk | ctk.CTkFrame, submit_callback: Callable[[], None] | None) -> None: + self.root = root + self.submit_callback = submit_callback + + # Stores the currently selected discount percentage (default: 0) + self.selected_discount: int = 0 + # Maps discount name (str) to percentage (int) + self.discount_map: dict[str, int] = {} + # List of discount names for the OptionMenu + self.discount_names: list[str] = [] + + log.info("Initializing InputPanel") + log.debug("InputPanel root=%s, submit_callback=%s", type(root).__name__, bool(submit_callback)) + + # Load available discounts from file + self._load_discounts() + # Create all GUI elements + self._create_widgets() + # Arrange the created elements + self.layout_widgets() + # Ensure the button command is set + self.set_submit_callback(submit_callback) + + def _load_discounts(self) -> None: + """ + Load discounts from file and build ``{discount_name: percent}`` mapping. + """ + log.info("Loading discounts from file: %s", config.DISCOUNTS_PATH) + # Load discounts from a tab-separated file using pandas + discounts_df = pd.read_csv(config.DISCOUNTS_PATH, sep="\t") + # Create a dictionary mapping the 'name' column to the 'percent' column + self.discount_map = dict(zip(discounts_df["name"], discounts_df["percent"], strict=False)) + # Extract just the names for the option menu + self.discount_names = list(self.discount_map.keys()) + self.selected_discount = 0 + log.debug( + "Loaded %d discounts, initial selected_discount=%d", + len(self.discount_map), + self.selected_discount, + ) + + def _create_widgets(self) -> None: + """ + Create GUI components (without layout/placement). + """ + log.debug("Creating InputPanel widgets") + # Outer frame with orange border effect + self.border_frame = ctk.CTkFrame(master=self.root, fg_color=ORANGE, corner_radius=9) + # Inner frame containing all input widgets + self.input_frame = ctk.CTkFrame(master=self.border_frame, fg_color=DARK_BG, corner_radius=9) + + # Entry field for the starting station + self.entry1 = ctk.CTkEntry( + master=self.input_frame, + placeholder_text="Station 1", + fg_color=ENTRY_BG, + text_color=TEXT_COLOR, + placeholder_text_color="#888", + ) + # Entry field for the destination station + self.entry2 = ctk.CTkEntry( + master=self.input_frame, + placeholder_text="Station 2", + fg_color=ENTRY_BG, + text_color=TEXT_COLOR, + placeholder_text_color="#888", + ) + + # Entry field for the desired start time + self.entry_time = ctk.CTkEntry( + master=self.input_frame, + placeholder_text="Start time (HH:MM:SS)", + fg_color=ENTRY_BG, + text_color=TEXT_COLOR, + placeholder_text_color="#888", + ) + + # Set up the discount selection menu + initial_option = self.discount_names[0] if self.discount_names else "No discount" + self.option_var = ctk.StringVar(value=initial_option) + self.option_menu = ctk.CTkOptionMenu( + master=self.input_frame, + # Provide loaded discount names as options + values=self.discount_names if self.discount_names else ["No discount"], + variable=self.option_var, + # Link the menu selection to the update_discount method + command=self.update_discount, + fg_color=ENTRY_BG, + button_color=ORANGE, + button_hover_color="#cc8400", + dropdown_fg_color=DARK_BG, + dropdown_text_color=TEXT_COLOR, + text_color=TEXT_COLOR, + ) + + # Create the search/submit button + self.submit_button = ctk.CTkButton( + master=self.input_frame, + text="Search", + fg_color=ORANGE, + hover_color="#cc8400", + text_color="black", + corner_radius=8, + # Command is initially set to the provided callback (or None) + command=self.submit_callback, + ) + log.debug("Widgets created; initial option_menu value='%s'", initial_option) + + def layout_widgets(self) -> None: + """ + Place components within the window/frame. + """ + log.debug("Laying out InputPanel widgets") + # Place the outer frame (with border) in the top-right corner + self.border_frame.place(relx=1.0, rely=0.0, anchor="ne", x=-10, y=10) + # Pack the inner frame inside the border frame + self.input_frame.pack(padx=2, pady=2) + + # Pack input fields with padding + self.entry1.pack(pady=(12, 6), padx=12, fill="x") + self.entry2.pack(pady=6, padx=12, fill="x") + self.entry_time.pack(pady=6, padx=12, fill="x") + self.option_menu.pack(pady=6, padx=12, fill="x") + # Pack the submit button + self.submit_button.pack(pady=(6, 12), padx=12) + log.info("InputPanel layout complete") + + def update_discount(self, selected_name: str) -> None: + """ + Update the discount percentage when a new option is selected. + + :param selected_name: Selected discount name. + """ + # Retrieve the percentage value from the map, defaulting to 0 if not found + self.selected_discount = int(self.discount_map.get(selected_name, 0)) + log.debug( + "Updated discount selection: name='%s', selected_discount=%d", + selected_name, + self.selected_discount, + ) + + def get_selected_discount(self) -> int: + """ + Return the currently selected discount percentage. + + :return: Discount value (e.g., 25). + """ + log.debug("get_selected_discount -> %d", self.selected_discount) + return self.selected_discount + + def set_submit_callback(self, callback: Callable[[], None] | None) -> None: + """ + Set the function to be called on "Search" button click. + + :param callback: Event handler function. + """ + # Configure the button command, using a no-op lambda if the callback is None + self.submit_button.configure(command=callback or (lambda: None)) + log.debug("Submit callback set: has_callback=%s", bool(callback)) + + def get_start_time_str(self) -> str: + """ + Return the value entered the start-time field. + + :return: Time string in ``HH:MM:SS`` format (defaults to ``"05:00:00"`` if empty). + """ + # Get the text, strip whitespace, and default if empty + value = self.entry_time.get().strip() or config.DEFAULT_START_TIME + log.debug("get_start_time_str -> '%s'", value) + return value diff --git a/src/rail_network_graph/GUI/counter_panel/submit_handler.py b/src/rail_network_graph/GUI/counter_panel/submit_handler.py new file mode 100644 index 0000000..1fa64d6 --- /dev/null +++ b/src/rail_network_graph/GUI/counter_panel/submit_handler.py @@ -0,0 +1,314 @@ +import logging +from datetime import timedelta +from typing import Any + +import customtkinter as ctk +import pandas as pd + +from rail_network_graph import config +from rail_network_graph.data_processing.data_processor import ( + get_parent_station_id_by_name, + get_stop_name_by_id, +) +from rail_network_graph.graphs.stations_graph import StationsGraph +from rail_network_graph.GUI.counter_panel.input_panel import InputPanel +from rail_network_graph.GUI.counter_panel.ticket_popup import TicketPopup +from rail_network_graph.GUI.map.map_canvas import MapCanvas +from rail_network_graph.path_finder.path_finder import PathFinderDijkstra + +log = logging.getLogger(__name__) + + +class SubmitHandler: + """ + Handle the "Search" button action: read user inputs, run routing, + compute price, and update the GUI (popup + map). + + :param input_panel: InputPanel instance providing user inputs. + :param root: Main tkinter root/window. + :param map_canvas: MapCanvas instance used for highlighting paths. + :param merged_data: Preprocessed GTFS-like DataFrame used by the pathfinder. + :param stops_data: GTFS ``stops.txt`` DataFrame used for name lookups. + :param graph: StationsGraph instance for BFS visualization on the map. + """ + + def __init__( + self, + input_panel: InputPanel, + root: ctk.CTk | ctk.CTkFrame, + map_canvas: MapCanvas, + merged_data: pd.DataFrame, + stops_data: pd.DataFrame, + graph: StationsGraph, + ) -> None: + self.input_panel = input_panel + self.root = root + self.map_canvas = map_canvas + self.stops_data = stops_data + self.graph = graph + self.merged_data = merged_data + # Initialize the pathfinding algorithm instance + self.finder = PathFinderDijkstra(merged_data) + log.debug( + "SubmitHandler initialized: merged_data_shape=%s, stops_data_shape=%s", + getattr(self.merged_data, "shape", None), + getattr(self.stops_data, "shape", None), + ) + + def handle_submit( + self, color: str = config.MAP_HIGHLIGHT_COLOR, linewidth: float = config.MAP_HIGHLIGHT_LINEWIDTH + ) -> None: + """ + End-to-end flow: inputs → path search → price calculation → GUI updates. + + :param color: Line color for map highlighting. + :param linewidth: Line width for map highlighting. + """ + log.info("Handling submit action (color=%s, linewidth=%s)", color, linewidth) + # Retrieve user inputs + start_station_name = self.get_start_station() + end_station_name = self.get_end_station() + discount_percent = self.get_selected_discount() + log.debug( + "User inputs: start_station=%s, end_station=%s, discount=%s", + start_station_name, + end_station_name, + discount_percent, + ) + + try: + # Step 1: Find the shortest path in time + station_rows, duration_string, station_ids = self.get_path(start_station_name, end_station_name) + except ValueError as error: + # Display error if stations are not found or no route exists + log.error("Error while computing path: %s", error) + print("Error:", str(error)) + return + + # Step 2: Assemble the full continuous route path for map visualization and distance calculation + full_path_station_ids: list[str] = [] + # Connect the segments (found by Dijkstra) using BFS to get intermediate stations for the map + for index in range(len(station_ids) - 1): + # Find the path segment between two consecutive stations from the Dijkstra result + segment = self.graph.find_path_bfs(station_ids[index], station_ids[index + 1]) + if not segment: + log.warning( + "No BFS segment found between station_ids %s and %s", + station_ids[index], + station_ids[index + 1], + ) + continue + # Append the segment, avoiding duplicate connection points + if full_path_station_ids and full_path_station_ids[-1] == segment[0]: + full_path_station_ids.extend(segment[1:]) + else: + full_path_station_ids.extend(segment) + log.debug("Full path station_ids (for distance/map) length=%d", len(full_path_station_ids)) + + # Convert the continuous path of station IDs back to names + station_names = [get_stop_name_by_id(self.stops_data, int(station_id)) for station_id in full_path_station_ids] + + # Step 3: Calculate distance + from rail_network_graph.utils.distance_calculator import get_total_distance, load_distances + + # Load pre-calculated railway segment lengths + distances = load_distances(config.RAILWAY_DISTANCES_OUTPUT_PATH) + # Sum the distances of all segments in the full path + distance_km = get_total_distance(station_names, distances) + log.info("Computed total distance: %.3f km", distance_km) + + # Step 4: Calculate price + from rail_network_graph.utils.price_calculator import get_price, load_pricing + + # Load base pricing table + pricing_ranges = load_pricing(config.PRICING_TABLE_PATH) + # Determine base price based on distance + base_price = get_price(distance_km, pricing_ranges) + # Apply user's selected discount + final_price = self.calculate_discounted_price(base_price, discount_percent) + log.info( + "Price calculation: base_price=%.2f, discount=%d, final_price=%.2f", + base_price, + discount_percent, + final_price, + ) + + # Format output strings + distance_string = f"{round(distance_km, 1)} km" + price_string = f"{final_price:.2f} zł" + + # Step 5: Update GUI + self.show_ticket_popup( + start_station_name, end_station_name, distance_string, duration_string, price_string, station_rows + ) + # Highlight the final path on the map + self.highlight_path_on_map(station_ids, color, linewidth) + + def get_start_station(self) -> str: + """Return start station name entered in UI.""" + value = self.input_panel.entry1.get() + log.debug("get_start_station -> %s", value) + return value + + def get_end_station(self) -> str: + """Return end station name entered in UI.""" + value = self.input_panel.entry2.get() + log.debug("get_end_station -> %s", value) + return value + + def get_selected_discount(self) -> int: + """Return selected discount percentage.""" + value = self.input_panel.get_selected_discount() + log.debug("get_selected_discount -> %d", value) + return value + + def get_path( + self, start_name: str, end_name: str + ) -> tuple[list[tuple[str | None, str, str, bool, bool]], str, list[str]]: + """ + Compute the route between two station names and format data for UI. + """ + log.info("Computing path between '%s' and '%s'", start_name, end_name) + # Convert station names to GTFS parent station IDs + start_id = get_parent_station_id_by_name(self.stops_data, start_name) + end_id = get_parent_station_id_by_name(self.stops_data, end_name) + log.debug("Resolved station IDs: start_id=%s, end_id=%s", start_id, end_id) + + if not start_id or not end_id: + log.warning("Station name resolution failed: start_id=%s, end_id=%s", start_id, end_id) + raise ValueError("One or both station names were not found.") + + # Get the desired start time from the input panel + start_time_string = self.input_panel.get_start_time_str() + log.debug("Starting path search with start_time=%s", start_time_string) + # Run Dijkstra's algorithm + result = self.finder.find_path(start_id, end_id, start_time=start_time_string) + + if result is None: + log.warning("No route found between '%s' and '%s'", start_name, end_name) + raise ValueError("No route found between the selected stations.") + + # Unpack pathfinding results + station_departures, transfers, duration_minutes, trip_ids = result + log.debug( + "PathFinder result: stations=%d, transfers=%d, duration_minutes=%d, trips=%d", + len(station_departures), + len(transfers), + duration_minutes, + len(trip_ids), + ) + + station_rows = [] + last_trip_id = None + + # Iterate through the sequence of stations with departure times + for _index, (station_id, departure_time) in enumerate(station_departures): + # Find the full GTFS row corresponding to this exact departure + row: Any = self.merged_data[ + (self.merged_data["station_id"] == station_id) & (self.merged_data["departure_time"] == departure_time) + ].iloc[0] + + arrival_time = row["arrival_time"] + trip_id = row["trip_id"] + # Convert station ID back to a display name + name = get_stop_name_by_id(self.stops_data, int(station_id)) + + # Determine if this station is the start of a new trip/segment + is_first_in_trip = trip_id != last_trip_id + if is_first_in_trip: + # Add trip_id to the name for clarity + name = f"{name} ({trip_id})" + + is_transfer = station_id in transfers + # Append formatted row data for the ticket popup + station_rows.append((name, departure_time, arrival_time, is_transfer, is_first_in_trip)) + last_trip_id = trip_id + + # Convert total duration from minutes to 'X h Y min' string + duration_td = timedelta(minutes=duration_minutes) + hours, remainder = divmod(duration_td.seconds, 3600) + minutes = remainder // 60 + duration_string = f"{hours} h {minutes} min" if hours else f"{minutes} min" + log.debug("Computed duration string: %s", duration_string) + + # Extract just the sequence of station IDs from the path + station_ids = [station_id for station_id, _ in station_departures] + log.debug("Extracted station_ids path length=%d", len(station_ids)) + return station_rows, duration_string, station_ids + + @staticmethod + def calculate_discounted_price(base_price: float, discount_percent: int) -> float: + """Return price after applying percentage discount.""" + discounted = base_price * ((100 - discount_percent) / 100) + log.debug( + "calculate_discounted_price: base_price=%.2f, discount=%d, result=%.2f", + base_price, + discount_percent, + discounted, + ) + return discounted + + def show_ticket_popup( + self, + start: str, + end: str, + distance: str, + duration: str, + price: str, + station_rows: list[tuple[str | None, str, str, bool, bool]], + ) -> None: + """Show results popup.""" + log.info( + "Showing ticket popup: %s -> %s, distance=%s, duration=%s, price=%s, rows=%d", + start, + end, + distance, + duration, + price, + len(station_rows), + ) + # Create and display the results window + TicketPopup(self.root).show_popup(start, end, distance, duration, price, station_rows) + + def highlight_path_on_map(self, station_ids: list[str], color: str, linewidth: float) -> None: + """Highlight travel path on map.""" + log.info( + "Highlighting path on map: segments=%d, color=%s, linewidth=%s", + max(len(station_ids) - 1, 0), + color, + linewidth, + ) + # Clear any previously drawn routes + self.map_canvas.map_plotter.clear_highlights() + + full_route: list[str] = [] + + # Re-run BFS to get intermediate nodes for map drawing + for index in range(len(station_ids) - 1): + start_id = station_ids[index] + end_id = station_ids[index + 1] + + # Find the segment of the route through the station graph + segment = self.graph.find_path_bfs(start_id, end_id) + + if not segment: + log.warning("No route segment found between %s and %s (for map highlight)", start_id, end_id) + print(f"No route segment found between {start_id} and {end_id}") + continue + + # Assemble the continuous list of station IDs for map drawing + if full_route and full_route[-1] == segment[0]: + full_route.extend(segment[1:]) + else: + full_route.extend(segment) + + log.debug("Full route for map highlight length=%d", len(full_route)) + + # Draw each connection segment on the map + for index in range(len(full_route) - 1): + self.map_canvas.map_plotter.highlight_connection( + full_route[index], full_route[index + 1], color=color, linewidth=linewidth + ) + + # Set focus back to the map canvas widget + self.map_canvas.canvas.get_tk_widget().focus_set() diff --git a/src/rail_network_graph/GUI/counter_panel/ticket_popup.py b/src/rail_network_graph/GUI/counter_panel/ticket_popup.py new file mode 100644 index 0000000..af872a3 --- /dev/null +++ b/src/rail_network_graph/GUI/counter_panel/ticket_popup.py @@ -0,0 +1,134 @@ +import logging +import re + +import customtkinter as ctk + +from rail_network_graph import config + +log = logging.getLogger(__name__) + +# Define color constants +ORANGE = "#FFA500" +DARK_BG = "#1a1a1a" +ENTRY_BG = "#2a2a2a" +TEXT_COLOR = "#ffffff" +SUBTEXT_COLOR = "#bbbbbb" + + +class TicketPopup: + """ + Popup window with a route summary and a scrollable list of all stations. + + :param root: Parent CTk window or frame. + """ + + def __init__(self, root: ctk.CTk | ctk.CTkFrame) -> None: + self.root = root + log.debug("TicketPopup initialized with root=%s", type(root).__name__) + + def show_popup( + self, + start: str, + end: str, + distance: str, + duration: str, + price: str, + station_rows: list[tuple[str, str, str, bool, bool]], + ) -> None: + """ + Display the popup with ticket/route information. + + :param start: Start station name. + :param end: End station name. + :param distance: Distance text (e.g., "123.4 km"). + :param duration: Duration text (e.g., "2 h 13 min"). + :param price: Price text (e.g., "12.34 zł"). + :param station_rows: List of per-stop tuples: + (name_with_optional_trip, dep_time, arr_time, is_transfer, is_first_in_trip). + """ + log.info( + "Showing ticket popup: start=%s, end=%s, distance=%s, duration=%s, price=%s, rows=%d", + start, + end, + distance, + duration, + price, + len(station_rows), + ) + # Create a new top-level window (popup) + popup_window = ctk.CTkToplevel(self.root) + # Set window icon (after a delay for proper initialization) + popup_window.after(250, lambda: popup_window.iconbitmap(config.APP_ICON_PATH)) # noqa + # Ensure the popup stays on top + popup_window.attributes("-topmost", True) + popup_window.lift() + # Force focus onto the popup + popup_window.focus_force() + popup_window.title("") + popup_window.geometry("380x520") + popup_window.resizable(False, False) + popup_window.configure(fg_color=DARK_BG) + + # Calculate coordinates to center the popup relative to the main window + popup_window.update_idletasks() + window_x = self.root.winfo_x() + (self.root.winfo_width() // 2) - 190 + window_y = self.root.winfo_y() + (self.root.winfo_height() // 2) - 260 + popup_window.geometry(f"+{window_x}+{window_y}") + log.debug("TicketPopup positioned at x=%d, y=%d", window_x, window_y) + + ## Header and Summary Information + + # Title label + ctk.CTkLabel( + popup_window, text="📄 Your route", font=ctk.CTkFont(size=16, weight="bold"), text_color=TEXT_COLOR + ).pack(pady=(16, 6)) + + # Compile route summary text + info_text = f"From: {start}\n" f"To: {end}\n\n" f"Distance: {distance}\n" f"Ticket price: {price}" + + # Label for displaying the summary text + ctk.CTkLabel(popup_window, text=info_text, justify="center", text_color=TEXT_COLOR, wraplength=340).pack( + pady=(0, 10), anchor="center" + ) + + ## Scrollable Station List + + # Outer frame for orange border effect around the list + border_frame = ctk.CTkFrame(popup_window, fg_color=ORANGE, corner_radius=9) + border_frame.pack(padx=12, pady=(0, 14), fill="both", expand=True) + + # Scrollable frame to contain the list of stops + scroll_frame = ctk.CTkScrollableFrame(border_frame, fg_color=ENTRY_BG) + scroll_frame.pack(padx=2, pady=2, fill="both", expand=True) + + # Check if route data is available + if not station_rows: + log.warning("TicketPopup opened with no station_rows") + ctk.CTkLabel(scroll_frame, text="No stations.", text_color=TEXT_COLOR).pack(pady=10) + else: + # Iterate through the stop-by-stop route details + for station_name, dep_time, arr_time, _is_transfer, _is_first_in_trip in station_rows: + # Remove trip ID in parentheses for cleaner display in the list + station_name_clean = re.sub(r"\s*\([^)]+\)", "", station_name).strip() + + # Format the line: arrival time - station name - departure time + line_text = f"arr: {arr_time} — {station_name_clean} — dep: {dep_time}" + + # Create a label for the current stop + ctk.CTkLabel( + scroll_frame, text=line_text, anchor="w", text_color=TEXT_COLOR, wraplength=330, justify="left" + ).pack(anchor="w", padx=10, pady=(4, 0)) + + ## Close Button + + # Button to close the popup window + ctk.CTkButton( + popup_window, + text="Close", + fg_color=ORANGE, + hover_color="#cc8400", + text_color="black", + corner_radius=8, + command=popup_window.destroy, # Action to close the window + ).pack(pady=(0, 12)) + log.debug("TicketPopup UI elements created and displayed") diff --git a/src/rail_network_graph/GUI/station_info/__init__.py b/src/rail_network_graph/GUI/station_info/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/rail_network_graph/GUI/station_info/station_info_logic.py b/src/rail_network_graph/GUI/station_info/station_info_logic.py new file mode 100644 index 0000000..8c16fd4 --- /dev/null +++ b/src/rail_network_graph/GUI/station_info/station_info_logic.py @@ -0,0 +1,150 @@ +import logging +from collections.abc import Iterable +from typing import Any + +import pandas as pd + +log = logging.getLogger(__name__) + + +class StationInfoLogic: + """ + Utilities for querying next-stop connections for a given station + from preprocessed GTFS-like trip/stop data. + """ + + def __init__(self, station_names: dict[str, str], merged_data: pd.DataFrame): + self.station_names: dict[str, str] = station_names + + log.info("Initializing StationInfoLogic") + log.debug("StationInfoLogic station_names size: %d", len(self.station_names)) + log.debug("Merged_data shape for StationInfoLogic: %s", merged_data.shape) + + # Group trips by trip_id and sort stops by stop_sequence + # Pre-process: Create a dictionary {trip_id: [list of stop tuples sorted by sequence]} + self.trips_data: dict[str, list[Any]] = { + str(trip_id): list(group.sort_values("stop_sequence").itertuples(index=False)) + for trip_id, group in merged_data.groupby("trip_id") + } + + # Map each trip to its list of station_ids + # Pre-process: Create a dictionary {trip_id: [list of station_ids]} for quick lookup + self.stops_in_trip: dict[str, list[str]] = merged_data.groupby("trip_id")["station_id"].apply(list).to_dict() + + log.debug("Preprocessed trips_data entries: %d", len(self.trips_data)) + log.debug("Preprocessed stops_in_trip entries: %d", len(self.stops_in_trip)) + log.info("StationInfoLogic initialization complete") + + def get_station_data(self, station_id: str) -> tuple[str, list[tuple[str, Any, Any]]]: + """ + Build a title and a deduplicated, time-sorted list of direct + next-stop connections from the given station. + + :param station_id: Station identifier to inspect. + :return: (title, connections) where each connection is + (destination_name, departure_time, arrival_time). + """ + log.info("Fetching station data for station_id=%s", station_id) + # Get the human-readable station name + station_display_name = self.station_names.get(station_id, "?") + connections: list[tuple[str, Any, Any]] = [] + + # Collect all trips that pass through this station using the pre-built lookup + relevant_trip_ids: list[str] = [ + trip_id for trip_id, station_list in self.stops_in_trip.items() if station_id in station_list + ] + log.debug("Found %d trips containing station_id=%s", len(relevant_trip_ids), station_id) + + # Iterate over all relevant trips + for trip_id in relevant_trip_ids: + stops_in_this_trip = self.trips_data[trip_id] + + # Find the next stop after the given station in this trip + for stop_index in range(len(stops_in_this_trip) - 1): + # Check if the current stop is the station of interest + if stops_in_this_trip[stop_index].station_id == station_id: + current_stop = stops_in_this_trip[stop_index] + next_stop = stops_in_this_trip[stop_index + 1] + + dest_station_id = next_stop.station_id + # Get destination name + destination_name = self.station_names.get(dest_station_id, "?") + + # Store connection: destination, departure time, arrival time + connections.append((destination_name, current_stop.departure_time, next_stop.arrival_time)) + # We only need the *first* next stop on this trip, so break the inner loop + break + + # Remove duplicates (e.g., identical service runs) and sort by departure time + unique_connections = sorted(set(connections), key=lambda row: row[1]) + log.debug( + "Station %s (%s): raw_connections=%d, unique_sorted_connections=%d", + station_id, + station_display_name, + len(connections), + len(unique_connections), + ) + # Format the title for the popup + title = f"{station_display_name}\n(ID: {station_id})" + log.info("Prepared station data for station_id=%s with %d connections", station_id, len(unique_connections)) + return title, list(unique_connections) + + @staticmethod + def quick_sort_connections(connections: Iterable[tuple[str, Any, Any]]) -> list[tuple[str, Any, Any]]: + """ + Deduplicate and in-place quick-sort connections by departure time. + + :param connections: Iterable of (destination_name, departure_time, arrival_time). + :return: List of unique connections sorted by departure_time. + """ + log.info("Sorting connections with quick_sort_connections") + connections_list = list(connections) + log.debug("quick_sort_connections received %d connections", len(connections_list)) + + def quick_sort_core(array: list[tuple[str, Any, Any]], left_index: int, right_index: int) -> None: + """ + In-place quicksort partitioning on departure_time. + + :param array: List of connection tuples. + :param left_index: Left index (inclusive). + :param right_index: Right index (inclusive). + """ + if left_index >= right_index: + return + + # Choose the middle element's departure time as the pivot + pivot_departure_time = array[(left_index + right_index) // 2][1] + i_left = left_index + i_right = right_index + + while i_left <= i_right: + # Find element on the left that is greater than or equal to the pivot + while array[i_left][1] < pivot_departure_time: + i_left += 1 + # Find element on the right that is less than or equal to the pivot + while array[i_right][1] > pivot_departure_time: + i_right -= 1 + if i_left <= i_right: + # Swap the two elements + array[i_left], array[i_right] = array[i_right], array[i_left] + i_left += 1 + i_right -= 1 + + # Recursively sort the sub-arrays + quick_sort_core(array, left_index, i_right) + quick_sort_core(array, i_left, right_index) + + # Deduplicate before sorting + unique_list: list[tuple[str, Any, Any]] = list(set(connections_list)) + log.debug( + "quick_sort_connections deduplicated list: input=%d, unique=%d", + len(connections_list), + len(unique_list), + ) + if unique_list: + # Execute the in-place quicksort if the list is not empty + quick_sort_core(unique_list, 0, len(unique_list) - 1) + log.debug("quick_sort_connections completed; sorted list length=%d", len(unique_list)) + else: + log.debug("quick_sort_connections received an empty list after deduplication; nothing to sort") + return unique_list diff --git a/src/rail_network_graph/GUI/station_info/station_info_popup.py b/src/rail_network_graph/GUI/station_info/station_info_popup.py new file mode 100644 index 0000000..a0b9fdb --- /dev/null +++ b/src/rail_network_graph/GUI/station_info/station_info_popup.py @@ -0,0 +1,148 @@ +import logging + +import customtkinter as ctk + +from rail_network_graph import config + +log = logging.getLogger(__name__) + + +class StationInfoPopup: + """ + Popup window displaying detailed information about a selected station: + + - station name, + - list of next available connections (departure → destination (arrival)). + + :param root: Parent application window (CTk or Tk). + """ + + ORANGE = "#FFA500" + DARK_BG = "#1a1a1a" + ENTRY_BG = "#2a2a2a" + TEXT_COLOR = "#ffffff" + + def __init__(self, root: ctk.CTk | ctk.CTkFrame) -> None: + self.root = root + log.debug("StationInfoPopup initialized with root=%s", type(root).__name__) + + def show_station_info(self, title: str, connections: list[tuple[str, str, str]]) -> None: + """ + Create and display the popup with station information. + + :param title: Popup title (e.g. "Wrocław Główny\n(ID: 1001)"). + :param connections: List of tuples representing upcoming departures: + (destination_name, departure_time, arrival_time). + """ + log.info( + "Showing station info popup: title=%r, connections_count=%d", + title, + len(connections), + ) + popup = self.create_popup(400, 500) + self.center_popup(popup, width=400, height=500) + + self.add_title(popup, title) + self.add_connection_list(popup, connections) + self.add_close_button(popup) + log.debug("Station info popup displayed for title=%r", title) + + def create_popup(self, width: int, height: int) -> ctk.CTkToplevel: + """ + Create and configure a popup window. + + :param width: Popup width in pixels. + :param height: Popup height in pixels. + :return: Configured CTkToplevel window. + """ + log.debug("Creating station info popup window: width=%d, height=%d", width, height) + popup = ctk.CTkToplevel(self.root) + popup.after(250, lambda: popup.iconbitmap(config.APP_ICON_PATH)) # noqa + popup.attributes("-topmost", True) + popup.lift() + popup.focus_force() + popup.title("") + popup.geometry(f"{width}x{height}") + popup.resizable(False, False) + popup.configure(fg_color=self.DARK_BG) + log.debug("Station info popup window created") + return popup + + def center_popup(self, popup: ctk.CTkToplevel, width: int, height: int) -> None: + """ + Center the popup relative to the main application window. + + :param popup: Popup window to position. + :param width: Popup width. + :param height: Popup height. + """ + popup.update_idletasks() + x_pos = self.root.winfo_x() + (self.root.winfo_width() // 2) - width // 2 + y_pos = self.root.winfo_y() + (self.root.winfo_height() // 2) - height // 2 + popup.geometry(f"+{x_pos}+{y_pos}") + log.debug("Centered station info popup at x=%d, y=%d", x_pos, y_pos) + + def add_title(self, popup: ctk.CTkToplevel, title: str) -> None: + """ + Add the station title/header to the popup. + + :param popup: Popup window. + :param title: Station title text. + """ + log.debug("Adding station title to popup: %r", title) + ctk.CTkLabel( + popup, + text=title, + font=ctk.CTkFont(size=16, weight="bold"), + text_color=self.TEXT_COLOR, + justify="center", + ).pack(pady=(16, 4)) + + def add_connection_list(self, popup: ctk.CTkToplevel, connections: list[tuple[str, str, str]]) -> None: + """ + Add a scrollable list of station connections. + + :param popup: Popup window. + :param connections: List of connections (destination, departure, arrival). + """ + log.debug("Adding %d connections to station info popup", len(connections)) + frame_border = ctk.CTkFrame(popup, fg_color=self.ORANGE, corner_radius=9) + frame_border.pack(padx=12, pady=(0, 12), fill="both", expand=True) + + scroll_frame = ctk.CTkScrollableFrame(master=frame_border, fg_color=self.ENTRY_BG) + scroll_frame.pack(padx=2, pady=2, fill="both", expand=True) + + if not connections: + log.info("No connections to display for this station") + ctk.CTkLabel(scroll_frame, text="No connections.", text_color=self.TEXT_COLOR).pack(pady=10) + return + + # Remove duplicates while preserving order + for destination_name, departure_time, arrival_time in list(dict.fromkeys(connections)): + row_text = f"{departure_time} > {destination_name} ({arrival_time})" + ctk.CTkLabel( + scroll_frame, + text=row_text, + anchor="w", + justify="left", + text_color=self.TEXT_COLOR, + wraplength=340, + ).pack(anchor="w", padx=8, pady=2) + log.debug("Finished adding connection rows to popup") + + def add_close_button(self, popup: ctk.CTkToplevel) -> None: + """ + Add a button to close the popup. + + :param popup: Popup window. + """ + log.debug("Adding Close button to station info popup") + ctk.CTkButton( + popup, + text="Close", + fg_color=self.ORANGE, + hover_color="#cc8400", + text_color="black", + corner_radius=8, + command=popup.destroy, + ).pack(pady=(0, 12))