diff --git a/lib/resty/timerng/array.lua b/lib/resty/timerng/array.lua index 686969256..4fa7ff43d 100644 --- a/lib/resty/timerng/array.lua +++ b/lib/resty/timerng/array.lua @@ -1,4 +1,5 @@ local utils = require("resty.timerng.utils") +local constants = require("resty.timerng.constants") local utils_table_new = utils.table_new local utils_table_clear = utils.table_clear @@ -9,6 +10,12 @@ local table_remove = table.remove local error = error local setmetatable = setmetatable +local CONSTANTS_DEFAULT_MAX_ARRAY_LENGTH = + constants.DEFAULT_MAX_ARRAY_LENGTH + +local CONSTANTS_DEFAULT_INIT_ARRAY_LENGTH = + constants.DEFAULT_INIT_ARRAY_LENGTH + local array_pool = {} local _M = {} @@ -47,6 +54,9 @@ end function _M:push_left (value) + if self.max_length and self:length() >= self.max_length then + return "list is full" + end local first = self.first - 1 self.first = first self.elts[first] = value @@ -54,13 +64,16 @@ end function _M:push_right(value) + if self.max_length and self:length() >= self.max_length then + return "list is full" + end local last = self.last + 1 self.last = last self.elts[last] = value end -function _M:pop_left () +function _M:pop_left() local first = self.first if first > self.last then @@ -88,9 +101,13 @@ function _M:pop_right() end -function _M.new(n) +function _M.new(n, max_length) if n == nil then - n = 8 + n = CONSTANTS_DEFAULT_INIT_ARRAY_LENGTH + end + + if max_length == nil then + max_length = CONSTANTS_DEFAULT_MAX_ARRAY_LENGTH end local self = table_remove(array_pool) @@ -100,6 +117,7 @@ function _M.new(n) end self = { + max_length = max_length, elts = utils_table_new(n, 0), first = 0, last = -1, @@ -123,7 +141,10 @@ function _M.merge(dst, src) end while not src:is_empty() do - dst:push_left(src:pop_left()) + local err = dst:push_left(src:pop_left()) + if err then + return "failed to merge array: " .. err + end end end diff --git a/lib/resty/timerng/constants.lua b/lib/resty/timerng/constants.lua index 0965a92d9..a06cc5cd1 100644 --- a/lib/resty/timerng/constants.lua +++ b/lib/resty/timerng/constants.lua @@ -34,6 +34,9 @@ local _M = { MIN_RESOLUTION = 0.1, + DEFAULT_INIT_ARRAY_LENGTH = 8, + DEFAULT_MAX_ARRAY_LENGTH = 100000, + -- for Nginx's graceful shutdown TOLERANCE_OF_GRACEFUL_SHUTDOWN = 1, @@ -108,4 +111,4 @@ do end end -return _M \ No newline at end of file +return _M diff --git a/lib/resty/timerng/init.lua b/lib/resty/timerng/init.lua index 9ec5a9e6f..c4bf298c1 100644 --- a/lib/resty/timerng/init.lua +++ b/lib/resty/timerng/init.lua @@ -95,12 +95,15 @@ end ---@param argc integer the number of arguments to the callback function ---@param argv table arguments to the callback function ---@return boolean name_or_false the name of the timer if ok, otherwise false ----@return string err error message +---@return string|nil err error message local function create(self, name, callback, delay, timer_type, argc, argv) local wheels = self.wheels local jobs = self.jobs - wheels:sync_time() + local err = wheels:sync_time() + if err then + return false, "failed to sync time: " .. err + end local job = job_module.new(wheels, name, @@ -121,14 +124,18 @@ local function create(self, name, callback, delay, timer_type, argc, argv) self.sys_stats.total = self.sys_stats.total + 1 if job:is_immediate() then - wheels.pending_jobs:push_right(job) + err = wheels.pending_jobs:push_right(job) + if err then + return false, "failed to push job to pending jobs: " .. err + end self.thread_group:wake_up_super_thread() report_job_expire_callback_inernal(self, job) return job.name, nil end - local ok, err = wheels:insert_job(job) + local ok + ok, err = wheels:insert_job(job) local _, need_wake_up = wheels:update_earliest_expiry_time() @@ -263,6 +270,17 @@ function _M.new(options) end end + + if options.max_pending_jobs then + assert(type(options.max_pending_jobs) == "number", + "expected `max_pending_jobs` to be a number") + + assert(options.max_pending_jobs > 0, + "expected `max_pending_jobs` to be greater than 0") + + local _, tmp = math_modf(options.max_pending_jobs) + assert(tmp == 0, "expected `max_pending_jobs` to be an integer") + end end local opt = { @@ -304,6 +322,10 @@ function _M.new(options) force_update_time = options and options.force_update_time or constants.DEFAULT_FORCE_UPDATE_TIME, + + max_pending_jobs = options + and options.max_pending_jobs + or constants.DEFAULT_MAX_ARRAY_LENGTH, } timer_sys.opt = opt @@ -339,7 +361,8 @@ function _M.new(options) timer_sys.wheels = wheel_group.new(opt.wheel_setting, opt.resolution, - report_job_expire_callback) + report_job_expire_callback, + opt.max_pending_jobs) return setmetatable(timer_sys, { __index = _M }) end diff --git a/lib/resty/timerng/thread/group.lua b/lib/resty/timerng/thread/group.lua index e6e025144..4ae2f7f71 100644 --- a/lib/resty/timerng/thread/group.lua +++ b/lib/resty/timerng/thread/group.lua @@ -32,7 +32,7 @@ end ---spawn super_thread, and all worker threads ---@return boolean ok ok? ----@return string err_msg +---@return string|nil err_msg function _M:spawn() local ok, err ok, err = self.super_thread:spawn() diff --git a/lib/resty/timerng/thread/loop.lua b/lib/resty/timerng/thread/loop.lua index f62288725..b2a15f540 100644 --- a/lib/resty/timerng/thread/loop.lua +++ b/lib/resty/timerng/thread/loop.lua @@ -22,7 +22,7 @@ local ACTION_EXIT = 3 -- max number of arguments for a phase handler local MAX_ARGS = 4 -local NEED_CHECK_WORKER_EIXTING = { +local NEED_CHECK_WORKER_EXITING = { init = false, before = true, loop_body = true, @@ -64,7 +64,7 @@ local function nop_finally() end -local PAHSE_HANDLERS = { +local PHASE_HANDLERS = { init = nop_init, before = nop_before, loop_body = nop_loop_body, @@ -98,7 +98,7 @@ end ---@param self table self ---@param phase string init | before | loop_body | after | finally ---@return integer action ----@return string message +---@return string|nil message local function phase_handler_wrapper(self, phase) -- unpack arguments to avoid NYI: return to lower frame -- as `pcall` with varargs might causes NYI when returning from `pcall` @@ -216,11 +216,11 @@ function _M.new(name, options) self.context.self = self - for phase, default_handler in pairs(PAHSE_HANDLERS) do + for phase, default_handler in pairs(PHASE_HANDLERS) do self[phase] = {} self[phase].need_check_worker_exiting - = NEED_CHECK_WORKER_EIXTING[phase] + = NEED_CHECK_WORKER_EXITING[phase] if not options[phase] then self[phase].argc = 0 diff --git a/lib/resty/timerng/thread/super.lua b/lib/resty/timerng/thread/super.lua index f745f550c..6ead51bef 100644 --- a/lib/resty/timerng/thread/super.lua +++ b/lib/resty/timerng/thread/super.lua @@ -189,7 +189,11 @@ local function thread_body(_, self) if timer_sys.enable then -- update the status of the wheel group - wheels:sync_time() + local err = wheels:sync_time() + if err then + ngx_log(ngx_ERR, "[timer-ng] failed to sync time: ", err) + return loop.ACTION_ERROR + end if not wheels.pending_jobs:is_empty() then self.worker_thread:wake_up(wheels.pending_jobs:length()) diff --git a/lib/resty/timerng/thread/worker.lua b/lib/resty/timerng/thread/worker.lua index 1eebdd3de..fe095aec7 100644 --- a/lib/resty/timerng/thread/worker.lua +++ b/lib/resty/timerng/thread/worker.lua @@ -162,7 +162,11 @@ local function thread_body(context, self, while not wheels.pending_jobs:is_empty() and not ngx_worker_exiting() do - local job = wheels.pending_jobs:pop_left() + local job, err = wheels.pending_jobs:pop_left() + if not job then + ngx_log(ngx_ERR, "[timer-ng] failed to pop job: ", err) + return loop.ACTION_EXIT + end if not job:is_runnable() then goto continue @@ -180,9 +184,17 @@ local function thread_body(context, self, end if job:is_runnable() then - wheels:sync_time() + local err2 = wheels:sync_time() + if err then + ngx_log(ngx_ERR, "[timer-ng] failed to sync time: ", err2) + return loop.ACTION_ERROR + end job:re_cal_next_pointer(wheels) - wheels:insert_job(job) + local res, err3 = wheels:insert_job(job) + if not res then + ngx_log(ngx_ERR, "[timer-ng] failed to insert job: ", err3) + return loop.ACTION_ERROR + end local _, need_wake_up = wheels:update_earliest_expiry_time() diff --git a/lib/resty/timerng/wheel/group.lua b/lib/resty/timerng/wheel/group.lua index 5b29bc153..dca759150 100644 --- a/lib/resty/timerng/wheel/group.lua +++ b/lib/resty/timerng/wheel/group.lua @@ -2,6 +2,7 @@ local utils = require("resty.timerng.utils") local wheel = require("resty.timerng.wheel") local array = require("resty.timerng.array") +local array_new = array.new local array_merge = array.merge local utils_float_compare = utils.float_compare @@ -18,6 +19,9 @@ local setmetatable = setmetatable local CONSTANTS_TOLERANCE_OF_GRACEFUL_SHUTDOWN = require("resty.timerng.constants").TOLERANCE_OF_GRACEFUL_SHUTDOWN +local CONSTANTS_DEFAULT_INIT_ARRAY_LENGTH = + require("resty.timerng.constants").DEFAULT_INIT_ARRAY_LENGTH + local _M = {} local meta_table = { @@ -78,7 +82,10 @@ end function _M:fetch_all_expired_jobs() for _, _wheel in ipairs(self.wheels) do local expired_jobs = _wheel:fetch_all_expired_jobs() - array_merge(self.pending_jobs, expired_jobs) + local err = array_merge(self.pending_jobs, expired_jobs) + if err then + return "failed to merge expired jobs: " .. err + end if expired_jobs then expired_jobs:release() @@ -92,7 +99,10 @@ function _M:sync_time() local resolution = self.resolution -- perhaps some jobs have expired but not been fetched - self:fetch_all_expired_jobs() + local err = self:fetch_all_expired_jobs() + if err then + return "failed to fetch all expired jobs: " .. err + end ngx_update_time() self.real_time = ngx_now() @@ -106,9 +116,15 @@ function _M:sync_time() local delta = self.real_time - self.expected_time local steps = utils_convert_second_to_step(delta, resolution) - lowest_wheel:spin_pointer(steps) + err = lowest_wheel:spin_pointer(steps) + if err then + return "failed to spin lowest wheel: " .. err + end - self:fetch_all_expired_jobs() + err = self:fetch_all_expired_jobs() + if err then + return "failed to fetch all expired jobs: " .. err + end -- The floating-point error may cause -- `expected_time` to be larger than `real_time` @@ -126,7 +142,8 @@ function _M:insert_job(job) end -function _M.new(wheel_setting, resolution, report_job_expire_callback) +function _M.new(wheel_setting, resolution, report_job_expire_callback, + max_pending_jobs) local self = { -- see `constants.DEFAULT_WHEEL_SETTING` setting = wheel_setting, @@ -142,7 +159,8 @@ function _M.new(wheel_setting, resolution, report_job_expire_callback) earliest_expiry_time = 0, - pending_jobs = array.new(), + pending_jobs = array_new(CONSTANTS_DEFAULT_INIT_ARRAY_LENGTH, + max_pending_jobs), -- store wheels for each level -- map from wheel_level to wheel @@ -180,4 +198,4 @@ function _M.new(wheel_setting, resolution, report_job_expire_callback) end -return _M \ No newline at end of file +return _M diff --git a/lib/resty/timerng/wheel/init.lua b/lib/resty/timerng/wheel/init.lua index 53231261f..fdd836e32 100644 --- a/lib/resty/timerng/wheel/init.lua +++ b/lib/resty/timerng/wheel/init.lua @@ -91,14 +91,18 @@ end ---inserting a job into this wheel or lower wheel ---@param job table a table that was returned by `job.new()` ---@return boolean ok ok? ----@return string err error message +---@return string|nil err error message function _M:insert(job) assert(self.slots) local next_pointer = job:get_next_pointer(self.id) if next_pointer then - self.slots[next_pointer]:push_right(job) + local err = self.slots[next_pointer]:push_right(job) + if err then + return false, + "failed to insert job into wheel " .. self.id .. ": " .. err + end return true, nil end @@ -109,7 +113,10 @@ function _M:insert(job) end self.report_job_expire_callback(job) - self.expired_jobs:push_right(job) + local err = self.expired_jobs:push_right(job) + if err then + return false, "failed to insert job into expired jobs: " .. err + end return true, nil end @@ -135,7 +142,10 @@ function _M:spin_pointer(offset) if higher_wheel then -- spin the higher wheel to move some jobs to this wheel - higher_wheel:spin_pointer(cycles) + local err = higher_wheel:spin_pointer(cycles) + if err then + return "failed to spin higher wheel: " .. err + end end local jobs = self:get_jobs_by_pointer(final_pointer) @@ -144,10 +154,16 @@ function _M:spin_pointer(offset) local job = jobs:pop_right() if lower_wheel then - lower_wheel:insert(job) + local _, err = lower_wheel:insert(job) + if err then + return "failed to insert job into lower wheel: " .. err + end else self.report_job_expire_callback(job) - expired_jobs:push_right(job) + local err = expired_jobs:push_right(job) + if err then + return "failed to insert job into expired jobs: " .. err + end end end end @@ -167,7 +183,7 @@ end ---return all expired jobs, or return nil. ----@return table jobs_or_nil +---@return table|nil jobs_or_nil function _M:fetch_all_expired_jobs() if self.expired_jobs:is_empty() then return nil @@ -197,7 +213,7 @@ function _M.new(id, nelts, report_job_expire_callback) higher_wheel = nil, lower_wheel = nil, - expired_jobs = array.new(), + expired_jobs = array_new(), report_job_expire_callback = report_job_expire_callback, } @@ -211,4 +227,4 @@ function _M.new(id, nelts, report_job_expire_callback) end -return _M \ No newline at end of file +return _M diff --git a/spec/01-new_spec.lua b/spec/01-new_spec.lua index 1468295ca..737e95554 100644 --- a/spec/01-new_spec.lua +++ b/spec/01-new_spec.lua @@ -312,6 +312,44 @@ describe("new with | ", function () end) -- end it + it("invalid `max_pending_jobs`", function () + assert.has.errors(function () + timer_module.new({ + max_pending_jobs = {}, + }) + end) + + assert.has.errors(function () + timer_module.new({ + max_pending_jobs = true, + }) + end) + + assert.has.errors(function () + timer_module.new({ + max_pending_jobs = "", + }) + end) + + assert.has.errors(function () + timer_module.new({ + max_pending_jobs = -1, + }) + end) + + assert.has.errors(function () + timer_module.new({ + max_pending_jobs = 0, + }) + end) + + assert.has.errors(function () + timer_module.new({ + max_pending_jobs = 1.5, + }) + end) + end) -- end it + end) -- end the second describe end) -- end the top describe \ No newline at end of file diff --git a/spec/09-array_max_legnth_spec.lua b/spec/09-array_max_legnth_spec.lua new file mode 100644 index 000000000..1b24ab548 --- /dev/null +++ b/spec/09-array_max_legnth_spec.lua @@ -0,0 +1,65 @@ +local timer_module = require("resty.timerng") +local array_module = require("resty.timerng.array") + + +describe("array max_length limit", function() + it("push_right > max_length", function() + local arr = array_module.new(2, 3) + assert.is_nil(arr:push_right(1)) + assert.is_nil(arr:push_right(2)) + assert.is_nil(arr:push_right(3)) + local err = arr:push_right(4) + assert.same("list is full", err) + end) + + it("push_left > max_length", function() + local arr = array_module.new(2, 2) + assert.is_nil(arr:push_left(1)) + assert.is_nil(arr:push_left(2)) + local err = arr:push_left(3) + assert.same("list is full", err) + end) +end) + + +describe("max_pending_jobs limit", function() + it("pending_jobs == 3", function() + local max_pending = 3 + local timer = assert(timer_module.new({ + max_pending_jobs = max_pending, + min_threads = 1, + max_threads = 2, + })) + assert(timer:start()) + + for _ = 1, max_pending do + assert.has_no.errors(function() + assert(timer:at(0, function() end)) + end) + end + + local ok, err = timer:at(0, function() end) + assert.is_false(ok) + assert.matches("failed to push job to pending jobs: list is full", err) + end) + + it("pending_jobs == 0", function() + local max_pending = 3 + local timer = assert(timer_module.new({ + min_threads = 1, + max_threads = 2, + })) + assert(timer:start()) + + for _ = 1, max_pending do + assert.has_no.errors(function() + assert(timer:at(0, function() end)) + end) + end + + local ok, err = timer:at(0, function() end) + print("ok, err = ", ok, err) + assert.is_not_false(ok) + assert.is_nil(err) + end) +end) \ No newline at end of file