Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 54 additions & 0 deletions database_functions/migration_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3446,6 +3446,60 @@ def migration_106_optimize_subscription_sync_performance(conn, db_type: str):
cursor.close()


@register_migration("108", "gpodder_subscription_snapshot", "Add subscription snapshot table for delta-based sync upload", requires=["008"])
def migration_108_gpodder_subscription_snapshot(conn, db_type: str):
"""Create GpodderSubscriptionSnapshot table.

Stores, per (user, sync target), the set of local feed URLs at the end of the last sync.
The sync code diffs the current local feeds against this snapshot to compute genuine local
add/remove deltas to push up - instead of re-uploading the full list every sync (which bloats
the server change log) and without a per-change queue.
"""
cursor = conn.cursor()

try:
logger.info("Starting gpodder migration 108: Add subscription snapshot table")

if db_type == 'postgresql':
safe_execute_sql(cursor, '''
CREATE TABLE IF NOT EXISTS "GpodderSubscriptionSnapshot" (
SnapshotID SERIAL PRIMARY KEY,
UserID INT NOT NULL,
SyncTarget TEXT NOT NULL,
FeedURL TEXT NOT NULL,
FOREIGN KEY (UserID) REFERENCES "Users"(UserID) ON DELETE CASCADE,
UNIQUE(UserID, SyncTarget, FeedURL)
)
''', conn=conn)

safe_execute_sql(cursor, '''
CREATE INDEX IF NOT EXISTS idx_gpodder_subsnapshot_user_target ON "GpodderSubscriptionSnapshot"(UserID, SyncTarget)
''', conn=conn)
else: # mysql
safe_execute_sql(cursor, '''
CREATE TABLE IF NOT EXISTS GpodderSubscriptionSnapshot (
SnapshotID INT AUTO_INCREMENT PRIMARY KEY,
UserID INT NOT NULL,
SyncTarget VARCHAR(512) NOT NULL,
FeedURL VARCHAR(2048) NOT NULL,
FOREIGN KEY (UserID) REFERENCES Users(UserID) ON DELETE CASCADE,
UNIQUE(UserID, SyncTarget, FeedURL(512))
)
''', conn=conn)

safe_execute_sql(cursor, '''
CREATE INDEX idx_gpodder_subsnapshot_user_target ON GpodderSubscriptionSnapshot(UserID, SyncTarget)
''', conn=conn)

logger.info("Created GpodderSubscriptionSnapshot table successfully")

except Exception as e:
logger.error(f"Error in gpodder migration 108: {e}")
raise
finally:
cursor.close()


@register_migration("033", "add_http_notification_columns", "Add generic HTTP notification columns to UserNotificationSettings table", requires=["011"])
def migration_033_add_http_notification_columns(conn, db_type: str):
"""Add generic HTTP notification columns for platforms like Telegram"""
Expand Down
27 changes: 22 additions & 5 deletions gpodder-api/internal/api/episode.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,9 +280,10 @@ func getEpisodeActions(database *db.Database) gin.HandlerFunc {
}
}

// ORDER BY DESC (newest first) to prioritize recent actions
// This ensures recent play state is synced first, even if total actions > limit
queryParts = append(queryParts, "ORDER BY e.Timestamp DESC")
// ORDER BY ASC (oldest first) so clients can paginate forward through ALL actions
// using the returned timestamp as the next 'since'. With DESC + a row limit, anything
// beyond the limit (>25k actions) would be silently dropped on the next page.
queryParts = append(queryParts, "ORDER BY e.Timestamp ASC")

