-
Notifications
You must be signed in to change notification settings - Fork 119
store: Don't hold write lock while receiving LTX #450
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: saleem/incremental-chksum
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1522,14 +1522,6 @@ func (s *Store) processLTXStreamFrame(ctx context.Context, frame *LTXStreamFrame | |
| TraceLog.Printf("[ProcessLTXStreamFrame.End(%s)]: %s", db.name, errorKeyValue(err)) | ||
| }() | ||
|
|
||
| // Acquire lock unless we are waiting for a database position, in which case, | ||
| // we already have the lock. | ||
| guardSet, err := db.AcquireWriteLock(ctx, nil) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer guardSet.Unlock() | ||
|
|
||
| // Skip frame if it already occurred on this node. This can happen if the | ||
| // replica node created the transaction and forwarded it to the primary. | ||
| if hdr.NodeID == s.ID() { | ||
|
|
@@ -1543,24 +1535,12 @@ func (s *Store) processLTXStreamFrame(ctx context.Context, frame *LTXStreamFrame | |
| return nil | ||
| } | ||
|
|
||
| // If we receive an LTX file while holding the remote HALT lock then the | ||
| // remote lock must have expired or been released so we can clear it locally. | ||
| // | ||
| // We also hold the local WRITE lock so a local write cannot be in-progress. | ||
| if haltLock := db.RemoteHaltLock(); haltLock != nil { | ||
| TraceLog.Printf("[ProcessLTXStreamFrame.Unhalt(%s)]: replica holds HALT lock but received LTX file, unsetting HALT lock", db.Name()) | ||
| if err := db.UnsetRemoteHaltLock(ctx, haltLock.ID); err != nil { | ||
| return fmt.Errorf("release remote halt lock: %w", err) | ||
| } | ||
| } | ||
|
|
||
| // Verify LTX file pre-apply checksum matches the current database position | ||
| // unless this is a snapshot, which will overwrite all data. | ||
| // unless this is a snapshot, which will overwrite all data. This is a preflight | ||
| // check so we can skip streaming an LTX we can't apply, but we'll need to verify | ||
| // again after we acquire the write lock. | ||
| if !hdr.IsSnapshot() { | ||
| expectedPos := ltx.Pos{ | ||
| TXID: hdr.MinTXID - 1, | ||
| PostApplyChecksum: hdr.PreApplyChecksum, | ||
| } | ||
| expectedPos := hdr.PreApplyPos() | ||
| if pos := db.Pos(); pos != expectedPos { | ||
| return fmt.Errorf("position mismatch on db %q: %s <> %s", db.Name(), pos, expectedPos) | ||
| } | ||
|
|
@@ -1595,6 +1575,35 @@ func (s *Store) processLTXStreamFrame(ctx context.Context, frame *LTXStreamFrame | |
| dbLTXCountMetricVec.WithLabelValues(db.Name()).Inc() | ||
| dbLTXBytesMetricVec.WithLabelValues(db.Name()).Set(float64(n)) | ||
|
|
||
| // Acquire lock unless we are waiting for a database position, in which case, | ||
|
benbjohnson marked this conversation as resolved.
|
||
| // we already have the lock. | ||
| guardSet, err := db.AcquireWriteLock(ctx, nil) | ||
| if err != nil { | ||
| return err | ||
| } | ||
| defer guardSet.Unlock() | ||
|
|
||
| // If we receive an LTX file while holding the remote HALT lock, then the remote lock | ||
| // might have expired or been released, and we could clear it locally. However, we might | ||
| // have recently acquired the remote lock and are waiting to catch up with the primary. | ||
| // | ||
| // In both cases, we hold the local WRITE lock so a local write cannot be in-progress. | ||
| // We'll perform recovery so the LTX can be applied, but not clear the remote lock. | ||
| if haltLock := db.RemoteHaltLock(); haltLock != nil { | ||
| TraceLog.Printf("[ProcessLTXStreamFrame.Recover(%s)]: replica holds HALT lock but received LTX file, performing recovery", db.Name()) | ||
| if err := db.recover(ctx); err != nil { | ||
|
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @benbjohnson Since we haven't called Also: we probably shouldn't set |
||
| return fmt.Errorf("recover: %w", err) | ||
| } | ||
| } | ||
|
|
||
| // Verify the database position again, now we're holding the write lock. | ||
| if !hdr.IsSnapshot() { | ||
| expectedPos := hdr.PreApplyPos() | ||
| if pos := db.Pos(); pos != expectedPos { | ||
| return fmt.Errorf("position mismatch on db %q: %s <> %s", db.Name(), pos, expectedPos) | ||
| } | ||
| } | ||
|
|
||
| // Remove other LTX files after a snapshot. | ||
| if hdr.IsSnapshot() { | ||
| dir, file := filepath.Split(path) | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a behavior change here now we're not acquiring the write lock: if a transaction is being applied concurrently, we might now observe the previous
db.Pos()and return a position mismatch error here. In contrast, the existing code is serialized by the write lock, and would observe the post-applydb.Pos()of any transaction being applied concurrently.I don't think this is an issue, but it is a behavior change, so I wanted to call it out.