diff --git a/reddit2telegram/channels/tech_receiver/app.py b/reddit2telegram/channels/tech_receiver/app.py index c835c15f..05ede0c1 100644 --- a/reddit2telegram/channels/tech_receiver/app.py +++ b/reddit2telegram/channels/tech_receiver/app.py @@ -5,46 +5,222 @@ import utils from utils import SupplyResult -from utils.tech import get_dev_channel, short_sleep +from utils.tech import get_dev_channel subreddit = 'all' t_channel = '@r_channels' -SETTING_NAME = 1 +LAST_UPDATE_SETTING = 1 +STATE_SETTING_PREFIX = 'tech_receiver_state:' -def send_post(submission, r2t): +def _load_config(): config_filename = 'configs/prod.yml' with open(config_filename) as config_file: - config = yaml.safe_load(config_file.read()) + return yaml.safe_load(config_file.read()) + + +def _get_settings_collection(config): settings = pymongo.MongoClient(host=config['db']['host'])[config['db']['name']]['settings'] - settings.ensure_index([('setting', pymongo.ASCENDING)]) + settings.create_index([('setting', pymongo.ASCENDING)], unique=True) + return settings + + +def _state_setting_name(user_id): + return '{}{}'.format(STATE_SETTING_PREFIX, user_id) + + +def _load_state(settings, user_id): + state_doc = settings.find_one({'setting': _state_setting_name(user_id)}) + if state_doc is None: + return None + return state_doc.get('data') + + +def _save_state(settings, user_id, data): + settings.update_one( + {'setting': _state_setting_name(user_id)}, + {'$set': {'data': data}}, + upsert=True + ) + + +def _clear_state(settings, user_id): + settings.delete_one({'setting': _state_setting_name(user_id)}) + - last_update_doc = settings.find_one({ - 'setting': SETTING_NAME, - }) +def _send_reply(user_id, text, config): + utils.Reddit2TelegramSender(user_id, config).send_text(text) + +def _help_text(): + return ( + 'Send channel data in one message:\n' + 'subreddit: https://reddit.com/r/example\n' + 'channel: @example_channel\n' + 'tags: #tag1 #tag2 #tag3\n\n' + 'Or send it step by step:\n' + '1. subreddit\n' + '2. channel\n' + '3. tags\n\n' + 'Commands:\n' + '/newchannel to restart\n' + '/cancel to clear the current draft' + ) + + +def _parse_complete_request(text): + lines = [line.strip() for line in text.splitlines() if line.strip()] + if not lines: + return None + + if lines[0].lower() in ('/newchannel', '/start'): + lines = lines[1:] + + parsed = {} + for line in lines: + lowered = line.lower() + if lowered.startswith('subreddit'): + parsed['subreddit'] = utils.channels_stuff.normalize_subreddit_name(line) + elif lowered.startswith('channel'): + parsed['channel'] = utils.channels_stuff.normalize_channel_name(line) + elif lowered.startswith('tags'): + parsed['tags'] = utils.channels_stuff.normalize_tags(line) + + if all(parsed.get(key) for key in ('subreddit', 'channel', 'tags')): + return parsed + + if len(lines) == 3: + subreddit_name = utils.channels_stuff.normalize_subreddit_name(lines[0]) + channel_name = utils.channels_stuff.normalize_channel_name(lines[1]) + tags = utils.channels_stuff.normalize_tags(lines[2]) + if subreddit_name and channel_name and tags: + return { + 'subreddit': subreddit_name, + 'channel': channel_name, + 'tags': tags, + } + return None + + +def process_message(text, user_id, settings, config, send_reply=None): + if send_reply is None: + send_reply = lambda target_user_id, reply_text: _send_reply(target_user_id, reply_text, config) + + stripped_text = text.strip() + lowered = stripped_text.lower() + + if lowered in ('/help', 'help'): + send_reply(user_id, _help_text()) + return + + if lowered in ('/cancel', 'cancel'): + _clear_state(settings, user_id) + send_reply(user_id, 'Draft cleared.') + return + + if lowered in ('/newchannel', '/start'): + _clear_state(settings, user_id) + _save_state(settings, user_id, {'step': 'await_subreddit'}) + send_reply(user_id, 'Send subreddit name or Reddit URL.') + return + + complete_request = _parse_complete_request(stripped_text) + if complete_request is not None: + existing = utils.channels_stuff.get_channel_doc(complete_request['channel']) + utils.channels_stuff.upsert_channel( + complete_request['channel'], + subreddit=complete_request['subreddit'], + tags=complete_request['tags'], + ) + _clear_state(settings, user_id) + action = 'Updated' if existing is not None else 'Saved' + send_reply( + user_id, + '{} channel:\nsubreddit: r/{}\nchannel: @{}\ntags: {}'.format( + action, + complete_request['subreddit'], + complete_request['channel'], + complete_request['tags'], + ) + ) + return + + state = _load_state(settings, user_id) + if state is None: + state = {'step': 'await_subreddit'} + + if state['step'] == 'await_subreddit': + subreddit_name = utils.channels_stuff.normalize_subreddit_name(stripped_text) + if not subreddit_name: + send_reply(user_id, _help_text()) + return + _save_state(settings, user_id, { + 'step': 'await_channel', + 'subreddit': subreddit_name, + }) + send_reply(user_id, 'Saved subreddit: r/{}\nNow send channel (@name or t.me link).'.format(subreddit_name)) + return + + if state['step'] == 'await_channel': + channel_name = utils.channels_stuff.normalize_channel_name(stripped_text) + if not channel_name: + send_reply(user_id, 'Channel is invalid. Send @channel_name or https://t.me/channel_name') + return + state['step'] = 'await_tags' + state['channel'] = channel_name + _save_state(settings, user_id, state) + send_reply(user_id, 'Saved channel: @{}\nNow send tags like #tag1 #tag2 #tag3'.format(channel_name)) + return + + if state['step'] == 'await_tags': + tags = utils.channels_stuff.normalize_tags(stripped_text) + if not tags: + send_reply(user_id, 'Tags are invalid. Send tags like #tag1 #tag2 #tag3') + return + existing = utils.channels_stuff.get_channel_doc(state['channel']) + utils.channels_stuff.upsert_channel( + state['channel'], + subreddit=state['subreddit'], + tags=tags, + ) + _clear_state(settings, user_id) + action = 'Updated' if existing is not None else 'Saved' + send_reply( + user_id, + '{} channel:\nsubreddit: r/{}\nchannel: @{}\ntags: {}'.format( + action, + state['subreddit'], + state['channel'], + tags, + ) + ) + return + + _clear_state(settings, user_id) + send_reply(user_id, _help_text()) + + +def send_post(submission, r2t): + config = _load_config() + settings = _get_settings_collection(config) + + last_update_doc = settings.find_one({'setting': LAST_UPDATE_SETTING}) if last_update_doc is None: - last_update_doc = { - 'last_update': 0 - } + last_update_doc = {'last_update': 0} settings.insert_one({ - 'setting': SETTING_NAME, + 'setting': LAST_UPDATE_SETTING, 'last_update': 0 }) - updates = r2t.get_updates(offset=last_update_doc['last_update']) + last_update = last_update_doc['last_update'] + updates = r2t.get_updates(offset=last_update + 1) - last_update = 0 for update in updates: - # print(update) update = update.to_dict() - # short_sleep() - if 'qwerrty' in str(update): - print(update) - last_update = update['update_id'] + last_update = max(last_update, update['update_id']) if 'message' not in update: continue if 'chat' not in update['message']: @@ -52,35 +228,24 @@ def send_post(submission, r2t): if 'text' not in update['message']: continue - # print(update) - user_id = update['message']['chat']['id'] if not isinstance(user_id, int) or user_id < 0: continue message_id = update['message']['message_id'] r2t.forward_message(chat_id=get_dev_channel(), from_chat_id=user_id, message_id=message_id) - if int(update['message']['chat']['id']) == int(config['telegram']['papa']): - # print('>>>>>>>>>>>>>>>>>^^^^^^^^^^^^^^') - text = update['message']['text'] - lines = text.split('\n') - if 'please' not in lines[0].lower(): - continue - new_channel_name = lines[1].split(': ')[-1] - new_subreddit = lines[2].split('/')[-1] - new_tags = lines[3].split(': ')[-1] - utils.channels_stuff.set_new_channel(new_channel_name, subreddit=new_subreddit, tags=new_tags) + if int(user_id) == int(config['telegram']['papa']): + process_message(update['message']['text'], user_id, settings, config) settings.find_one_and_update( { - 'setting': SETTING_NAME + 'setting': LAST_UPDATE_SETTING }, { - '$set': + '$set': { 'last_update': last_update } } ) - # It's not a proper supply, so just stop. return SupplyResult.STOP_THIS_SUPPLY diff --git a/reddit2telegram/utils/channels_stuff.py b/reddit2telegram/utils/channels_stuff.py index cd916114..7e70b7c9 100644 --- a/reddit2telegram/utils/channels_stuff.py +++ b/reddit2telegram/utils/channels_stuff.py @@ -10,6 +10,8 @@ CHANNELS_COLLECTION = 'channels' _SIMPLE_SEND_RE = re.compile(r'^\s*return\s+r2t\.send_simple\(submission\)\s*$') +_SUBREDDIT_URL_RE = re.compile(r'(?:https?://)?(?:www\.)?reddit\.com/r/([^/?#\s]+)', re.IGNORECASE) +_CHANNEL_URL_RE = re.compile(r'(?:https?://)?(?:t(?:elegram)?\.me/)([^/?#\s]+)', re.IGNORECASE) def get_config(config_filename=None): @@ -69,24 +71,88 @@ def import_submodule(submodule_name): return DefaultChannel(submodule_name) -def set_new_channel(channel, **kwargs): - channel = channel.replace('@', '') - channels = _get_channels_collection() - is_any = channels.find_one({'submodule': channel.lower()}) - if is_any is not None: - return - details = { - 'submodule': channel.lower(), - 'channel': '@' + channel, - 'subreddit': kwargs['subreddit'], - 'tags': kwargs['tags'].lower(), +def normalize_subreddit_name(subreddit): + subreddit = subreddit.strip() + subreddit = re.sub(r'^\s*subreddit\s*:\s*', '', subreddit, flags=re.IGNORECASE) + match = _SUBREDDIT_URL_RE.search(subreddit) + if match: + subreddit = match.group(1) + else: + subreddit = subreddit.strip().strip('/') + if subreddit.lower().startswith('r/'): + subreddit = subreddit[2:] + if (not subreddit) or any(char.isspace() for char in subreddit): + return '' + subreddit = subreddit.strip().strip('/') + return subreddit + + +def normalize_channel_name(channel): + channel = channel.strip() + channel = re.sub(r'^\s*channel\s*:\s*', '', channel, flags=re.IGNORECASE) + match = _CHANNEL_URL_RE.search(channel) + if match: + channel = match.group(1) + channel = channel.strip().strip('/') + if channel.startswith('@'): + channel = channel[1:] + if (not channel) or any(char.isspace() for char in channel): + return '' + return channel + + +def normalize_tags(tags): + tags = tags.strip() + tags = re.sub(r'^\s*tags\s*:\s*', '', tags, flags=re.IGNORECASE) + normalized = [] + seen = set() + for raw_part in re.split(r'[\s,]+', tags): + part = raw_part.strip() + if not part: + continue + if not part.startswith('#'): + part = '#' + part.lstrip('#') + if part not in seen: + normalized.append(part) + seen.add(part) + return ' '.join(normalized) + + +def _build_channel_details(channel, **kwargs): + normalized_channel = normalize_channel_name(channel) + normalized_subreddit = normalize_subreddit_name(kwargs['subreddit']) + normalized_tags = normalize_tags(kwargs['tags']) + return { + 'submodule': normalized_channel.lower(), + 'channel': '@' + normalized_channel, + 'subreddit': normalized_subreddit, + 'tags': normalized_tags, 'min_upvotes_limit': kwargs.get('min_upvotes_limit', None), 'submissions_ranking': kwargs.get('submissions_ranking', 'hot'), 'submissions_limit': kwargs.get('submissions_limit', 100) } + + +def set_new_channel(channel, **kwargs): + channels = _get_channels_collection() + details = _build_channel_details(channel, **kwargs) + is_any = channels.find_one({'submodule': details['submodule']}) + if is_any is not None: + return channels.insert_one(details) +def upsert_channel(channel, **kwargs): + channels = _get_channels_collection() + details = _build_channel_details(channel, **kwargs) + channels.update_one( + {'submodule': details['submodule']}, + {'$set': details}, + upsert=True + ) + return channels.find_one({'submodule': details['submodule']}) + + class DefaultChannel(object): '''docstring for DefaultChannel''' def __init__(self, submodule): diff --git a/tests/test_tech_receiver.py b/tests/test_tech_receiver.py new file mode 100644 index 00000000..53192220 --- /dev/null +++ b/tests/test_tech_receiver.py @@ -0,0 +1,173 @@ +from pathlib import Path +import sys + + +REPO_ROOT = Path(__file__).resolve().parent.parent +APP_DIR = REPO_ROOT / 'reddit2telegram' +sys.path.insert(0, str(APP_DIR)) + +from channels.tech_receiver import app as tech_receiver + + +class FakeSettings: + def __init__(self): + self.docs = {} + + def create_index(self, *args, **kwargs): + return None + + def find_one(self, selector): + doc = self.docs.get(selector['setting']) + if doc is None: + return None + return dict(doc) + + def insert_one(self, doc): + self.docs[doc['setting']] = dict(doc) + + def update_one(self, selector, update, upsert=False): + key = selector['setting'] + doc = self.docs.get(key) + if doc is None: + if not upsert: + return + doc = {'setting': key} + self.docs[key] = doc + doc.update(update.get('$set', {})) + + def delete_one(self, selector): + self.docs.pop(selector['setting'], None) + + def find_one_and_update(self, selector, update): + self.update_one(selector, update, upsert=False) + + +def test_process_message_step_by_step_flow_saves_channel(monkeypatch): + settings = FakeSettings() + replies = [] + upserts = [] + + monkeypatch.setattr( + tech_receiver.utils.channels_stuff, + 'get_channel_doc', + lambda channel: None + ) + monkeypatch.setattr( + tech_receiver.utils.channels_stuff, + 'upsert_channel', + lambda channel, **kwargs: upserts.append((channel, kwargs)) + ) + + send_reply = lambda user_id, text: replies.append((user_id, text)) + + tech_receiver.process_message( + 'https://www.reddit.com/r/space/', + 1882084, + settings, + {'telegram': {'papa': '1882084'}}, + send_reply=send_reply, + ) + tech_receiver.process_message( + 'https://t.me/r_space', + 1882084, + settings, + {'telegram': {'papa': '1882084'}}, + send_reply=send_reply, + ) + tech_receiver.process_message( + '#space #astronomy #nasa', + 1882084, + settings, + {'telegram': {'papa': '1882084'}}, + send_reply=send_reply, + ) + + assert upserts == [ + ( + 'r_space', + { + 'subreddit': 'space', + 'tags': '#space #astronomy #nasa', + } + ) + ] + assert settings.find_one({'setting': tech_receiver._state_setting_name(1882084)}) is None + assert replies[0][1] == 'Saved subreddit: r/space\nNow send channel (@name or t.me link).' + assert replies[1][1] == 'Saved channel: @r_space\nNow send tags like #tag1 #tag2 #tag3' + assert replies[2][1] == ( + 'Saved channel:\n' + 'subreddit: r/space\n' + 'channel: @r_space\n' + 'tags: #space #astronomy #nasa' + ) + + +def test_process_message_single_message_updates_existing_channel(monkeypatch): + settings = FakeSettings() + replies = [] + upserts = [] + + monkeypatch.setattr( + tech_receiver.utils.channels_stuff, + 'get_channel_doc', + lambda channel: {'submodule': channel.lower()} + ) + monkeypatch.setattr( + tech_receiver.utils.channels_stuff, + 'upsert_channel', + lambda channel, **kwargs: upserts.append((channel, kwargs)) + ) + + tech_receiver.process_message( + 'subreddit: https://www.reddit.com/r/space/\n' + 'channel: @r_space\n' + 'tags: #space #astronomy #nasa', + 1882084, + settings, + {'telegram': {'papa': '1882084'}}, + send_reply=lambda user_id, text: replies.append(text), + ) + + assert upserts == [ + ( + 'r_space', + { + 'subreddit': 'space', + 'tags': '#space #astronomy #nasa', + } + ) + ] + assert replies == [ + 'Updated channel:\n' + 'subreddit: r/space\n' + 'channel: @r_space\n' + 'tags: #space #astronomy #nasa' + ] + + +def test_send_post_preserves_last_update_when_there_are_no_updates(monkeypatch): + settings = FakeSettings() + settings.insert_one({'setting': tech_receiver.LAST_UPDATE_SETTING, 'last_update': 42}) + + monkeypatch.setattr( + tech_receiver, + '_load_config', + lambda: { + 'db': {'host': 'localhost', 'name': 'reddit2telegram'}, + 'telegram': {'papa': '1882084'} + } + ) + monkeypatch.setattr(tech_receiver, '_get_settings_collection', lambda config: settings) + + class FakeR2T: + def get_updates(self, **kwargs): + assert kwargs['offset'] == 43 + return [] + + def forward_message(self, **kwargs): + raise AssertionError('No messages should be forwarded when there are no updates.') + + result = tech_receiver.send_post(None, FakeR2T()) + + assert result == tech_receiver.SupplyResult.STOP_THIS_SUPPLY + assert settings.find_one({'setting': tech_receiver.LAST_UPDATE_SETTING})['last_update'] == 42