diff --git a/cmd/litefs/mount_test.go b/cmd/litefs/mount_test.go index 50073c0..976b5ed 100644 --- a/cmd/litefs/mount_test.go +++ b/cmd/litefs/mount_test.go @@ -1067,6 +1067,7 @@ func TestMultiNode_WALToJournal(t *testing.T) { waitForPrimary(t, cmd0) cmd1 := runMountCommand(t, newMountCommand(t, t.TempDir(), cmd0)) db0 := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db")) + waitForInitialDB(t, "db", cmd1) db1 := testingutil.OpenSQLDB(t, filepath.Join(cmd1.Config.FUSE.Dir, "db")) // Switch to WAL mode. @@ -1110,6 +1111,7 @@ func TestMultiNode_JournalToWAL(t *testing.T) { waitForPrimary(t, cmd0) cmd1 := runMountCommand(t, newMountCommand(t, t.TempDir(), cmd0)) db0 := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db")) + waitForInitialDB(t, "db", cmd1) db1 := testingutil.OpenSQLDB(t, filepath.Join(cmd1.Config.FUSE.Dir, "db")) // Create a simple table. @@ -1940,6 +1942,7 @@ func TestMultiNode_Halt(t *testing.T) { waitForPrimary(t, cmd0) cmd1 := runMountCommand(t, newMountCommand(t, t.TempDir(), cmd0)) db0 := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db")) + waitForInitialDB(t, "db", cmd1) db1 := testingutil.OpenSQLDB(t, filepath.Join(cmd1.Config.FUSE.Dir, "db")) // Create a simple table with a single value. @@ -2126,6 +2129,7 @@ func TestMultiNode_Handoff(t *testing.T) { waitForPrimary(t, cmd0) cmd1 := runMountCommand(t, newMountCommand(t, t.TempDir(), cmd0)) db0 := testingutil.OpenSQLDB(t, filepath.Join(cmd0.Config.FUSE.Dir, "db")) + waitForInitialDB(t, "db", cmd1) db1 := testingutil.OpenSQLDB(t, filepath.Join(cmd1.Config.FUSE.Dir, "db")) // Create a simple table with a single value. @@ -3173,6 +3177,30 @@ func waitForBackupSync(tb testing.TB, cmd *main.MountCommand) { }) } +// waitForInitialDB waits for a database to exist, when in WAL mode. It does nothing +// in journal mode. This is required because opening a non-existent DB in WAL mode +// will create the DB (to update the header), which will fail with "attempt to write +// a readonly database" on replicas. +func waitForInitialDB(tb testing.TB, name string, cmd *main.MountCommand) { + tb.Helper() + + if !testingutil.IsWALMode() { + return + } + + testingutil.RetryUntil(tb, 1*time.Millisecond, 5*time.Second, func() error { + tb.Helper() + + if db := cmd.Store.DB(name); db == nil { + return fmt.Errorf("no database") + } + + tb.Logf("db %q exists", name) + + return nil + }) +} + var HTTPClient = &http.Client{ Transport: &http2.Transport{ AllowHTTP: true, diff --git a/store.go b/store.go index 982261f..b8db821 100644 --- a/store.go +++ b/store.go @@ -1522,14 +1522,6 @@ func (s *Store) processLTXStreamFrame(ctx context.Context, frame *LTXStreamFrame TraceLog.Printf("[ProcessLTXStreamFrame.End(%s)]: %s", db.name, errorKeyValue(err)) }() - // Acquire lock unless we are waiting for a database position, in which case, - // we already have the lock. - guardSet, err := db.AcquireWriteLock(ctx, nil) - if err != nil { - return err - } - defer guardSet.Unlock() - // Skip frame if it already occurred on this node. This can happen if the // replica node created the transaction and forwarded it to the primary. if hdr.NodeID == s.ID() { @@ -1543,24 +1535,12 @@ func (s *Store) processLTXStreamFrame(ctx context.Context, frame *LTXStreamFrame return nil } - // If we receive an LTX file while holding the remote HALT lock then the - // remote lock must have expired or been released so we can clear it locally. - // - // We also hold the local WRITE lock so a local write cannot be in-progress. - if haltLock := db.RemoteHaltLock(); haltLock != nil { - TraceLog.Printf("[ProcessLTXStreamFrame.Unhalt(%s)]: replica holds HALT lock but received LTX file, unsetting HALT lock", db.Name()) - if err := db.UnsetRemoteHaltLock(ctx, haltLock.ID); err != nil { - return fmt.Errorf("release remote halt lock: %w", err) - } - } - // Verify LTX file pre-apply checksum matches the current database position - // unless this is a snapshot, which will overwrite all data. + // unless this is a snapshot, which will overwrite all data. This is a preflight + // check so we can skip streaming an LTX we can't apply, but we'll need to verify + // again after we acquire the write lock. if !hdr.IsSnapshot() { - expectedPos := ltx.Pos{ - TXID: hdr.MinTXID - 1, - PostApplyChecksum: hdr.PreApplyChecksum, - } + expectedPos := hdr.PreApplyPos() if pos := db.Pos(); pos != expectedPos { return fmt.Errorf("position mismatch on db %q: %s <> %s", db.Name(), pos, expectedPos) } @@ -1595,6 +1575,35 @@ func (s *Store) processLTXStreamFrame(ctx context.Context, frame *LTXStreamFrame dbLTXCountMetricVec.WithLabelValues(db.Name()).Inc() dbLTXBytesMetricVec.WithLabelValues(db.Name()).Set(float64(n)) + // Acquire lock unless we are waiting for a database position, in which case, + // we already have the lock. + guardSet, err := db.AcquireWriteLock(ctx, nil) + if err != nil { + return err + } + defer guardSet.Unlock() + + // If we receive an LTX file while holding the remote HALT lock, then the remote lock + // might have expired or been released, and we could clear it locally. However, we might + // have recently acquired the remote lock and are waiting to catch up with the primary. + // + // In both cases, we hold the local WRITE lock so a local write cannot be in-progress. + // We'll perform recovery so the LTX can be applied, but not clear the remote lock. + if haltLock := db.RemoteHaltLock(); haltLock != nil { + TraceLog.Printf("[ProcessLTXStreamFrame.Recover(%s)]: replica holds HALT lock but received LTX file, performing recovery", db.Name()) + if err := db.recover(ctx); err != nil { + return fmt.Errorf("recover: %w", err) + } + } + + // Verify the database position again, now we're holding the write lock. + if !hdr.IsSnapshot() { + expectedPos := hdr.PreApplyPos() + if pos := db.Pos(); pos != expectedPos { + return fmt.Errorf("position mismatch on db %q: %s <> %s", db.Name(), pos, expectedPos) + } + } + // Remove other LTX files after a snapshot. if hdr.IsSnapshot() { dir, file := filepath.Split(path)