This repository was archived by the owner on Oct 8, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmain.py
More file actions
executable file
·206 lines (186 loc) · 8.39 KB
/
main.py
File metadata and controls
executable file
·206 lines (186 loc) · 8.39 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
from pkg.plugin.context import register, handler, BasePlugin, APIHost, EventContext
from pkg.plugin.events import *
import re
import requests
from typing import Dict, Tuple, Optional
from pkg.platform.types import *
# from .card_ui import CardGenerator
@register(
name="LinkAnalysis",
description="解析哔哩哔哩、GitHub、Gitee等多种链接并展示信息",
version="0.75",
author="sheetung"
)
class MyPlugin(BasePlugin):
def __init__(self, host: APIHost):
"""初始化时注册所有支持的链接类型"""
self.link_handlers = {
"bilibili": {
"patterns": [
r"www\.bilibili\.com/video/(BV\w+)", # 标准链接
r"b23\.tv/(BV\w+)", # 短链接
r"www\.bilibili\.com/video/av(\d+)" # 旧版av号
],
"handler": self.handle_bilibili
},
"github": {
"patterns": [r"github\.com/([^/]+)/([^/?#]+)"], # 用户/仓库
"handler": self.handle_github
},
"gitee": {
"patterns": [r"gitee\.com/([^/]+)/([^/?#]+)"], # 用户/仓库
"handler": self.handle_gitee
}
}
# self.ui_generator = CardGenerator(
# wkhtml_path="data/plugins/LinkAnalysis/wkhtmltoimage", # 可选自定义路径
# temp_dir="data/plugins/LinkAnalysis/temp" # 可选自定义目录
# )
# 修改后的数字格式化方法 - 只使用K单位
def _format_count(self, count: int) -> str:
"""格式化数字为K单位(不使用M)"""
if count >= 1000:
# 对于1000的整数倍显示整数,否则显示1位小数
if count % 1000 == 0:
return f"{count//1000}K"
return f"{count/1000:.1f}K"
return str(count)
@handler(PersonMessageReceived)
@handler(GroupMessageReceived)
async def message_handler(self, ctx: EventContext):
"""消息处理入口"""
msg = str(ctx.event.message_chain).strip()
launcher_id = str(ctx.event.launcher_id)
launcher_type = str(ctx.event.launcher_type)
# 获取黑/白名单
# 临时适配langbot4.0特性
pipeline_data = getattr(self.ap.pipeline_cfg, 'data', None)
if not pipeline_data:
# add langbot 4.0 适配
try:
mode = ctx.event.query.pipeline_config['trigger']['access-control']['mode']
sess_list = ctx.event.query.pipeline_config['trigger']['access-control'][mode]
except Exception as e:
logger.error(f"无法从 ctx 获取 access-control 设置: {e}")
return
else:
mode = pipeline_data['access-control']['mode']
sess_list = pipeline_data['access-control'][mode]
found = False
if (launcher_type== 'group' and 'group_*' in sess_list) \
or (launcher_type == 'person' and 'person_*' in sess_list):
found = True
else:
for sess in sess_list:
if sess == f"{launcher_type}_{launcher_id}":
found = True
break
ctn = False
if mode == 'whitelist':
ctn = found
else:
ctn = not found
if not ctn:
# print(f'您被杀了哦')
return
if 'plugin' in msg:
return
for platform in self.link_handlers.values(): # 遍历所有支持平台
match = self._match_link(msg, platform["patterns"])
if match:
await platform["handler"](ctx, match)
ctx.prevent_default()
ctx.prevent_postorder()
return # 匹配成功后立即退出
def _match_link(self, msg: str, patterns: list) -> Optional[re.Match]:
"""同一平台匹配多个正则"""
for pattern in patterns:
if match := re.search(pattern, msg):
return match
return None
# -------------------------- 各平台处理逻辑 --------------------------
async def handle_bilibili(self, ctx: EventContext, match: re.Match):
"""B站视频解析逻辑"""
id_type = "BV" if "BV" in match.group(0) else "av"
video_id = match.group(1) # 从正则捕获组提取ID
# 调用B站API获取信息
api_url = (
f"https://api.bilibili.com/x/web-interface/view?bvid={video_id}"
if id_type == "BV" else
f"https://api.bilibili.com/x/web-interface/view?aid={video_id}"
)
try:
resp = requests.get(api_url, headers={"User-Agent": "Mozilla/5.0"})
data = resp.json()
if data["code"] != 0:
raise ValueError("Bilibili API error")
video_data = data['data']
stat_data = video_data['stat'] # 核心统计数据
description = video_data.get('desc') or video_data.get('dynamic', '')
if isinstance(description, str) and len(description) > 0:
description = f"📝 描述:{description[:97]}..." if len(description) > 100 else f"📝 描述:{description}"
else:
description = None
message_b = [
f"🎐 标题:{video_data['title']}",
f"😃 UP主:{video_data['owner']['name']}"
]
# 添加描述(只有存在时显示)
if description:
message_b.append(description.replace("\n", ""))
# 使用格式化后的点赞/投币/收藏数据
message_b.extend([
f"💖 点赞:{self._format_count(stat_data.get('like', 0))} ",
f"🪙 投币:{self._format_count(stat_data.get('coin', 0))} ",
f"✨ 收藏:{self._format_count(stat_data.get('favorite', 0))}",
f"🌐 链接:https://www.bilibili.com/video/{video_id}"
])
message_b_chain = MessageChain([Plain(text="\n".join(message_b))])
message_b_chain.insert(0,Image(url=video_data['pic']))
await ctx.send_message(ctx.event.launcher_type, str(ctx.event.launcher_id), message_b_chain)
except Exception as e:
await ctx.send_message(ctx.event.launcher_type, str(ctx.event.launcher_id), MessageChain([Plain(f"视频解析失败")]))
async def handle_github(self, ctx: EventContext, match: re.Match):
"""GitHub仓库解析逻辑"""
await self._handle_git_repo(ctx, match.groups(), "GitHub",
api_template="https://api.github.com/repos/{owner}/{repo}")
async def handle_gitee(self, ctx: EventContext, match: re.Match):
"""Gitee仓库解析逻辑"""
await self._handle_git_repo(ctx, match.groups(), "Gitee",
api_template="https://gitee.com/api/v5/repos/{owner}/{repo}")
async def _handle_git_repo(self, ctx: EventContext,
groups: Tuple[str],
platform: str,
api_template: str):
"""Git平台通用解析逻辑"""
owner, repo = groups
try:
resp = requests.get(
api_template.format(owner=owner, repo=repo),
headers={"User-Agent": "Mozilla/5.0"},
timeout=10
)
data = resp.json()
# 使用格式化后的数字
stars = self._format_count(data.get('stargazers_count', 0))
forks = self._format_count(data.get('forks_count', 0))
# watchers = self._format_count(data.get('watchers_count', 0))
message_git = [
"━" * 3,
f"📦 {platform} 仓库:{data['name']}",
f"📄 描述:{data.get('description', '暂无')}",
f"⭐ Stars: {stars}",
f"🍴 Forks: {forks}",
"━" * 3,
f"🌐 链接:{data['html_url']}"
]
await ctx.send_message(
ctx.event.launcher_type,
str(ctx.event.launcher_id),
MessageChain([Plain(text="\n".join(message_git))])
)
except Exception as e:
await ctx.send_message(ctx.event.launcher_type, str(ctx.event.launcher_id), MessageChain([Plain(f"仓库信息获取失败")]))
def __del__(self):
"""清理资源"""
pass