diff --git a/examples/ESRF_ORM_example/measure_ideal_ORM.py b/examples/ESRF_ORM_example/measure_ideal_ORM.py index 4420c1a5..60efa03d 100644 --- a/examples/ESRF_ORM_example/measure_ideal_ORM.py +++ b/examples/ESRF_ORM_example/measure_ideal_ORM.py @@ -5,15 +5,13 @@ from pyaml.tuning_tools.orbit_response_matrix import OrbitResponseMatrix parent_folder = Path(__file__).parent -config_path = parent_folder.parent.parent.joinpath( - "tests", "config", "EBSOrbit.yaml" -).resolve() +config_path = parent_folder.parent.parent.joinpath("tests", "config", "EBSOrbit.yaml").resolve() sr = Accelerator.load(config_path) ebs = sr.design -ebs.orm.measure() -ebs.orm.save(parent_folder / Path("ideal_orm.json")) -ebs.orm.save(parent_folder / Path("ideal_orm.yaml"), with_type="yaml") -ebs.orm.save(parent_folder / Path("ideal_orm.npz"), with_type="npz") +if ebs.orm.measure(): + ebs.orm.save(parent_folder / Path("ideal_orm.json")) + ebs.orm.save(parent_folder / Path("ideal_orm.yaml"), with_type="yaml") + ebs.orm.save(parent_folder / Path("ideal_orm.npz"), with_type="npz") ormdata = ebs.orm.get() diff --git a/pyaml/accelerator.py b/pyaml/accelerator.py index e418e2ba..c69ba0c7 100644 --- a/pyaml/accelerator.py +++ b/pyaml/accelerator.py @@ -33,6 +33,8 @@ class ConfigModel(BaseModel): energy : float Accelerator nominal energy. For ramped machine, this value can be dynamically set + alphac : float, optional + Moment compaction factor. controls : list[ControlSystem], optional List of control system used. An accelerator can access several control systems @@ -53,6 +55,7 @@ class ConfigModel(BaseModel): facility: str machine: str energy: float + alphac: float | None = None controls: list[ControlSystem] = None simulators: list[Simulator] = None data_folder: str @@ -103,6 +106,9 @@ def __init__(self, cfg: ConfigModel): if cfg.energy is not None: self.set_energy(cfg.energy) + if cfg.alphac is not None: + self.set_mcf(cfg.alphac) + self._yellow_pages = YellowPages(self) self.post_init() @@ -118,10 +124,26 @@ def set_energy(self, E: float): """ if self._cfg.simulators is not None: for s in self._cfg.simulators: - s.set_energy(E) + s._set_energy(E) + if self._cfg.controls is not None: + for c in self._cfg.controls: + c._set_energy(E) + + def set_mcf(self, alphac: float): + """ + Set the moment compaction factor for all simulators and control systems. + + Parameters + ---------- + alphac : float + Moment compaction factor + """ + if self._cfg.simulators is not None: + for s in self._cfg.simulators: + s._set_mcf(alphac) if self._cfg.controls is not None: for c in self._cfg.controls: - c.set_energy(E) + c._set_mcf(alphac) def post_init(self): """ diff --git a/pyaml/common/element.py b/pyaml/common/element.py index 4eb8c5d1..bc9008f0 100644 --- a/pyaml/common/element.py +++ b/pyaml/common/element.py @@ -97,6 +97,12 @@ def set_energy(self, E: float): """ pass + def set_mcf(self, alphac: float): + """ + Set the instrument moment compaction factor on this element + """ + pass + def check_peer(self): """ Throws an exception if the element is not attacched diff --git a/pyaml/common/element_holder.py b/pyaml/common/element_holder.py index 2b864e1f..9296f560 100644 --- a/pyaml/common/element_holder.py +++ b/pyaml/common/element_holder.py @@ -386,3 +386,29 @@ def _list_diagnostics(self) -> list[str]: Return all diagnostic identifiers available in this holder. """ return list(self.__DIAG.keys()) + + def _set_energy(self, E: float): + """ + Sets the energy on all elements + + Parameters + ---------- + E : float + Energy in eV + """ + # Needed by energy dependant element (i.e. magnet coil current calculation) + for m in self.get_all_elements(): + m.set_energy(E) + + def _set_mcf(self, alphac: float): + """ + Sets the moment compaction factor on all elements + + Parameters + ---------- + alphac : float + Moment compaction factor + """ + # Needed by some off energy dependant element (i.e. chromaticty tools) + for m in self.get_all_elements(): + m.set_mcf(alphac) diff --git a/pyaml/control/controlsystem.py b/pyaml/control/controlsystem.py index a7888728..d644a9a4 100644 --- a/pyaml/control/controlsystem.py +++ b/pyaml/control/controlsystem.py @@ -162,19 +162,6 @@ def create_bpm_aggregators(self, bpms: list[BPM]) -> list[ScalarAggregator]: else: raise PyAMLException("Indexed BPM and scalar values cannot be mixed in the same array") - def set_energy(self, E: float): - """ - Sets the energy on magnets belonging to this control system - - Parameters - ---------- - E : float - Energy in eV - """ - # Needed by energy dependant element (i.e. magnet coil current calculation) - for m in self.get_all_elements(): - m.set_energy(E) - def fill_device(self, elements: list[Element]): """ Fill device of this control system with Element diff --git a/pyaml/diagnostics/chromaticity_monitor.py b/pyaml/diagnostics/chromaticity_monitor.py index 7e09e9fd..4af00902 100644 --- a/pyaml/diagnostics/chromaticity_monitor.py +++ b/pyaml/diagnostics/chromaticity_monitor.py @@ -2,7 +2,7 @@ from ..common.constants import Action from ..common.element import ElementConfigModel from ..common.exception import PyAMLException -from ..tuning_tools.measurement_tool import MeasurementTool +from ..tuning_tools.measurement_tool import MeasurementTool, MeasurementToolConfigModel try: from typing import Self # Python 3.11+ @@ -20,7 +20,7 @@ PYAMLCLASS = "ChomaticityMonitor" -class ConfigModel(ElementConfigModel): +class ConfigModel(MeasurementToolConfigModel): """ Configuration model for Chromaticity Monitor. @@ -32,11 +32,6 @@ class ConfigModel(ElementConfigModel): Name of main RF frequency plant bpm_array_name : str,optional Name of main BPM array used for dispersion fit - n_step : int, optional - Default number of RF step during chromaticity - measurement, by default 5 - alphac : float or None, optional - Momentum compaction factor, by default None e_delta : float, optional Default variation of relative energy during chromaticity measurement: f0 - f0 * E_delta * alphac < f_RF < f0 + f0 * E_delta * alphac, @@ -44,12 +39,6 @@ class ConfigModel(ElementConfigModel): max_e_delta : float, optional Maximum authorized variation of relative energy during chromaticity measurement, by default 0.004 - n_tune_meas : int, optional - Default number of tune/orbit measurement per RF frequency, by default 1 - sleep_between_meas : float, optional - Default sleep time in [s] between two tune measurements, by default 2.0 - sleep_between_step : float, optional - Default sleep time in [s] after RF frequency variation, by default 5.0 fit_order : int, optional Chomaticity fitting order, by default 1 fit_disp_order : int, optional @@ -63,13 +52,8 @@ class ConfigModel(ElementConfigModel): betatron_tune_name: str rf_plant_name: str bpm_array_name: str | None = None - n_step: int = 5 - alphac: float | None = None e_delta: float = 0.001 max_e_delta: float = 0.004 - n_tune_meas: int = 1 - sleep_between_meas: float = 0.0 - sleep_between_step: float = 0.0 fit_order: int = 1 fit_disp_order: int = 1 fit_dispersion: bool = False @@ -118,6 +102,7 @@ def __init__(self, cfg: ConfigModel): self._cfg = cfg self._chromaticity = RChromaDispArray(self, "chromaticity", "1") self._dipsersion = RChromaDispArray(self, "dispersion", "m") + self._alphac = None @property def chromaticity(self) -> ReadFloatArray: @@ -127,10 +112,13 @@ def chromaticity(self) -> ReadFloatArray: Returns ------- ReadFloatArray - Array of chromaticity values [[q'x, q'y],[q''x, q''y],...] + chromaticity values [q'x, q'y] """ return self._chromaticity + def set_mcf(self, alphac: float): + self._alphac = alphac + @property def dispersion(self) -> ReadFloatArray: """ @@ -149,7 +137,7 @@ def measure( alphac: float = None, e_delta: float = None, max_e_delta: float = None, - n_tune_meas: int = None, + n_avg_meas: int = None, sleep_between_meas: float = None, sleep_between_step: float = None, fit_order: int = None, @@ -175,7 +163,7 @@ def measure( max_e_delta: float Maximum autorized variation of relative energy during chromaticity measurment [default: from config] - n_tune_meas: int + n_avg_meas: int Default number of tune/orbit measurment per RF frequency [default: from config] sleep_between_meas: float Default time sleep between two tune measurment [default: from config] @@ -194,10 +182,10 @@ def measure( If the callback return false, then the process is aborted. """ n_step = n_step if n_step is not None else self._cfg.n_step - alphac = alphac if alphac is not None else self._cfg.alphac + alphac = alphac if alphac is not None else self._alphac e_delta = e_delta if e_delta is not None else self._cfg.e_delta max_e_delta = max_e_delta if max_e_delta is not None else self._cfg.max_e_delta - n_tune_meas = n_tune_meas if n_tune_meas is not None else self._cfg.n_tune_meas + n_avg_meas = n_avg_meas if n_avg_meas is not None else self._cfg.n_avg_meas sleep_between_meas = sleep_between_meas if sleep_between_meas is not None else self._cfg.sleep_between_meas sleep_between_step = sleep_between_step if sleep_between_step is not None else self._cfg.sleep_between_step fit_order = fit_order if fit_order is not None else self._cfg.fit_order @@ -205,11 +193,13 @@ def measure( fit_dispersion = fit_dispersion if fit_dispersion is not None else self._cfg.fit_dispersion if abs(e_delta) > abs(max_e_delta): - logger.warning("e_delta={e_delta} is greater than max_e_delta={max_e_delta}") + logger.warning(f"e_delta={e_delta} is greater than max_e_delta={max_e_delta}") if alphac is None: raise PyAMLException("Moment compaction factor is not defined") + self.register_callback(callback) + # Get devices self.check_peer() tm = self._peer.get_betatron_tune_monitor(self._cfg.betatron_tune_name) @@ -234,55 +224,55 @@ def measure( # ensure that, even if there is an issus, the script will finish by # reseting the RF frequency to its original value err = None - ok = True + aborted = False try: for i, f in enumerate(delta_frec): # TODO : Use set_and_wait once it is implemented ! rf.frequency.set(f0 + f) - - cb_data = {"step": i, "rf": f0 + f} - if not self.send_callback(Action.APPLY, callback, cb_data): - # Abort - rf.frequency.set(f0) - return False + self.send_callback(Action.APPLY, {"step": i, "rf": float(f0 + f)}) sleep(sleep_between_step) # Averaging - for j in range(n_tune_meas): + for j in range(n_avg_meas): tune = tm.tune.get() Q[i] += tune - cb_data = {"step": i, "avg_step": j, "rf": f0 + f, "tune": tune} + cb_data = {"step": i, "avg_step": j, "rf": float(f0 + f), "tune": tune} if bpms is not None: orb = bpms.positions.get() orbit[i] += orb cb_data["orbit"] = orb - if not self.send_callback(Action.MEASURE, callback, cb_data): - # Abort - rf.frequency.set(f0) - return False + self.send_callback(Action.MEASURE, cb_data) - if j < n_tune_meas - 1: + if j < n_avg_meas - 1: sleep(sleep_between_meas) - Q /= float(n_tune_meas) + Q /= float(n_avg_meas) if bpms is not None: - orbit /= float(n_tune_meas) + orbit /= float(n_avg_meas) except Exception as ex: err = ex + except KeyboardInterrupt as ex: + aborted = True finally: - # TODO : Use set_and_wait once it is implemented ! + # Restore rf.frequency.set(f0) - cb_data = {"step": i, "rf": f0} - ok = self.send_callback(Action.RESTORE, callback, cb_data) + self.send_callback(Action.RESTORE, {"step": i, "rf": f0}, raiseException=False) - if err: + if err is not None: raise (err) - self.fit(delta, Q, fit_order, orbit=orbit, fit_disp_order=fit_disp_order, do_plot=do_plot) + if aborted: + logger.warning(f"{self.get_name()} : measurement aborted") + return False + + if fit_dispersion: + self.fit(delta, Q, fit_order, orbit=orbit, fit_disp_order=fit_disp_order, do_plot=do_plot) + else: + self.fit(delta, Q, fit_order, do_plot=do_plot) - return ok + return True def fit(self, deltas, Q, order, orbit=None, fit_disp_order=None, do_plot=False): """ @@ -320,9 +310,15 @@ def fit(self, deltas, Q, order, orbit=None, fit_disp_order=None, do_plot=False): self.latest_measurement["dispersion"] = [dispx[:, 1], dispy[:, 1]] # First order dispersion if do_plot: - fig = plt.figure("Chromaticity_measurement") + if fit_disp_order is None: + fig = plt.figure("Chromaticity measurement") + cols = 1 + else: + fig = plt.figure("Chromaticity/Dispersion measurement") + cols = 2 + for i in range(2): - ax = fig.add_subplot(2, 1, 1 + i) + ax = fig.add_subplot(2, cols, 1 + i) ax.scatter(deltas * 100, Q[:, i]) title = "" for o in range(order, -1, -1): @@ -338,8 +334,17 @@ def fit(self, deltas, Q, order, orbit=None, fit_disp_order=None, do_plot=False): ax.plot(deltas * 100, np.polyval(chroma[i][::-1], deltas)) ax.set_title(title) - ax.set_xlabel("Momentum Shift, dp/p [%]") ax.set_ylabel("%s Tune" % ["Horizontal", "Vertical"][i]) - # ax.legend() + ax.set_xlabel("Momentum Shift, dp/p [%]") + + if fit_disp_order is not None: + ax = fig.add_subplot(2, cols, 3) + ax.plot(dispx[:, 1]) + ax.set_ylabel("Dispersion [m]") + ax = fig.add_subplot(2, cols, 4) + ax.plot(dispy[:, 1]) + ax.set_xlabel("BPM #") + ax.set_ylabel("Dispersion [m]") + fig.tight_layout() plt.show() diff --git a/pyaml/lattice/simulator.py b/pyaml/lattice/simulator.py index d6ffd670..e5d65eac 100644 --- a/pyaml/lattice/simulator.py +++ b/pyaml/lattice/simulator.py @@ -114,12 +114,6 @@ def get_description(self) -> str: """ return self._cfg.description - def set_energy(self, E: float): - self.ring.energy = E - # Needed by energy dependant element (i.e. magnet coil current calculation) - for m in self.get_all_elements(): - m.set_energy(E) - def create_magnet_strength_aggregator(self, magnets: list[Magnet]) -> ScalarAggregator: # No magnet aggregator for simulator return None diff --git a/pyaml/tuning_tools/chromaticity.py b/pyaml/tuning_tools/chromaticity.py index 9239fe1b..31735ca4 100644 --- a/pyaml/tuning_tools/chromaticity.py +++ b/pyaml/tuning_tools/chromaticity.py @@ -3,6 +3,7 @@ from .. import PyAMLException from ..common.element import ElementConfigModel +from ..diagnostics.chromaticity_monitor import ChomaticityMonitor from .response_matrix_data import ResponseMatrixData from .tuning_tool import TuningTool @@ -30,15 +31,16 @@ class ConfigModel(ElementConfigModel): Parameters ---------- - quad_array_name : str - Array name of quad used to adjust the tune - betatron_tune_name : str - Name of the diagnostic pyaml device for measuring the tune - quad_delta : float - Delta strength used to get the response matrix + sextu_array_name : str + Array name of sextu used to adjust the chromaticity + chromaticty_monitor_name : str + Name of the diagnostic pyaml device for measuring the chromaticity + response_matrix : str | ResponseMatrixData + filename or data of the chromaticity response matrix """ sextu_array_name: str + chromaticty_monitor_name: str response_matrix: str | ResponseMatrixData @@ -82,6 +84,11 @@ def load(self, load_path: Path): self._response_matrix = np.array(self._cfg.response_matrix._cfg.matrix) self._correctionmat = np.linalg.pinv(self._response_matrix) + @property + def _cm(self) -> "ChomaticityMonitor": + self.check_peer() + return self._peer.get_chromaticity_monitor(self._cfg.chromaticty_monitor_name) + @property def _sextu(self) -> "MagnetArray": self.check_peer() @@ -93,36 +100,36 @@ def get(self): """ return self._setpoint - # def readback(self): - # """ - # Return the betatron tune measurement - # """ - # self.check_peer() - # return self._tm.tune.get() - - # def set(self, tune: np.array, iter: int = 1, wait_time: float = 0.0): - # """ - # Sets the tune - # - # Parameters - # ---------- - # tune : np.array - # Tune setpoint - # iter_nb : int - # Number of iteration - # wait_time : float - # Time to wait in second between 2 iterations - # """ - # for i in range(iter): - # diff_tune = tune - self.readback() - # if i == iter: - # wait_time = 0 # do not wait on last iteration - # self.add(diff_tune, wait_time) - # self._setpoint = np.array(tune) + def readback(self): + """ + Launch a chromaticty scan and returns the measured chromaticity. + """ + self._cm.measure() + return self._cm.chromaticity.get() + + def set(self, chroma: np.array, iter: int = 1, wait_time: float = 0.0): + """ + Sets the chromaticity + + Parameters + ---------- + chromaticity : np.array + Chromaticity setpoint + iter_nb : int + Number of iteration + wait_time : float + Time to wait in second between 2 iterations + """ + for i in range(iter): + diff_chroma = chroma - self.readback() + if i == iter: + wait_time = 0 # do not wait on last iteration + self.add(diff_chroma, wait_time) + self._setpoint = np.array(chroma) def correct(self, dchroma: np.array) -> np.array: """ - Return delta strengths for tune correction + Return delta strengths for chromaticity correction Parameters ---------- diff --git a/pyaml/tuning_tools/chromaticity_response_matrix.py b/pyaml/tuning_tools/chromaticity_response_matrix.py index 2aedafa5..783dacdf 100644 --- a/pyaml/tuning_tools/chromaticity_response_matrix.py +++ b/pyaml/tuning_tools/chromaticity_response_matrix.py @@ -3,10 +3,11 @@ from typing import Callable, Optional import numpy as np +from pydantic import ConfigDict from ..common.constants import Action from ..common.element import ElementConfigModel -from .measurement_tool import MeasurementTool +from .measurement_tool import MeasurementTool, MeasurementToolConfigModel from .response_matrix_data import ConfigModel as ResponseMatrixDataConfigModel logger = logging.getLogger(__name__) @@ -14,7 +15,7 @@ PYAMLCLASS = "ChromaticityResponseMatrix" -class ConfigModel(ElementConfigModel): +class ConfigModel(MeasurementToolConfigModel): """ Configuration model for Tune response matrix @@ -26,33 +27,20 @@ class ConfigModel(ElementConfigModel): Name of the diagnostic chromaticy monitor sextu_delta : float Delta strength used to get the response matrix - n_step: int, optional - Number of step for fitting the tune [-quad_delta/n_step..quad_delta/n_step] - Default 1 - sleep_between_step: float - Default time sleep after quad exitation - Default: 0 - n_avg_meas : int, optional - Default number of tune measurement per step used for averaging - Default 1 - sleep_between_meas: float - Default time sleep between two tune measurment - Default: 0 """ + model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") + sextu_array_name: str chromaticity_name: str sextu_delta: float - n_step: Optional[int] = 1 - sleep_between_step: Optional[float] = 0 - n_avg_meas: Optional[int] = 1 - sleep_between_meas: Optional[float] = 0 class ChromaticityResponseMatrix(MeasurementTool): def __init__(self, cfg: ConfigModel): super().__init__(cfg.name) self._cfg = cfg + self.aborted = False def measure( self, @@ -94,6 +82,7 @@ def measure( step:int The current step avg_step:int The current avg step magnet:Magnet The magnet being excited + strength:Magnet strength chroma:np.array The measured chroma (on Action.MEASURE) dchroma:np.array The chroma variation (on Action.RESTORE) @@ -103,11 +92,13 @@ def measure( sextus = self._peer.get_magnets(self._cfg.sextu_array_name) cm = self._peer.get_chromaticity_monitor(self._cfg.chromaticity_name) + self.register_callback(callback) + chromamat = np.zeros((len(sextus), 2)) initial_chroma = None if not cm.measure(callback=callback): - # Abort + # Aborted return False initial_chroma = cm.chromaticity.get() @@ -117,55 +108,71 @@ def measure( sleep_step = sleep_between_step if sleep_between_step is not None else self._cfg.sleep_between_step sleep_meas = sleep_between_meas if sleep_between_meas is not None else self._cfg.sleep_between_meas - for qidx, m in enumerate(sextus): - str = m.strength.get() # Initial strength - deltas = np.linspace(-delta, delta, nb_step) - Qp = np.zeros((nb_step, 2)) - - for step, d in enumerate(deltas): - # apply strength - m.strength.set(str + d) - - if not self.send_callback(Action.APPLY, callback, {"step": qidx, "magnet": m}): - m.strength.set(str) # restore strength - return False - - time.sleep(sleep_step) - - # Chroma averaging - Qp[step] = np.zeros(2) - for avg in range(nb_meas): - if not cm.measure(callback=callback): - aborted = True - m.strength.set(str) # restore strength - return False - chroma = cm.chromaticity.get() - Qp[step] += chroma - - if not self.send_callback( - Action.MEASURE, callback, {"step": qidx, "avg_step": avg, "magnet": m, "chroma": chroma} - ): - m.strength.set(str) # restore strength - return False - - if avg < nb_meas - 1: - time.sleep(sleep_meas) - Qp[step] /= float(nb_meas) - - # Fit and fill matrix with the slopes - if nb_step == 1: - chromamat[qidx] = (Qp - initial_chroma) / deltas[0] - else: - coefs = np.polynomial.polynomial.polyfit(deltas, Qp, 1) - chromamat[qidx] = coefs[1] - - # Restore strength - m.strength.set(str) - if not self.send_callback( - Action.RESTORE, callback, {"step": qidx, "magnet": m, "dchroma": chromamat[qidx]} - ): - aborted = True - break + err = None + aborted = False + try: + for qidx, m in enumerate(sextus): + str = m.strength.get() # Initial strength + deltas = np.linspace(-delta, delta, nb_step) + Qp = np.zeros((nb_step, 2)) + + for step, d in enumerate(deltas): + # apply strength + m.strength.set(str + d) + + self.send_callback(Action.APPLY, {"step": qidx, "magnet": m.get_name(), "strength": float(str + d)}) + + time.sleep(sleep_step) + + # Chroma averaging + Qp[step] = np.zeros(2) + for avg in range(nb_meas): + if not cm.measure(callback=callback): + raise KeyboardInterrupt + chroma = cm.chromaticity.get() + Qp[step] += chroma + + self.send_callback( + Action.MEASURE, {"step": qidx, "avg_step": avg, "magnet": m.get_name(), "chroma": chroma} + ) + + if avg < nb_meas - 1: + time.sleep(sleep_meas) + Qp[step] /= float(nb_meas) + + # Fit and fill matrix with the slopes + if nb_step == 1: + chromamat[qidx] = (Qp - initial_chroma) / deltas[0] + else: + coefs = np.polynomial.polynomial.polyfit(deltas, Qp, 1) + chromamat[qidx] = coefs[1] + + # Restore strength + m.strength.set(str) + self.send_callback( + Action.RESTORE, + {"step": qidx, "magnet": m.get_name(), "strength": float(str), "dchroma": chromamat[qidx]}, + ) + + except Exception as ex: + err = ex + except KeyboardInterrupt as ex: + aborted = True + finally: + # Restore + m.strength.set(str) # restore strength + self.send_callback( + Action.RESTORE, + {"step": qidx, "magnet": m.get_name(), "strength": float(str), "dchroma": chromamat[qidx]}, + raiseException=False, + ) + + if err is not None: + raise (err) + + if aborted: + logger.warning(f"{self.get_name()} : measurement aborted") + return False self.latest_measurement = ResponseMatrixDataConfigModel( matrix=chromamat.T.tolist(), @@ -173,3 +180,5 @@ def measure( observable_names=[cm.get_name() + ".x", cm.get_name() + ".y"], ).model_dump() self.latest_measurement["type"] = "pyaml.tuning_tools.response_matrix_data" + + return True diff --git a/pyaml/tuning_tools/dispersion.py b/pyaml/tuning_tools/dispersion.py index 0e9344d1..ef165b87 100644 --- a/pyaml/tuning_tools/dispersion.py +++ b/pyaml/tuning_tools/dispersion.py @@ -65,19 +65,21 @@ def measure( skip_save=True, ) + self.register_callback(callback) + aborted = False for code, measurement in generator: callback_data = measurement.dispersion_data # to be defined better if code is DispersionCode.AFTER_SET: - if not self.send_callback(Action.APPLY, callback, callback_data): + if not self.send_callback(Action.APPLY, callback_data): if aborted: break elif code is DispersionCode.AFTER_GET: - if not self.send_callback(Action.MEASURE, callback, callback_data): + if not self.send_callback(Action.MEASURE, callback_data): aborted = True break elif code is DispersionCode.AFTER_RESTORE: - if not self.send_callback(Action.RESTORE, callback, callback_data): + if not self.send_callback(Action.RESTORE, callback_data): aborted = True break diff --git a/pyaml/tuning_tools/measurement_tool.py b/pyaml/tuning_tools/measurement_tool.py index 90fc72e2..0b0b6151 100644 --- a/pyaml/tuning_tools/measurement_tool.py +++ b/pyaml/tuning_tools/measurement_tool.py @@ -1,10 +1,12 @@ import logging from abc import ABCMeta, abstractmethod from pathlib import Path -from typing import TYPE_CHECKING, Callable, Self +from typing import TYPE_CHECKING, Callable, Optional, Self + +from pydantic import ConfigDict from ..common.constants import Action -from ..common.element import Element +from ..common.element import Element, ElementConfigModel from ..common.exception import PyAMLException if TYPE_CHECKING: @@ -13,6 +15,34 @@ logger = logging.getLogger(__name__) +class MeasurementToolConfigModel(ElementConfigModel): + """ + Measurement tool configuration model + + Parameters + ---------- + n_step: int, optional + Number of measurement step [-delta/n_step..delta/n_step] + Default 1 + sleep_between_step: float, optional + Default sleep time after an actuator excitation + Default: 0 + n_avg_meas : int, optional + Default number of measurement per step used for averaging + Default 1 + sleep_between_meas: float, optional + Default sleep time between two measurments + Default: 0 + """ + + model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") + + n_step: Optional[int] = 1 + sleep_between_step: Optional[float] = 0 + n_avg_meas: Optional[int] = 1 + sleep_between_meas: Optional[float] = 0 + + class MeasurementTool(Element, metaclass=ABCMeta): """ Base class for measurement tool such as reponse matrix measurement or other scans. @@ -22,6 +52,7 @@ def __init__(self, name): super().__init__(name) self.latest_measurement: dict = None self._peer: "ElementHolder" = None # Peer: ControlSystem or Simulator + self._callback: Callable = None @abstractmethod def measure(self) -> bool: @@ -86,7 +117,7 @@ def save(self, save_path: Path, with_type: str = "json"): else: raise PyAMLException(f"ERROR: Unknown file type to save as: {with_type}.") - def send_callback(self, action: Action, callback: Callable, cb_data: dict) -> bool: + def send_callback(self, action: Action, cb_data: dict, raiseException: bool = True): """ Send callback from this Measurement tool to the caller. If the callback returns False, the scan is aborted and actuators are restored to their orignal values. @@ -106,17 +137,22 @@ def callback(action: Action, data: dict): action: Action See :py:class:`pyaml.common.constants.Action` - callback: Callable - Callback to be executed - cb_data: dict Callback data """ - if callback is not None: + ok = True + if self._callback is not None: # Add source - cb_data["source"] = self.__class__.__name__ - return callback(action, cb_data) - return True + cb_data["mode"] = f"{self.get_peer()}" + cb_data["source_name"] = f"{self.get_name()}" + ok = self._callback(action, cb_data) + if not ok and raiseException: + # Abort, same as ctrl+C + raise KeyboardInterrupt + return ok + + def register_callback(self, callback: Callable): + self._callback = callback def attach(self, peer: "ElementHolder") -> Self: """ diff --git a/pyaml/tuning_tools/orbit_response_matrix.py b/pyaml/tuning_tools/orbit_response_matrix.py index 2a42b1d5..4bc9d959 100644 --- a/pyaml/tuning_tools/orbit_response_matrix.py +++ b/pyaml/tuning_tools/orbit_response_matrix.py @@ -10,16 +10,15 @@ from ..common.constants import Action from ..common.element import ElementConfigModel from ..external.pySC_interface import pySCInterface -from .measurement_tool import MeasurementTool +from .measurement_tool import MeasurementTool, MeasurementToolConfigModel from .orbit_response_matrix_data import ConfigModel as OrbitResponseMatrixDataConfigModel -from .orbit_response_matrix_data import OrbitResponseMatrixData logger = logging.getLogger(__name__) PYAMLCLASS = "OrbitResponseMatrix" -class ConfigModel(ElementConfigModel): +class ConfigModel(MeasurementToolConfigModel): """ Configuration model for orbit response matrix measurement @@ -56,7 +55,9 @@ def __init__(self, cfg: ConfigModel): def measure( self, corrector_names: Optional[List[str]] = None, - set_wait_time: float = 0, + sleep_between_step: Optional[float] = None, + n_avg_meas: Optional[int] = None, + sleep_between_meas: Optional[float] = None, callback: Optional[Callable] = None, ): """ @@ -64,18 +65,32 @@ def measure( Parameters ---------- + sleep_between_step: float + Default time sleep after quad exitation + Default: from config + n_avg_meas : int, optional + Default number of tune measurement per step used for averaging + Default from config + sleep_between_meas: float + Default time sleep between two tune measurment + Default: from config callback : Callable, optional example: callback(action:int, callback_data: 'Complicated struct') callback is executed after each strength setting and after each orbit reading. If the callback returns false, then the process is aborted. """ + nb_meas = n_avg_meas if n_avg_meas is not None else self._cfg.n_avg_meas + sleep_step = sleep_between_step if sleep_between_step is not None else self._cfg.sleep_between_step + sleep_meas = sleep_between_meas if sleep_between_meas is not None else self._cfg.sleep_between_meas + element_holder = self._peer interface = pySCInterface( element_holder=element_holder, bpm_array_name=self.bpm_array_name, ) - interface.set_wait_time = set_wait_time + # TODO handle sleep_meas + interface.set_wait_time = sleep_step if corrector_names is None: logger.info( @@ -90,29 +105,44 @@ def measure( corrector_names=corrector_names, delta=self.corrector_delta, skip_save=True, + shots_per_orbit=nb_meas, ) pySC.disable_pySC_rich() aborted = False - for code, measurement in generator: - callback_data = measurement.response_data # to be defined better - if code is ResponseCode.AFTER_SET: - if not self.send_callback(Action.APPLY, callback, callback_data): - if aborted: - break - elif code is ResponseCode.AFTER_GET: - if not self.send_callback(Action.MEASURE, callback, callback_data): - aborted = True - break - elif code is ResponseCode.AFTER_RESTORE: - logger.info(f"Measured response of {measurement.last_input}.") - if not self.send_callback(Action.RESTORE, callback, callback_data): - aborted = True - break + err = None + step = 0 + try: + self.register_callback(callback) + for code, measurement in generator: + callback_data = measurement.response_data # to be defined better + if code is ResponseCode.AFTER_SET: + self.send_callback(Action.APPLY, callback_data) + elif code is ResponseCode.AFTER_GET: + self.send_callback(Action.MEASURE, callback_data) + elif code is ResponseCode.AFTER_RESTORE: + logger.info(f"Measured response of {measurement.last_input}.") + self.send_callback(Action.RESTORE, callback_data) + step += 1 + except Exception as ex: + err = ex + except KeyboardInterrupt as ex: + aborted = True + finally: + # Restore steerer strength + # TODO + self.send_callback( + Action.RESTORE, + {"step": step}, + raiseException=False, + ) + + if err is not None: + raise (err) if aborted: - logger.warning("Measurement aborted! Settings have not been restored.") - return + logger.warning(f"{self.get_name()} : measurement aborted (settings not restored)") + return False orm_data = self._pySC_response_data_to_ORMData(measurement.response_data.model_dump()) self.latest_measurement = orm_data.model_dump() diff --git a/pyaml/tuning_tools/tune.py b/pyaml/tuning_tools/tune.py index b6d8ad10..ce4090b7 100644 --- a/pyaml/tuning_tools/tune.py +++ b/pyaml/tuning_tools/tune.py @@ -1,6 +1,15 @@ +import logging from pathlib import Path +from time import sleep from typing import TYPE_CHECKING +import numpy as np + +try: + from typing import Self # Python 3.11+ +except ImportError: + from typing_extensions import Self # Python 3.10 and earlier + from .. import PyAMLException from ..common.element import ElementConfigModel from .response_matrix_data import ResponseMatrixData @@ -10,15 +19,6 @@ from ..arrays.magnet_array import MagnetArray from ..diagnostics.tune_monitor import BetatronTuneMonitor -try: - from typing import Self # Python 3.11+ -except ImportError: - from typing_extensions import Self # Python 3.10 and earlier -import logging -import time - -import numpy as np - logger = logging.getLogger(__name__) # Define the main class name for this module @@ -37,6 +37,7 @@ class ConfigModel(ElementConfigModel): Name of the diagnostic pyaml device for measuring the tune quad_delta : float Delta strength used to get the response matrix + """ quad_array_name: str @@ -154,5 +155,5 @@ def add(self, dtune: np.array, wait_time: float = 0.0): strengths = self._quads.strengths.get() strengths += self.correct(dtune) self._quads.strengths.set(strengths) - time.sleep(wait_time) + sleep(wait_time) self._setpoint += dtune diff --git a/pyaml/tuning_tools/tune_response_matrix.py b/pyaml/tuning_tools/tune_response_matrix.py index c2290542..2c0d57db 100644 --- a/pyaml/tuning_tools/tune_response_matrix.py +++ b/pyaml/tuning_tools/tune_response_matrix.py @@ -1,12 +1,12 @@ import logging -import time +from time import sleep from typing import Callable, Optional import numpy as np +from pydantic import ConfigDict from ..common.constants import Action -from ..common.element import ElementConfigModel -from .measurement_tool import MeasurementTool +from .measurement_tool import MeasurementTool, MeasurementToolConfigModel from .response_matrix_data import ConfigModel as ResponseMatrixDataConfigModel logger = logging.getLogger(__name__) @@ -14,7 +14,7 @@ PYAMLCLASS = "TuneResponseMatrix" -class ConfigModel(ElementConfigModel): +class ConfigModel(MeasurementToolConfigModel): """ Configuration model for Tune response matrix @@ -26,27 +26,13 @@ class ConfigModel(ElementConfigModel): Name of the diagnostic pyaml device for measuring the tune quad_delta : float Delta strength used to get the response matrix - n_step: int, optional - Number of step for fitting the tune [-quad_delta/n_step..quad_delta/n_step] - Default 1 - sleep_between_step: float - Default time sleep after quad exitation - Default: 0 - n_tune_meas : int, optional - Default number of tune measurement per step used for averaging - Default 1 - sleep_between_meas: float - Default time sleep between two tune measurment - Default: 0 """ + model_config = ConfigDict(arbitrary_types_allowed=True, extra="forbid") + quad_array_name: str betatron_tune_name: str quad_delta: float - n_step: Optional[int] = 1 - sleep_between_step: Optional[float] = 0 - n_tune_meas: Optional[int] = 1 - sleep_between_meas: Optional[float] = 0 class TuneResponseMatrix(MeasurementTool): @@ -59,7 +45,7 @@ def measure( quad_delta: Optional[float] = None, n_step: Optional[int] = None, sleep_between_step: Optional[float] = None, - n_tune_meas: Optional[int] = None, + n_avg_meas: Optional[int] = None, sleep_between_meas: Optional[float] = None, callback: Optional[Callable] = None, ): @@ -76,7 +62,7 @@ def measure( sleep_between_step: float Default time sleep after quad exitation Default: from config - n_tune_meas : int, optional + n_avg_meas : int, optional Default number of tune measurement per step used for averaging Default from config sleep_between_meas: float @@ -94,6 +80,7 @@ def measure( step:int The current step avg_step:int The current avg step magnet:Magnet The magnet being excited + strength:float Magnet strength tune:np.array The measured tune (on Action.MEASURE) dtune:np.array The tune variation (on Action.RESTORE) @@ -107,58 +94,72 @@ def measure( initial_tune = tm.tune.get() delta = quad_delta if quad_delta is not None else self._cfg.quad_delta nb_step = n_step if n_step is not None else self._cfg.n_step - nb_meas = n_tune_meas if n_tune_meas is not None else self._cfg.n_tune_meas + nb_meas = n_avg_meas if n_avg_meas is not None else self._cfg.n_avg_meas sleep_step = sleep_between_step if sleep_between_step is not None else self._cfg.sleep_between_step sleep_meas = sleep_between_meas if sleep_between_meas is not None else self._cfg.sleep_between_meas + self.register_callback(callback) aborted = False - - for qidx, m in enumerate(quads): - str = m.strength.get() # Initial strength - deltas = np.linspace(-delta, delta, nb_step) - Q = np.zeros((nb_step, 2)) - - for step, d in enumerate(deltas): - # apply strength - m.strength.set(str + d) - - if not self.send_callback(Action.APPLY, callback, {"step": qidx, "magnet": m}): - aborted = True - m.strength.set(str) # restore strength - break - - time.sleep(sleep_step) - - # Tune averaging - Q[step] = np.zeros(2) - for avg in range(nb_meas): - tune = tm.tune.get() - Q[step] += tune - if not self.send_callback( - Action.MEASURE, callback, {"step": qidx, "avg_step": avg, "magnet": m, "tune": tune} - ): - aborted = True - m.strength.set(str) # restore strength - break - if avg < nb_meas - 1: - time.sleep(sleep_meas) - Q[step] /= float(nb_meas) - - if aborted: - break - - # Fit and fill matrix with the slopes - if nb_step == 1: - tunemat[qidx] = (Q - initial_tune) / deltas[0] - else: - coefs = np.polynomial.polynomial.polyfit(deltas, Q, 1) - tunemat[qidx] = coefs[1] - + err = None + try: + for qidx, m in enumerate(quads): + str = m.strength.get() # Initial strength + deltas = np.linspace(-delta, delta, nb_step) + Q = np.zeros((nb_step, 2)) + + for step, d in enumerate(deltas): + # apply strength + m.strength.set(str + d) + + self.send_callback(Action.APPLY, {"step": qidx, "magnet": m.get_name(), "strength": float(str + d)}) + + sleep(sleep_step) + + # Tune averaging + Q[step] = np.zeros(2) + for avg in range(nb_meas): + tune = tm.tune.get() + Q[step] += tune + self.send_callback( + Action.MEASURE, {"step": qidx, "avg_step": avg, "magnet": m.get_name(), "tune": tune} + ) + if avg < nb_meas - 1: + sleep(sleep_meas) + Q[step] /= float(nb_meas) + + # Fit and fill matrix with the slopes + if nb_step == 1: + tunemat[qidx] = (Q - initial_tune) / deltas[0] + else: + coefs = np.polynomial.polynomial.polyfit(deltas, Q, 1) + tunemat[qidx] = coefs[1] + + # Restore strength + m.strength.set(str) + self.send_callback( + Action.RESTORE, + {"step": qidx, "magnet": m.get_name(), "strength": float(str), "dtune": tunemat[qidx]}, + ) + + except Exception as ex: + err = ex + except KeyboardInterrupt as ex: + aborted = True + finally: # Restore strength m.strength.set(str) - if not self.send_callback(Action.RESTORE, callback, {"step": qidx, "magnet": m, "dtune": tunemat[qidx]}): - aborted = True - break + self.send_callback( + Action.RESTORE, + {"step": qidx, "magnet": m.get_name(), "strength": float(str), "dtune": tunemat[qidx]}, + raiseException=False, + ) + + if err is not None: + raise (err) + + if aborted: + logger.warning(f"{self.get_name()} : measurement aborted") + return False self.latest_measurement = ResponseMatrixDataConfigModel( matrix=tunemat.T.tolist(), @@ -166,3 +167,5 @@ def measure( observable_names=[tm.get_name() + ".x", tm.get_name() + ".y"], ).model_dump() self.latest_measurement["type"] = "pyaml.tuning_tools.response_matrix_data" + + return True diff --git a/pyaml/tuning_tools/tuning_tool.py b/pyaml/tuning_tools/tuning_tool.py index bd2d6753..8f043bc4 100644 --- a/pyaml/tuning_tools/tuning_tool.py +++ b/pyaml/tuning_tools/tuning_tool.py @@ -1,11 +1,8 @@ -import logging from typing import Self from ..common.element import Element from ..common.element_holder import ElementHolder -logger = logging.getLogger(__name__) - class TuningTool(Element): """ diff --git a/tests/config/EBSOrbit.yaml b/tests/config/EBSOrbit.yaml index 4f58a981..39df579a 100644 --- a/tests/config/EBSOrbit.yaml +++ b/tests/config/EBSOrbit.yaml @@ -2,6 +2,7 @@ type: pyaml.accelerator facility: ESRF machine: sr energy: 6e9 +alphac: 8.623614679299252e-05 simulators: - type: pyaml.lattice.simulator lattice: sr/lattices/ebs.mat @@ -49,6 +50,7 @@ devices: betatron_tune_name: BETATRON_TUNE rf_plant_name: RF bpm_array_name: BPM + n_step: 5 - type: pyaml.tuning_tools.chromaticity_response_matrix name: DEFAULT_CHROMATICITY_RESPONSE_MATRIX sextu_array_name: Sext @@ -56,6 +58,7 @@ devices: sextu_delta: 1e-6 - type: pyaml.tuning_tools.chromaticity name: DEFAULT_CHROMATICITY_CORRECTION + chromaticty_monitor_name: CHROMATICITY_MONITOR sextu_array_name: Sext response_matrix: file:ideal_chroma_resp.json - type: pyaml.tuning_tools.orbit diff --git a/tests/config/EBS_chromaticity.yaml b/tests/config/EBS_chromaticity.yaml index dafffda4..d7e45bcf 100644 --- a/tests/config/EBS_chromaticity.yaml +++ b/tests/config/EBS_chromaticity.yaml @@ -2,6 +2,7 @@ type: pyaml.accelerator facility: ESRF machine: sr energy: 6e9 +alphac: 8.623614679299252e-05 simulators: - type: pyaml.lattice.simulator lattice: sr/lattices/ebs.mat @@ -32,6 +33,7 @@ devices: betatron_tune_name: BETATRON_TUNE rf_plant_name: RF bpm_array_name: BPM + n_step: 5 - type: pyaml.rf.rf_plant name: RF masterclock: diff --git a/tests/test_chromaticity_monitor.py b/tests/test_chromaticity_monitor.py index 871250bd..5f296ea9 100644 --- a/tests/test_chromaticity_monitor.py +++ b/tests/test_chromaticity_monitor.py @@ -11,14 +11,10 @@ def callback(action: int, data: dict): def test_simulator_chromaticity_monitor(): sr: Accelerator = Accelerator.load("tests/config/EBS_chromaticity.yaml", ignore_external=True) - sr.design.get_lattice().disable_6d() - mcf = sr.design.get_lattice().get_mcf() sr.design.get_lattice().enable_6d() chromaAT = sr.design.get_lattice().get_chrom()[:-1] chromaticity_monitor = sr.design.get_chromaticity_monitor("CHROMATICITY_MONITOR") - chromaticity_monitor.measure( - alphac=mcf, fit_dispersion=True, sleep_between_meas=0, sleep_between_step=0, callback=callback - ) + chromaticity_monitor.measure(fit_dispersion=True, callback=callback) chroma = chromaticity_monitor.chromaticity.get() assert np.abs(chroma[0] - chromaAT[0]) < 1e-2 assert np.abs(chroma[1] - chromaAT[1]) < 1e-2 diff --git a/tests/test_tuning_tools.py b/tests/test_tuning_tools.py index ef2f6407..72fdc5d3 100644 --- a/tests/test_tuning_tools.py +++ b/tests/test_tuning_tools.py @@ -4,33 +4,15 @@ from pyaml.common.constants import Action -def tune_callback(action: Action, data: dict): - source = data["source"] - if action == Action.APPLY: - # ACTION_APPLY - step = data["step"] - m = data["magnet"] - print(f"{action} {source}: #{step} {m.get_name()} = {m.strength.get()}") - elif action == Action.MEASURE: - # On ACTION_MEASURE, the tune is passed as argument - step = data["step"] - avg_step = data["avg_step"] - m = data["magnet"] - tune = data["tune"] - print(f"{action} {source}: #{step} {avg_step} {m.get_name()} q={tune}") - elif action == Action.RESTORE: - # On ACTION_RESTORE, the delta tune is passed as argument - step = data["step"] - m = data["magnet"] - dtune = data["dtune"] - print(f"{action} {source}: #{step} {m.get_name()} dq/dk={dtune}") +def callback(action: Action, data: dict): + print(f"{action}, data:{data}") return True -def test_tuning_tools(): - sr = Accelerator.load("tests/config/EBSTune.yaml", use_fast_loader=False) +def test_tune_tool(): + sr = Accelerator.load("tests/config/EBSTune.yaml", use_fast_loader=True) sr.design.get_lattice().disable_6d() - sr.design.trm.measure(callback=tune_callback) + sr.design.trm.measure(callback=callback) sr.design.trm.save("tunemat.json") sr.design.tune.load("tunemat.json") sr.design.tune.set([0.17, 0.32], iter=2) @@ -40,7 +22,7 @@ def test_tuning_tools(): def test_tune_add(): - sr = Accelerator.load("tests/config/EBSTune.yaml", use_fast_loader=False) + sr = Accelerator.load("tests/config/EBSTune.yaml", use_fast_loader=True) sr.design.get_lattice().disable_6d() sr.design.tune.load("tunemat.json") tune_initial = sr.design.tune.readback() @@ -50,8 +32,21 @@ def test_tune_add(): np.testing.assert_allclose(tune - tune_initial, dtune, atol=1e-5) +def test_chroma_tool(): + sr: Accelerator = Accelerator.load("tests/config/EBSOrbit.yaml", use_fast_loader=True) + sr.design.get_lattice().enable_6d() + sr.design.chromaticity.set([8.0, 5.0], iter=2) + QpAT = sr.design.get_lattice().get_chrom()[:-1] + Qp = sr.design.chromaticity.readback() + assert np.abs(Qp[0] - 8.0) < 1e-3 + assert np.abs(Qp[1] - 5.0) < 1e-3 + assert np.abs(QpAT[0] - 8.0) < 1e-2 + assert np.abs(QpAT[1] - 5.0) < 1e-2 + + def test_chroma_add(): - sr: Accelerator = Accelerator.load("tests/config/EBSOrbit.yaml") + sr: Accelerator = Accelerator.load("tests/config/EBSOrbit.yaml", use_fast_loader=True) + sr.design.get_lattice().enable_6d() chromaAT = sr.design.get_lattice().get_chrom()[:-1] sr.design.chromaticity.add([0.5, 0.4]) chromaAT2 = sr.design.get_lattice().get_chrom()[:-1]