Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/uv/event-loop.kk
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ pub extern clear-timeout( tid : any) : io-noexn ()

// Runs an async action on the default uv event loop, and exceptions at the top level exit the async loop.
pub fun default-async-uv(action: () -> <async,io> a): io a
val result = ref(Error(Exception("Unreachable", ExnInternal("Unreachable"))))
val result = ref(Error(Exception(
"uv event loop returned without fully executing koka code (missing callback or deadlock?)"
, ExnInternal("Unreachable"))))
val _ =
with default-event-loop
with @default-async
Expand Down
38 changes: 36 additions & 2 deletions lib/uv/inline/event-loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,45 @@ void kk_uv_loop_run(kk_context_t* _ctx){
}
}

static char* kk_uv_handle_type_str(uv_handle_t* handle) {
switch (handle->type) {
case UV_UNKNOWN_HANDLE: return "UNKNOWN";
case UV_ASYNC: return "ASYNC";
case UV_CHECK: return "CHECK";
case UV_FS_EVENT: return "FS_EVENT";
case UV_FS_POLL: return "FS_POLL";
case UV_HANDLE: return "HANDLE";
case UV_IDLE: return "IDLE";
case UV_NAMED_PIPE: return "NAMED_PIPE";
case UV_POLL: return "POLL";
case UV_PREPARE: return "PREPARE";
case UV_PROCESS: return "PROCESS";
case UV_STREAM: return "STREAM";
case UV_TCP: return "TCP";
case UV_TIMER: return "TIMER";
case UV_TTY: return "TTY";
case UV_UDP: return "UDP";
case UV_SIGNAL: return "SIGNAL";
case UV_FILE: return "FILE";
default: return "INVALID";
}
}

static void kk_uv_loop_walk_cb(uv_handle_t* handle, void* arg) {
const char* closing_msg = uv_is_closing(handle) ? " [CLOSING]" : "";
const char* active_msg = uv_is_active(handle) ? " [ACTIVE]" : "";
kk_warning_message(" - %s handle%s%s\n", kk_uv_handle_type_str(handle), active_msg, closing_msg);
}

