-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathlogging.py
More file actions
204 lines (169 loc) · 7.9 KB
/
logging.py
File metadata and controls
204 lines (169 loc) · 7.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
"""
Shared structured logging + Flow Doctor integration.
Replaces near-identical copies of ``log_config.py`` in alpha-engine-data
and alpha-engine/executor. Consumers call :func:`setup_logging` once at
process startup; subsequent call sites retrieve the Flow Doctor instance
via :func:`get_flow_doctor`.
Modes:
- Text (default): human-readable single-line log format.
- JSON: activated by ``ALPHA_ENGINE_JSON_LOGS=1``. Emits one JSON object
per log record, including tracebacks for errors.
Flow Doctor activates only when ``FLOW_DOCTOR_ENABLED=1`` and a
``flow_doctor_yaml`` path is provided. ERROR-level records (including
``logger.exception``) fire the FlowDoctorHandler, which dispatches per
the yaml config (email + GitHub issue with dedup + rate limits).
Requires the ``flow_doctor`` optional extra when FLOW_DOCTOR_ENABLED=1
(``alpha-engine-lib[flow_doctor]``).
"""
from __future__ import annotations
import json
import logging
import os
import re
from datetime import datetime, timezone
from typing import Optional
# ``${VAR}`` interpolation tokens in a flow-doctor.yaml. flow-doctor
# resolves these from ``os.environ`` eagerly at ``flow_doctor.init()``
# time — before any lazy ``get_secret()`` consumer-site call runs — so
# the seed below must populate them first.
_FD_VAR_RE = re.compile(r"\$\{([A-Z][A-Z0-9_]*)\}")
# Singleton populated by setup_logging() when FLOW_DOCTOR_ENABLED=1.
# ``Optional[object]`` typing avoids forcing a flow_doctor import here.
_fd_instance: Optional[object] = None
class JSONFormatter(logging.Formatter):
"""Emit log records as single-line JSON objects."""
def format(self, record: logging.LogRecord) -> str:
entry = {
"ts": datetime.fromtimestamp(record.created, tz=timezone.utc).isoformat(),
"level": record.levelname,
"module": record.module,
"func": record.funcName,
"msg": record.getMessage(),
}
if record.exc_info and record.exc_info[0] is not None:
entry["exc"] = self.formatException(record.exc_info)
if hasattr(record, "ctx"):
entry["ctx"] = record.ctx
return json.dumps(entry, default=str)
def get_flow_doctor():
"""Return the shared flow-doctor instance, or None if not initialized."""
return _fd_instance
def _seed_flow_doctor_secrets(yaml_path: str) -> None:
"""Populate the flow-doctor ``${VAR}`` secrets into ``os.environ``.
flow-doctor resolves every ``${VAR}`` in its yaml from ``os.environ``
eagerly inside ``flow_doctor.init()``, before any consumer-site
:func:`alpha_engine_lib.secrets.get_secret` call has had a chance to
run. With the legacy ``ssm_secrets.load_secrets()`` bulk-load shim
retired (PR 9g), systemd/Step-Functions-launched entrypoints have no
``.env`` source, so those ``${VAR}`` refs would resolve to nothing
and flow-doctor's email + GitHub dispatch would silently misfire.
This is the single chokepoint every repo reaches flow-doctor
through, so seeding here closes the gap system-wide with no
per-repo code. The var set is derived from the yaml itself rather
than hardcoded — each repo's flow-doctor.yaml carries a different
``${VAR}`` set, and a yaml-added secret must not silently re-open
the gap.
Invariants (mirroring the retired shim):
- A var already present in ``os.environ`` wins — never overwritten.
- A genuinely unresolvable secret is left **unset**, so
flow-doctor's own ``ConfigError`` fires loudly rather than being
masked with ``""`` (see ``feedback_no_silent_fails``).
- A secrets-backend hiccup never blocks logging setup; it is logged
at WARNING and the var is left unset (same loud-failure path).
"""
try:
with open(yaml_path, "r", encoding="utf-8") as fh:
yaml_text = fh.read()
except OSError:
# Missing/unreadable yaml is reported by _attach_flow_doctor's
# own os.path.exists guard with a clearer message.
return
from alpha_engine_lib.secrets import get_secret
for var in sorted(set(_FD_VAR_RE.findall(yaml_text))):
if os.environ.get(var):
continue
try:
value = get_secret(var, required=False)
except Exception as exc: # noqa: BLE001 - backend hiccup is non-fatal
logging.getLogger(__name__).warning(
"flow-doctor secret seed: get_secret(%s) raised %r; "
"leaving unset so flow-doctor fails loudly", var, exc,
)
continue
if value:
os.environ[var] = value
def _attach_flow_doctor(
yaml_path: str,
exclude_patterns: list[str] | None = None,
) -> None:
"""Initialize the shared flow-doctor instance and attach a log handler.
``exclude_patterns`` is a list of regex strings forwarded to
``FlowDoctorHandler(exclude_patterns=...)``. Log records whose
rendered message matches any pattern are dropped before entering
the flow-doctor dispatch pipeline (email / GitHub issue). Use for
benign ERROR-level noise that would otherwise dedup-spam on-call.
"""
global _fd_instance
try:
import flow_doctor
except ImportError as exc:
raise RuntimeError(
"FLOW_DOCTOR_ENABLED=1 but flow-doctor is not installed. Install "
"via alpha-engine-lib[flow_doctor] or add flow-doctor[diagnosis] "
f"to requirements: {exc}"
) from exc
if not os.path.exists(yaml_path):
raise RuntimeError(
f"FLOW_DOCTOR_ENABLED=1 but flow-doctor config not found at {yaml_path}"
)
_seed_flow_doctor_secrets(yaml_path)
_fd_instance = flow_doctor.init(config_path=yaml_path)
handler_kwargs: dict = {"level": logging.ERROR}
if exclude_patterns:
handler_kwargs["exclude_patterns"] = exclude_patterns
handler = flow_doctor.FlowDoctorHandler(_fd_instance, **handler_kwargs)
logging.getLogger().addHandler(handler)
def setup_logging(
name: str,
flow_doctor_yaml: str | None = None,
exclude_patterns: list[str] | None = None,
) -> None:
"""Configure the root logger for an Alpha Engine entrypoint.
:param name: Logger name shown in the text-mode prefix
(``"%(asctime)s %(levelname)s [{name}] %(message)s"``). Typically
the module name (``"data-collector"``, ``"executor"``, etc.).
:param flow_doctor_yaml: Absolute or CWD-relative path to the
flow-doctor yaml config. Required if ``FLOW_DOCTOR_ENABLED=1``;
ignored otherwise.
:param exclude_patterns: Optional list of regex strings. When
``FLOW_DOCTOR_ENABLED=1``, these are forwarded to
``FlowDoctorHandler`` so matching ERROR-level records are
dropped before the flow-doctor dispatch pipeline. Use sparingly
— this silences *alerts*, not logs. The records still appear in
stdout / JSON logs; only flow-doctor's email + GitHub issue
routing is suppressed. Example: the executor passes
``[r"Error 10197"]`` to suppress benign IB Gateway noise when
the iOS app steals the live-data session.
Env vars consulted:
- ``ALPHA_ENGINE_JSON_LOGS`` — ``"1"`` enables JSON formatter.
- ``FLOW_DOCTOR_ENABLED`` — ``"1"`` attaches FlowDoctorHandler.
"""
json_mode = os.environ.get("ALPHA_ENGINE_JSON_LOGS", "0") == "1"
handler = logging.StreamHandler()
if json_mode:
handler.setFormatter(JSONFormatter())
else:
handler.setFormatter(logging.Formatter(
f"%(asctime)s %(levelname)s [{name}] %(message)s"
))
root = logging.getLogger()
root.handlers.clear()
root.addHandler(handler)
root.setLevel(logging.INFO)
if os.environ.get("FLOW_DOCTOR_ENABLED", "0") == "1":
if not flow_doctor_yaml:
raise RuntimeError(
"FLOW_DOCTOR_ENABLED=1 but setup_logging() was not given a "
"flow_doctor_yaml path"
)
_attach_flow_doctor(flow_doctor_yaml, exclude_patterns=exclude_patterns)