Skip to content
154 changes: 97 additions & 57 deletions src/framework/mpas_stream_manager.F
Original file line number Diff line number Diff line change
Expand Up @@ -1510,9 +1510,14 @@ subroutine MPAS_stream_mgr_reset_alarms(manager, streamID, direction, ierr)!{{{
alarm_cursor => stream % alarmList_out % head
do while (associated(alarm_cursor))
if (mpas_is_alarm_ringing(manager % streamClock, alarm_cursor % name, ierr=local_ierr)) then
call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr)
! Update variable output alarms with new interval based on forecast hour
call update_variable_output_alarm(manager, stream, alarm_cursor % name, ierr=local_ierr)
! For variable output streams, update_variable_output_alarm handles everything
! (removes old alarm, adds new one with correct timing)
! For regular streams, just reset the alarm
if (stream % timelevel_spec % is_parsed) then
call update_variable_output_alarm(manager, stream, alarm_cursor % name, ierr=local_ierr)
else
call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr)
end if
end if
alarm_cursor => alarm_cursor % next
end do
Expand Down Expand Up @@ -1541,13 +1546,24 @@ subroutine MPAS_stream_mgr_reset_alarms(manager, streamID, direction, ierr)!{{{
alarm_cursor => manager % alarms_out % head
do while (associated(alarm_cursor))
if (mpas_is_alarm_ringing(manager % streamClock, alarm_cursor % name, ierr=local_ierr)) then
call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr)
! Update variable output alarms for each stream associated with this alarm
! Check if ANY associated stream uses variable output
! If so, update_variable_output_alarm handles removal/recreation
! Otherwise, use standard reset
resetAlarms = .false. ! Reuse this flag to track if any variable stream handled it
stream_cursor => alarm_cursor % streamList % head
do while (associated(stream_cursor))
call update_variable_output_alarm(manager, stream_cursor, alarm_cursor % name, ierr=local_ierr)
! stream_cursor % xref points to the actual stream
if (stream_cursor % xref % timelevel_spec % is_parsed) then
call update_variable_output_alarm(manager, stream_cursor % xref, alarm_cursor % name, ierr=local_ierr)
resetAlarms = .true. ! A variable stream handled it
end if
stream_cursor => stream_cursor % next
end do
! For non-variable streams, reset the alarm normally
! But ONLY if no variable stream already handled it
if (.not. resetAlarms) then
call mpas_reset_clock_alarm(manager % streamClock, alarm_cursor % name, ierr=local_ierr)
end if
end if
alarm_cursor => alarm_cursor % next
end do
Expand Down Expand Up @@ -6064,9 +6080,9 @@ subroutine parse_all_timelevels(timelevels_str, timelevel_spec, stream_name, ier
return
end if

! Split the string by spaces
! Split the string by spaces (trim leading/trailing whitespaces first)
nullify(segments)
call mpas_split_string(timelevels_str, ' ', segments)
call mpas_split_string(trim(adjustl(timelevels_str)), ' ', segments)
n_segments = size(segments)

! Check for too many segments
Expand Down Expand Up @@ -6230,89 +6246,85 @@ subroutine parse_output_timelevel_spec(spec, start_hour, end_hour, interval_minu
real(kind=RKIND), intent(out) :: interval_minutes
integer, intent(out) :: ierr

character(len=StrKIND) :: spec_local
integer :: i, dash_count, dash_pos(2), len_spec
character(len=StrKIND), pointer, dimension(:) :: parts
integer :: i, n_parts, local_ierr
real(kind=RKIND) :: start_minutes, end_minutes, step_minutes
integer :: local_ierr

ierr = 0
dash_pos(1) = 0
dash_pos(2) = 0
spec_local = trim(adjustl(spec))
len_spec = len_trim(spec_local)

! Guard against empty string
if (len_spec == 0) then
if (len_trim(spec) == 0) then
ierr = 1
return
end if

! Count dashes to determine format
dash_count = 0
do i = 1, len_spec
if (spec_local(i:i) == '-') then
dash_count = dash_count + 1
if (dash_count <= 2) dash_pos(dash_count) = i
! Split by dash delimiter
nullify(parts)
call mpas_split_string(trim(adjustl(spec)), '-', parts)
n_parts = size(parts)

! Validate parts array - check for empty parts (from consecutive dashes or leading/trailing dashes)
do i = 1, n_parts
if (len_trim(parts(i)) == 0) then
ierr = 1
deallocate(parts)
return
end if
end do

! Parse based on number of dashes
if (dash_count == 0) then
! Parse based on number of parts
if (n_parts == 1) then
! Format: single time string (output at that time only)
call parse_time_string(spec_local, start_minutes, local_ierr)
call parse_time_string(parts(1), start_minutes, local_ierr)
if (local_ierr /= 0) then
ierr = 1
deallocate(parts)
return
end if
start_hour = start_minutes / 60.0_RKIND
end_hour = start_hour
interval_minutes = 60.0_RKIND ! Default, but won't matter for single time

else if (dash_count == 1) then
else if (n_parts == 2) then
! Format: start-stop (interval defaults to 1 hour)
if (dash_pos(1) <= 1 .or. dash_pos(1) >= len_spec) then
ierr = 1
return
end if

call parse_time_string(spec_local(1:dash_pos(1)-1), start_minutes, local_ierr)
call parse_time_string(parts(1), start_minutes, local_ierr)
if (local_ierr /= 0) then
ierr = 1
deallocate(parts)
return
end if

call parse_time_string(spec_local(dash_pos(1)+1:len_spec), end_minutes, local_ierr)
call parse_time_string(parts(2), end_minutes, local_ierr)
if (local_ierr /= 0) then
ierr = 1
deallocate(parts)
return
end if

start_hour = start_minutes / 60.0_RKIND
end_hour = end_minutes / 60.0_RKIND
interval_minutes = 60.0_RKIND

else if (dash_count == 2) then
else if (n_parts == 3) then
! Format: start-stop-step
if (dash_pos(1) <= 1 .or. dash_pos(2) <= dash_pos(1) + 1 .or. dash_pos(2) >= len_spec) then
ierr = 1
return
end if

call parse_time_string(spec_local(1:dash_pos(1)-1), start_minutes, local_ierr)
call parse_time_string(parts(1), start_minutes, local_ierr)
if (local_ierr /= 0) then
ierr = 1
deallocate(parts)
return
end if

call parse_time_string(spec_local(dash_pos(1)+1:dash_pos(2)-1), end_minutes, local_ierr)
call parse_time_string(parts(2), end_minutes, local_ierr)
if (local_ierr /= 0) then
ierr = 1
deallocate(parts)
return
end if

call parse_time_string(spec_local(dash_pos(2)+1:len_spec), step_minutes, local_ierr)
call parse_time_string(parts(3), step_minutes, local_ierr)
if (local_ierr /= 0) then
ierr = 1
deallocate(parts)
return
end if

Expand All @@ -6321,11 +6333,20 @@ subroutine parse_output_timelevel_spec(spec, start_hour, end_hour, interval_minu
interval_minutes = step_minutes

else
! More than 2 dashes - invalid format
! Invalid number of parts (0 or > 3 dashes)
ierr = 1
deallocate(parts)
return
end if

deallocate(parts)

! Check for setting mistake "start-step-stop". The correct one should be "start-stop-step"
if ((n_parts == 3) .and. (interval_minutes > end_hour * 60.0_RKIND + 1.0e-4_RKIND)) then
call mpas_log_write('ERROR: output_timelevels segment '''//trim(spec)//''': ' // &
'expected format is start-stop-step, but appears to be start-step-stop', MPAS_LOG_ERR)
end if

end subroutine parse_output_timelevel_spec!}}}


Expand All @@ -6341,25 +6362,27 @@ end subroutine parse_output_timelevel_spec!}}}
!> appropriate output interval in minutes.
!
!-----------------------------------------------------------------------
subroutine get_output_interval_from_timelevels(timelevel_spec, forecast_hour, interval_minutes, ierr)!{{{
subroutine get_output_interval_from_timelevels(timelevel_spec, forecast_hour, interval_minutes, ierr, next_output_hour)!{{{

implicit none

type(MPAS_timelevel_spec_type), intent(in) :: timelevel_spec
real(kind=RKIND), intent(in) :: forecast_hour
real(kind=RKIND), intent(out) :: interval_minutes
integer, intent(out) :: ierr
real(kind=RKIND), intent(out), optional :: next_output_hour

integer :: i
real(kind=RKIND) :: start_hour, end_hour, seg_interval
real(kind=RKIND) :: min_next_time
real(kind=RKIND) :: next_hour
logical :: found, is_single_time

ierr = 0
interval_minutes = 60.0_RKIND ! Default to 1 hour
if (present(next_output_hour)) next_output_hour = -1.0_RKIND
found = .false.
is_single_time = .false.
min_next_time = huge(1.0_RKIND)
next_hour = huge(1.0_RKIND)

! Guard against unparsed spec
if (.not. timelevel_spec % is_parsed .or. timelevel_spec % n_segments == 0) then
Expand Down Expand Up @@ -6395,23 +6418,25 @@ subroutine get_output_interval_from_timelevels(timelevel_spec, forecast_hour, in

! For single times, check if > current
if (abs(start_hour - end_hour) < 1.0e-6_RKIND) then
if (start_hour > forecast_hour + 1.0e-6_RKIND .and. start_hour < min_next_time) then
min_next_time = start_hour
if (start_hour > forecast_hour + 1.0e-6_RKIND .and. start_hour < next_hour) then
next_hour = start_hour
end if
else
! For ranges, check if start is after current
if (start_hour > forecast_hour + 1.0e-6_RKIND .and. start_hour < min_next_time) then
min_next_time = start_hour
if (start_hour > forecast_hour + 1.0e-6_RKIND .and. start_hour < next_hour) then
next_hour = start_hour
end if
end if
end do

! Calculate interval to next time
if (min_next_time < huge(1.0_RKIND)) then
interval_minutes = (min_next_time - forecast_hour) * 60.0_RKIND
if (next_hour < huge(1.0_RKIND)) then
interval_minutes = (next_hour - forecast_hour) * 60.0_RKIND
if (present(next_output_hour)) next_output_hour = next_hour
else
! No more times after this - interval_minutes = 0 signals last output
interval_minutes = 0.0_RKIND
if (present(next_output_hour)) next_output_hour = -1.0_RKIND
end if
return
end if
Expand Down Expand Up @@ -6512,7 +6537,7 @@ subroutine update_variable_output_alarm(manager, stream, alarm_name, ierr)!{{{

type (MPAS_Time_type) :: start_time, current_time, alarmTime_local
type (MPAS_TimeInterval_type) :: time_diff, alarmInterval_local
real (kind=RKIND) :: forecast_hour, next_start_hour
real (kind=RKIND) :: forecast_hour, next_start_hour, next_output_hour
real (kind=RKIND) :: interval_minutes, next_interval_minutes
integer :: local_ierr, ierr_tmp
integer (kind=I8KIND) :: seconds_diff
Expand All @@ -6534,14 +6559,15 @@ subroutine update_variable_output_alarm(manager, stream, alarm_name, ierr)!{{{
forecast_hour = real(seconds_diff, RKIND) / 3600.0_RKIND

! Get next interval using pre-parsed timelevel_spec
call get_output_interval_from_timelevels(stream % timelevel_spec, forecast_hour, interval_minutes, ierr_tmp)
! Also get the absolute next output hour to avoid floating-point drift
call get_output_interval_from_timelevels(stream % timelevel_spec, forecast_hour, interval_minutes, ierr_tmp, next_output_hour)
if (ierr_tmp /= MPAS_STREAM_MGR_NOERR .or. interval_minutes <= 0.0_RKIND) then
! Current hour not in any range - check for future range
call get_next_timelevel_start(stream % timelevel_spec, forecast_hour, next_start_hour, next_interval_minutes, ierr_tmp)
call mpas_remove_clock_alarm(manager % streamClock, alarm_name, ierr=ierr_tmp)

if (next_start_hour > 0.0_RKIND) then
! Schedule alarm for when the next range starts
! Schedule recurring alarm for when the next range starts
call mpas_set_timeInterval(time_diff, dt=next_start_hour * 3600.0_RKIND, ierr=ierr_tmp)
alarmTime_local = start_time + time_diff
call mpas_set_timeInterval(alarmInterval_local, dt=next_interval_minutes * 60.0_RKIND, ierr=ierr_tmp)
Expand All @@ -6561,8 +6587,16 @@ subroutine update_variable_output_alarm(manager, stream, alarm_name, ierr)!{{{
local_ierr = ior(local_ierr, ierr_tmp)

! Add new alarm with updated interval
! Set alarm time to current_time + interval so next ring is in the future
alarmTime_local = current_time + alarmInterval_local
! Use ABSOLUTE time from start_time to avoid floating-point drift
! This ensures discrete times like "0m 15m 1h3m 2 3" hit exactly
if (next_output_hour > 0.0_RKIND) then
! Compute alarm time absolutely: start_time + next_output_hour
call mpas_set_timeInterval(time_diff, dt=next_output_hour * 3600.0_RKIND, ierr=ierr_tmp)
alarmTime_local = start_time + time_diff
else
! Fallback to relative calculation (for ranges)
alarmTime_local = current_time + alarmInterval_local
end if
call mpas_add_clock_alarm(manager % streamClock, alarm_name, alarmTime_local, &
alarmTimeInterval=alarmInterval_local, ierr=ierr_tmp)
local_ierr = ior(local_ierr, ierr_tmp)
Expand Down Expand Up @@ -7171,14 +7205,20 @@ subroutine write_done_marker(filename, dminfo, blockWrite)!{{{
logical, intent(in) :: blockWrite

character(len=1024) :: marker_filename
character(len=8) :: date_str
character(len=10) :: time_str
integer :: unit_num

! For blockWrite mode, each rank writes its own file so each creates marker
! For normal parallel I/O, all ranks write same file so only rank 0 creates marker
if (blockWrite .or. dminfo % my_proc_id == IO_NODE) then
marker_filename = trim(filename) // '.done'
unit_num = 99
call date_and_time(date=date_str, time=time_str)
open(unit=unit_num, file=trim(marker_filename), status='replace', action='write')
! Write timestamp: YYYY-MM-DD HH:MM:SS
write(unit_num, '(A)') date_str(1:4)//'-'//date_str(5:6)//'-'//date_str(7:8)//' '// &
time_str(1:2)//':'//time_str(3:4)//':'//time_str(5:6)
close(unit_num)
end if

Expand Down
Loading