diff --git a/.emmyrc.json b/.emmyrc.json new file mode 100644 index 000000000..7929e98d7 --- /dev/null +++ b/.emmyrc.json @@ -0,0 +1,19 @@ +{ + "format": { + "externalTool": { + "program": "stylua", + "args": ["-", "--stdin-filepath", "${file}"] + } + }, + "runtime": { + "version": "LuaJIT" + }, + "diagnostics": { + "globals": ["ngx"], + "disable": ["unnecessary-if", "invert-if"], + "enables": [ + "incomplete-signature-doc", + "missing-global-doc" + ] + } +} diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index 7de778054..b450810b5 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -1,7 +1,10 @@ name: Lint on: - pull_request: {} + pull_request: + branches: + - main + - master workflow_dispatch: {} push: branches: @@ -41,3 +44,42 @@ jobs: with: additional_args: '--no-default-config --config .luacheckrc' files: ${{ steps.changed-files.outputs.all_changed_files }} + + lint: + runs-on: ubuntu-latest + permissions: + contents: read + issues: read + checks: write + pull-requests: write + if: (github.actor != 'dependabot[bot]') + + steps: + - name: Checkout source code + uses: actions/checkout@v6 + + - name: Download and install emmylua_check + run: | + # Download the specific release tarball + curl -L -o emmylua_check.tar.gz https://github.com/dquixote-jr/emmylua-analyzer-rust/releases/download/0.20.2/emmylua_check-linux-x64.tar.gz + + # Extract the contents + tar -xzf emmylua_check.tar.gz + + # Make the binary executable + chmod +x emmylua_check + + # Move it to the system PATH so it can be run from anywhere + sudo mv emmylua_check /usr/local/bin/ + + - name: Run Lints + run: | + emmy_failed=0 + + # Run emmylua check + emmylua_check -c ./.emmyrc.json --warnings-as-errors -- --path lib/ || emmy_failed=1 + + if [ $emmy_failed -ne 0 ]; then + echo "Linting failed! Check the logs above for details." + exit 1 + fi diff --git a/lib/resty/timerng/array.lua b/lib/resty/timerng/array.lua index 686969256..6141fa8f6 100644 --- a/lib/resty/timerng/array.lua +++ b/lib/resty/timerng/array.lua @@ -11,6 +11,10 @@ local setmetatable = setmetatable local array_pool = {} +---@class resty.timerng.array +---@field elts table +---@field first integer +---@field last integer local _M = {} local meta_table = { @@ -18,6 +22,11 @@ local meta_table = { } +---stateless iterator for the array +---@param array resty.timerng.array +---@param index? integer current index (nil to start) +---@return integer? next_index +---@return any? value function _M.next(array, index) if index == nil then index = 0 @@ -35,17 +44,20 @@ function _M.next(array, index) end +---@return integer function _M:length() return self.last - self.first + 1 end +---@return boolean function _M:is_empty() return self.first > self.last end +---@param value any function _M:push_left (value) local first = self.first - 1 self.first = first @@ -53,6 +65,7 @@ function _M:push_left (value) end +---@param value any function _M:push_right(value) local last = self.last + 1 self.last = last @@ -60,6 +73,7 @@ function _M:push_right(value) end +---@return any value function _M:pop_left () local first = self.first @@ -74,6 +88,7 @@ function _M:pop_left () end +---@return any value function _M:pop_right() local last = self.last @@ -88,6 +103,9 @@ function _M:pop_right() end +---create or reuse an array from the pool +---@param n? integer initial capacity (default 8) +---@return resty.timerng.array function _M.new(n) if n == nil then n = 8 @@ -109,6 +127,7 @@ function _M.new(n) end +---clear and return this array to the pool function _M:release() utils_table_clear(self.elts) self.first = 0 @@ -117,6 +136,9 @@ function _M:release() end +---move all elements from src into dst +---@param dst resty.timerng.array +---@param src? resty.timerng.array function _M.merge(dst, src) if src == nil then return diff --git a/lib/resty/timerng/constants.lua b/lib/resty/timerng/constants.lua index 0965a92d9..47bf3798f 100644 --- a/lib/resty/timerng/constants.lua +++ b/lib/resty/timerng/constants.lua @@ -1,5 +1,19 @@ local assert = assert +---@class resty.timerng.constants +---@field DEFAULT_DEBUG boolean +---@field DEFAULT_MIN_THREADS integer +---@field DEFAULT_MAX_THREADS integer +---@field DEFAULT_AUTO_SCALING_LOAD_THRESHOLD number +---@field DEFAULT_AUTO_SCALING_INERVAL number +---@field DEFAULT_RESTART_THREAD_AFTER_RUNS integer +---@field DEFAULT_FORCE_UPDATE_TIME boolean +---@field DEFAULT_RESOLUTION number +---@field DEFAULT_WHEEL_SETTING resty.timerng.wheel_setting +---@field MIN_RESOLUTION number +---@field TOLERANCE_OF_GRACEFUL_SHUTDOWN number +---@field SCALING_RECORD_INTERVAL number +---@field SCALING_INFO_LOG_INTERVAL number local _M = { -- disable debug mode DEFAULT_DEBUG = false, diff --git a/lib/resty/timerng/init.lua b/lib/resty/timerng/init.lua index 9ec5a9e6f..dfc92456f 100644 --- a/lib/resty/timerng/init.lua +++ b/lib/resty/timerng/init.lua @@ -30,10 +30,56 @@ local select = select local TIMER_ONCE = true local TIMER_REPEATED = false +---@class resty.timerng.wheel_setting +---@field level integer +---@field slots_for_each_level integer[] + +---@class resty.timerng.options +---@field debug? boolean +---@field restart_thread_after_runs? integer +---@field min_threads? integer +---@field max_threads? integer +---@field auto_scaling_load_threshold? number +---@field resolution? number +---@field wheel_setting? resty.timerng.wheel_setting +---@field auto_scaling_interval? number +---@field force_update_time? boolean + +---@class resty.timerng.internal_opt +---@field debug boolean +---@field wheel_setting resty.timerng.wheel_setting +---@field resolution number +---@field restart_thread_after_runs integer +---@field min_threads integer +---@field max_threads integer +---@field auto_scaling_load_threshold number +---@field auto_scaling_interval number +---@field force_update_time boolean + +---@class resty.timerng.sys_stats +---@field running integer +---@field pending integer +---@field waiting integer +---@field total integer +---@field runs integer + +---@class resty.timerng +---@field opt resty.timerng.internal_opt +---@field id_counter integer +---@field max_expire number +---@field enable boolean +---@field thread_group resty.timerng.thread.group +---@field jobs table +---@field is_first_start boolean +---@field sys_stats { running: integer, total: integer, runs: integer } +---@field debug_stats table +---@field wheels resty.timerng.wheel.group local _M = {} +---@param self resty.timerng +---@param job resty.timerng.job local function report_job_expire_callback_inernal(self, job) if not job.debug then return @@ -59,6 +105,8 @@ end +---@param self resty.timerng +---@param job resty.timerng.job local function report_job_cancel_callback_internal(self, job) self.sys_stats.total = self.sys_stats.total - 1 @@ -144,6 +192,9 @@ local function create(self, name, callback, delay, timer_type, argc, argv) end +---create a new timer-ng system +---@param options? resty.timerng.options +---@return resty.timerng function _M.new(options) local timer_sys = {} @@ -345,6 +396,9 @@ function _M.new(options) end +---start (or resume) the timer system +---@return boolean ok +---@return string? err function _M:start() if self.is_first_start then local ok, err = self.thread_group:spawn() @@ -367,16 +421,25 @@ function _M:start() end +---pause the timer system without destroying threads function _M:freeze() self.enable = false end +---kill all threads and shut down the timer system function _M:destroy() self.thread_group:kill() end +---schedule a named one-shot timer +---@param name string|nil timer name +---@param delay number seconds before firing +---@param callback function +---@param ... any extra arguments passed to callback +---@return string|false name_or_false timer name on success, false on failure +---@return string? err function _M:named_at(name, delay, callback, ...) assert(self.enable, "the timer module is not started") assert(type(callback) == "function", "expected `callback` to be a function") @@ -398,6 +461,13 @@ function _M:named_at(name, delay, callback, ...) end +---schedule a named recurring timer +---@param name string|nil timer name +---@param interval number seconds between firings +---@param callback function +---@param ... any extra arguments passed to callback +---@return string|false name_or_false timer name on success, false on failure +---@return string? err function _M:named_every(name, interval, callback, ...) assert(self.enable, "the timer module is not started") assert(type(callback) == "function", "expected `callback` to be a function") @@ -419,16 +489,32 @@ function _M:named_every(name, interval, callback, ...) end +---schedule an anonymous one-shot timer +---@param delay number seconds before firing +---@param callback function +---@param ... any extra arguments passed to callback +---@return string|false name_or_false +---@return string? err function _M:at(delay, callback, ...) return self:named_at(nil, delay, callback, ...) end +---schedule an anonymous recurring timer +---@param interval number seconds between firings +---@param callback function +---@param ... any extra arguments passed to callback +---@return string|false name_or_false +---@return string? err function _M:every(interval, callback, ...) return self:named_every(nil, interval, callback, ...) end +---resume a previously paused timer by name +---@param name string timer name +---@return boolean ok +---@return string? err function _M:resume(name) assert(type(name) == "string", "expected `name` to be a string") @@ -458,6 +544,10 @@ function _M:resume(name) end +---pause a timer by name +---@param name string timer name +---@return boolean ok +---@return string? err function _M:pause(name) assert(type(name) == "string", "expected `name` to be a string") @@ -474,6 +564,10 @@ function _M:pause(name) end +---cancel and remove a timer by name +---@param name string timer name +---@return boolean ok +---@return string? err function _M:cancel(name) assert(type(name) == "string", "expected `name` to be a string") @@ -492,11 +586,26 @@ function _M:cancel(name) end +---check whether a timer with the given name is managed by this system +---@param name string timer name +---@return boolean function _M:is_managed(name) return self.jobs[name] ~= nil end +---@class resty.timerng.stats_options +---@field verbose? boolean include per-timer details +---@field flamegraph? boolean include flamegraph data (requires debug mode) + +---@class resty.timerng.stats_result +---@field sys resty.timerng.sys_stats +---@field timers? table +---@field flamegraph? { running: string, pending: string, elapsed_time: string } + +---return system and (optionally) per-timer statistics +---@param options? resty.timerng.stats_options +---@return resty.timerng.stats_result function _M:stats(options) if not options then options = {} @@ -586,11 +695,13 @@ function _M:set_debug(status) end +---@return integer function _M:_debug_alive_worker_thread_count() return self.thread_group:get_alive_worker_thread_count() end +---@return integer function _M:_debug_expected_alive_worker_thread_count() return self.thread_group:get_expected_alive_worker_thread_count() end diff --git a/lib/resty/timerng/job.lua b/lib/resty/timerng/job.lua index 96802772f..263849ed4 100644 --- a/lib/resty/timerng/job.lua +++ b/lib/resty/timerng/job.lua @@ -36,9 +36,44 @@ local string_len = string.len local NAME_COUNTER = 0 local MAX_CALLSTACK_DEPTH = 128 +---@class resty.timerng.job +---@field _enable boolean +---@field _cancel boolean +---@field _running boolean +---@field _immediate boolean +---@field _once boolean +---@field name string +---@field callback function +---@field delay number +---@field debug boolean +---@field steps integer +---@field next_pointers? table +---@field argc integer +---@field argv table +---@field stats resty.timerng.job.stats +---@field meta resty.timerng.job.meta + +---@class resty.timerng.job.stats +---@field elapsed_time resty.timerng.job.elapsed_time +---@field runs integer +---@field finish integer +---@field last_err_msg string + +---@class resty.timerng.job.elapsed_time +---@field avg number +---@field max number +---@field min number +---@field variance number + +---@class resty.timerng.job.meta +---@field name string +---@field callstack string + local _M = {} +---@param job resty.timerng.job +---@return string local function job_tostring(job) local stats = job.stats local elapsed_time = stats.elapsed_time @@ -80,6 +115,8 @@ local meta_table = { } +---populate debug metadata (callstack info) on a job +---@param job resty.timerng.job local function job_create_meta(job) local meta = job.meta @@ -126,7 +163,9 @@ local function job_create_meta(job) end --- Calculate the position of each pointer when the job expires +---recalculate the wheel pointer positions for when the job expires +---@param job resty.timerng.job +---@param wheel_group resty.timerng.wheel.group local function job_re_cal_next_pointer(job, wheel_group) local lowest_wheel = wheel_group.lowest_wheel @@ -151,26 +190,31 @@ function _M:enable() end +---@return boolean function _M:is_running() return self._running end +---@return boolean function _M:is_enabled() return self._enable end +---@return boolean function _M:is_oneshot() return self._once end +---@return boolean function _M:is_cancelled() return self._cancel end +---@return boolean function _M:is_runnable() return self:is_enabled() and not self:is_cancelled() and @@ -178,31 +222,46 @@ function _M:is_runnable() end +---@return boolean function _M:is_immediate() return self._immediate end +---@return resty.timerng.job.meta function _M:get_metadata() return utils_table_deepcopy(self.meta) end +---@return resty.timerng.job.stats function _M:get_stats() return utils_table_deepcopy(self.stats) end +---@param wheel_id string +---@return integer? pointer function _M:get_next_pointer(wheel_id) return self.next_pointers[wheel_id] end +---@param wheels resty.timerng.wheel.group function _M:re_cal_next_pointer(wheels) job_re_cal_next_pointer(self, wheels) end +---@param wheels resty.timerng.wheel.group +---@param name? string timer name (auto-generated if nil) +---@param callback function +---@param delay number seconds until expiry +---@param once boolean true for one-shot, false for repeated +---@param debug boolean enable debug metadata +---@param argc integer number of extra callback arguments +---@param argv table extra callback arguments +---@return resty.timerng.job function _M.new(wheels, name, callback, delay, once, debug, argc, argv) local self = { _enable = true, @@ -265,6 +324,7 @@ function _M.new(wheels, name, callback, delay, once, debug, argc, argv) end +---run the job's callback and update stats function _M:execute() local stats = self.stats local elapsed_time = stats.elapsed_time diff --git a/lib/resty/timerng/thread/group.lua b/lib/resty/timerng/thread/group.lua index e6e025144..0c5faaf77 100644 --- a/lib/resty/timerng/thread/group.lua +++ b/lib/resty/timerng/thread/group.lua @@ -3,6 +3,9 @@ local worker_thread_module = require("resty.timerng.thread.worker") local setmetatable = setmetatable +---@class resty.timerng.thread.group +---@field super_thread resty.timerng.thread.super +---@field worker_thread resty.timerng.thread.worker local _M = {} local meta_table = { @@ -20,11 +23,13 @@ function _M:wake_up_worker_thread() end +---@return integer function _M:get_expected_alive_worker_thread_count() return self.worker_thread:get_expected_alive_thread_count() end +---@return integer function _M:get_alive_worker_thread_count() return self.worker_thread:get_alive_thread_count() end @@ -60,6 +65,8 @@ function _M:kill() end +---@param timer_sys resty.timerng +---@return resty.timerng.thread.group function _M.new(timer_sys) local super_thread = super_thread_module.new(timer_sys) local worker_thread = worker_thread_module.new(timer_sys, diff --git a/lib/resty/timerng/thread/loop.lua b/lib/resty/timerng/thread/loop.lua index f62288725..da64e85d9 100644 --- a/lib/resty/timerng/thread/loop.lua +++ b/lib/resty/timerng/thread/loop.lua @@ -31,6 +31,21 @@ local NEED_CHECK_WORKER_EIXTING = { } +---@class resty.timerng.thread.loop.phase_config +---@field argc integer +---@field argv table +---@field callback function +---@field need_check_worker_exiting? boolean + +---@class resty.timerng.thread.loop +---@field name string +---@field context table +---@field _kill boolean +---@field init resty.timerng.thread.loop.phase_config +---@field before resty.timerng.thread.loop.phase_config +---@field loop_body resty.timerng.thread.loop.phase_config +---@field after resty.timerng.thread.loop.phase_config +---@field finally resty.timerng.thread.loop.phase_config local _M = { ACTION_CONTINUE = ACTION_CONTINUE, ACTION_ERROR = ACTION_ERROR, @@ -163,6 +178,8 @@ local function do_phase_handler(self, phase) end +---@param premature boolean +---@param self resty.timerng.thread.loop local function loop_wrapper(premature, self) if premature then return @@ -183,6 +200,8 @@ local function loop_wrapper(premature, self) end +---@return boolean ok +---@return string? err function _M:spawn() self._kill = false local ok, err = ngx_timer_at(0, loop_wrapper, self) @@ -203,6 +222,9 @@ function _M:kill() end +---@param name string +---@param options table +---@return resty.timerng.thread.loop function _M.new(name, options) assert(options ~= nil) diff --git a/lib/resty/timerng/thread/super.lua b/lib/resty/timerng/thread/super.lua index f745f550c..356989490 100644 --- a/lib/resty/timerng/thread/super.lua +++ b/lib/resty/timerng/thread/super.lua @@ -24,6 +24,11 @@ local CONSTANTS_SCALING_LOG_INTERVAL = local setmetatable = setmetatable +---@class resty.timerng.thread.super +---@field timer_sys resty.timerng +---@field wake_up_semaphore table +---@field worker_thread resty.timerng.thread.worker +---@field thread resty.timerng.thread.loop local _M = {} local meta_table = { @@ -31,6 +36,7 @@ local meta_table = { } +---@param context table local function scaling_init(context) context.scaling_info = { context_for_scaling = { @@ -52,6 +58,8 @@ local function scaling_init(context) end +---@param self resty.timerng.thread.super +---@param context table local function scaling_record(self, context) local scaling_info = context.scaling_info local context_for_scaling = scaling_info.context_for_scaling @@ -88,6 +96,10 @@ local function scaling_record(self, context) end +---@param self resty.timerng.thread.super +---@param context table +---@return boolean? ok +---@return string? err local function scaling_execute(self, context) local scaling_info = context.scaling_info local context_for_scaling = scaling_info.context_for_scaling @@ -136,6 +148,8 @@ local function scaling_execute(self, context) end +---@param self resty.timerng.thread.super +---@param context table local function scaling_log(self, context) local scaling_info = context.scaling_info local context_for_log = scaling_info.context_for_log @@ -166,6 +180,9 @@ local function scaling_log(self, context) end +---@param context table +---@param self resty.timerng.thread.super +---@return integer action local function thread_init(context, self) local timer_sys = self.timer_sys local wheels = timer_sys.wheels @@ -183,6 +200,9 @@ local function thread_init(context, self) end +---@param _ table +---@param self resty.timerng.thread.super +---@return integer action local function thread_body(_, self) local timer_sys = self.timer_sys local wheels = timer_sys.wheels @@ -200,6 +220,9 @@ local function thread_body(_, self) end +---@param context table +---@param self resty.timerng.thread.super +---@return integer action local function thread_after(context, self) local timer_sys = self.timer_sys local wheels = timer_sys.wheels @@ -226,11 +249,14 @@ local function thread_after(context, self) end +---@param _ table +---@return integer action local function thread_finally(_) return loop.ACTION_CONTINUE end +---@param worker_thread resty.timerng.thread.worker function _M:set_worker_thread_ref(worker_thread) self.worker_thread = worker_thread end @@ -241,6 +267,7 @@ function _M:kill() end +---post to the semaphore to wake the super thread function _M:wake_up() local wake_up_semaphore = self.wake_up_semaphore local count = wake_up_semaphore:count() @@ -251,11 +278,15 @@ function _M:wake_up() end +---@return boolean ok +---@return string? err function _M:spawn() return self.thread:spawn() end +---@param timer_sys resty.timerng +---@return resty.timerng.thread.super function _M.new(timer_sys) local self = { timer_sys = timer_sys, diff --git a/lib/resty/timerng/thread/worker.lua b/lib/resty/timerng/thread/worker.lua index 1eebdd3de..e14244fb5 100644 --- a/lib/resty/timerng/thread/worker.lua +++ b/lib/resty/timerng/thread/worker.lua @@ -24,6 +24,23 @@ local utils_table_clear = utils.table_clear local CONSTANTS_TOLERANCE_OF_GRACEFUL_SHUTDOWN = require("resty.timerng.constants").TOLERANCE_OF_GRACEFUL_SHUTDOWN +---@class resty.timerng.thread.worker +---@field name_counter integer +---@field timer_sys resty.timerng +---@field wake_up_semaphore table +---@field min_threads integer +---@field max_threads integer +---@field spawned_threads_count integer +---@field spawned_threads table +---@field alive_threads_count integer +---@field alive_threads table +---@field expected_alive_thread_count integer +---@field super_thread resty.timerng.thread.super +---@field init resty.timerng.thread.loop.phase_config +---@field before resty.timerng.thread.loop.phase_config +---@field loop_body resty.timerng.thread.loop.phase_config +---@field after resty.timerng.thread.loop.phase_config +---@field finally resty.timerng.thread.loop.phase_config local _M = {} local meta_table = { @@ -31,6 +48,8 @@ local meta_table = { } +---@param self resty.timerng.thread.worker +---@param job resty.timerng.job local function report_before_job_execute(self, job) local timer_sys = self.timer_sys timer_sys.sys_stats.running = timer_sys.sys_stats.running + 1 @@ -59,6 +78,8 @@ local function report_before_job_execute(self, job) end +---@param self resty.timerng.thread.worker +---@param job resty.timerng.job local function report_after_job_execute(self, job) local timer_sys = self.timer_sys timer_sys.sys_stats.running = timer_sys.sys_stats.running - 1 @@ -82,6 +103,8 @@ local function report_after_job_execute(self, job) end +---@param self resty.timerng.thread.worker +---@param thread resty.timerng.thread.loop local function report_alive(self, thread) assert(self.spawned_threads[thread.name] ~= nil) @@ -90,6 +113,8 @@ local function report_alive(self, thread) end +---@param self resty.timerng.thread.worker +---@param thread resty.timerng.thread.loop local function report_exit(self, thread) assert(self.spawned_threads[thread.name] ~= nil) assert(self.alive_threads[thread.name] ~= nil) @@ -102,6 +127,9 @@ local function report_exit(self, thread) end +---@param self resty.timerng.thread.worker +---@return boolean ok +---@return string? err local function start_loop(self) local name = string_format( "worker#%d#%d", @@ -130,6 +158,10 @@ local function start_loop(self) end +---@param context table +---@param report_alive_callback fun(self: resty.timerng.thread.worker, thread: resty.timerng.thread.loop) +---@param self resty.timerng.thread.worker +---@return integer action local function thread_init(context, report_alive_callback, self) report_alive_callback(self, context.self) @@ -140,6 +172,9 @@ local function thread_init(context, report_alive_callback, self) end +---@param context table +---@param self resty.timerng.thread.worker +---@return integer action local function thread_before(context, self) local wake_up_semaphore = self.wake_up_semaphore local ok, err = @@ -153,6 +188,11 @@ local function thread_before(context, self) end +---@param context table +---@param self resty.timerng.thread.worker +---@param report_before_job_execute_callback fun(self: resty.timerng.thread.worker, job: resty.timerng.job) +---@param report_after_job_execute_callback fun(self: resty.timerng.thread.worker, job: resty.timerng.job) +---@return integer action local function thread_body(context, self, report_before_job_execute_callback, report_after_job_execute_callback) @@ -200,6 +240,9 @@ local function thread_body(context, self, end +---@param context table +---@param restart_thread_after_runs integer +---@return integer action local function thread_after(context, restart_thread_after_runs) local counter = context.counter local runs = counter.runs + 1 @@ -214,6 +257,10 @@ local function thread_after(context, restart_thread_after_runs) end +---@param context table +---@param report_exit_callback fun(self: resty.timerng.thread.worker, thread: resty.timerng.thread.loop) +---@param self resty.timerng.thread.worker +---@return integer action local function thread_finally(context, report_exit_callback, self) context.counter.runs = 0 @@ -233,16 +280,19 @@ local function thread_finally(context, report_exit_callback, self) end +---@return integer function _M:get_alive_thread_count() return self.alive_threads_count end +---@return integer function _M:get_expected_alive_thread_count() return self.expected_alive_thread_count end +---@param super_thread resty.timerng.thread.super function _M:set_super_thread_ref(super_thread) self.super_thread = super_thread end @@ -256,6 +306,7 @@ function _M:kill() end +---@param pending_jobs integer number of pending jobs to wake threads for function _M:wake_up(pending_jobs) local wake_up_semaphore = self.wake_up_semaphore local delta = pending_jobs - wake_up_semaphore:count() @@ -263,6 +314,10 @@ function _M:wake_up(pending_jobs) end +---scale the thread pool by a ratio of the range (max - min) +---@param ratio number positive to grow, negative to shrink, 0 for no-op +---@return boolean ok +---@return integer delta_thread_count change in expected thread count function _M:stretch(ratio) if ratio == 0 then return true, 0 @@ -304,6 +359,8 @@ function _M:stretch(ratio) end +---@return boolean ok +---@return string? err function _M:spawn() while self.spawned_threads_count < self.expected_alive_thread_count do local ok, err = start_loop(self) @@ -316,6 +373,10 @@ function _M:spawn() end +---@param timer_sys resty.timerng +---@param min_threads integer +---@param max_threads integer +---@return resty.timerng.thread.worker function _M.new(timer_sys, min_threads, max_threads) local self = { name_counter = 0, diff --git a/lib/resty/timerng/utils.lua b/lib/resty/timerng/utils.lua index e4e2bc5ae..c9412d8f4 100644 --- a/lib/resty/timerng/utils.lua +++ b/lib/resty/timerng/utils.lua @@ -35,21 +35,33 @@ do end +---@class resty.timerng.utils local _M = {} _M.table_new = table_new _M.table_clear = table_clear +---@type fun(tbl: table): table _M.table_deepcopy = table_deepcopy --- get average +---compute running average using recurrence formula +---@param cur_value number current observed value +---@param cur_count number total count including current observation +---@param old_avg number previous running average +---@return number avg updated running average function _M.get_avg(cur_value, cur_count, old_avg) -- recurrence formula return old_avg + ((cur_value - old_avg) / cur_count) end +---compute running variance using recurrence formula +---@param cur_value number current observed value +---@param cur_count number total count including current observation +---@param old_variance number previous running variance +---@param old_avg number previous running average +---@return number variance updated running variance function _M.get_variance(cur_value, cur_count, old_variance, old_avg) -- recurrence formula return (((cur_count - 1) @@ -58,6 +70,10 @@ function _M.get_variance(cur_value, cur_count, old_variance, old_avg) end +---compare two floats with tolerance of 0.01 +---@param left number +---@param right number +---@return integer result -1 if left < right, 1 if left > right, 0 if equal function _M.float_compare(left, right) local delta = left - right if delta < -0.01 then @@ -72,11 +88,19 @@ function _M.float_compare(left, right) end +---convert seconds to discrete wheel steps +---@param second number time in seconds +---@param resolution number seconds per step +---@return integer steps number of wheel steps function _M.convert_second_to_step(second, resolution) return math_floor(_M.round(second / resolution, 3)) end +---round a number to the given number of decimal digits +---@param value number +---@param digits integer number of decimal places +---@return number rounded function _M.round(value, digits) local x = 10 ^ digits return math_floor(value * x + 0.1) / x diff --git a/lib/resty/timerng/wheel/group.lua b/lib/resty/timerng/wheel/group.lua index 5b29bc153..e381976ac 100644 --- a/lib/resty/timerng/wheel/group.lua +++ b/lib/resty/timerng/wheel/group.lua @@ -18,6 +18,17 @@ local setmetatable = setmetatable local CONSTANTS_TOLERANCE_OF_GRACEFUL_SHUTDOWN = require("resty.timerng.constants").TOLERANCE_OF_GRACEFUL_SHUTDOWN +---@class resty.timerng.wheel.group +---@field setting resty.timerng.wheel_setting +---@field resolution number +---@field real_time number +---@field expected_time number +---@field earliest_expiry_time number +---@field pending_jobs resty.timerng.array +---@field wheels resty.timerng.wheel[] +---@field highest_wheel resty.timerng.wheel +---@field lowest_wheel resty.timerng.wheel +---@field report_job_expire_callback fun(job: resty.timerng.job) local _M = {} local meta_table = { @@ -73,8 +84,7 @@ function _M:update_earliest_expiry_time() end --- do the following things --- * add all expired jobs from wheels to `wheels.pending_jobs` +---collect all expired jobs from every wheel into pending_jobs function _M:fetch_all_expired_jobs() for _, _wheel in ipairs(self.wheels) do local expired_jobs = _wheel:fetch_all_expired_jobs() @@ -87,6 +97,7 @@ function _M:fetch_all_expired_jobs() end +---synchronize wall-clock time with the wheel group, spinning as needed function _M:sync_time() local lowest_wheel = self.lowest_wheel local resolution = self.resolution @@ -120,12 +131,19 @@ function _M:sync_time() end --- insert a job into the wheel group +---insert a job into the wheel group starting from the highest wheel +---@param job resty.timerng.job +---@return boolean ok +---@return string? err function _M:insert_job(job) return self.highest_wheel:insert(job) end +---@param wheel_setting resty.timerng.wheel_setting +---@param resolution number +---@param report_job_expire_callback fun(job: resty.timerng.job) +---@return resty.timerng.wheel.group function _M.new(wheel_setting, resolution, report_job_expire_callback) local self = { -- see `constants.DEFAULT_WHEEL_SETTING` diff --git a/lib/resty/timerng/wheel/init.lua b/lib/resty/timerng/wheel/init.lua index 53231261f..99399440f 100644 --- a/lib/resty/timerng/wheel/init.lua +++ b/lib/resty/timerng/wheel/init.lua @@ -7,6 +7,15 @@ local math_floor = math.floor local setmetatable = setmetatable +---@class resty.timerng.wheel +---@field id string +---@field pointer integer +---@field nelts integer +---@field slots resty.timerng.array[] +---@field higher_wheel? resty.timerng.wheel +---@field lower_wheel? resty.timerng.wheel +---@field expired_jobs resty.timerng.array +---@field report_job_expire_callback fun(job: resty.timerng.job) local _M = {} local meta_table = { @@ -14,16 +23,19 @@ local meta_table = { } +---@param wheel resty.timerng.wheel function _M:set_higher_wheel(wheel) self.higher_wheel = wheel end +---@param wheel resty.timerng.wheel function _M:set_lower_wheel(wheel) self.lower_wheel = wheel end +---@return integer function _M:get_cur_pointer() return self.pointer end @@ -156,11 +168,14 @@ function _M:spin_pointer(offset) end +---@return resty.timerng.array function _M:get_jobs() return self.slots[self:get_cur_pointer()] end +---@param pointer integer +---@return resty.timerng.array function _M:get_jobs_by_pointer(pointer) return self.slots[pointer] end @@ -180,10 +195,11 @@ function _M:fetch_all_expired_jobs() return ret end ----new a wheel +---create a new wheel ---@param id string id of this wheel ---@param nelts integer slots of this wheel ----@return table wheel a wheel +---@param report_job_expire_callback fun(job: resty.timerng.job) +---@return resty.timerng.wheel function _M.new(id, nelts, report_job_expire_callback) assert(id ~= nil)