Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ Session Task Coordinator
| |
|-- acquire OCC semaphore |
| |
|-- CreateReadThenWriteSubscribe ----> |
|-- CreateInternalSubscribe ---------> |
| <------------ subscribe channel -----|
| |
| +-- OCC Loop ------------------+ |
Expand All @@ -141,7 +141,7 @@ Session Task Coordinator
| | if Success: break | |
| +------------------------------+ |
| |
|-- DropReadThenWriteSubscribe ------> |
|-- DropInternalSubscribe -----------> |
| |
```

Expand Down Expand Up @@ -193,7 +193,7 @@ subscribe.
The subscribes created for read-then-write are internal: they do not appear in
`mz_subscriptions` or other introspection tables, and they don't increment the
active subscribes metric. They are created and dropped via dedicated `Command`
variants (`CreateReadThenWriteSubscribe`, `DropReadThenWriteSubscribe`).
variants (`CreateInternalSubscribe`, `DropInternalSubscribe`).

## Correctness

Expand Down
5 changes: 5 additions & 0 deletions misc/python/materialize/mzcompose/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,11 @@ def get_variable_system_parameters(
"true",
["true", "false"],
),
VariableSystemParameter(
"enable_adapter_frontend_occ_read_then_write",
"true",
["true", "false"],
),
VariableSystemParameter(
"enable_cast_elimination",
"true",
Expand Down
10 changes: 7 additions & 3 deletions misc/python/materialize/parallel_workload/action.py
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ def errors_to_ignore(self, exe: Executor) -> list[str]:
result.extend(
[
"does not exist",
"subscribe has been terminated because underlying relation",
"subscribe has been terminated because underlying cluster",
"query could not complete because relation",
"query could not complete because cluster",
]
)
return result
Expand Down Expand Up @@ -1585,14 +1585,18 @@ def __init__(
self.flags_with_values["enable_cast_elimination"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_upsert_v2"] = BOOLEAN_FLAG_VALUES
self.flags_with_values["enable_coalesce_case_transform"] = BOOLEAN_FLAG_VALUES

# If you are adding a new config flag in Materialize, consider using it
# here instead of just marking it as uninteresting to silence the
# linter. parallel-workload randomly flips the flags in
# `flags_with_values` while running. If a new flag has interesting
# behavior, you should add it. Feature flags which turn on/off
# externally visible features should not be flipped.
self.uninteresting_flags: list[str] = [
# Read once at environmentd startup; runtime ALTER SYSTEM SET is
# rejected (see sequence_alter_system_set). Flipping it here would
# be a no-op at best and confusing if any future code path forgot
# to consult the cached value.
"enable_adapter_frontend_occ_read_then_write",
"enable_compute_half_join2",
"enable_mz_join_core",
"enable_compute_correction_v2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,10 @@ def print_stats(


def parse_common_args(parser: argparse.ArgumentParser) -> None:
parser.add_argument("--seed", type=str, default=str(int(time.time())))
# TEMP: pinned to the seed from nightly 16238 to deterministically
# exercise the parallel-workload-dml retraction failure on this
# branch. Restore to `str(int(time.time()))` before merging.
parser.add_argument("--seed", type=str, default="1777388214")
parser.add_argument("--runtime", default=600, type=int, help="Runtime in seconds")
parser.add_argument(
"--complexity",
Expand Down
6 changes: 6 additions & 0 deletions misc/python/materialize/parallel_workload/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,19 @@ def run(
try:
self.exe.rollback()
except QueryError as e:
# ROLLBACK can itself be cancelled by
# `pg_cancel_backend`, leaving psycopg in
# `InFailedSqlTransaction`. Force a reconnect rather
# than retry the rollback.
if (
"Please disconnect and re-connect" in e.msg
or "server closed the connection unexpectedly" in e.msg
or "Can't create a connection to host" in e.msg
or "Connection refused" in e.msg
or "the connection is lost" in e.msg
or "connection in transaction status INERROR" in e.msg
or "canceling statement due to user request" in e.msg
or "current transaction is aborted" in e.msg
):
self.exe.reconnect_next = True
self.exe.rollback_next = False
Expand Down
8 changes: 8 additions & 0 deletions src/adapter-types/src/dyncfgs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,13 @@ pub const CONSOLE_OIDC_SCOPES: Config<&'static str> = Config::new(
"Space-separated OIDC scopes requested by the web console.",
);

pub const FRONTEND_READ_THEN_WRITE: Config<bool> = Config::new(
"enable_adapter_frontend_occ_read_then_write",
// WIP: true for testing in ci, Should be false before merging.
true,
"Use frontend sequencing (with optimistic concurrency control) for \
DELETE, UPDATE, and INSERT operations.",
);
/// Adds the full set of all adapter `Config`s.
pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
configs
Expand Down Expand Up @@ -245,4 +252,5 @@ pub fn all_dyncfgs(configs: ConfigSet) -> ConfigSet {
.add(&USER_ID_POOL_BATCH_SIZE)
.add(&CONSOLE_OIDC_CLIENT_ID)
.add(&CONSOLE_OIDC_SCOPES)
.add(&FRONTEND_READ_THEN_WRITE)
}
19 changes: 11 additions & 8 deletions src/adapter/src/active_compute_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use timely::progress::Antichain;
use tokio::sync::{mpsc, oneshot};
use uuid::Uuid;

use crate::coord::peek::PeekResponseUnary;
use crate::coord::peek::{DroppedDependency, PeekResponseUnary};
use crate::{AdapterError, ExecuteContext, ExecuteResponse};

#[derive(Debug)]
Expand Down Expand Up @@ -88,7 +88,7 @@ pub enum ActiveComputeSinkRetireReason {
Canceled,
/// The compute sink was forcibly terminated because an object it depended on
/// was dropped.
DependencyDropped(String),
DependencyDropped(DroppedDependency),
}

/// A description of an active subscribe from coord's perspective
Expand Down Expand Up @@ -116,6 +116,9 @@ pub struct ActiveSubscribe {
pub start_time: EpochMillis,
/// How to present the subscribe's output.
pub output: SubscribeOutput,
/// If true, this is an internal subscribe that should not appear in
/// introspection tables like mz_subscriptions.
pub internal: bool,
}

impl ActiveSubscribe {
Expand Down Expand Up @@ -386,9 +389,9 @@ impl ActiveSubscribe {
let message = match reason {
ActiveComputeSinkRetireReason::Finished => return,
ActiveComputeSinkRetireReason::Canceled => PeekResponseUnary::Canceled,
ActiveComputeSinkRetireReason::DependencyDropped(d) => PeekResponseUnary::Error(
format!("subscribe has been terminated because underlying {d} was dropped"),
),
ActiveComputeSinkRetireReason::DependencyDropped(d) => {
PeekResponseUnary::DependencyDropped(d)
}
};
self.send(message);
}
Expand Down Expand Up @@ -440,9 +443,9 @@ impl ActiveCopyTo {
let message = match reason {
ActiveComputeSinkRetireReason::Finished => return,
ActiveComputeSinkRetireReason::Canceled => Err(AdapterError::Canceled),
ActiveComputeSinkRetireReason::DependencyDropped(d) => Err(AdapterError::Unstructured(
anyhow!("copy has been terminated because underlying {d} was dropped"),
)),
ActiveComputeSinkRetireReason::DependencyDropped(dep) => Err(
AdapterError::Unstructured(anyhow!(dep.copy_terminated_error())),
),
};
let _ = self.tx.send(message);
}
Expand Down
Loading
Loading