-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathOfflineMessageStore.py
More file actions
75 lines (63 loc) · 2.71 KB
/
OfflineMessageStore.py
File metadata and controls
75 lines (63 loc) · 2.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
# OfflineMessageStore.py
import asyncio
import datetime
from typing import Dict, List, Any
import logging
from pymongo import AsyncMongoClient
import uuid
import config
uri = config.Config.mongo_uri
class OfflineMessageStore:
"""离线消息存储"""
def __init__(self):
self.logger = logging.getLogger('OfflineMessageStore')
self.dbclient = AsyncMongoClient(uri)
self.db = self.dbclient["IM"]["offline_messages"]
# 创建索引
# asyncio.get_event_loop().run_until_complete(self._create_indexes())
# asyncio.create_task(self._create_indexes())
async def initialize(self):
await self._create_indexes()
async def _create_indexes(self):
"""创建数据库索引"""
try:
await self.db.create_index("user_id")
await self.db.create_index([("user_id", 1), ("timestamp", -1)])
self.logger.debug("离线消息存储索引创建完成")
except Exception as e:
self.logger.error(f"创建索引失败: {e}")
async def add_offline_message(self, user_id: int, message: Dict[str, Any]):
"""添加离线消息"""
try:
message_record = {
"message_id": str(uuid.uuid4()),
"user_id": user_id,
"message": message,
"timestamp": message.get("data", {}).get("timestamp", 0),
"created_at": datetime.datetime.now()
}
result = await self.db.insert_one(message_record)
self.logger.debug(f"为用户 {user_id} 添加离线消息,ID: {result.inserted_id}")
except Exception as e:
self.logger.error(f"添加离线消息失败: {e}")
async def get_offline_messages(self, user_id: int) -> List[Dict[str, Any]]:
"""获取用户的离线消息"""
try:
cursor = self.db.find({"user_id": user_id}).sort("timestamp", 1)
messages = await cursor.to_list(length=None)
# 转换为原始消息格式
result = []
for msg in messages:
result.append(msg["message"])
self.logger.debug(f"获取用户 {user_id} 的离线消息,数量: {len(result)}")
return result
except Exception as e:
self.logger.error(f"获取离线消息失败: {e}")
return []
async def clear_offline_messages(self, user_id: int):
"""清空用户的离线消息"""
try:
result = await self.db.delete_many({"user_id": user_id})
self.logger.debug(f"清空用户 {user_id} 的离线消息,删除数量: {result.deleted_count}")
except Exception as e:
self.logger.error(f"清空离线消息失败: {e}")