Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
188 changes: 147 additions & 41 deletions src/cubeb_aaudio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,12 @@ enum class stream_state {
SHUTDOWN,
};

enum class slot_state {
FREE = 0,
IN_USE,
DESTROYING,
};

struct AAudioTimingInfo {
// The timestamp at which the audio engine last called the calback.
uint64_t tstamp;
Expand Down Expand Up @@ -322,11 +328,14 @@ struct cubeb_stream {
cubeb * context{};
void * user_ptr{};

std::atomic<bool> in_use{false};
// slot_state is only written with the stream mutex held, but stream_init
// reads it outside the mutex as an optimization to avoid locking every
// stream when searching for a free slot.
std::atomic<slot_state> slot_state{slot_state::FREE};
std::atomic<int> pending_reinit{0};
std::atomic<bool> latency_metrics_available{false};
std::atomic<int64_t> drain_target{-1};
std::atomic<stream_state> state{stream_state::INIT};
std::atomic<bool> in_data_callback{false};
triple_buffer<AAudioTimingInfo> timing_info;

AAudioStream * ostream{};
Expand Down Expand Up @@ -373,19 +382,11 @@ struct cubeb {
std::atomic<bool> waiting{false};
} state;

// streams[i].in_use signals whether a stream is used
// streams[i].slot_state signals whether a stream slot is free, in use, or
// being destroyed.
struct cubeb_stream streams[MAX_STREAMS];
};

struct AutoInCallback {
AutoInCallback(cubeb_stream * stm) : stm(stm)
{
stm->in_data_callback.store(true);
}
~AutoInCallback() { stm->in_data_callback.store(false); }
cubeb_stream * stm;
};

// Returns when aaudio_stream's state is equal to desired_state.
// poll_frequency_ns is the duration that is slept in between asking for
// state updates and getting the new state.
Expand Down Expand Up @@ -420,6 +421,25 @@ wait_for_state_change(AAudioStream * aaudio_stream,
return CUBEB_OK;
}

// Maps a transitional state to its corresponding stable state.
// Returns AAUDIO_STREAM_STATE_UNINITIALIZED if not transitional.
static aaudio_stream_state_t
get_stable_state(aaudio_stream_state_t state)
{
switch (state) {
case AAUDIO_STREAM_STATE_STARTING:
return AAUDIO_STREAM_STATE_STARTED;
case AAUDIO_STREAM_STATE_PAUSING:
return AAUDIO_STREAM_STATE_PAUSED;
case AAUDIO_STREAM_STATE_STOPPING:
return AAUDIO_STREAM_STATE_STOPPED;
case AAUDIO_STREAM_STATE_FLUSHING:
return AAUDIO_STREAM_STATE_FLUSHED;
default:
return AAUDIO_STREAM_STATE_UNINITIALIZED;
}
}

// Only allowed from state thread, while mutex on stm is locked
static void
shutdown_with_error(cubeb_stream * stm)
Expand Down Expand Up @@ -452,7 +472,6 @@ shutdown_with_error(cubeb_stream * stm)
}
}

