Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 25 additions & 4 deletions lib/resty/timerng/array.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
local utils = require("resty.timerng.utils")
local constants = require("resty.timerng.constants")

local utils_table_new = utils.table_new
local utils_table_clear = utils.table_clear
Expand All @@ -9,6 +10,12 @@ local table_remove = table.remove
local error = error
local setmetatable = setmetatable

local CONSTANTS_DEFAULT_MAX_ARRAY_LENGTH =
constants.DEFAULT_MAX_ARRAY_LENGTH

local CONSTANTS_DEFAULT_INIT_ARRAY_LENGTH =
constants.DEFAULT_INIT_ARRAY_LENGTH

local array_pool = {}

local _M = {}
Expand Down Expand Up @@ -47,20 +54,26 @@ end


function _M:push_left (value)
if self.max_length and self:length() >= self.max_length then
return "list is full"
end
local first = self.first - 1
self.first = first
self.elts[first] = value
end


function _M:push_right(value)
if self.max_length and self:length() >= self.max_length then
return "list is full"
end
local last = self.last + 1
self.last = last
self.elts[last] = value
end


function _M:pop_left ()
function _M:pop_left()
local first = self.first

if first > self.last then
Expand Down Expand Up @@ -88,9 +101,13 @@ function _M:pop_right()
end


function _M.new(n)
function _M.new(n, max_length)
if n == nil then
n = 8
n = CONSTANTS_DEFAULT_INIT_ARRAY_LENGTH
end

if max_length == nil then
max_length = CONSTANTS_DEFAULT_MAX_ARRAY_LENGTH
end

local self = table_remove(array_pool)
Expand All @@ -100,6 +117,7 @@ function _M.new(n)
end

