From 1e6a82416ae821a1cb7eebe3274e1484284411aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Mon, 22 Jun 2026 18:39:26 +0200 Subject: [PATCH 1/2] Add c-ares async DNS resolve for non-blocking connect MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When c-ares is enabled, the async connect path defers DNS until the event adapter is attached. The libevent adapter initiates async DNS on attach, with c-ares fds and timers driven by the event loop. On DNS completion, socket() + connect() proceed as normal and the fd is registered with the event loop for connect completion. Adapters without c-ares hooks fall back to blocking DNS. Signed-off-by: Björn Svensson --- .github/wordlist.txt | 2 + CMakeLists.txt | 2 +- README.md | 2 +- docs/standalone.md | 19 ++ include/valkey/adapters/ae.h | 2 + include/valkey/adapters/glib.h | 1 + include/valkey/adapters/ivykis.h | 2 + include/valkey/adapters/libev.h | 2 + include/valkey/adapters/libevent.h | 131 ++++++++++- include/valkey/adapters/libhv.h | 2 + include/valkey/adapters/libsdevent.h | 2 + include/valkey/adapters/libuv.h | 2 + include/valkey/adapters/macosx.h | 2 + include/valkey/adapters/poll.h | 2 + include/valkey/adapters/qt.h | 1 + include/valkey/adapters/valkeymoduleapi.h | 2 + include/valkey/async.h | 33 +++ include/valkey/net.h | 4 + include/valkey/valkey.h | 3 + src/async.c | 13 +- src/async_private.h | 28 ++- src/dns.c | 260 ++++++++++++++++++++++ src/dns.h | 17 ++ src/net.c | 83 +++++++ tests/CMakeLists.txt | 6 + tests/ct_async_dns.c | 132 +++++++++++ 26 files changed, 736 insertions(+), 19 deletions(-) create mode 100644 tests/ct_async_dns.c diff --git a/.github/wordlist.txt b/.github/wordlist.txt index 058ad828..47a3f601 100644 --- a/.github/wordlist.txt +++ b/.github/wordlist.txt @@ -6,6 +6,7 @@ allocators antirez api APIs +async ASYNC asynchronous atomicity @@ -36,6 +37,7 @@ DNS ElastiCache extensibility failover +fds FPM FreeBSD getaddrinfo diff --git a/CMakeLists.txt b/CMakeLists.txt index f6e6b6a8..2fe75d08 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -369,7 +369,7 @@ if(NOT DISABLE_TESTS) target_compile_definitions(valkey_unittest PRIVATE ${valkey_compile_definitions}) endif() if(ENABLE_CARES AND NOT WIN32) - target_compile_definitions(valkey_unittest PRIVATE VALKEY_USE_CARES) + target_compile_definitions(valkey_unittest PUBLIC VALKEY_USE_CARES) endif() if(valkey_link_libraries) target_link_libraries(valkey_unittest PUBLIC ${valkey_link_libraries}) diff --git a/README.md b/README.md index 2705d4b5..a4aacc9c 100644 --- a/README.md +++ b/README.md @@ -21,7 +21,7 @@ Libvalkey is the official C client for the [Valkey](https://valkey.io) database. - Supports both `RESP2` and `RESP3` protocol versions. - Supports both synchronous and asynchronous operation. - Optional support for `MPTCP`, `TLS` and `RDMA` connections. -- Optional timeout-bounded DNS resolution via [c-ares](https://github.com/c-ares/c-ares). +- Optional non-blocking DNS resolution via [c-ares](https://github.com/c-ares/c-ares) with timeout support. - Asynchronous API with several event libraries to choose from. - Supports both standalone and cluster mode operation. - Can be compiled with either `make` or `CMake`. diff --git a/docs/standalone.md b/docs/standalone.md index 82b666e0..fadc4aae 100644 --- a/docs/standalone.md +++ b/docs/standalone.md @@ -49,6 +49,10 @@ When a hostname resolves to multiple addresses, libvalkey will try each address When built with c-ares support (`USE_CARES=1` / `-DENABLE_CARES=1`), DNS resolution uses c-ares instead of `getaddrinfo()`. This provides timeout-bounded DNS resolution using `connect_timeout` (defaulting to 5 seconds if unset), preventing indefinite hangs when DNS servers are slow or unreachable. +For the synchronous API, DNS resolution blocks with a `poll()` loop bounded by the timeout. +For the asynchronous API with the libevent adapter, DNS is fully non-blocking; c-ares fds are registered with the event loop. +Other adapters use a blocking DNS fallback that is still timeout-bounded. + c-ares support is available on Linux, macOS, and FreeBSD (not Windows). ```c @@ -330,6 +334,21 @@ The asynchronous context _should_ hold a connect callback function that is calle It _can_ also hold a disconnect callback function that is called when the connection is disconnected (either because of an error or per user request). The context object is always freed after the disconnect callback fired. +#### Custom adapters and c-ares + +When built with c-ares, the async connect defers DNS until the adapter is attached. +All bundled adapters handle this automatically. +If you implement a custom adapter, either add `VALKEY_DNS_BLOCKING_FALLBACK(ac)` for a simple blocking fallback, or implement the c-ares hooks (`addCaresSocket`, `delCaresSocket`, `scheduleCaresTimer`) for fully non-blocking DNS; see the libevent adapter for a reference implementation. + +```c +static int myAdapterAttach(valkeyAsyncContext *ac, ...) { + if (ac->ev.data != NULL) + return VALKEY_ERR; + VALKEY_DNS_BLOCKING_FALLBACK(ac); + // ... use ac->c.fd normally ... +} +``` + ### Executing commands Executing commands in an asynchronous context work similarly to the synchronous context, except that you can pass a callback that will be invoked when the reply is received. diff --git a/include/valkey/adapters/ae.h b/include/valkey/adapters/ae.h index 7c3df00c..2ec1b388 100644 --- a/include/valkey/adapters/ae.h +++ b/include/valkey/adapters/ae.h @@ -113,6 +113,8 @@ static int valkeyAeAttach(aeEventLoop *loop, valkeyAsyncContext *ac) { if (ac->ev.data != NULL) return VALKEY_ERR; + VALKEY_DNS_BLOCKING_FALLBACK(ac); + /* Create container for context and r/w events */ e = (valkeyAeEvents *)vk_malloc(sizeof(*e)); if (e == NULL) diff --git a/include/valkey/adapters/glib.h b/include/valkey/adapters/glib.h index b471b65b..27db3278 100644 --- a/include/valkey/adapters/glib.h +++ b/include/valkey/adapters/glib.h @@ -146,6 +146,7 @@ valkey_source_new(valkeyAsyncContext *ac) { /* Internal adapter function with correct function signature. */ static int valkeyGlibAttachAdapter(valkeyAsyncContext *ac, void *context) { + VALKEY_DNS_BLOCKING_FALLBACK(ac); if (g_source_attach(valkey_source_new(ac), (GMainContext *)context) > 0) { return VALKEY_OK; } diff --git a/include/valkey/adapters/ivykis.h b/include/valkey/adapters/ivykis.h index a0bf0b82..3c14d5f0 100644 --- a/include/valkey/adapters/ivykis.h +++ b/include/valkey/adapters/ivykis.h @@ -56,6 +56,8 @@ static int valkeyIvykisAttach(valkeyAsyncContext *ac) { if (ac->ev.data != NULL) return VALKEY_ERR; + VALKEY_DNS_BLOCKING_FALLBACK(ac); + /* Create container for context and r/w events */ e = (valkeyIvykisEvents *)vk_malloc(sizeof(*e)); if (e == NULL) diff --git a/include/valkey/adapters/libev.h b/include/valkey/adapters/libev.h index cc96d8b6..11309b8b 100644 --- a/include/valkey/adapters/libev.h +++ b/include/valkey/adapters/libev.h @@ -158,6 +158,8 @@ static int valkeyLibevAttach(EV_P_ valkeyAsyncContext *ac) { if (ac->ev.data != NULL) return VALKEY_ERR; + VALKEY_DNS_BLOCKING_FALLBACK(ac); + /* Create container for context and r/w events */ e = (valkeyLibevEvents *)vk_calloc(1, sizeof(*e)); if (e == NULL) diff --git a/include/valkey/adapters/libevent.h b/include/valkey/adapters/libevent.h index c063486c..eb97d5cf 100644 --- a/include/valkey/adapters/libevent.h +++ b/include/valkey/adapters/libevent.h @@ -39,6 +39,16 @@ #define VALKEY_LIBEVENT_DELETED 0x01 #define VALKEY_LIBEVENT_ENTERED 0x02 +#ifdef VALKEY_USE_CARES + +#define VALKEY_LIBEVENT_MAX_CARES_FDS 16 + +typedef struct valkeyLibeventCaresEvent { + int fd; + struct event *ev; +} valkeyLibeventCaresEvent; +#endif + typedef struct valkeyLibeventEvents { valkeyAsyncContext *context; struct event *ev; @@ -46,6 +56,11 @@ typedef struct valkeyLibeventEvents { struct timeval tv; short flags; short state; +#ifdef VALKEY_USE_CARES + valkeyLibeventCaresEvent cares_fds[VALKEY_LIBEVENT_MAX_CARES_FDS]; + int cares_nfds; + struct event *cares_timer; +#endif } valkeyLibeventEvents; static void valkeyLibeventDestroy(valkeyLibeventEvents *e) { @@ -100,7 +115,11 @@ static void valkeyLibeventUpdate(void *privdata, short flag, int isRemove) { } } - event_del(e->ev); + if (e->ev) { + event_del(e->ev); + } else { + e->ev = event_new(e->base, e->context->c.fd, 0, valkeyLibeventHandler, privdata); + } event_assign(e->ev, e->base, e->context->c.fd, e->flags | EV_PERSIST, valkeyLibeventHandler, privdata); event_add(e->ev, tv); @@ -127,9 +146,22 @@ static void valkeyLibeventCleanup(void *privdata) { if (!e) { return; } - event_del(e->ev); - event_free(e->ev); - e->ev = NULL; + if (e->ev) { + event_del(e->ev); + event_free(e->ev); + e->ev = NULL; + } +#ifdef VALKEY_USE_CARES + for (int i = 0; i < e->cares_nfds; i++) { + if (e->cares_fds[i].ev) + event_free(e->cares_fds[i].ev); + } + e->cares_nfds = 0; + if (e->cares_timer) { + event_free(e->cares_timer); + e->cares_timer = NULL; + } +#endif if (e->state & VALKEY_LIBEVENT_ENTERED) { e->state |= VALKEY_LIBEVENT_DELETED; @@ -146,6 +178,76 @@ static void valkeyLibeventSetTimeout(void *privdata, struct timeval tv) { valkeyLibeventUpdate(e, flags, 0); } +#ifdef VALKEY_USE_CARES +static void valkeyLibeventCaresHandler(evutil_socket_t fd, short event, void *arg) { + valkeyLibeventEvents *e = (valkeyLibeventEvents *)arg; + int readable = (event & EV_READ) != 0; + int writable = (event & EV_WRITE) != 0; + valkeyResolveAsyncHandleEvent(e->context, (int)fd, readable, writable); +} + +static void valkeyLibeventCaresTimerHandler(evutil_socket_t fd, short event, void *arg) { + (void)fd; + (void)event; + valkeyLibeventEvents *e = (valkeyLibeventEvents *)arg; + valkeyResolveAsyncHandleTimer(e->context); +} + +static void valkeyLibeventAddCaresSocket(void *privdata, int fd, int readable, int writable) { + valkeyLibeventEvents *e = (valkeyLibeventEvents *)privdata; + int i; + + /* Find existing or add new. */ + for (i = 0; i < e->cares_nfds; i++) { + if (e->cares_fds[i].fd == fd) + break; + } + if (i == e->cares_nfds) { + if (e->cares_nfds >= VALKEY_LIBEVENT_MAX_CARES_FDS) + return; + e->cares_fds[i].fd = fd; + e->cares_fds[i].ev = NULL; + e->cares_nfds++; + } + + short flags = EV_PERSIST; + if (readable) + flags |= EV_READ; + if (writable) + flags |= EV_WRITE; + + if (e->cares_fds[i].ev) + event_free(e->cares_fds[i].ev); + e->cares_fds[i].ev = event_new(e->base, fd, flags, valkeyLibeventCaresHandler, e); + event_add(e->cares_fds[i].ev, NULL); +} + +static void valkeyLibeventDelCaresSocket(void *privdata, int fd) { + valkeyLibeventEvents *e = (valkeyLibeventEvents *)privdata; + + for (int i = 0; i < e->cares_nfds; i++) { + if (e->cares_fds[i].fd == fd) { + if (e->cares_fds[i].ev) { + event_free(e->cares_fds[i].ev); + e->cares_fds[i].ev = NULL; + } + e->cares_fds[i] = e->cares_fds[e->cares_nfds - 1]; + e->cares_nfds--; + break; + } + } +} + +static void valkeyLibeventScheduleCaresTimer(void *privdata, struct timeval tv) { + valkeyLibeventEvents *e = (valkeyLibeventEvents *)privdata; + + if (e->cares_timer == NULL) + e->cares_timer = evtimer_new(e->base, valkeyLibeventCaresTimerHandler, e); + evtimer_del(e->cares_timer); + evtimer_add(e->cares_timer, &tv); +} +#endif /* VALKEY_USE_CARES */ + static int valkeyLibeventAttach(valkeyAsyncContext *ac, struct event_base *base) { valkeyContext *c = &(ac->c); valkeyLibeventEvents *e; @@ -168,11 +270,30 @@ static int valkeyLibeventAttach(valkeyAsyncContext *ac, struct event_base *base) ac->ev.delWrite = valkeyLibeventDelWrite; ac->ev.cleanup = valkeyLibeventCleanup; ac->ev.scheduleTimer = valkeyLibeventSetTimeout; +#ifdef VALKEY_USE_CARES + ac->ev.addCaresSocket = valkeyLibeventAddCaresSocket; + ac->ev.delCaresSocket = valkeyLibeventDelCaresSocket; + ac->ev.scheduleCaresTimer = valkeyLibeventScheduleCaresTimer; +#endif ac->ev.data = e; /* Initialize and install read/write events */ - e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, valkeyLibeventHandler, e); e->base = base; +#ifdef VALKEY_USE_CARES + if (c->flags & VALKEY_DNS_PENDING) { + /* DNS is deferred — no fd yet. Initiate async DNS now that + * the adapter is attached and can drive c-ares fds. */ + e->ev = NULL; + if (valkeyResolveAsyncStart(ac, c->tcp.host, c->tcp.port) != VALKEY_OK) { + vk_free(e); + ac->ev.data = NULL; + return VALKEY_ERR; + } + } else +#endif + { + e->ev = event_new(base, c->fd, EV_READ | EV_WRITE, valkeyLibeventHandler, e); + } return VALKEY_OK; } diff --git a/include/valkey/adapters/libhv.h b/include/valkey/adapters/libhv.h index d73ec1a2..dd243242 100644 --- a/include/valkey/adapters/libhv.h +++ b/include/valkey/adapters/libhv.h @@ -95,6 +95,8 @@ static int valkeyLibhvAttach(valkeyAsyncContext *ac, hloop_t *loop) { return VALKEY_ERR; } + VALKEY_DNS_BLOCKING_FALLBACK(ac); + /* Create container struct to keep track of our io and any timer */ events = (valkeyLibhvEvents *)vk_malloc(sizeof(*events)); if (events == NULL) { diff --git a/include/valkey/adapters/libsdevent.h b/include/valkey/adapters/libsdevent.h index 51151861..fcadb599 100644 --- a/include/valkey/adapters/libsdevent.h +++ b/include/valkey/adapters/libsdevent.h @@ -155,6 +155,8 @@ static int valkeyLibsdeventAttach(valkeyAsyncContext *ac, struct sd_event *event if (ac->ev.data != NULL) return VALKEY_ERR; + VALKEY_DNS_BLOCKING_FALLBACK(ac); + /* Create container for context and r/w events */ e = (valkeyLibsdeventEvents *)vk_calloc(1, sizeof(*e)); if (e == NULL) diff --git a/include/valkey/adapters/libuv.h b/include/valkey/adapters/libuv.h index b9ad0f91..a4a986e4 100644 --- a/include/valkey/adapters/libuv.h +++ b/include/valkey/adapters/libuv.h @@ -172,6 +172,8 @@ static int valkeyLibuvAttach(valkeyAsyncContext *ac, uv_loop_t *loop) { return VALKEY_ERR; } + VALKEY_DNS_BLOCKING_FALLBACK(ac); + ac->ev.addRead = valkeyLibuvAddRead; ac->ev.delRead = valkeyLibuvDelRead; ac->ev.addWrite = valkeyLibuvAddWrite; diff --git a/include/valkey/adapters/macosx.h b/include/valkey/adapters/macosx.h index 78164413..5caf7944 100644 --- a/include/valkey/adapters/macosx.h +++ b/include/valkey/adapters/macosx.h @@ -110,6 +110,8 @@ static int valkeyMacOSAttach(valkeyAsyncContext *valkeyAsyncCtx, CFRunLoopRef ru if (valkeyAsyncCtx->ev.data != NULL) return VALKEY_ERR; + VALKEY_DNS_BLOCKING_FALLBACK(valkeyAsyncCtx); + ValkeyRunLoop *valkeyRunLoop = (ValkeyRunLoop *)vk_calloc(1, sizeof(ValkeyRunLoop)); if (valkeyRunLoop == NULL) return VALKEY_ERR; diff --git a/include/valkey/adapters/poll.h b/include/valkey/adapters/poll.h index 97bbd605..3435c525 100644 --- a/include/valkey/adapters/poll.h +++ b/include/valkey/adapters/poll.h @@ -172,6 +172,8 @@ static int valkeyPollAttach(valkeyAsyncContext *ac) { if (ac->ev.data != NULL) return VALKEY_ERR; + VALKEY_DNS_BLOCKING_FALLBACK(ac); + /* Create container for context and r/w events */ e = (valkeyPollEvents *)vk_malloc(sizeof(*e)); if (e == NULL) diff --git a/include/valkey/adapters/qt.h b/include/valkey/adapters/qt.h index 0bef4150..c01cb6ac 100644 --- a/include/valkey/adapters/qt.h +++ b/include/valkey/adapters/qt.h @@ -78,6 +78,7 @@ class ValkeyQtAdapter : public QObject { if (ac->ev.data != NULL) { return VALKEY_ERR; } + VALKEY_DNS_BLOCKING_FALLBACK(ac); m_ctx = ac; m_ctx->ev.data = this; m_ctx->ev.addRead = ValkeyQtAddRead; diff --git a/include/valkey/adapters/valkeymoduleapi.h b/include/valkey/adapters/valkeymoduleapi.h index 89906fcb..d329464f 100644 --- a/include/valkey/adapters/valkeymoduleapi.h +++ b/include/valkey/adapters/valkeymoduleapi.h @@ -117,6 +117,8 @@ static inline int valkeyModuleAttach(valkeyAsyncContext *ac, ValkeyModuleCtx *mo if (ac->ev.data != NULL) return VALKEY_ERR; + VALKEY_DNS_BLOCKING_FALLBACK(ac); + /* Create container for context and r/w events */ e = (valkeyModuleEvents *)vk_malloc(sizeof(*e)); if (e == NULL) diff --git a/include/valkey/async.h b/include/valkey/async.h index 8c7d07a3..f4dee90e 100644 --- a/include/valkey/async.h +++ b/include/valkey/async.h @@ -94,6 +94,11 @@ typedef struct valkeyAsyncContext { void (*delWrite)(void *privdata); void (*cleanup)(void *privdata); void (*scheduleTimer)(void *privdata, struct timeval tv); + /* c-ares async DNS hooks. If addCaresSocket is NULL, the blocking + * fallback (valkeyResolveSync with timeout) is used instead. */ + void (*addCaresSocket)(void *privdata, int fd, int readable, int writable); + void (*delCaresSocket)(void *privdata, int fd); + void (*scheduleCaresTimer)(void *privdata, struct timeval tv); } ev; /* Called when either the connection is terminated due to an error or per @@ -121,6 +126,9 @@ typedef struct valkeyAsyncContext { /* Any configured RESP3 PUSH handler */ valkeyAsyncPushFn *push_cb; + + /* Async DNS resolution state (used when c-ares is enabled). */ + void *dns_state; } valkeyAsyncContext; LIBVALKEY_API valkeyAsyncContext *valkeyAsyncConnectWithOptions(const valkeyOptions *options); @@ -151,6 +159,31 @@ LIBVALKEY_API int valkeyAsyncCommand(valkeyAsyncContext *ac, valkeyCallbackFn *f LIBVALKEY_API int valkeyAsyncCommandArgv(valkeyAsyncContext *ac, valkeyCallbackFn *fn, void *privdata, int argc, const char **argv, const size_t *argvlen); LIBVALKEY_API int valkeyAsyncFormattedCommand(valkeyAsyncContext *ac, valkeyCallbackFn *fn, void *privdata, const char *cmd, size_t len); +#ifdef VALKEY_USE_CARES +/* Async DNS resolution (c-ares integration). Used by event-loop adapters. */ +LIBVALKEY_API int valkeyResolveAsyncStart(valkeyAsyncContext *ac, const char *host, int port); +LIBVALKEY_API void valkeyResolveAsyncHandleEvent(valkeyAsyncContext *ac, int fd, int readable, int writable); +LIBVALKEY_API void valkeyResolveAsyncHandleTimer(valkeyAsyncContext *ac); +LIBVALKEY_API void valkeyResolveAsyncFree(valkeyAsyncContext *ac); + +/* Blocking DNS fallback for adapters that don't implement c-ares hooks. + * Resolves DNS synchronously, performs connect(), clears DNS_PENDING, + * and sets ac->c.fd. Returns VALKEY_OK or VALKEY_ERR. */ +LIBVALKEY_API int valkeyResolveDnsPending(valkeyAsyncContext *ac); + +/* Convenience macro for adapters that don't support async DNS. + * Place after the "already attached" check in the attach function. */ +#define VALKEY_DNS_BLOCKING_FALLBACK(ac) \ + do { \ + if ((ac)->c.flags & VALKEY_DNS_PENDING) { \ + if (valkeyResolveDnsPending(ac) != VALKEY_OK) \ + return VALKEY_ERR; \ + } \ + } while (0) +#else +#define VALKEY_DNS_BLOCKING_FALLBACK(ac) ((void)0) +#endif + #ifdef __cplusplus } #endif diff --git a/include/valkey/net.h b/include/valkey/net.h index 7e844cb8..1570170a 100644 --- a/include/valkey/net.h +++ b/include/valkey/net.h @@ -38,12 +38,16 @@ #include "valkey.h" #include "visibility.h" +#include +struct addrinfo; + LIBVALKEY_API void valkeyNetClose(valkeyContext *c); LIBVALKEY_API int valkeyHasMptcp(void); LIBVALKEY_API int valkeyCheckSocketError(valkeyContext *c); LIBVALKEY_API int valkeyTcpSetTimeout(valkeyContext *c, const struct timeval tv); LIBVALKEY_API int valkeyContextConnectTcp(valkeyContext *c, const valkeyOptions *options); +LIBVALKEY_API int valkeyTcpConnectNonBlock(valkeyContext *c, struct addrinfo *servinfo); LIBVALKEY_API int valkeyKeepAlive(valkeyContext *c, int interval); LIBVALKEY_API int valkeyCheckConnectDone(valkeyContext *c, int *completed); diff --git a/include/valkey/valkey.h b/include/valkey/valkey.h index 586cdea8..e4b32af4 100644 --- a/include/valkey/valkey.h +++ b/include/valkey/valkey.h @@ -103,6 +103,9 @@ typedef SSIZE_T ssize_t; /* Flag specific to use Multipath TCP (MPTCP) */ #define VALKEY_MPTCP 0x2000 +/* Flag indicating async DNS is pending (connection deferred until adapter attached). */ +#define VALKEY_DNS_PENDING 0x4000 + #define VALKEY_KEEPALIVE_INTERVAL 15 /* seconds */ /* number of times we retry to connect in the case of EADDRNOTAVAIL and diff --git a/src/async.c b/src/async.c index 3f4d56bc..5ebe8af0 100644 --- a/src/async.c +++ b/src/async.c @@ -45,6 +45,7 @@ #include "async.h" #include "async_private.h" #include "dict.h" +#include "dns.h" #include "net.h" #include "valkey_private.h" #include "vkutil.h" @@ -152,6 +153,9 @@ static valkeyAsyncContext *valkeyAsyncInitialize(valkeyContext *c) { ac->sub.schannels = schannels; ac->sub.pending_unsubs = 0; + ac->push_cb = NULL; + ac->dns_state = NULL; + return ac; oom: dictRelease(channels); @@ -162,7 +166,7 @@ static valkeyAsyncContext *valkeyAsyncInitialize(valkeyContext *c) { /* We want the error field to be accessible directly instead of requiring * an indirection to the valkeyContext struct. */ -static void valkeyAsyncCopyError(valkeyAsyncContext *ac) { +void valkeyAsyncCopyError(valkeyAsyncContext *ac) { if (!ac) return; @@ -375,6 +379,11 @@ static void valkeyAsyncFreeInternal(valkeyAsyncContext *ac) { dictRelease(ac->sub.schannels); } + /* Free any in-flight async DNS state before tearing down the event loop. */ +#ifdef VALKEY_USE_CARES + valkeyResolveAsyncFree(ac); +#endif + /* Signal event lib to clean up */ _EL_CLEANUP(ac); @@ -659,7 +668,7 @@ void valkeyProcessCallbacks(valkeyAsyncContext *ac) { valkeyAsyncDisconnectInternal(ac); } -static void valkeyAsyncHandleConnectFailure(valkeyAsyncContext *ac) { +void valkeyAsyncHandleConnectFailure(valkeyAsyncContext *ac) { valkeyRunConnectCallback(ac, VALKEY_ERR); valkeyAsyncDisconnectInternal(ac); } diff --git a/src/async_private.h b/src/async_private.h index 30f062d9..10c93787 100644 --- a/src/async_private.h +++ b/src/async_private.h @@ -33,22 +33,26 @@ #define VALKEY_ASYNC_PRIVATE_H #include "visibility.h" -#define _EL_ADD_READ(ctx) \ - do { \ - refreshTimeout(ctx); \ - if ((ctx)->ev.addRead) \ - (ctx)->ev.addRead((ctx)->ev.data); \ +#define _EL_ADD_READ(ctx) \ + do { \ + if ((ctx)->c.flags & VALKEY_DNS_PENDING) \ + break; \ + refreshTimeout(ctx); \ + if ((ctx)->ev.addRead) \ + (ctx)->ev.addRead((ctx)->ev.data); \ } while (0) #define _EL_DEL_READ(ctx) \ do { \ if ((ctx)->ev.delRead) \ (ctx)->ev.delRead((ctx)->ev.data); \ } while (0) -#define _EL_ADD_WRITE(ctx) \ - do { \ - refreshTimeout(ctx); \ - if ((ctx)->ev.addWrite) \ - (ctx)->ev.addWrite((ctx)->ev.data); \ +#define _EL_ADD_WRITE(ctx) \ + do { \ + if ((ctx)->c.flags & VALKEY_DNS_PENDING) \ + break; \ + refreshTimeout(ctx); \ + if ((ctx)->ev.addWrite) \ + (ctx)->ev.addWrite((ctx)->ev.data); \ } while (0) #define _EL_DEL_WRITE(ctx) \ do { \ @@ -82,4 +86,8 @@ static inline void refreshTimeout(valkeyAsyncContext *ctx) { LIBVALKEY_API void valkeyAsyncDisconnectInternal(valkeyAsyncContext *ac); LIBVALKEY_API void valkeyProcessCallbacks(valkeyAsyncContext *ac); +/* Visible although private since required by dns.c (c-ares async connect). */ +void valkeyAsyncCopyError(valkeyAsyncContext *ac); +void valkeyAsyncHandleConnectFailure(valkeyAsyncContext *ac); + #endif /* VALKEY_ASYNC_PRIVATE_H */ diff --git a/src/dns.c b/src/dns.c index 72b5aad6..487c2a43 100644 --- a/src/dns.c +++ b/src/dns.c @@ -37,6 +37,10 @@ #ifdef VALKEY_USE_CARES #include "alloc.h" +#include "async.h" +#include "async_private.h" +#include "net.h" +#include "valkey_private.h" #include @@ -317,6 +321,262 @@ static int valkeyResolveCares(const char *host, int port, int flags, ares_destroy(channel); return rv; } + +/* --- Async DNS resolution --- */ + +typedef struct valkeyAsyncDns { + ares_channel_t *channel; + struct valkeyAsyncContext *ac; + char *host; + int port; + int flags; + int ai_family; /* Current family being tried. */ + int done; /* Set when DNS callback has fired and resolution is complete. */ + int failed; /* Set when DNS or connect failed (error is on ac->c). */ +} valkeyAsyncDns; + +static void caresAsyncSockStateCb(void *data, ares_socket_t fd, int readable, int writable) { + valkeyAsyncDns *dns = (valkeyAsyncDns *)data; + valkeyAsyncContext *ac = dns->ac; + + if (!readable && !writable) { + ac->ev.delCaresSocket(ac->ev.data, (int)fd); + } else { + ac->ev.addCaresSocket(ac->ev.data, (int)fd, readable, writable); + } +} + +static void caresAsyncScheduleTimer(valkeyAsyncDns *dns) { + valkeyAsyncContext *ac = dns->ac; + struct timeval tv, maxtv; + + maxtv.tv_sec = 1; + maxtv.tv_usec = 0; + struct timeval *tvp = ares_timeout(dns->channel, &maxtv, &tv); + ac->ev.scheduleCaresTimer(ac->ev.data, *tvp); +} + +/* Forward declaration. */ +static void caresAsyncCallback(void *arg, int status, int timeouts, struct ares_addrinfo *res); + +/* Mark DNS resolution as complete and record failure. */ +static void caresAsyncFail(valkeyAsyncDns *dns) { + valkeyAsyncContext *ac = dns->ac; + valkeyAsyncCopyError(ac); + dns->done = 1; + dns->failed = 1; +} + +static void caresAsyncConnectWithResult(valkeyAsyncDns *dns, struct ares_addrinfo *ai) { + valkeyAsyncContext *ac = dns->ac; + valkeyContext *c = &ac->c; + struct addrinfo *servinfo = NULL; + + if (caresAddrInfoToAddrInfo(ai, &servinfo) != 0) { + valkeySetError(c, VALKEY_ERR_OOM, "Out of memory"); + ares_freeaddrinfo(ai); + caresAsyncFail(dns); + return; + } + ares_freeaddrinfo(ai); + + if (valkeyTcpConnectNonBlock(c, servinfo) != VALKEY_OK) { + valkeyFreeAddrInfo(servinfo); + caresAsyncFail(dns); + return; + } + + c->flags &= ~VALKEY_DNS_PENDING; + valkeyFreeAddrInfo(servinfo); + dns->done = 1; + _EL_ADD_WRITE(ac); +} + +static void caresAsyncCallback(void *arg, int status, int timeouts, struct ares_addrinfo *res) { + (void)timeouts; + valkeyAsyncDns *dns = (valkeyAsyncDns *)arg; + valkeyAsyncContext *ac = dns->ac; + valkeyContext *c = &ac->c; + + /* Channel is being destroyed (e.g. during valkeyAsyncFree). */ + if (status == ARES_EDESTRUCTION) { + if (res) + ares_freeaddrinfo(res); + return; + } + + if (status == ARES_SUCCESS && res) { + caresAsyncConnectWithResult(dns, res); + return; + } + + /* Retry with other family if applicable. */ + if ((status == ARES_ENOTFOUND || status == ARES_ENODATA) && + dns->ai_family != AF_UNSPEC) { + if (res) + ares_freeaddrinfo(res); + + dns->ai_family = (dns->ai_family == AF_INET) ? AF_INET6 : AF_INET; + struct ares_addrinfo_hints hints = {0}; + hints.ai_socktype = SOCK_STREAM; + hints.ai_family = dns->ai_family; + + char portstr[6]; + snprintf(portstr, sizeof(portstr), "%d", dns->port); + ares_getaddrinfo(dns->channel, dns->host, portstr, &hints, caresAsyncCallback, dns); + caresAsyncScheduleTimer(dns); + return; + } + + /* DNS failed. */ + if (res) + ares_freeaddrinfo(res); + + int eai = caresStatusToEai(status); + if (eai == EAI_MEMORY) + valkeySetError(c, VALKEY_ERR_OOM, "Out of memory"); + else + valkeySetError(c, VALKEY_ERR_OTHER, gai_strerror(eai)); + caresAsyncFail(dns); +} + +int valkeyResolveAsyncStart(struct valkeyAsyncContext *ac, const char *host, int port) { + valkeyContext *c = &ac->c; + valkeyAsyncDns *dns; + + pthread_once(&cares_init_once, valkeyCaresLibraryInit); + + dns = vk_calloc(1, sizeof(*dns)); + if (dns == NULL) + return VALKEY_ERR; + + dns->ac = ac; + dns->host = vk_strdup(host); + if (dns->host == NULL) { + vk_free(dns); + return VALKEY_ERR; + } + dns->port = port; + dns->flags = c->flags; + + /* Determine family. */ + if ((c->flags & VALKEY_PREFER_IPV6) && (c->flags & VALKEY_PREFER_IPV4)) + dns->ai_family = AF_UNSPEC; + else if (c->flags & VALKEY_PREFER_IPV6) + dns->ai_family = AF_INET6; + else if (strchr(host, ':') != NULL) + dns->ai_family = AF_INET6; /* IPv6 literal */ + else + dns->ai_family = AF_INET; + + struct ares_options opts = {0}; + opts.sock_state_cb = caresAsyncSockStateCb; + opts.sock_state_cb_data = dns; + int optmask = ARES_OPT_SOCK_STATE_CB; + + int rv = ares_init_options(&dns->channel, &opts, optmask); + if (rv != ARES_SUCCESS) { + vk_free(dns->host); + vk_free(dns); + return VALKEY_ERR; + } + + /* Store dns state in the context for later access. */ + ac->dns_state = dns; + + struct ares_addrinfo_hints hints = {0}; + hints.ai_socktype = SOCK_STREAM; + hints.ai_family = dns->ai_family; + + char portstr[6]; + snprintf(portstr, sizeof(portstr), "%d", port); + + ares_getaddrinfo(dns->channel, host, portstr, &hints, caresAsyncCallback, dns); + + /* If c-ares resolved synchronously (e.g. IP literals), the callback + * has already fired and set dns->done. Clean up now. */ + if (dns->done) { + int failed = dns->failed; + valkeyResolveAsyncFree(ac); + if (failed) + return VALKEY_ERR; + } else { + caresAsyncScheduleTimer(dns); + } + + return VALKEY_OK; +} + +void valkeyResolveAsyncHandleEvent(struct valkeyAsyncContext *ac, int fd, int readable, int writable) { + valkeyAsyncDns *dns = (valkeyAsyncDns *)ac->dns_state; + if (dns == NULL || dns->channel == NULL) + return; + ares_socket_t rfd = readable ? (ares_socket_t)fd : ARES_SOCKET_BAD; + ares_socket_t wfd = writable ? (ares_socket_t)fd : ARES_SOCKET_BAD; + ares_process_fd(dns->channel, rfd, wfd); + if (dns->done) { + int failed = dns->failed; + valkeyResolveAsyncFree(ac); + if (failed) + valkeyAsyncHandleConnectFailure(ac); + } else { + caresAsyncScheduleTimer(dns); + } +} + +void valkeyResolveAsyncHandleTimer(struct valkeyAsyncContext *ac) { + valkeyAsyncDns *dns = (valkeyAsyncDns *)ac->dns_state; + if (dns == NULL || dns->channel == NULL) + return; + ares_process_fd(dns->channel, ARES_SOCKET_BAD, ARES_SOCKET_BAD); + if (dns->done) { + int failed = dns->failed; + valkeyResolveAsyncFree(ac); + if (failed) + valkeyAsyncHandleConnectFailure(ac); + } else + caresAsyncScheduleTimer(dns); +} + +void valkeyResolveAsyncFree(struct valkeyAsyncContext *ac) { + valkeyAsyncDns *dns = (valkeyAsyncDns *)ac->dns_state; + if (dns == NULL) + return; + if (dns->channel) { + ares_destroy(dns->channel); + dns->channel = NULL; + } + vk_free(dns->host); + vk_free(dns); + ac->dns_state = NULL; +} + +int valkeyResolveDnsPending(struct valkeyAsyncContext *ac) { + valkeyContext *c = &ac->c; + long timeout_ms = 0; + valkeyConnectTimeoutMsec(c, &timeout_ms); + + struct addrinfo *servinfo = NULL; + int rv = valkeyResolveSync(c->tcp.host, c->tcp.port, c->flags, + timeout_ms, &servinfo); + if (rv != 0) { + if (rv == EAI_MEMORY) + valkeySetError(c, VALKEY_ERR_OOM, "Out of memory"); + else + valkeySetError(c, VALKEY_ERR_OTHER, gai_strerror(rv)); + return VALKEY_ERR; + } + + if (valkeyTcpConnectNonBlock(c, servinfo) != VALKEY_OK) { + valkeyFreeAddrInfo(servinfo); + return VALKEY_ERR; + } + + c->flags &= ~VALKEY_DNS_PENDING; + valkeyFreeAddrInfo(servinfo); + return VALKEY_OK; +} + #endif /* VALKEY_USE_CARES */ int valkeyResolveSync(const char *host, int port, int flags, diff --git a/src/dns.h b/src/dns.h index 8cb7b933..2314f5fd 100644 --- a/src/dns.h +++ b/src/dns.h @@ -46,6 +46,23 @@ int valkeyResolveSync(const char *host, int port, int flags, * freeaddrinfo-compatible or c-ares-allocated results. */ #ifdef VALKEY_USE_CARES void valkeyFreeAddrInfo(struct addrinfo *ai); + +struct valkeyAsyncContext; + +/* Initiate async DNS resolution. Returns VALKEY_OK if the query was started + * (completion is via the event loop), or VALKEY_ERR on immediate failure. + * When DNS completes, the callback is invoked to continue the connect flow. */ +int valkeyResolveAsyncStart(struct valkeyAsyncContext *ac, const char *host, int port); + +/* Called by the adapter when a c-ares fd is readable/writable. */ +void valkeyResolveAsyncHandleEvent(struct valkeyAsyncContext *ac, int fd, int readable, int writable); + +/* Called by the adapter when the c-ares timer fires. */ +void valkeyResolveAsyncHandleTimer(struct valkeyAsyncContext *ac); + +/* Free async DNS state. Called during context cleanup. */ +void valkeyResolveAsyncFree(struct valkeyAsyncContext *ac); + #else #define valkeyFreeAddrInfo(ai) freeaddrinfo(ai) #endif diff --git a/src/net.c b/src/net.c index e99f2a3d..43aba962 100644 --- a/src/net.c +++ b/src/net.c @@ -392,6 +392,80 @@ static int valkeyTcpGetProtocol(int is_mptcp_enabled) { } #endif /* IPPROTO_MPTCP */ +/* Try each address in servinfo, create a non-blocking socket, bind source_addr + * if set, and initiate connect(). Returns VALKEY_OK on first successful + * connect (or EINPROGRESS), VALKEY_ERR if all addresses fail. Sets c->fd. */ +int valkeyTcpConnectNonBlock(valkeyContext *c, struct addrinfo *servinfo) { + struct addrinfo *p, *bservinfo, *b; + valkeyFD s; + int rv, n; + int reuseaddr = (c->flags & VALKEY_REUSEADDR); + + for (p = servinfo; p != NULL; p = p->ai_next) { + s = socket(p->ai_family, p->ai_socktype, valkeyTcpGetProtocol(c->flags & VALKEY_MPTCP)); + if (s == VALKEY_INVALID_FD) + continue; + + c->fd = s; + if (valkeySetBlocking(c, 0) != VALKEY_OK) { + valkeyNetClose(c); + continue; + } + + if (c->tcp.source_addr) { + int bound = 0; + struct addrinfo hints = {0}; + hints.ai_family = p->ai_family; + hints.ai_socktype = p->ai_socktype; + if ((rv = getaddrinfo(c->tcp.source_addr, NULL, &hints, &bservinfo)) != 0) { + char buf[128]; + snprintf(buf, sizeof(buf), "Can't get addr: %s", gai_strerror(rv)); + valkeySetError(c, VALKEY_ERR_OTHER, buf); + valkeyNetClose(c); + return VALKEY_ERR; + } + if (reuseaddr) { + n = 1; + if (setsockopt(s, SOL_SOCKET, SO_REUSEADDR, (char *)&n, sizeof(n)) < 0) { + freeaddrinfo(bservinfo); + valkeyNetClose(c); + continue; + } + } + for (b = bservinfo; b != NULL; b = b->ai_next) { + if (bind(s, b->ai_addr, b->ai_addrlen) != -1) { + bound = 1; + break; + } + } + freeaddrinfo(bservinfo); + if (!bound) { + valkeyNetClose(c); + continue; + } + } + + vk_free(c->saddr); + c->saddr = vk_malloc(p->ai_addrlen); + if (c->saddr == NULL) { + valkeyNetClose(c); + valkeySetError(c, VALKEY_ERR_OOM, "Out of memory"); + return VALKEY_ERR; + } + memcpy(c->saddr, p->ai_addr, p->ai_addrlen); + c->addrlen = p->ai_addrlen; + + if (connect(s, p->ai_addr, p->ai_addrlen) == 0 || errno == EINPROGRESS) { + return VALKEY_OK; + } + + valkeyNetClose(c); + } + + valkeySetErrorFromErrno(c, VALKEY_ERR_IO, NULL); + return VALKEY_ERR; +} + int valkeyContextConnectTcp(valkeyContext *c, const valkeyOptions *options) { const struct timeval *timeout = options->connect_timeout; const char *addr = options->endpoint.tcp.ip; @@ -444,6 +518,15 @@ int valkeyContextConnectTcp(valkeyContext *c, const valkeyOptions *options) { c->tcp.source_addr = vk_strdup(source_addr); } +#ifdef VALKEY_USE_CARES + /* In async mode, defer DNS to after the adapter is attached so it can + * be driven by the event loop (fully non-blocking). */ + if (!(c->flags & VALKEY_BLOCK)) { + c->flags |= VALKEY_DNS_PENDING; + return VALKEY_OK; + } +#endif + /* DNS lookup */ /* TODO: Decide if DNS + TCP connect should share a single connect_timeout * budget rather than each getting the full timeout independently. */ diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index ad7c2e20..f734c329 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -143,6 +143,12 @@ if (LIBEVENT_LIBRARY) target_link_libraries(ct_specific_nodes valkey ${TLS_LIBRARY} ${LIBEVENT_LIBRARY}) add_test(NAME ct_specific_nodes COMMAND "$") + if(ENABLE_CARES AND NOT WIN32) + add_executable(ct_async_dns ct_async_dns.c test_utils.c) + target_link_libraries(ct_async_dns valkey ${TLS_LIBRARY} ${LIBEVENT_LIBRARY}) + add_test(NAME ct_async_dns COMMAND "$") + endif() + # Tests using simulated Valkey node add_executable(clusterclient clusterclient.c) target_link_libraries(clusterclient valkey ${TLS_LIBRARY}) diff --git a/tests/ct_async_dns.c b/tests/ct_async_dns.c new file mode 100644 index 00000000..f4969645 --- /dev/null +++ b/tests/ct_async_dns.c @@ -0,0 +1,132 @@ +/* + * Test for async DNS resolution with c-ares and libevent adapter. + * + * Uses "localhost" (a real hostname) to exercise the async c-ares path + * where DNS resolves via the event loop rather than synchronously. + */ + +#include "adapters/libevent.h" +#include "cluster.h" +#include "test_utils.h" +#include "valkey.h" + +#include +#include +#include + +#define CLUSTER_NODE "127.0.0.1:7000" + +/* --- Test: async cluster connect using hostname "localhost" --- */ + +static int connect_cb_called; +static int disconnect_cb_called; + +static void connectCb(valkeyAsyncContext *ac, int status) { + (void)ac; + if (status == VALKEY_OK) + connect_cb_called++; +} + +static void disconnectCb(const valkeyAsyncContext *ac, int status) { + (void)ac; + (void)status; + disconnect_cb_called++; +} + +static void setCallback(valkeyClusterAsyncContext *cc, void *r, void *privdata) { + (void)privdata; + valkeyReply *reply = (valkeyReply *)r; + ASSERT_MSG(reply != NULL, cc->errstr); + assert(reply->type == VALKEY_REPLY_STATUS); + assert(strcmp(reply->str, "OK") == 0); + valkeyClusterAsyncDisconnect(cc); +} + +/* Test a basic async command using the cluster API with libevent. + * The libevent adapter handles DNS_PENDING via async c-ares resolution. */ +static void test_async_cluster_command(void) { + struct event_base *base = event_base_new(); + + valkeyClusterOptions options = {0}; + options.initial_nodes = CLUSTER_NODE; + options.options = VALKEY_OPT_BLOCKING_INITIAL_UPDATE; + options.async_connect_callback = connectCb; + options.async_disconnect_callback = disconnectCb; + valkeyClusterOptionsUseLibevent(&options, base); + + connect_cb_called = 0; + disconnect_cb_called = 0; + + valkeyClusterAsyncContext *acc = valkeyClusterAsyncConnectWithOptions(&options); + ASSERT_MSG(acc && acc->err == 0, acc ? acc->errstr : "OOM"); + + int ret = valkeyClusterAsyncCommand(acc, setCallback, NULL, "SET dns-test hello"); + assert(ret == VALKEY_OK); + + event_base_dispatch(base); + + assert(connect_cb_called > 0); + assert(disconnect_cb_called > 0); + + valkeyClusterAsyncFree(acc); + event_base_free(base); + printf(" PASS: test_async_cluster_command\n"); +} + +/* --- Test: standalone async connect to localhost (hostname, not IP) --- */ + +static int standalone_connected; + +static void standaloneConnectCb(valkeyAsyncContext *ac, int status) { + (void)ac; + standalone_connected = (status == VALKEY_OK) ? 1 : -1; +} + +static void standaloneGetCb(valkeyAsyncContext *ac, void *r, void *privdata) { + (void)privdata; + (void)r; + valkeyAsyncDisconnect(ac); +} + +static void test_async_standalone_localhost(void) { + struct event_base *base = event_base_new(); + standalone_connected = 0; + + valkeyOptions opts = {0}; + VALKEY_OPTIONS_SET_TCP(&opts, "localhost", 7000); + + valkeyAsyncContext *ac = valkeyAsyncConnectWithOptions(&opts); + assert(ac != NULL); + if (ac->err) { + /* If localhost doesn't resolve to a server, skip gracefully. */ + printf(" SKIP: test_async_standalone_localhost (connect error: %s)\n", ac->errstr); + valkeyAsyncFree(ac); + event_base_free(base); + return; + } + + valkeyLibeventAttach(ac, base); + valkeyAsyncSetConnectCallback(ac, standaloneConnectCb); + + /* Send a PING to verify the connection works. */ + valkeyAsyncCommand(ac, standaloneGetCb, NULL, "PING"); + + event_base_dispatch(base); + + if (standalone_connected == 1) { + printf(" PASS: test_async_standalone_localhost\n"); + } else { + /* Server not running on localhost:7000. */ + printf(" SKIP: test_async_standalone_localhost (server not available)\n"); + } + + event_base_free(base); +} + +int main(void) { + printf("Testing async DNS with c-ares + libevent:\n"); + test_async_cluster_command(); + test_async_standalone_localhost(); + printf("All async DNS tests passed.\n"); + return 0; +} From 9a158239514e72e26de361268d429b7949a952ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Svensson?= Date: Wed, 24 Jun 2026 10:13:05 +0200 Subject: [PATCH 2/2] fixup: move duplicated code into function caresHintsFamily() MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Björn Svensson --- src/dns.c | 31 +++++++++++++------------------ 1 file changed, 13 insertions(+), 18 deletions(-) diff --git a/src/dns.c b/src/dns.c index 487c2a43..f219242f 100644 --- a/src/dns.c +++ b/src/dns.c @@ -66,6 +66,17 @@ static void valkeyCaresLibraryInit(void) { ares_library_init(ARES_LIB_INIT_NONE); } +/* Determine address family from context flags and hostname. */ +static int caresHintsFamily(const char *host, int flags) { + if ((flags & VALKEY_PREFER_IPV6) && (flags & VALKEY_PREFER_IPV4)) + return AF_UNSPEC; + if (flags & VALKEY_PREFER_IPV6) + return AF_INET6; + if (strchr(host, ':') != NULL) + return AF_INET6; /* IPv6 literal */ + return AF_INET; +} + static long valkeyDnsPollMillis(void) { struct timespec now; clock_gettime(CLOCK_MONOTONIC, &now); @@ -261,15 +272,7 @@ static int valkeyResolveCares(const char *host, int port, int flags, memset(&hints, 0, sizeof(hints)); hints.ai_socktype = SOCK_STREAM; - - if ((flags & VALKEY_PREFER_IPV6) && (flags & VALKEY_PREFER_IPV4)) - hints.ai_family = AF_UNSPEC; - else if (flags & VALKEY_PREFER_IPV6) - hints.ai_family = AF_INET6; - else if (strchr(host, ':') != NULL) - hints.ai_family = AF_INET6; /* IPv6 literal */ - else - hints.ai_family = AF_INET; + hints.ai_family = caresHintsFamily(host, flags); char portstr[6]; snprintf(portstr, sizeof(portstr), "%d", port); @@ -459,15 +462,7 @@ int valkeyResolveAsyncStart(struct valkeyAsyncContext *ac, const char *host, int dns->port = port; dns->flags = c->flags; - /* Determine family. */ - if ((c->flags & VALKEY_PREFER_IPV6) && (c->flags & VALKEY_PREFER_IPV4)) - dns->ai_family = AF_UNSPEC; - else if (c->flags & VALKEY_PREFER_IPV6) - dns->ai_family = AF_INET6; - else if (strchr(host, ':') != NULL) - dns->ai_family = AF_INET6; /* IPv6 literal */ - else - dns->ai_family = AF_INET; + dns->ai_family = caresHintsFamily(host, c->flags); struct ares_options opts = {0}; opts.sock_state_cb = caresAsyncSockStateCb;