static void kk_uv_loop_close(kk_context_t* _ctx) {
int ret = uv_loop_close(uvloop());
if (ret != 0) {
kk_warning_message("Event loop closed %s\n", uv_err_name(ret));
if (ret == UV_EBUSY) {
kk_warning_message("Event loop closed with open child handles:\n");
uv_walk(uvloop(), kk_uv_loop_walk_cb, NULL);
} else {
kk_warning_message("Event loop close returned error: %s\n", uv_err_name(ret));
}
}
kk_free(uvloop(), _ctx);
}
Expand Down Expand Up @@ -118,6 +153,5 @@ kk_box_t kk_set_timeout(kk_function_t cb, int64_t time, kk_context_t* _ctx) {
kk_unit_t kk_clear_timeout(kk_box_t boxed_timer, kk_context_t* _ctx) {
kk_uv_timer__timer timer = kk_uv_timer__timer_unbox(boxed_timer, KK_OWNED, _ctx);
kk_uv_timer_stop(timer, _ctx);
kk_uv_timer_release_callback(timer, _ctx);
return kk_Unit;
}
139 changes: 41 additions & 98 deletions lib/uv/inline/timer.c
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,13 @@ void kk_handle_free(void *p, kk_block_t *block, kk_context_t *_ctx) {
kk_free(block, kk_context()); // Free the block memory
}

#define kk_tm_to_uv(hnd) kk_owned_handle_to_uv_handle(wasm_timer, hnd)
#define kk_tm_borrow_internal(hnd) kk_borrow_internal_as(wasm_timer, hnd)

EMSCRIPTEN_KEEPALIVE void wasm_timer_callback(kk_wasm_timer_t* timer_info){
kk_context_t* _ctx = kk_get_context();
kk_function_t callback = timer_info->callback;
if (timer_info->repeat_ms == 0) {
timer_info->callback = kk_function_null(kk_context());
kk_unit_t res = kk_unit_callback(callback, kk_context());
return;
} else {
Expand Down Expand Up @@ -47,30 +48,31 @@ EM_JS(void, stop_timer, (int timer, bool repeating), {

kk_uv_timer__timer kk_wasm_timer_init(kk_context_t* _ctx) {
kk_wasm_timer_t* timer_info = kk_malloc(sizeof(kk_wasm_timer_t), kk_context());
kk_uv_timer__timer t = uv_handle_to_owned_kk_handle(timer_info, kk_handle_free, timer, Timer);
kk_box_t timer_box = kk_cptr_raw_box(&kk_handle_free, (void*)timer_info, kk_context());
kk_uv_timer__timer t = kk_uv_timer__new_Timer(timer_box, kk_context());
timer_info->callback = kk_function_null(kk_context());
return t;
}

kk_unit_t kk_wasm_timer_finish(kk_uv_timer__timer timer, kk_context_t* _ctx) {
kk_wasm_timer_t* timer_info = kk_tm_to_uv(timer);
if (kk_likely(!kk_function_is_null(timer_info->callback, kk_context()))) {
kk_function_drop(timer_info->callback, kk_context());
}
kk_uv_timer__timer_drop(timer, kk_context());
return kk_Unit;
}

kk_unit_t kk_wasm_timer_stop(kk_uv_timer__timer timer, kk_context_t* _ctx) {
kk_wasm_timer_t* timer_info = kk_tm_to_uv(timer);
kk_wasm_timer_t* timer_info = kk_tm_borrow_internal(timer);
if (kk_likely(timer_info->timer != 0)) {
stop_timer(timer_info->timer, timer_info->repeat_ms != 0);
}
if (kk_likely(!kk_function_is_null(timer_info->callback, kk_context()))) {
kk_function_drop(timer_info->callback, kk_context());
timer_info->callback = kk_function_null(kk_context());
}
return kk_Unit;
}

kk_std_core_exn__error kk_wasm_timer_start(kk_uv_timer__timer timer, int64_t timeout, int64_t repeat, kk_function_t callback, kk_context_t* _ctx) {
kk_wasm_timer_t* timer_info = kk_tm_to_uv(timer);
kk_wasm_timer_t* timer_info = kk_tm_borrow_internal(timer);
if (kk_unlikely(!kk_function_is_null(timer_info->callback, kk_context()))) {
// If there's already a callback, the timer is still busy on a previous request
kk_function_drop(callback, kk_context());
return kk_uv_error_from_errno(UV_EBUSY, kk_context());
}
timer_info->callback = callback;
timer_info->repeat_ms = repeat;
timer_info->timer = start_timer(timer_info, timeout, repeat);
Expand All @@ -79,33 +81,26 @@ kk_std_core_exn__error kk_wasm_timer_start(kk_uv_timer__timer timer, int64_t tim

#else

#define kk_tm_to_uv(hnd) kk_owned_handle_to_uv_handle(timer, hnd)
#define kk_tm_borrow_internal(hnd) kk_borrow_internal_as(timer, hnd)

// Initialize the timer handle
kk_uv_timer__timer kk_libuv_timer_init(kk_context_t* _ctx) {
kk_timer_t* handle = kk_malloc(sizeof(kk_timer_t), kk_context());
handle->callback = kk_function_null(kk_context());
// Wrap the uv / kk struct in a reference counted box value type
kk_uv_timer__timer t = uv_handle_to_owned_kk_handle(handle, kk_timer_free, timer, Timer);
uv_timer_init(uvloop(), (uv_timer_t*)handle); // Timer initialization never fails
kk_uv_timer__timer t = kk_uv_timer__new_Timer(kk_timer_box(handle, _ctx), _ctx);
uv_timer_init(uvloop(), &handle->uv); // Timer initialization never fails
return t;
}

// Stop / pause the timer (doesn't clean up) - the timer can be restarted with the same callback with kk_libuv_timer_again
// Stop timer and remove callback - the timer can be reused with kk_libuv_timer_start
kk_unit_t kk_libuv_timer_stop(kk_uv_timer__timer timer, kk_context_t* _ctx) {
uv_timer_t* uv_timer = (uv_timer_t*)kk_tm_to_uv(timer);
uv_timer_stop(uv_timer);
return kk_Unit;
}

// Actually clean up the timer
// This drops the callback first in case it is holding onto the timer - as it does for uv/timer/timer()
kk_unit_t kk_libuv_timer_finish(kk_uv_timer__timer timer, kk_context_t* _ctx) {
kk_timer_t* kk_timer = kk_tm_to_uv(timer);
kk_timer_t* kk_timer = kk_tm_borrow_internal(timer);
if (kk_likely(!kk_function_is_null(kk_timer->callback, kk_context()))) {
kk_function_drop(kk_timer->callback, kk_context());
kk_timer->callback = kk_function_null(kk_context());
}
kk_uv_timer__timer_drop(timer, kk_context());
uv_timer_stop(&kk_timer->uv);
return kk_Unit;
}

Expand All @@ -114,47 +109,35 @@ void kk_uv_timer_unit_callback(uv_timer_t* uv_timer) {
kk_context_t* _ctx = kk_get_context();
kk_timer_t* kk_timer = (kk_timer_t*)uv_timer;
kk_function_t callback = kk_timer->callback; // Get the callback
if (uv_timer_get_repeat(uv_timer) == 0) { // If this is a one-shot timer, just call the callback
kk_unit_callback(callback, kk_context());
return;
if (uv_timer_get_repeat(uv_timer) == 0) { // If this is a one-shot timer, remove it
kk_timer->callback = kk_function_null(kk_context());
} else { // Otherwise, we need to dup the callback, as it will be called again
callback = kk_function_dup(callback, kk_context());
kk_unit_callback(callback, kk_context());
return;
}
kk_unit_callback(callback, kk_context());
}

kk_std_core_exn__error kk_libuv_timer_start(kk_uv_timer__timer timer, int64_t timeout, int64_t repeat, kk_function_t callback, kk_context_t* _ctx) {
kk_timer_t* uv_timer = kk_tm_to_uv(timer);
// TODO: Drop previous callback if any?
uv_timer->callback = callback;
int status = uv_timer_start((uv_timer_t*)uv_timer, kk_uv_timer_unit_callback, timeout, repeat);
// On error, report, and drop callback
kk_uv_check_err_drops(status, {
uv_timer->callback = kk_function_null(kk_context());
kk_function_drop(callback, kk_context());
})
}

kk_std_core_exn__error kk_libuv_timer_again(kk_uv_timer__timer timer, kk_context_t* _ctx) {
int status = uv_timer_again((uv_timer_t*)kk_tm_to_uv(timer));
kk_uv_check(status)
}
kk_timer_t* uv_timer = kk_tm_borrow_internal(timer);
int status = UV_OK;

kk_unit_t kk_libuv_timer_set_repeat(kk_uv_timer__timer timer, int64_t repeat, kk_context_t* _ctx) {
uv_timer_set_repeat((uv_timer_t*)kk_tm_to_uv(timer), repeat);
return kk_Unit;
}
if (kk_unlikely(!kk_function_is_null(uv_timer->callback, kk_context()))) {
// If there's already a callback, the timer is still busy with a previous request
status = UV_EBUSY;
} else {
uv_timer->callback = callback;
status = uv_timer_start((uv_timer_t*)uv_timer, kk_uv_timer_unit_callback, timeout, repeat);
}

int64_t kk_libuv_timer_get_repeat(kk_uv_timer__timer timer, kk_context_t* _ctx) {
uint64_t repeat = uv_timer_get_repeat((uv_timer_t*)kk_tm_to_uv(timer));
return repeat;
if (status != UV_OK) {
uv_timer->callback = kk_function_null(kk_context());
kk_function_drop(callback, kk_context());
return kk_uv_error_from_errno(status, kk_context());
} else {
return kk_std_core_exn__new_Ok(kk_unit_box(kk_Unit), kk_context());
}
}

int64_t kk_libuv_timer_get_due_in(kk_uv_timer__timer timer, kk_context_t* _ctx) {
uint64_t due_in = uv_timer_get_due_in((uv_timer_t*)kk_tm_to_uv(timer));
return due_in;
}
#endif

//////////////////////////////////////////////////////
Expand All @@ -177,50 +160,10 @@ kk_unit_t kk_timer_stop(kk_uv_timer__timer timer, kk_context_t* _ctx) {
#endif
}

kk_unit_t kk_timer_finish(kk_uv_timer__timer timer, kk_context_t* _ctx) {
#ifdef __EMSCRIPTEN__
return kk_wasm_timer_finish(timer, kk_context());
#else
return kk_libuv_timer_finish(timer, kk_context());
#endif
}

kk_std_core_exn__error kk_timer_start(kk_uv_timer__timer timer, int64_t timeout, int64_t repeat, kk_function_t callback, kk_context_t* _ctx) {
#ifdef __EMSCRIPTEN__
return kk_wasm_timer_start(timer, timeout, repeat, callback, kk_context());
#else
return kk_libuv_timer_start(timer, timeout, repeat, callback, kk_context());
#endif
}

kk_std_core_exn__error kk_timer_again(kk_uv_timer__timer timer, kk_context_t* _ctx) {
#ifdef __EMSCRIPTEN__
return kk_std_core_exn__new_Ok(kk_unit_box(kk_Unit), kk_context());
#else
return kk_libuv_timer_again(timer, kk_context());
#endif
}

kk_unit_t kk_timer_set_repeat(kk_uv_timer__timer timer, int64_t repeat, kk_context_t* _ctx) {
#ifdef __EMSCRIPTEN__
return kk_Unit;
#else
return kk_libuv_timer_set_repeat(timer, repeat, kk_context());
#endif
}

int64_t kk_timer_get_repeat(kk_uv_timer__timer timer, kk_context_t* _ctx) {
#ifdef __EMSCRIPTEN__
return -1;
#else
return kk_libuv_timer_get_repeat(timer, kk_context());
#endif
}

int64_t kk_timer_get_due_in(kk_uv_timer__timer timer, kk_context_t* _ctx) {
#ifdef __EMSCRIPTEN__
return -1;
#else
return kk_libuv_timer_get_due_in(timer, kk_context());
#endif
}
2 changes: 1 addition & 1 deletion lib/uv/inline/utils.c
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ static kk_uv_utils__uv_status_code kk_uv_status_to_status_code(int32_t status, k
}

// Map a libuv status code to a Koka Error value with uv status code enum
kk_std_core_exn__error kk_uv_async_error_from_errno( int err, kk_context_t* _ctx ) {
kk_std_core_exn__error kk_uv_error_from_errno( int err, kk_context_t* _ctx ) {
kk_uv_utils__uv_status_code code = kk_uv_status_to_status_code(err, _ctx);
kk_string_t msg = kk_uv_utils_message(code, _ctx);
return kk_std_core_exn__new_Error( kk_std_core_exn__new_Exception( msg, kk_uv_utils__new_AsyncExn(kk_reuse_null, 0, code, _ctx), _ctx), _ctx );
Expand Down
Loading