assert(!stm->in_data_callback.load());
stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_ERROR);
stm->state.store(stream_state::SHUTDOWN);
}
Expand Down Expand Up @@ -627,16 +646,19 @@ update_state(cubeb_stream * stm)
case stream_state::STOPPING:
// If stream_stop happens while the stream is still starting, we may see
// STARTING/STARTED, ignore these and handle STATE_STOPPED once we reach
// PAUSED.
// PAUSED (for output) or STOPPED (for input, which doesn't support
// pause).
assert(!istate || istate == AAUDIO_STREAM_STATE_STARTING ||
istate == AAUDIO_STREAM_STATE_STARTED ||
istate == AAUDIO_STREAM_STATE_PAUSING ||
istate == AAUDIO_STREAM_STATE_PAUSED);
istate == AAUDIO_STREAM_STATE_STOPPING ||
istate == AAUDIO_STREAM_STATE_STOPPED);
assert(!ostate || ostate == AAUDIO_STREAM_STATE_STARTING ||
ostate == AAUDIO_STREAM_STATE_STARTED ||
ostate == AAUDIO_STREAM_STATE_PAUSING ||
ostate == AAUDIO_STREAM_STATE_PAUSED);
if ((!istate || istate == AAUDIO_STREAM_STATE_PAUSED) &&
// Input streams use requestStop (goes to STOPPED), output uses
// requestPause (goes to PAUSED)
if ((!istate || istate == AAUDIO_STREAM_STATE_STOPPED) &&
(!ostate || ostate == AAUDIO_STREAM_STATE_PAUSED)) {
stm->state_callback(stm, stm->user_ptr, CUBEB_STATE_STOPPED);
new_state = stream_state::STOPPED;
Expand Down Expand Up @@ -749,7 +771,7 @@ aaudio_destroy(cubeb * ctx)
#ifndef NDEBUG
// make sure all streams were destroyed
for (auto & stream : ctx->streams) {
assert(!stream.in_use.load());
assert(stream.slot_state.load() == slot_state::FREE);
}
#endif

Expand Down Expand Up @@ -892,7 +914,6 @@ aaudio_duplex_data_cb(AAudioStream * astream, void * user_data,
void * audio_data, int32_t num_frames)
{
cubeb_stream * stm = (cubeb_stream *)user_data;
AutoInCallback aic(stm);
assert(stm->ostream == astream);
assert(stm->istream);
assert(num_frames >= 0);
Expand Down Expand Up @@ -993,7 +1014,6 @@ aaudio_output_data_cb(AAudioStream * astream, void * user_data,
void * audio_data, int32_t num_frames)
{
cubeb_stream * stm = (cubeb_stream *)user_data;
AutoInCallback aic(stm);
assert(stm->ostream == astream);
assert(!stm->istream);
assert(num_frames >= 0);
Expand Down Expand Up @@ -1047,7 +1067,6 @@ aaudio_input_data_cb(AAudioStream * astream, void * user_data,
void * audio_data, int32_t num_frames)
{
cubeb_stream * stm = (cubeb_stream *)user_data;
AutoInCallback aic(stm);
assert(stm->istream == astream);
assert(!stm->ostream);
assert(num_frames >= 0);
Expand Down Expand Up @@ -1100,8 +1119,27 @@ reinitialize_stream(cubeb_stream * stm)
// thread.
// In this situation, the lock is acquired for the entire duration of the
// function, so that this reinitialization period is atomic.

// Ensure only one reinit is pending at a time.
int expected = 0;
if (!stm->pending_reinit.compare_exchange_strong(expected, 1)) {
LOG("reinitialize_stream: reinit already pending, skipping");
return;
}

std::thread([stm] {
struct PendingReinitGuard {
cubeb_stream * stm;
~PendingReinitGuard() { stm->pending_reinit.store(0); }
} guard{stm};

lock_guard lock(stm->mutex);

if (stm->slot_state.load() != slot_state::IN_USE) {
LOG("reinitialize_stream: stream destroyed, cancelling");
return;
}

stream_state state = stm->state.load();
bool was_playing = state == stream_state::STARTED ||
state == stream_state::STARTING ||
Expand All @@ -1122,8 +1160,6 @@ reinitialize_stream(cubeb_stream * stm)
aaudio_stream_destroy_locked(stm, lock);
err = aaudio_stream_init_impl(stm, lock);

assert(stm->in_use.load());

// Set the new initial position.
stm->pos_estimate.reinit(total_frames);

Expand Down Expand Up @@ -1230,9 +1266,20 @@ realize_stream(AAudioStreamBuilder * sb, const cubeb_stream_params * params,
static void
aaudio_stream_destroy(cubeb_stream * stm)
{
lock_guard lock(stm->mutex);
stm->in_use.store(false);
aaudio_stream_destroy_locked(stm, lock);
{
lock_guard lock(stm->mutex);
aaudio_stream_destroy_locked(stm, lock);
// Two-phase destroy, only mark as free once reinit threads exit.
stm->slot_state.store(slot_state::DESTROYING);
}

// Wait for reinit threads to exit.
while (stm->pending_reinit.load() > 0) {
auto dur = std::chrono::milliseconds(5);
std::this_thread::sleep_for(dur);
}

stm->slot_state.store(slot_state::FREE);
}

static void
Expand Down Expand Up @@ -1504,19 +1551,20 @@ aaudio_stream_init(cubeb * ctx, cubeb_stream ** stream,
unique_lock<mutex> lock;
for (auto & stream : ctx->streams) {
// This check is only an optimization, we don't strictly need it
// since we check again after locking the mutex.
if (stream.in_use.load()) {
// since we check again after locking the mutex. It also skips
// DESTROYING slots, which is important to avoid racing with destroy.
if (stream.slot_state.load() != slot_state::FREE) {
continue;
}

// if this fails, another thread initialized this stream
// between our check of in_use and this.
// between our check of slot_state and this.
lock = unique_lock(stream.mutex, std::try_to_lock);
if (!lock.owns_lock()) {
continue;
}

if (stream.in_use.load()) {
if (stream.slot_state.load() != slot_state::FREE) {
lock = {};
continue;
}
Expand All @@ -1530,7 +1578,7 @@ aaudio_stream_init(cubeb * ctx, cubeb_stream ** stream,
return CUBEB_ERROR;
}

stm->in_use.store(true);
stm->slot_state.store(slot_state::IN_USE);
stm->context = ctx;
stm->user_ptr = user_ptr;
stm->data_callback = data_callback;
Expand Down Expand Up @@ -1586,11 +1634,17 @@ aaudio_stream_start(cubeb_stream * stm)
static int
aaudio_stream_start_locked(cubeb_stream * stm, lock_guard<mutex> & lock)
{
assert(stm && stm->in_use.load());
assert(stm && stm->slot_state.load() == slot_state::IN_USE);
stream_state state = stm->state.load();
int istate = stm->istream ? WRAP(AAudioStream_getState)(stm->istream) : 0;
int ostate = stm->ostream ? WRAP(AAudioStream_getState)(stm->ostream) : 0;
LOGV("STARTING stream %p: %d (%d %d)", (void *)stm, state, istate, ostate);
aaudio_stream_state_t istate = stm->istream
? WRAP(AAudioStream_getState)(stm->istream)
: AAUDIO_STREAM_STATE_UNINITIALIZED;
aaudio_stream_state_t ostate = stm->ostream
? WRAP(AAudioStream_getState)(stm->ostream)
: AAUDIO_STREAM_STATE_UNINITIALIZED;
LOG("STARTING stream %p: %d (in: %s out: %s)", (void *)stm, state,
WRAP(AAudio_convertStreamStateToText)(istate),
WRAP(AAudio_convertStreamStateToText)(ostate));

switch (state) {
case stream_state::STARTED:
Expand All @@ -1611,6 +1665,33 @@ aaudio_stream_start_locked(cubeb_stream * stm, lock_guard<mutex> & lock)

aaudio_result_t res;

// Wait for stream transitions to settle before starting.
int64_t poll_frequency_ns = 10 * NS_PER_S / 1000;
if (stm->ostream) {
ostate = WRAP(AAudioStream_getState)(stm->ostream);
aaudio_stream_state_t target = get_stable_state(ostate);
if (target != AAUDIO_STREAM_STATE_UNINITIALIZED) {
int rv = wait_for_state_change(stm->ostream, &target, poll_frequency_ns);
if (rv != CUBEB_OK) {
LOG("Failure waiting for ostream to reach stable state before start");
stm->state.store(stream_state::ERROR);
return CUBEB_ERROR;
}
}
}
if (stm->istream) {
istate = WRAP(AAudioStream_getState)(stm->istream);
aaudio_stream_state_t target = get_stable_state(istate);
if (target != AAUDIO_STREAM_STATE_UNINITIALIZED) {
int rv = wait_for_state_change(stm->istream, &target, poll_frequency_ns);
if (rv != CUBEB_OK) {
LOG("Failure waiting for istream to reach stable state before start");
stm->state.store(stream_state::ERROR);
return CUBEB_ERROR;
}
}
}

// Important to start istream before ostream.
// As soon as we start ostream, the callbacks might be triggered an we
// might read from istream (on duplex). If istream wasn't started yet
Expand Down Expand Up @@ -1687,15 +1768,15 @@ aaudio_stream_start_locked(cubeb_stream * stm, lock_guard<mutex> & lock)
static int
aaudio_stream_stop(cubeb_stream * stm)
{
assert(stm && stm->in_use.load());
assert(stm && stm->slot_state.load() == slot_state::IN_USE);
lock_guard lock(stm->mutex);
return aaudio_stream_stop_locked(stm, lock);
}

static int
aaudio_stream_stop_locked(cubeb_stream * stm, lock_guard<mutex> & lock)
{
assert(stm && stm->in_use.load());
assert(stm && stm->slot_state.load() == slot_state::IN_USE);

stream_state state = stm->state.load();
aaudio_stream_state_t istate = stm->istream
Expand Down Expand Up @@ -1727,6 +1808,27 @@ aaudio_stream_stop_locked(cubeb_stream * stm, lock_guard<mutex> & lock)

aaudio_result_t res;

// Wait for stream transitions to settle before stopping.
int64_t poll_frequency_ns = 10 * NS_PER_S / 1000; // 10ms
if (stm->ostream && ostate == AAUDIO_STREAM_STATE_STARTING) {
aaudio_stream_state_t target = AAUDIO_STREAM_STATE_STARTED;
int rv = wait_for_state_change(stm->ostream, &target, poll_frequency_ns);
if (rv != CUBEB_OK) {
LOG("Failure waiting for ostream to finish starting before stop");
stm->state.store(stream_state::ERROR);
return CUBEB_ERROR;
}
}
if (stm->istream && istate == AAUDIO_STREAM_STATE_STARTING) {
aaudio_stream_state_t target = AAUDIO_STREAM_STATE_STARTED;
int rv = wait_for_state_change(stm->istream, &target, poll_frequency_ns);
if (rv != CUBEB_OK) {
LOG("Failure waiting for istream to finish starting before stop");
stm->state.store(stream_state::ERROR);
return CUBEB_ERROR;
}
}

// No callbacks are triggered anymore when requestPause returns.
// That is important as we otherwise might read from a closed istream
// for a duplex stream.
Expand All @@ -1744,9 +1846,11 @@ aaudio_stream_stop_locked(cubeb_stream * stm, lock_guard<mutex> & lock)
}

if (stm->istream) {
res = WRAP(AAudioStream_requestPause)(stm->istream);
// AAudio input streams don't support pause - use stop instead.
// The stream will transition through STOPPING to STOPPED.
res = WRAP(AAudioStream_requestStop)(stm->istream);
if (res != AAUDIO_OK) {
LOG("AAudioStream_requestPause (istream): %s",
LOG("AAudioStream_requestStop (istream): %s",
WRAP(AAudio_convertResultToText)(res));
stm->state.store(stream_state::ERROR);
return CUBEB_ERROR;
Expand Down Expand Up @@ -1802,7 +1906,7 @@ aaudio_stream_stop_locked(cubeb_stream * stm, lock_guard<mutex> & lock)
static int
aaudio_stream_get_position(cubeb_stream * stm, uint64_t * position)
{
assert(stm && stm->in_use.load());
assert(stm && stm->slot_state.load() == slot_state::IN_USE);
lock_guard lock(stm->mutex);

stream_state state = stm->state.load();
Expand Down Expand Up @@ -1870,6 +1974,7 @@ aaudio_stream_get_latency(cubeb_stream * stm, uint32_t * latency)

if (!stm->latency_metrics_available) {
LOG("Not timing info yet (output)");
*latency = 0;
return CUBEB_OK;
}

Expand All @@ -1891,6 +1996,7 @@ aaudio_stream_get_input_latency(cubeb_stream * stm, uint32_t * latency)

if (!stm->latency_metrics_available) {
LOG("Not timing info yet (input)");
*latency = 0;
return CUBEB_OK;
}

Expand All @@ -1905,7 +2011,7 @@ aaudio_stream_get_input_latency(cubeb_stream * stm, uint32_t * latency)
static int
aaudio_stream_set_volume(cubeb_stream * stm, float volume)
{
assert(stm && stm->in_use.load() && stm->ostream);
assert(stm && stm->slot_state.load() == slot_state::IN_USE && stm->ostream);
stm->volume.store(volume);
return CUBEB_OK;
}
Expand Down
Loading