// Add LIMIT for performance - prevents returning massive datasets
// Clients should use the 'since' parameter to paginate through results
Expand Down Expand Up @@ -311,13 +312,15 @@ func getEpisodeActions(database *db.Database) gin.HandlerFunc {

// Build response
actions := make([]models.EpisodeAction, 0)
var maxBatchTimestamp int64
for rows.Next() {
var action models.EpisodeAction
var deviceIDInt sql.NullInt64
var deviceName sql.NullString
var started sql.NullInt64
var position sql.NullInt64
var total sql.NullInt64
var actionTimestamp int64

if err := rows.Scan(
&action.ActionID,
Expand All @@ -326,7 +329,7 @@ func getEpisodeActions(database *db.Database) gin.HandlerFunc {
&action.Podcast,
&action.Episode,
&action.Action,
&action.Timestamp,
&actionTimestamp,
&started,
&position,
&total,
Expand All @@ -336,6 +339,11 @@ func getEpisodeActions(database *db.Database) gin.HandlerFunc {
continue
}

action.Timestamp = actionTimestamp
if actionTimestamp > maxBatchTimestamp {
maxBatchTimestamp = actionTimestamp
}

// Set optional fields if present
if deviceName.Valid {
action.Device = deviceName.String
Expand Down Expand Up @@ -368,10 +376,19 @@ func getEpisodeActions(database *db.Database) gin.HandlerFunc {
totalDuration := time.Since(startTime)
log.Printf("[DEBUG] getEpisodeActions: Returning %d actions, total time: %v", len(actions), totalDuration)

// Determine the timestamp the client should use as 'since' on its next request.
// If we hit the row limit there are more actions to fetch, so return the max timestamp
// in THIS batch (results are ordered ascending) - that lets the client page forward
// through everything. Otherwise return the global latest so the next sync is incremental.
responseTimestamp := latestTimestamp
if len(actions) >= MAX_EPISODE_ACTIONS && maxBatchTimestamp > 0 {
responseTimestamp = maxBatchTimestamp
}

// Return response in gpodder format
c.JSON(http.StatusOK, models.EpisodeActionsResponse{
Actions: actions,
Timestamp: latestTimestamp,
Timestamp: responseTimestamp,
})
}
}
Expand Down
28 changes: 28 additions & 0 deletions gpodder-api/internal/db/migrations.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,5 +534,33 @@ func GetMigrations() []Migration {
CREATE INDEX idx_gpodder_syncstate_userid_deviceid ON GpodderSyncState(UserID, DeviceID);
`,
},
{
Version: 5,
Description: "Add subscription snapshot table for delta-based sync upload",
PostgreSQLSQL: `
CREATE TABLE IF NOT EXISTS "GpodderSubscriptionSnapshot" (
SnapshotID SERIAL PRIMARY KEY,
UserID INT NOT NULL,
SyncTarget TEXT NOT NULL,
FeedURL TEXT NOT NULL,
FOREIGN KEY (UserID) REFERENCES "Users"(UserID) ON DELETE CASCADE,
UNIQUE(UserID, SyncTarget, FeedURL)
);

CREATE INDEX IF NOT EXISTS idx_gpodder_subsnapshot_user_target ON "GpodderSubscriptionSnapshot"(UserID, SyncTarget);
`,
MySQLSQL: `
CREATE TABLE IF NOT EXISTS GpodderSubscriptionSnapshot (
SnapshotID INT AUTO_INCREMENT PRIMARY KEY,
UserID INT NOT NULL,
SyncTarget VARCHAR(512) NOT NULL,
FeedURL VARCHAR(2048) NOT NULL,
FOREIGN KEY (UserID) REFERENCES Users(UserID) ON DELETE CASCADE,
UNIQUE(UserID, SyncTarget, FeedURL(512))
);

CREATE INDEX idx_gpodder_subsnapshot_user_target ON GpodderSubscriptionSnapshot(UserID, SyncTarget);
`,
},
}
}
13 changes: 9 additions & 4 deletions mobile/android/app/src/main/AndroidManifest.xml
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,21 @@
<data android:scheme="http" />
</intent-filter>

<!-- Handle OIDC authentication callbacks -->
<intent-filter android:autoVerify="true">
</activity>

<!-- OIDC authentication callback handler for flutter_web_auth_2.
Captures the pinepods:// redirect from the Chrome Custom Tab. -->
<activity
android:name="com.linusu.flutter_web_auth_2.CallbackActivity"
android:exported="true"
android:taskAffinity="">
<intent-filter android:label="flutter_web_auth_2">
<action android:name="android.intent.action.VIEW" />

<category android:name="android.intent.category.DEFAULT" />
<category android:name="android.intent.category.BROWSABLE" />

<data android:scheme="pinepods" />
<data android:host="auth" />
<data android:pathPrefix="/callback" />
</intent-filter>
</activity>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,11 @@ public static void registerWith(@NonNull FlutterEngine flutterEngine) {
} catch (Exception e) {
Log.e(TAG, "Error registering plugin flutter_plugin_android_lifecycle, io.flutter.plugins.flutter_plugin_android_lifecycle.FlutterAndroidLifecyclePlugin", e);
}
try {
flutterEngine.getPlugins().add(new com.linusu.flutter_web_auth_2.FlutterWebAuth2Plugin());
} catch (Exception e) {
Log.e(TAG, "Error registering plugin flutter_web_auth_2, com.linusu.flutter_web_auth_2.FlutterWebAuth2Plugin", e);
}
try {
flutterEngine.getPlugins().add(new com.ryanheise.just_audio.JustAudioPlugin());
} catch (Exception e) {
Expand Down Expand Up @@ -100,10 +105,5 @@ public static void registerWith(@NonNull FlutterEngine flutterEngine) {
} catch (Exception e) {
Log.e(TAG, "Error registering plugin url_launcher_android, io.flutter.plugins.urllauncher.UrlLauncherPlugin", e);
}
try {
flutterEngine.getPlugins().add(new io.flutter.plugins.webviewflutter.WebViewFlutterPlugin());
} catch (Exception e) {
Log.e(TAG, "Error registering plugin webview_flutter_android, io.flutter.plugins.webviewflutter.WebViewFlutterPlugin", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import android.app.PendingIntent
import android.content.Intent
import android.media.audiofx.LoudnessEnhancer
import android.net.Uri
import java.io.File
import android.os.Binder
import android.os.Handler
import android.os.IBinder
Expand Down Expand Up @@ -353,7 +354,12 @@ class PinepodsMediaService : MediaLibraryService() {

player?.let { p ->
try {
val uri = Uri.parse(url)
// For local downloads `url` is a raw filesystem path (which may
// contain spaces and has no scheme). Uri.parse() leaves it
// scheme-less and unencoded, so ExoPlayer fails to load it and
// never reports a duration (player shows 00:00 and won't scrub).
// Uri.fromFile() builds a properly-encoded file:// URI.
val uri = if (isLocal) Uri.fromFile(File(url)) else Uri.parse(url)

// Build media metadata
val mediaMetadataBuilder = MediaMetadata.Builder()
Expand Down
14 changes: 7 additions & 7 deletions mobile/ios/Runner/GeneratedPluginRegistrant.m
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@
@import flutter_downloader;
#endif

#if __has_include(<flutter_web_auth_2/FlutterWebAuth2Plugin.h>)
#import <flutter_web_auth_2/FlutterWebAuth2Plugin.h>
#else
@import flutter_web_auth_2;
#endif

#if __has_include(<just_audio/JustAudioPlugin.h>)
#import <just_audio/JustAudioPlugin.h>
#else
Expand Down Expand Up @@ -102,12 +108,6 @@
@import url_launcher_ios;
#endif

#if __has_include(<webview_flutter_wkwebview/WebViewFlutterPlugin.h>)
#import <webview_flutter_wkwebview/WebViewFlutterPlugin.h>
#else
@import webview_flutter_wkwebview;
#endif

@implementation GeneratedPluginRegistrant

+ (void)registerWithRegistry:(NSObject<FlutterPluginRegistry>*)registry {
Expand All @@ -119,6 +119,7 @@ + (void)registerWithRegistry:(NSObject<FlutterPluginRegistry>*)registry {
[FilePickerPlugin registerWithRegistrar:[registry registrarForPlugin:@"FilePickerPlugin"]];
[FlutterCarplayPlugin registerWithRegistrar:[registry registrarForPlugin:@"FlutterCarplayPlugin"]];
[FlutterDownloaderPlugin registerWithRegistrar:[registry registrarForPlugin:@"FlutterDownloaderPlugin"]];
[FlutterWebAuth2Plugin registerWithRegistrar:[registry registrarForPlugin:@"FlutterWebAuth2Plugin"]];
[JustAudioPlugin registerWithRegistrar:[registry registrarForPlugin:@"JustAudioPlugin"]];
[FPPPackageInfoPlusPlugin registerWithRegistrar:[registry registrarForPlugin:@"FPPPackageInfoPlusPlugin"]];
[PathProviderPlugin registerWithRegistrar:[registry registrarForPlugin:@"PathProviderPlugin"]];
Expand All @@ -127,7 +128,6 @@ + (void)registerWithRegistry:(NSObject<FlutterPluginRegistry>*)registry {
[SharedPreferencesPlugin registerWithRegistrar:[registry registrarForPlugin:@"SharedPreferencesPlugin"]];
[SqflitePlugin registerWithRegistrar:[registry registrarForPlugin:@"SqflitePlugin"]];
[URLLauncherPlugin registerWithRegistrar:[registry registrarForPlugin:@"URLLauncherPlugin"]];
[WebViewFlutterPlugin registerWithRegistrar:[registry registrarForPlugin:@"WebViewFlutterPlugin"]];
}

@end
106 changes: 106 additions & 0 deletions mobile/lib/entities/pending_action.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
// lib/entities/pending_action.dart

/// The kind of server interaction a [PendingAction] represents.
///
/// These map 1:1 onto the relevant PinepodsService calls so the offline queue
/// can dispatch each action when connectivity is restored.
enum PendingActionType {
recordPosition,
markCompleted,
markUncompleted,
saveEpisode,
removeSaved,
queue,
addHistory,
}

/// A user interaction (progress, completion, save, queue, history) that could
/// not be — or has not yet been — sent to the server, persisted locally so it
/// can be synced later. This is what lets episodes downloaded for offline
/// listening still record interactions and reconcile once back online.
class PendingAction {
/// Database key (Sembast record id). Null until persisted.
int? id;

final PendingActionType type;
final int episodeId;
final int userId;
final bool isYoutube;

/// Action-specific data, e.g. {'position': 123.0} for [recordPosition].
final Map<String, dynamic> payload;

final DateTime createdAt;

/// Number of failed sync attempts. Used for backoff / surfacing stuck items.
int retryCount;

PendingAction({
this.id,
required this.type,
required this.episodeId,
required this.userId,
this.isYoutube = false,
this.payload = const {},
DateTime? createdAt,
this.retryCount = 0,
}) : createdAt = createdAt ?? DateTime.now();

/// Convenience for the common position payload.
double? get position {
final p = payload['position'];
if (p is num) return p.toDouble();
return null;
}

Map<String, dynamic> toMap() {
return <String, dynamic>{
'type': type.name,
'episodeId': episodeId,
'userId': userId,
'isYoutube': isYoutube,
'payload': payload,
'createdAt': createdAt.millisecondsSinceEpoch,
'retryCount': retryCount,
};
}

static PendingAction fromMap(int? key, Map<String, dynamic> map) {
return PendingAction(
id: key,
type: PendingActionType.values.firstWhere(
(t) => t.name == map['type'],
orElse: () => PendingActionType.recordPosition,
),
episodeId: map['episodeId'] as int? ?? 0,
userId: map['userId'] as int? ?? 0,
isYoutube: map['isYoutube'] as bool? ?? false,
payload: (map['payload'] as Map?)?.cast<String, dynamic>() ?? const {},
createdAt: map['createdAt'] == null
? DateTime.now()
: DateTime.fromMillisecondsSinceEpoch(map['createdAt'] as int),
retryCount: map['retryCount'] as int? ?? 0,
);
}

/// Human-friendly label for the action queue viewer.
String get description {
switch (type) {
case PendingActionType.recordPosition:
final p = position;
return p != null ? 'Save progress (${p.toInt()}s)' : 'Save progress';
case PendingActionType.markCompleted:
return 'Mark completed';
case PendingActionType.markUncompleted:
return 'Mark not completed';
case PendingActionType.saveEpisode:
return 'Save episode';
case PendingActionType.removeSaved:
return 'Remove saved episode';
case PendingActionType.queue:
return 'Add to queue';
case PendingActionType.addHistory:
return 'Add to history';
}
}
}
Loading
Loading