self = {
max_length = max_length,
elts = utils_table_new(n, 0),
first = 0,
last = -1,
Expand All @@ -123,7 +141,10 @@ function _M.merge(dst, src)
end

while not src:is_empty() do
dst:push_left(src:pop_left())
local err = dst:push_left(src:pop_left())
if err then
return "failed to merge array: " .. err
end
end
end

Expand Down
5 changes: 4 additions & 1 deletion lib/resty/timerng/constants.lua
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ local _M = {

MIN_RESOLUTION = 0.1,

DEFAULT_INIT_ARRAY_LENGTH = 8,
DEFAULT_MAX_ARRAY_LENGTH = 100000,

-- for Nginx's graceful shutdown
TOLERANCE_OF_GRACEFUL_SHUTDOWN = 1,

Expand Down Expand Up @@ -108,4 +111,4 @@ do
end
end

return _M
return _M
33 changes: 28 additions & 5 deletions lib/resty/timerng/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -95,12 +95,15 @@ end
---@param argc integer the number of arguments to the callback function
---@param argv table arguments to the callback function
---@return boolean name_or_false the name of the timer if ok, otherwise false
---@return string err error message
---@return string|nil err error message
local function create(self, name, callback, delay, timer_type, argc, argv)
local wheels = self.wheels
local jobs = self.jobs

wheels:sync_time()
local err = wheels:sync_time()
if err then
return false, "failed to sync time: " .. err
end

local job = job_module.new(wheels,
name,
Expand All @@ -121,14 +124,18 @@ local function create(self, name, callback, delay, timer_type, argc, argv)
self.sys_stats.total = self.sys_stats.total + 1

if job:is_immediate() then
wheels.pending_jobs:push_right(job)
err = wheels.pending_jobs:push_right(job)
if err then
return false, "failed to push job to pending jobs: " .. err
end
self.thread_group:wake_up_super_thread()
report_job_expire_callback_inernal(self, job)

return job.name, nil
end

local ok, err = wheels:insert_job(job)
local ok
ok, err = wheels:insert_job(job)

local _, need_wake_up = wheels:update_earliest_expiry_time()

Expand Down Expand Up @@ -263,6 +270,17 @@ function _M.new(options)
end

end

if options.max_pending_jobs then
assert(type(options.max_pending_jobs) == "number",
"expected `max_pending_jobs` to be a number")

assert(options.max_pending_jobs > 0,
"expected `max_pending_jobs` to be greater than 0")

local _, tmp = math_modf(options.max_pending_jobs)
assert(tmp == 0, "expected `max_pending_jobs` to be an integer")
end
end

local opt = {
Expand Down Expand Up @@ -304,6 +322,10 @@ function _M.new(options)
force_update_time = options
and options.force_update_time
or constants.DEFAULT_FORCE_UPDATE_TIME,

max_pending_jobs = options
and options.max_pending_jobs
or constants.DEFAULT_MAX_ARRAY_LENGTH,
}

timer_sys.opt = opt
Expand Down Expand Up @@ -339,7 +361,8 @@ function _M.new(options)

timer_sys.wheels = wheel_group.new(opt.wheel_setting,
opt.resolution,
report_job_expire_callback)
report_job_expire_callback,
opt.max_pending_jobs)

return setmetatable(timer_sys, { __index = _M })
end
Expand Down
2 changes: 1 addition & 1 deletion lib/resty/timerng/thread/group.lua
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ end

---spawn super_thread, and all worker threads
---@return boolean ok ok?
---@return string err_msg
---@return string|nil err_msg
function _M:spawn()
local ok, err
ok, err = self.super_thread:spawn()
Expand Down
10 changes: 5 additions & 5 deletions lib/resty/timerng/thread/loop.lua
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ local ACTION_EXIT = 3
-- max number of arguments for a phase handler
local MAX_ARGS = 4

local NEED_CHECK_WORKER_EIXTING = {
local NEED_CHECK_WORKER_EXITING = {
init = false,
before = true,
loop_body = true,
Expand Down Expand Up @@ -64,7 +64,7 @@ local function nop_finally()
end


local PAHSE_HANDLERS = {
local PHASE_HANDLERS = {
init = nop_init,
before = nop_before,
loop_body = nop_loop_body,
Expand Down Expand Up @@ -98,7 +98,7 @@ end
---@param self table self
---@param phase string init | before | loop_body | after | finally
---@return integer action
---@return string message
---@return string|nil message
local function phase_handler_wrapper(self, phase)
-- unpack arguments to avoid NYI: return to lower frame
-- as `pcall` with varargs might causes NYI when returning from `pcall`
Expand Down Expand Up @@ -216,11 +216,11 @@ function _M.new(name, options)

self.context.self = self

for phase, default_handler in pairs(PAHSE_HANDLERS) do
for phase, default_handler in pairs(PHASE_HANDLERS) do
self[phase] = {}

self[phase].need_check_worker_exiting
= NEED_CHECK_WORKER_EIXTING[phase]
= NEED_CHECK_WORKER_EXITING[phase]

if not options[phase] then
self[phase].argc = 0
Expand Down
6 changes: 5 additions & 1 deletion lib/resty/timerng/thread/super.lua
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,11 @@ local function thread_body(_, self)

if timer_sys.enable then
-- update the status of the wheel group
wheels:sync_time()
local err = wheels:sync_time()
if err then
ngx_log(ngx_ERR, "[timer-ng] failed to sync time: ", err)
return loop.ACTION_ERROR
end

if not wheels.pending_jobs:is_empty() then
self.worker_thread:wake_up(wheels.pending_jobs:length())
Expand Down
18 changes: 15 additions & 3 deletions lib/resty/timerng/thread/worker.lua
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,11 @@ local function thread_body(context, self,
while not wheels.pending_jobs:is_empty() and
not ngx_worker_exiting()
do
local job = wheels.pending_jobs:pop_left()
local job, err = wheels.pending_jobs:pop_left()
if not job then
ngx_log(ngx_ERR, "[timer-ng] failed to pop job: ", err)
return loop.ACTION_EXIT
end

if not job:is_runnable() then
goto continue
Expand All @@ -180,9 +184,17 @@ local function thread_body(context, self,
end

if job:is_runnable() then
wheels:sync_time()
local err2 = wheels:sync_time()
if err then
ngx_log(ngx_ERR, "[timer-ng] failed to sync time: ", err2)
return loop.ACTION_ERROR
end
job:re_cal_next_pointer(wheels)
wheels:insert_job(job)
local res, err3 = wheels:insert_job(job)
if not res then
ngx_log(ngx_ERR, "[timer-ng] failed to insert job: ", err3)
return loop.ACTION_ERROR
end

local _, need_wake_up = wheels:update_earliest_expiry_time()

Expand Down
32 changes: 25 additions & 7 deletions lib/resty/timerng/wheel/group.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ local utils = require("resty.timerng.utils")
local wheel = require("resty.timerng.wheel")
local array = require("resty.timerng.array")

local array_new = array.new
local array_merge = array.merge

local utils_float_compare = utils.float_compare
Expand All @@ -18,6 +19,9 @@ local setmetatable = setmetatable
local CONSTANTS_TOLERANCE_OF_GRACEFUL_SHUTDOWN =
require("resty.timerng.constants").TOLERANCE_OF_GRACEFUL_SHUTDOWN

local CONSTANTS_DEFAULT_INIT_ARRAY_LENGTH =
require("resty.timerng.constants").DEFAULT_INIT_ARRAY_LENGTH

local _M = {}

local meta_table = {
Expand Down Expand Up @@ -78,7 +82,10 @@ end
function _M:fetch_all_expired_jobs()
for _, _wheel in ipairs(self.wheels) do
local expired_jobs = _wheel:fetch_all_expired_jobs()
array_merge(self.pending_jobs, expired_jobs)
local err = array_merge(self.pending_jobs, expired_jobs)
if err then
return "failed to merge expired jobs: " .. err
end

if expired_jobs then
expired_jobs:release()
Expand All @@ -92,7 +99,10 @@ function _M:sync_time()
local resolution = self.resolution

-- perhaps some jobs have expired but not been fetched
self:fetch_all_expired_jobs()
local err = self:fetch_all_expired_jobs()
if err then
return "failed to fetch all expired jobs: " .. err
end

ngx_update_time()
self.real_time = ngx_now()
Expand All @@ -106,9 +116,15 @@ function _M:sync_time()
local delta = self.real_time - self.expected_time
local steps = utils_convert_second_to_step(delta, resolution)

lowest_wheel:spin_pointer(steps)
err = lowest_wheel:spin_pointer(steps)
if err then
return "failed to spin lowest wheel: " .. err
end

self:fetch_all_expired_jobs()
err = self:fetch_all_expired_jobs()
if err then
return "failed to fetch all expired jobs: " .. err
end

-- The floating-point error may cause
-- `expected_time` to be larger than `real_time`
Expand All @@ -126,7 +142,8 @@ function _M:insert_job(job)
end


function _M.new(wheel_setting, resolution, report_job_expire_callback)
function _M.new(wheel_setting, resolution, report_job_expire_callback,
max_pending_jobs)
local self = {
-- see `constants.DEFAULT_WHEEL_SETTING`
setting = wheel_setting,
Expand All @@ -142,7 +159,8 @@ function _M.new(wheel_setting, resolution, report_job_expire_callback)

earliest_expiry_time = 0,

pending_jobs = array.new(),
pending_jobs = array_new(CONSTANTS_DEFAULT_INIT_ARRAY_LENGTH,
max_pending_jobs),

-- store wheels for each level
-- map from wheel_level to wheel
Expand Down Expand Up @@ -180,4 +198,4 @@ function _M.new(wheel_setting, resolution, report_job_expire_callback)
end


return _M
return _M
Loading
Loading