diff --git a/final_task/README.md b/final_task/README.md index 7af281f..0850de7 100644 --- a/final_task/README.md +++ b/final_task/README.md @@ -1,3 +1,48 @@ -# Your readme here -Some text. -Checkout how to write this file using *markdown*. + +#### This program receives RSS URL and prints results in human-readable format. + +- positional arguments: ++ source RSS URL + +- optional arguments: ++ -h, --help show this help message and exit ++ --version Print version info ++ --json Print result as JSON in stdout ++ --verbose Outputs verbose status messages ++ --limit LIMIT Limit news topics if this parameter provided ++ --date DATE to search in cache for news by date in the format in YYYYmmdd ++ --to-html PATH the conversion of news in html file ++ --to-pdf PATH the conversion of news in pdf file ++ --colorize print news in multi colored format ++ --clear Clears news story + + + +- Installation recommendation rss-reader: +1. Open terminal +2. Enter "pip install setuptools" or "pip3 install setuptools" +3. Go to the folder final_task +4. Enter "python3 setup.py install" +5. Application installed +6. To run the utility, type in the terminal "rss-reader" then a space and url on news +- Example : rss-reader https://news.yahoo.com/rss + +- News caching: ++ In order to see the history you must enter an additional parameter --date ++ Example: rss-reader https://news.tut.by/rss/ --limit 2 --date 20191122 ++ Searching by date and source or only by date + +- Format converter: +1. Use --to-pdf to save news in pdf format +2. Use --to-html to save news in html format +3. If no internet connection, get a file without images +4. Enter the full path to the file +5. If you enter path to directory,news successfully saved to file "your path+News.(pdf or html)" + +- If you enter --colorize,that will print the result of the utility in colorized mode. +- If you enter --colorize with --json,that will print the result of the utility in json in colorized mode. + +- If you enter --clear this will delete all cached news + + + diff --git a/final_task/config.txt b/final_task/config.txt new file mode 100644 index 0000000..d5e77b9 --- /dev/null +++ b/final_task/config.txt @@ -0,0 +1,5 @@ +database postgres +user postgres +password 1 +host localhost +port 5432 \ No newline at end of file diff --git a/final_task/rss_reader/News.py b/final_task/rss_reader/News.py new file mode 100644 index 0000000..f2a8bff --- /dev/null +++ b/final_task/rss_reader/News.py @@ -0,0 +1,53 @@ +from dataclasses import dataclass +import datetime +import logging + +MODULE_LOGGER = logging.getLogger("rss_reader.News") + + +@dataclass +class News: + feed: str + title: str + date: datetime.datetime + link: str + info_about_image: str + briefly_about_news: str + links_from_news: list + + def get_json(self): + """ + returns news in json format + """ + logger = logging.getLogger("rss_reader.News.get_json") + logger.info("return news in json format") + data = { + "Feed": self.feed, + "Title": self.title, + "Date": str(self.date), + "Link": self.link, + "Info about image": self.info_about_image, + "Briefly about news": self.briefly_about_news, + "Links": self.links_from_news + + } + return data + + def __str__(self): + """ + Return a string representation of the news for print in stdout. + """ + logger = logging.getLogger("rss_reader.News.__str__") + logger.info("return str") + links = "" + for index, link in enumerate(self.links_from_news or []): + if link: + links += "[" + str(index) + "] " + link + "\n" + + return f"Feed: {self.feed}\n" \ + f"Title: {self.title} \n" \ + f"Date: {self.date} \n" \ + f"Link: {self.link}\n" \ + f"Info about image: {self.info_about_image}\n" \ + f"Briefly about news: {self.briefly_about_news}\n" \ + f"Links: \n{links}" diff --git a/final_task/rss_reader/__init__.py b/final_task/rss_reader/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/final_task/rss_reader/converter.py b/final_task/rss_reader/converter.py new file mode 100644 index 0000000..888ef3c --- /dev/null +++ b/final_task/rss_reader/converter.py @@ -0,0 +1,209 @@ +import fnmatch +import logging +import os +import textwrap +from io import BytesIO + +import dominate +import requests +from PIL import Image +from dominate import tags +from reportlab.lib.pagesizes import A4 +from reportlab.pdfbase import pdfmetrics +from reportlab.pdfbase.ttfonts import TTFont +from reportlab.pdfgen.canvas import Canvas + +MODULE_LOGGER = logging.getLogger("rss_reader.converter") + + +def get_path(path: str, expansion_file: str) -> str: + """ + Checks the correctness of the entered path + if received path to directory check her on exist + if directory exist add News and expansion file + if received path to file check his on exist and check correctness expansion file + :param path: + :param expansion_file: + :return: + """ + logger = logging.getLogger("rss_reader.converter.get_path") + logger.info("check path") + if os.path.isdir(path): + logger.info("path specified to directory") + result = path + '/News' + expansion_file + else: + if not fnmatch.fnmatch(path, '*%s' % expansion_file): + logger.error("Invalid expansion ") + raise FileNotFoundError(f"Invalid expansion {path}") + if not os.path.isdir(path[:path.rfind("/") + 1]): + logger.error("File or directory not found") + raise FileNotFoundError(f"File or directory not found {path}") + result = path + return result + + +def get_html(list_of_news: list): + """ + Forms html content + :param list_of_news: + :return: + """ + logger = logging.getLogger("rss_reader.converter.get_html") + logger.info("getting html content") + doc = dominate.document(title='RSS READER') + for news in list_of_news: + with doc.head: + tags.link(rel='stylesheet', href='style.css') + tags.script(type='text/javascript', src='script.js') + tags.style("""\ + body { + background-color: #F9F8F1; + color: #2C232A; + font-family: sans-serif; + font-size: 2.6em; + margin: 3em 1em; + } + + """) + + with doc: + with tags.div(id='header'): + tags.p("Feed: ", news.feed) + tags.p("Title: ", news.title) + tags.p("Date ", str(news.date)) + tags.p("Link: ", tags.a(news.link.title(), href=news.link, target="_blank")) + tags.p("Info about image: ", news.info_about_image) + tags.p("Briefly about news: ", news.briefly_about_news) + tags.p("Links: ", ) + for reference in news.links_from_news: + if reference: + tags.li(tags.a(reference.title(), href=reference, target="_blank")) + if news.links_from_news[1]: + tags.a(tags.img( + src=news.links_from_news[1], + width="200", height="200", alt=news.info_about_image), + href=news.links_from_news[1], target="_blank") + logger.info("html content received") + return doc + + +def conversion_of_news_in_html(path, list_of_news): + logger = logging.getLogger("rss_reader.converter.conversion_of_news_in_html") + logger.info("conversion of news in html") + correct_path = get_path(path, ".html") + html_content = get_html(list_of_news) + save_html(correct_path, html_content) + logger.info("conversion of news in html successful completed") + + +def save_html(path, html_content): + """ + Save news in file + :param path: + :param html_content: + :return: + """ + logger = logging.getLogger("rss_reader.converter.save_html") + try: + with open(path, 'w') as file: + file.write(html_content.render()) + print("news successfully saved to file ", path) + logger.info("news successfully saved to file ") + except MemoryError: + logger.error("not enough memory to save html file") + print("You do not have enough memory to save html file") + + +def get_img(image_name, reference): + """ + Download image in file + :param image_name: + :param reference: + :return: True if image successfully downloaded + """ + logger = logging.getLogger("rss_reader.converter.get_img") + logger.info("return img") + is_picture = False + try: + response = requests.get(reference) + img = Image.open(BytesIO(response.content)) + img = img.resize((100, 100)) + img = img.convert('RGB') + img.save(image_name, 'JPEG') + is_picture = True + except requests.exceptions.ConnectionError: + logger = logging.getLogger("rss_reader.converter.get_img") + logger.error("You do not have an internet connection\n" + "your news will be saved in pdf without pictures") + except requests.exceptions.MissingSchema: + logger = logging.getLogger("rss_reader.converter.get_img") + logger.error("Invalid url picture \n") + except OSError: + logger = logging.getLogger("rss_reader.converter.get_img") + logger.error("cannot identify image\n") + return is_picture + + +def text_separator(text: str, break_long_words: bool) -> list: + """ + Breaks text into lines of 50 characters + :param text: + :param break_long_words: + :return: + """ + logger = logging.getLogger("rss_reader.converter.text_separator") + format_text = textwrap.fill(text, width=50, break_long_words=break_long_words) + ls = format_text.split('\n') + logger.info("text successfully broken") + return ls + + +def print_text_in_pdf(canvas, text, x, y): + logger = logging.getLogger("rss_reader.converter.print_list_in_pdf") + logger.info("print list in pdf") + ls = text_separator(text, False) + for lines in ls: + if y < 45: + canvas.showPage() + canvas.setFont('FreeSans', 19) + y = 800 + y -= 25 + canvas.drawString(x, y, lines) + return y - 25 + + +def conversion_of_news_in_pdf(path, list_of_news): + logger = logging.getLogger("rss_reader.converter.conversion_of_news_in_pdf") + correct_path = get_path(path, ".pdf") + canvas = Canvas(correct_path, pagesize=A4) + pdfmetrics.registerFont(TTFont('FreeSans', 'FreeSans.ttf')) + canvas.setFont('FreeSans', 19) + canvas.setTitle("RSS READER") + x = 10 + y = 800 + pdfmetrics.registerFont(TTFont('FreeSans', 'FreeSans.ttf')) + name_buffer_picture_file = "tmp1" + for index, news in enumerate(list_of_news): + name_buffer_picture_file = name_buffer_picture_file[:-1] + str(index) + canvas.setFont('FreeSans', 19) + if get_img(name_buffer_picture_file + '.jpg', news.links_from_news[1]): + y -= 170 + if y < 45: + canvas.showPage() + canvas.setFont('FreeSans', 19) + y = 680 + canvas.drawImage(name_buffer_picture_file + ".jpg", x, y, 150, 150) + os.remove(name_buffer_picture_file + '.jpg') + y -= 40 + y = print_text_in_pdf(canvas, news.feed, x, y) + y = print_text_in_pdf(canvas, news.title, x, y) + y = print_text_in_pdf(canvas, str(news.date), x, y) + y = print_text_in_pdf(canvas, news.link, x, y) + y = print_text_in_pdf(canvas, news.info_about_image, x, y) + y = print_text_in_pdf(canvas, news.briefly_about_news, x, y) + if y < 45: + canvas.showPage() + y = 800 + logger.info("save news in pdf") + canvas.save() + print("news successfully saved to file ", correct_path) diff --git a/final_task/rss_reader/database.py b/final_task/rss_reader/database.py new file mode 100644 index 0000000..95b157f --- /dev/null +++ b/final_task/rss_reader/database.py @@ -0,0 +1,155 @@ +import datetime +import logging +import sqlite3 + +import News +from exceptions import DataBaseEmpty + +MODULE_LOGGER = logging.getLogger("rss_reader.database") + + +def connect_to_database(name_database: str): + logger = logging.getLogger("rss_reader.database.connect_to_database") + logger.info("connecting to database") + con = sqlite3.connect(f"{name_database}") + logger.info("connected to database") + return con + + +def is_table(connect, table_name: str, name_database: str) -> bool: + """ + Checks table existence + :param connect: + :param name_database: + :param table_name: + :return: True or False + """ + logger = logging.getLogger("rss_reader.database.is_table") + logger.info("check exist table") + flag_is_table = True + + cursor = connect.cursor() + + try: + cursor.execute(f"SELECT * FROM {table_name}") + logger.info(" table exist") + except sqlite3.OperationalError: + flag_is_table = False + logger.error("table does not exist") + + return flag_is_table + + +def create_table(con, cursor, name_database_str): + """ + Creates a table NEWS + """ + logger = logging.getLogger("rss_reader.database.create_table") + logger.info("creating table") + if not is_table(con, "NEWS", name_database_str): + cursor.execute('''CREATE TABLE NEWS + (FEED TEXT , + SOURCE_LINK TEXT, + TITLE_OF_NEWS TEXT, + DATA timestamptz, + LINK TEXT , + INFO TEXT, + BRIEFLY TEXT, + LINKS TEXT);''') + con.commit() + logger.info("created table") + + +def write_to(list_news: list, source_link: str, cursor): + """ + Writes news to database + :param list_news: + :param source_link: + :param cursor: + :return: + """ + try: + logger = logging.getLogger("rss_reader.database.write_to") + logger.info("write news") + for news in list_news: + cursor.execute(f"SELECT * FROM NEWS WHERE LINK = ?", (news.link,)) + if not cursor.fetchall(): + # links_in_str = "" + # for link in news.links_from_news: + links_in_str = "\n".join(news.links_from_news) + cursor.execute( + "INSERT INTO NEWS (FEED,SOURCE_LINK,TITLE_OF_NEWS,DATA,LINK,INFO,BRIEFLY,LINKS) " + "VALUES (?,?, ?,?, ?, ?, ?,?)", (news.feed, + source_link, + news.title, + news.date, + news.link, + news.info_about_image, + news.briefly_about_news, + links_in_str,) + + ) + logger = logging.getLogger("rss_reader.database.write_to") + logger.info("end news recording") + except MemoryError: + logger = logging.getLogger("rss_reader.database.write_to") + logger.error("not enough memory") + raise MemoryError("You do not have enough memory to cache") + + +def read_news(list_of_news: list, limit: int, source_link, date_of_news: datetime, cursor): + """ + Read news from database + :param list_of_news: + :param limit: + :param source_link: + :param date_of_news: + :param cursor: + :return: + """ + logger = logging.getLogger("rss_reader.database.read_news") + # the user enter "source_link" + if limit and source_link: + logger.info("reading new from cache with limit") + cursor.execute( + "SELECT * FROM NEWS WHERE date(DATA) = DATE(?) AND SOURCE_LINK = ? LIMIT ?", + (date_of_news, source_link, limit,)) + elif not limit and source_link: + logger.info("reading new from cache without limit") + cursor.execute("SELECT * FROM NEWS WHERE date(DATA) = DATE(?) AND SOURCE_LINK = ?", + (date_of_news, source_link,)) + # the user did not enter "source_link" + if limit and not source_link: + logger.info("reading all news from cache with limit ") + cursor.execute( + "SELECT * FROM NEWS WHERE date(DATA) = DATE(?) LIMIT ?", + (date_of_news, limit,)) + elif not limit and not source_link: + logger.info("reading all news from cache without limit") + cursor.execute("SELECT * FROM NEWS WHERE date(DATA) = DATE(?)", + (date_of_news,)) + + for row in cursor: + links = row[7].split("\n") + news = News.News(feed=row[0], + title=row[2], + date=row[3], + link=row[4], + info_about_image=row[5], + briefly_about_news=row[6], + links_from_news=links) + list_of_news.append(news) + if not list_of_news: + logger.error("story on is empty") + raise DataBaseEmpty(Exception("Your news story on is empty ")) + logger.error("news read successfully") + + +def clear_the_history(connect, name_database, name_table): + logger = logging.getLogger("rss_reader.database.clear_the_history") + if is_table(connect, name_table, name_database): + cursor = connect.cursor() + cursor.execute(f'DELETE FROM {name_table}') + connect.commit() + print('The story is cleared') + logger.info('The story is cleared') diff --git a/final_task/rss_reader/exceptions.py b/final_task/rss_reader/exceptions.py new file mode 100644 index 0000000..761eeb0 --- /dev/null +++ b/final_task/rss_reader/exceptions.py @@ -0,0 +1,7 @@ +class TimeOutExeption(Exception): + pass + + +class DataBaseEmpty(Exception): + pass + diff --git a/final_task/rss_reader/pars_args.py b/final_task/rss_reader/pars_args.py new file mode 100644 index 0000000..77e03f2 --- /dev/null +++ b/final_task/rss_reader/pars_args.py @@ -0,0 +1,56 @@ +import argparse +import sys +import logging + +MODULE_LOGGER = logging.getLogger("rss_reader.pars_args") + + +def create_parser(): + """ function to parse the command line """ + logger = logging.getLogger("rss_reader.create_parser") + logger.info("parse the command line ") + parser = argparse.ArgumentParser( + prog='rss_reader', + description=''' This program receives RSS URL + and prints results in human-readable format.''', + epilog='''Thank you for using this program''' + + ) + + # add information about the expected parameters + # using the add_argument method one call for each parameter). + + parser.add_argument('source', type=str, nargs='?', default="", help='RSS URL') + + parser.add_argument('--version', action='version', help='Print version info', version=f'{5.0}') + + parser.add_argument('--json', action='store_const', const=True, default=False, + help='Print result as JSON in stdout') + + parser.add_argument('--verbose', action='store_const', const=True, default=False, + help='Outputs verbose status messages') + + parser.add_argument('--limit', type=int, metavar='LIMIT', default=None, + help='Limit news topics if this parameter provided') + parser.add_argument('--date', type=str, metavar='DATE', + help='to search in cache for news by date in the format in YYYYmmdd') + parser.add_argument('--to-html', type=str, metavar='PATH', default=None, + help='the conversion of news in html file') + parser.add_argument('--to-pdf', type=str, metavar='PATH', default=None, + help='the conversion of news in pdf file') + parser.add_argument('--colorize', action='store_const', const=True, default=False, + help='print news in multi colored format') + parser.add_argument('--clear', action='store_const', const=True, default=False, + help='Clears news story') + return parser + + +def get_args(): + """ + returns command line arguments + """ + logger = logging.getLogger("rss_reader.get_args") + logger.info("return args command line") + parser = create_parser() + args = parser.parse_args(sys.argv[1:]) + return args diff --git a/final_task/rss_reader/parser_rss.py b/final_task/rss_reader/parser_rss.py new file mode 100644 index 0000000..4a81f24 --- /dev/null +++ b/final_task/rss_reader/parser_rss.py @@ -0,0 +1,145 @@ +import datetime +import html +import logging +import re +import signal +from contextlib import contextmanager +from urllib.error import URLError + +import feedparser +from dateutil import parser + +import News +from exceptions import TimeOutExeption + +MODULE_LOGGER = logging.getLogger("rss_reader.parser_rss") + + +@contextmanager +def timeout_sec(seconds): + """ + Contextmanager to check the expectation of a response + and if the response does not come for a long time, an error + """ + + def signal_handler(signum, frame): + raise TimeOutExeption(Exception('Time out')) + + signal.signal(signal.SIGALRM, signal_handler) + signal.alarm(seconds) + try: + yield + finally: + signal.alarm(0) + + +def valid_date(date_text: str) -> datetime.datetime: + """ + Checks the entered date and and throws an exception + if the date does not match the format + :param date_text: + :return: + """ + try: + date = datetime.datetime.strptime(date_text, '%Y%m%d') + except ValueError: + raise ValueError("Incorrect data format, should be YYYYMMDD") + return date + + +def get_link_image(summary: str) -> str: + """ + Selects a photo link from html + :param summary: + :return: + """ + tag = 'img src=' + begin_position_link_img = summary.find(tag) + len(tag) + 1 + end_position_link_img = summary.find('"', begin_position_link_img) + link = summary[begin_position_link_img:end_position_link_img + 1] + return link + + +def clear_text(text: str) -> str: + """ + Cleans text from problems that occurred when decoding formats + """ + logger = logging.getLogger("rss_reader.parser_rss.clear_text") + logger.info("clear text from news") + return html.unescape(text) + + +def get_info_about_image(summary: str) -> str: + """ + Selects a info about image from html + :param summary: + :return: + """ + logger = logging.getLogger("rss_reader.parser_rss.get_info_about_image") + logger.info("return info about image") + tag = 'alt=' + begin_position_info_about_image = summary.find(tag) + len(tag) + 1 + end_position_info_about_image = summary.find('"', begin_position_info_about_image) + info_about_image = summary[begin_position_info_about_image:end_position_info_about_image] + return clear_text(info_about_image) + + +def get_briefly_about_news(summary: str) -> str: + """ + Selects a info about news from html + :param summary: + :return: + """ + logger = logging.getLogger("rss_reader.parser_rss.get_briefly_about_news") + logger.info("return briefly info about news") + result = re.compile(r'<.*?>') + text = result.sub('', summary) + return clear_text(text) + + +def get_news_feed(sourse_url: str) -> feedparser.parse: + logger = logging.getLogger("rss_reader.parser_rss.get_news_feed") + + with timeout_sec(10): + news_feed = feedparser.parse(sourse_url) + if news_feed['bozo'] != 0: + logger.error(news_feed['bozo_exception'].args[0]) + raise URLError(news_feed['bozo_exception'].args[0]) + logger.info("return news Feed") + return news_feed + + +def init_list_of_news(list_of_news: list, news_feed: feedparser.parse, limit: int): + """ + Fills the list with news + """ + logger = logging.getLogger("rss_reader.parser_rss.init_list_of_news") + logger.info("Fills the list with news") + feed_title = news_feed['feed'].get('title', 'NO TITLE') + feed_title = clear_text(feed_title) + for index, entry in enumerate(news_feed['entries']): + if index == limit: + break + title = entry.get('title', '(NO TITLE') + title = clear_text(title) + summary = entry.get('summary', '(NO SUMMARY)') + date = parser.parse(entry['published']) + link = entry['link'] + info_about_image = get_info_about_image(summary) + briefly_about_news = get_briefly_about_news(summary) + try: + link_on_image = entry.get("media_content")[0]["url"] + except TypeError: + link_on_image = "link not found" + info_about_image = "info about image not found" + news = News.News(feed=feed_title, + title=title, + date=date, + link=link, + info_about_image=info_about_image, + briefly_about_news=briefly_about_news, + links_from_news=[link, link_on_image] + ) + + list_of_news.append(news) + logger.info("list completed successfully") diff --git a/final_task/rss_reader/print_functions.py b/final_task/rss_reader/print_functions.py new file mode 100644 index 0000000..830cb02 --- /dev/null +++ b/final_task/rss_reader/print_functions.py @@ -0,0 +1,135 @@ +import json +import logging +from urllib.error import URLError +import parser_rss +from pars_args import get_args +import colorama +from colorama import Fore, Back, Style +import converter + +MODULE_LOGGER = logging.getLogger("rss_reader.print_functions") + + +def print_news_in_json(list_of_news: list): + """ + Print news in the console in json format + :param list_of_news: + :return: + """ + logger = logging.getLogger("rss_reader.print_functions.print_news_in_json") + logger.info("print news in the console in json format") + list_of_news_in_json = [] + for news in list_of_news: + list_of_news_in_json.append(news.get_json()) + print(json.dumps(list_of_news_in_json, indent=4, ensure_ascii=False)) + + +def print_news_without_cashing(): + """ + If you have problems with the database + user can use the program without caching + :return: + """ + try: + logger = logging.getLogger("rss_reader.print_functions.print_news_without_cashing") + logger.info("print news without cashing") + args = get_args() + list_of_news = [] + news_feed = parser_rss.get_news_feed(args.source) + parser_rss.init_list_of_news(list_of_news, news_feed, args.limit) + if args.json: + if args.colorize: + print_news_in_json_in_multi_colored_format(list_of_news) + else: + print_news_in_json(list_of_news) + else: + if args.colorize: + print_news_in_multi_colored_format(list_of_news) + else: + print_news(list_of_news) + if args.to_html: + converter.conversion_of_news_in_html(args.to_html, list_of_news) + if args.to_pdf: + converter.conversion_of_news_in_pdf(args.to_pdf, list_of_news) + logger.info("print news without cashing completed successfully") + except URLError as er: + logger = logging.getLogger("rss_reader.print_functions.print_news_without_cashing") + logger.error(er) + print(er) + except Exception as e: + logger = logging.getLogger("rss_reader.print_functions.print_news_without_cashing") + logger.error(e) + print(e) + + +def print_news(list_of_news: list): + """ + Print news in the console + :param feed_title: + :param list_of_news: + :return: + """ + logger = logging.getLogger("rss_reader.parser_rss.print_news") + logger.info("print news in the console") + for number, news in enumerate(list_of_news): + print(number + 1) # because number starts at 0 + print(news) + print('-' * 100) + + +def print_news_in_multi_colored_format(list_of_news: list): + """ + Print news in the console in colorized mode + :param list_of_news: + :return: + """ + logger = logging.getLogger("rss_reader.parser_rss.print_news_in_multi_colored_format") + colorama.init() + for number, news in enumerate(list_of_news): + links = "" + for index, link in enumerate(news.links_from_news or []): + links += "[" + str(index) + "] " + link + "\n" + print('\033[1m\033[32m\033[4m' + str(number + 1) + ":") + print(Style.RESET_ALL + Fore.BLUE + f'Feed: {news.feed}') + print(Style.RESET_ALL + Fore.GREEN + f'Title: {news.title}') + print(Style.RESET_ALL + Fore.YELLOW + f'Date: {news.date}') + print(Style.RESET_ALL + Fore.CYAN + f'Link: {news.link}') + print(Style.RESET_ALL + Fore.YELLOW + f'Info about image: {news.info_about_image}') + print(Style.RESET_ALL + Fore.GREEN + f'Briefly about news: {news.briefly_about_news}') + print(Style.RESET_ALL + Fore.CYAN + f'Links: \n{links}') + logger.info("print completed successfully") + + +def print_news_in_json_in_multi_colored_format(list_of_news: list): + """ + Print news in json format in the console in colorized mode + + :param list_of_news: + :return: + """ + logger = logging.getLogger("rss_reader.parser_rss.print_news_in_json_in_multi_colored_format") + result = "\033[1m\033[35m[\033[0m\n" + for number, news in enumerate(list_of_news): + + result += " \033[1m\033[31m{\033[0m\n" + result += f''' \033[1m\033[34m"Feed": "{news.feed}",\033[0m\n''' + result += f''' \033[32m"Title": "{news.title}",\033[0m\n''' + result += f''' \033[33m"Date": "{news.date}",\033[0m\n''' + result += f''' \033[36m"Link": "{news.link}",\033[0m\n''' + result += f''' \033[33m"Info about image": "{news.info_about_image}",\033[0m\n''' + result += f''' \033[32m"Briefly about news": "{news.briefly_about_news}",\033[0m\n''' + result += f''' \033[36m"Links": [\n''' + for index_link, link in enumerate(news.links_from_news): + result += f''' "{link}"''' + if index_link != len(news.links_from_news) - 1: + result += ',\n' + result += '\n' + result += " ]\033[0m\n" + result += " \033[1m\033[31m}\033[0m" + if len(list_of_news) - 1 != number: + result += ',' + result += '\n' + + result += "\033[1m\033[35m]\033[0m" + print(result) + logger.info("print completed successfully") diff --git a/final_task/rss_reader/requirements.txt b/final_task/rss_reader/requirements.txt index e69de29..c9150e5 100644 --- a/final_task/rss_reader/requirements.txt +++ b/final_task/rss_reader/requirements.txt @@ -0,0 +1,8 @@ +feedparser == 5.2.1 +python-dateutil == 2.8.1 +psycopg2-binary == 2.8.4 +dominate == 2.4.0 +Pillow == 6.2.1 +requests == 2.22.0 +reportlab == 3.5.32 +colorama == 0.4.1 \ No newline at end of file diff --git a/final_task/rss_reader/rss_reader.py b/final_task/rss_reader/rss_reader.py index e69de29..c292f92 100644 --- a/final_task/rss_reader/rss_reader.py +++ b/final_task/rss_reader/rss_reader.py @@ -0,0 +1,84 @@ +import logging +import sqlite3 +import sys +from contextlib import closing + +import converter +import database +import pars_args +import parser_rss +import print_functions + + +def main(): + """ + The main entry point of the application + """ + try: + args = pars_args.get_args() + logger = logging.getLogger("rss_reader") + logger.setLevel(logging.INFO) + # create the logging file handler + if not args.verbose: + fh = logging.FileHandler("new_snake.log") + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + fh.setFormatter(formatter) + logger.addHandler(fh) + else: + fh = logging.basicConfig(stream=sys.stdout, + filemode='a', + format='%(asctime)s,%(msecs)d %(name)s %(levelname)s %(message)s', + datefmt='%H:%M:%S', + level=logging.DEBUG) + + # add handler to logger object + + logger.info("Program started") + + with closing(database.connect_to_database('database.db')) as con: + if args.clear: + database.clear_the_history(con, 'database.db', 'NEWS') + else: + cursor = con.cursor() + database.create_table(con, cursor, 'database.db') + args = pars_args.get_args() + list_of_news = [] + if args.date: + date = parser_rss.valid_date(args.date) + database.read_news(list_of_news, args.limit, args.source, date, cursor) + else: + news_feed = parser_rss.get_news_feed(args.source) + parser_rss.init_list_of_news(list_of_news, news_feed, args.limit) + database.write_to(list_of_news, args.source, cursor) + if args.json: + if args.colorize: + print_functions.print_news_in_json_in_multi_colored_format(list_of_news) + else: + print_functions.print_news_in_json(list_of_news) + else: + if args.colorize: + print_functions.print_news_in_multi_colored_format(list_of_news) + else: + print_functions.print_news(list_of_news) + if args.to_html: + converter.conversion_of_news_in_html(args.to_html, list_of_news) + if args.to_pdf: + converter.conversion_of_news_in_pdf(args.to_pdf, list_of_news) + con.commit() + except (sqlite3.OperationalError, MemoryError)as er: + print_functions.print_news_without_cashing() + print("Check your database," + "news is not saved " + "you cannot use --date\n" + ) + print(er) + except parser_rss.TimeOutExeption as e: + print(e) + except Exception as e: + print(e) + except KeyboardInterrupt as key_error: + print("The program is interrupted " + str(key_error)) + + +if __name__ == '__main__': + main() diff --git a/final_task/setup.py b/final_task/setup.py index e69de29..d210cd9 100644 --- a/final_task/setup.py +++ b/final_task/setup.py @@ -0,0 +1,26 @@ +from setuptools import find_namespace_packages, setup + +setup( + name='rss_reader', + version='5.0', + description='RSS reader', + author='Matyushenok Sergey', + author_email='matyushenoksergei@yandex.by', + package_dir={'rss_reader': 'rss_reader'}, + scripts=['rss_reader/News.py', + 'rss_reader/pars_args.py', + 'rss_reader/parser_rss.py', + 'rss_reader/exceptions.py', + 'rss_reader/database.py', + 'rss_reader/converter.py', + 'rss_reader/print_functions.py', + 'rss_reader/rss_reader.py'], + entry_points={ + 'console_scripts': ['rss-reader=rss_reader:main'], + }, + packages=find_namespace_packages(), + install_requires=['feedparser', 'python-dateutil', 'psycopg2-binary', 'dominate', 'Pillow', + 'requests','reportlab','colorama'], + license="none", + platforms="Linux, Windows (not tested)", +) diff --git a/final_task/tests/README.md b/final_task/tests/README.md new file mode 100644 index 0000000..5c74b84 --- /dev/null +++ b/final_task/tests/README.md @@ -0,0 +1,3 @@ +Tests were run using: coverage run -m unittest discover -s final_task/tests/ or + coverage run -m unittest discover + diff --git a/final_task/tests/__init__.py b/final_task/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/final_task/tests/news_feed_for_test.xml b/final_task/tests/news_feed_for_test.xml new file mode 100644 index 0000000..d410183 --- /dev/null +++ b/final_task/tests/news_feed_for_test.xml @@ -0,0 +1,26 @@ + + + + CHANNEL TITLE + + + CHANNEL DESCRIPTION + + CHANNEL LINK + + + ITEM1 TITLE + ITEM1 LINK + 2003-12-31 + + + + ITEM2 TITLE + ITEM2 LINK + 2003-12-31 + + + + + \ No newline at end of file diff --git a/final_task/tests/test_News.py b/final_task/tests/test_News.py new file mode 100644 index 0000000..f76b545 --- /dev/null +++ b/final_task/tests/test_News.py @@ -0,0 +1,44 @@ +import sys + +sys.path.insert(1, 'final_task/rss_reader') +from News import News +import unittest +from dateutil import parser + + +class TestNews(unittest.TestCase): + + def setUp(self): + self.item = News(feed="feed", + title="title", + date=parser.parse("2019-11-17 10:44:20-05:00"), + link="link", + info_about_image="info_about_image", + briefly_about_news="briefly_about_news", + links_from_news=["link", "link_on_image"] + ) + + def test_str(self): + self.assertTrue(str(self.item) == "Feed: feed\n" + "Title: title \n" + "Date: 2019-11-17 10:44:20-05:00 \n" + "Link: link\n" + "Info about image: info_about_image\n" + "Briefly about news: briefly_about_news\n" + "Links: \n" + "[0] link\n" + "[1] link_on_image\n") + + def test_get_json(self): + data = self.item.get_json() + self.assertEqual(data['Feed'], 'feed') + self.assertEqual(data['Title'], 'title') + self.assertEqual(data['Date'], '2019-11-17 10:44:20-05:00') + self.assertEqual(data['Link'], 'link') + self.assertEqual(data['Info about image'], 'info_about_image') + self.assertEqual(data['Briefly about news'], 'briefly_about_news') + self.assertEqual(data['Links'], ['link', 'link_on_image']) + + +if __name__ == '__main__': + unittest.main() diff --git a/final_task/tests/test_converter.py b/final_task/tests/test_converter.py new file mode 100644 index 0000000..e57ede4 --- /dev/null +++ b/final_task/tests/test_converter.py @@ -0,0 +1,96 @@ +from dateutil import parser +import sys + +from News import News + +sys.path.insert(1, 'final_task/rss_reader') +import converter +import unittest + + +class TestConverters(unittest.TestCase): + def test_get_path(self): + with self.assertRaises(FileNotFoundError) as error: + converter.get_path('path_not_exist.txt', '.pdf') + self.assertEqual(str(error.exception), 'Invalid expansion path_not_exist.txt') + with self.assertRaises(FileNotFoundError) as error: + converter.get_path('path_not_exist.pdf', '.pdf') + self.assertEqual(str(error.exception), 'File or directory not found path_not_exist.pdf') + with self.assertRaises(FileNotFoundError) as error: + converter.get_path('path_not_exist.txt', '.html') + self.assertEqual(str(error.exception), 'Invalid expansion path_not_exist.txt') + with self.assertRaises(FileNotFoundError) as error: + converter.get_path('path_not_exist.html', '.html') + self.assertEqual(str(error.exception), 'File or directory not found path_not_exist.html') + + def test_get_img(self): + self.assertEqual(converter.get_img("name", "not link"), False) + self.assertEqual(converter.get_img('name', "https://news.tut.by/rss"), False) + + def test_text_separator(self): + text = "A Utah woman charged with a crime after her stepchildren saw her topless in her own home is " + result = ["A Utah woman charged with a crime after her", + "stepchildren saw her topless in her own home is"] + self.assertEqual(converter.text_separator(text, False), result) + + def test_get_html(self): + dat = parser.parse("2019-11-12 18:21:00+03:00") + link12 = "link" + link_on_image = "link on image" + + links1 = [link12, link_on_image] + news = News(feed="TUT.BY: Новости ТУТ", + title="wcds", + date=dat, + link="link", + info_about_image="uhinjвв", + briefly_about_news="Полпред России в контактной группе Борис Грызлов сообщил", + links_from_news=links1) + + list_of_new = [news, ] + verifiable_info = "\n" \ + "\n" \ + " \n" \ + " RSS READER\n" \ + """ \n""" \ + """ \n""" \ + """ \n""" \ + """ \n""" \ + """ \n""" \ + """ \n""" \ + """ \n""" \ + """""" + + document_of_html = converter.get_html(list_of_new) + self.assertEqual(document_of_html.render(), verifiable_info) + + +if __name__ == '__main__': + unittest.main() diff --git a/final_task/tests/test_database.py b/final_task/tests/test_database.py new file mode 100644 index 0000000..f066e03 --- /dev/null +++ b/final_task/tests/test_database.py @@ -0,0 +1,63 @@ +import sys +from contextlib import closing +from dateutil import parser +import os + +sys.path.insert(1, 'final_task/rss_reader') +import database +import unittest +import exceptions +from News import News + + +class TestNews(unittest.TestCase): + @staticmethod + def delete_database(database_name): + if os.path.isfile(database_name): + os.remove(database_name) + + def setUp(self): + self.item = News(feed="feed", + title="title", + date=parser.parse("2019-11-17 10:44:20-05:00"), + link="link", + info_about_image="info_about_image", + briefly_about_news="briefly_about_news", + links_from_news=["link", "link_on_image"] + ) + + def test_is_table(self): + with closing(database.connect_to_database('database_fail.db'))as con: + self.assertEqual(database.is_table(con, "NEWSFA", 'database_test.db'), False) + if os.path.isfile('database_test.db'): + os.remove('database_test.db') + + def test_read_news(self): + with closing(database.connect_to_database('database_fail.db'))as con: + cursor = con.cursor() + database.create_table(con, cursor, 'database_fail.db') + date = parser.parse("10011001") + list_of_news = [] + with self.assertRaises(exceptions.DataBaseEmpty) as error: + database.read_news(list_of_news, 2, "not link", date, cursor) + self.assertEqual(str(error.exception), 'Your news story on is empty ') + self.delete_database('database_fail.db') + + def test_create_table(self): + with closing(database.connect_to_database('databasefail.db'))as con: + database.create_table(con, con.cursor(), 'databasefail.db') + self.assertTrue(database.is_table(con, "NEWS", 'databasefail.db')) + self.delete_database('databasefail.db') + + def test_write_to(self): + with closing(database.connect_to_database('databasefail_wr.db'))as con: + database.create_table(con, con.cursor(), 'databasefail_wr.db') + database.write_to([self.item, ], self.item.link, con.cursor()) + list_of_news = [] + database.read_news(list_of_news, 1, self.item.link, self.item.date, con.cursor()) + self.assertEqual(len(list_of_news), 1) + self.delete_database('databasefail_wr.db') + + +if __name__ == '__main__': + unittest.main() diff --git a/final_task/tests/test_pars_args.py b/final_task/tests/test_pars_args.py new file mode 100644 index 0000000..e84b89a --- /dev/null +++ b/final_task/tests/test_pars_args.py @@ -0,0 +1,29 @@ +import sys + +sys.path.insert(1, 'final_task/rss_reader') +from pars_args import * +import unittest +import argparse +from unittest import mock # python 3.3+ + + +class TestParsArgs(unittest.TestCase): + @mock.patch('argparse.ArgumentParser.parse_args', + return_value=argparse.Namespace(source='https://news.tut.by/rss/', + version='2.0', + json=False, + verbose=False, + limit=2, + date="20191212")) + def test_command(self, mock_args): + data = get_args() + self.assertEqual(data.source, "https://news.tut.by/rss/") + self.assertEqual(data.version, "2.0") + self.assertEqual(data.json, False) + self.assertEqual(data.verbose, False) + self.assertEqual(data.limit, 2) + self.assertEqual(data.date, "20191212") + + +if __name__ == '__main__': + unittest.main() diff --git a/final_task/tests/test_parser_rss.py b/final_task/tests/test_parser_rss.py new file mode 100644 index 0000000..9ed3ca5 --- /dev/null +++ b/final_task/tests/test_parser_rss.py @@ -0,0 +1,86 @@ +import os +import sys + +sys.path.insert(1, 'final_task/rss_reader') +from parser_rss import * +from News import News +import unittest +import feedparser + + +class TestParserRss(unittest.TestCase): + + def setUp(self): + self.summary = '''

\ +NATO ally expels undercover Russian spy In a rare move,NATO ally Bulgaria has expelled an undercover spy affiliated with \ +the Russian military intelligence service, according to a Western intelligence source.


''' + self.item = News(feed="feed", + title="title", + date=parser.parse("2019-11-17 10:44:20-05:00"), + link="link", + info_about_image="info_about_image", + briefly_about_news="briefly_about_news", + links_from_news=["link", "link_on_image"] + ) + self.result = "1\n" + self.result += "Feed: feed\n" + self.result += "Title: title \n" + self.result += "Date: 2019-11-17 10:44:20-05:00 \n" + self.result += "Link: link\n" + self.result += "Info about image: info_about_image\n" + self.result += "Briefly about news: briefly_about_news\n" + self.result += "Links: \n" + self.result += "[0] link\n" + self.result += "[1] link_on_image\n" + self.result += '\n' + self.result += '-' * 100 + + if os.path.isfile('final_task/tests/news_feed_for_test.xml'): + self.url = 'final_task/tests/news_feed_for_test.xml' + else: + self.url = 'news_feed_for_test.xml' + + self.news_feed = feedparser.parse(self.url) + + def test_clear_text(self): + self.assertEqual(clear_text("'"), "'") + + def test_get_info_about_image(self): + self.assertEqual(get_info_about_image(self.summary), '''NATO ally expels undercover Russian spy ''') + + def test_get_briefly_about_news(self): + self.assertEqual(get_briefly_about_news(self.summary), + '''In a rare move,NATO ally Bulgaria has expelled an undercover ''' + '''spy affiliated with the Russian military intelligence''' + ''' service, according to a Western intelligence source.''') + + def test_valid_date(self): + self.assertEqual(str(valid_date("20191211")), "2019-12-11 00:00:00") + with self.assertRaises(ValueError) as error: + valid_date("dfgh") + self.assertEqual(str(error.exception), 'Incorrect data format, should be YYYYMMDD') + with self.assertRaises(ValueError) as error: + valid_date("20102111") + self.assertEqual(str(error.exception), 'Incorrect data format, should be YYYYMMDD') + + def test_get_news_feed(self): + with self.assertRaises(URLError) as error: + get_news_feed("wcxqa") + self.assertEqual(str(error.exception), '') + with self.assertRaises(URLError) as error: + get_news_feed(" https://news.tut.by/") + self.assertEqual(str(error.exception), '') + self.assertEqual(get_news_feed(self.url), self.news_feed) + + def test_init_list_of_news(self): + + list_of_news = [] + init_list_of_news(list_of_news, self.news_feed, 2) + self.assertEqual(len(list_of_news), 2) + + +if __name__ == '__main__': + unittest.main() diff --git a/final_task/tests/test_print_functions.py b/final_task/tests/test_print_functions.py new file mode 100644 index 0000000..2174cbe --- /dev/null +++ b/final_task/tests/test_print_functions.py @@ -0,0 +1,106 @@ +import sys +import os + +sys.path.insert(1, 'final_task/rss_reader') +from parser_rss import * +from News import News + +from io import StringIO +from unittest.mock import patch +import feedparser +import print_functions +import unittest + + +class TestPrintFunctios(unittest.TestCase): + + def setUp(self): + self.item = News(feed="feed", + title="title", + date=parser.parse("2019-11-17 10:44:20-05:00"), + link="link", + info_about_image="info_about_image", + briefly_about_news="briefly_about_news", + links_from_news=["link", "link_on_image"] + ) + self.result = "1\n" + self.result += "Feed: feed\n" + self.result += "Title: title \n" + self.result += "Date: 2019-11-17 10:44:20-05:00 \n" + self.result += "Link: link\n" + self.result += "Info about image: info_about_image\n" + self.result += "Briefly about news: briefly_about_news\n" + self.result += "Links: \n" + self.result += "[0] link\n" + self.result += "[1] link_on_image\n" + self.result += '\n' + self.result += '-' * 100 + + if os.path.isfile('final_task/tests/news_feed_for_test.xml'): + self.url = 'final_task/tests/news_feed_for_test.xml' + else: + self.url = 'news_feed_for_test.xml' + + self.news_feed = feedparser.parse(self.url) + + def test_print_news(self): + with patch('sys.stdout', new=StringIO()) as fake_out_put: + print_functions.print_news([self.item, ]) + self.assertEqual(fake_out_put.getvalue().strip(), self.result) + + def test_print_news_in_json(self): + self.result = "[\n" + self.result += " {\n" + self.result += ''' "Feed": "feed",\n''' + self.result += ''' "Title": "title",\n''' + self.result += ''' "Date": "2019-11-17 10:44:20-05:00",\n''' + self.result += ''' "Link": "link",\n''' + self.result += ''' "Info about image": "info_about_image",\n''' + self.result += ''' "Briefly about news": "briefly_about_news",\n''' + self.result += ''' "Links": [\n''' + self.result += ''' "link",\n''' + self.result += ''' "link_on_image"\n''' + self.result += " ]\n" + self.result += " }\n" + self.result += "]" + with patch('sys.stdout', new=StringIO()) as fake_out_put: + print_functions.print_news_in_json([self.item, ]) + self.assertEqual(fake_out_put.getvalue().strip(), self.result) + + def test_print_news_in_json_in_multi_colored_format(self): + self.result = "\033[1m\033[35m[\033[0m\n" + self.result += " \033[1m\033[31m{\033[0m\n" + self.result += ''' \033[1m\033[34m"Feed": "feed",\033[0m\n''' + self.result += ''' \033[32m"Title": "title",\033[0m\n''' + self.result += ''' \033[33m"Date": "2019-11-17 10:44:20-05:00",\033[0m\n''' + self.result += ''' \033[36m"Link": "link",\033[0m\n''' + self.result += ''' \033[33m"Info about image": "info_about_image",\033[0m\n''' + self.result += ''' \033[32m"Briefly about news": "briefly_about_news",\033[0m\n''' + self.result += ''' \033[36m"Links": [\n''' + self.result += ''' "link",\n''' + self.result += ''' "link_on_image"\n''' + self.result += " ]\033[0m\n" + self.result += " \033[1m\033[31m}\033[0m\n" + self.result += "\033[1m\033[35m]\033[0m" + with patch('sys.stdout', new=StringIO()) as fake_out_put: + print_functions.print_news_in_json_in_multi_colored_format([self.item, ]) + self.assertEqual(fake_out_put.getvalue().strip(), self.result) + + def test_print_news_in_multi_colored_format(self): + self.result = "1:\n" + self.result += "Feed: feed\n" + self.result += "Title: title\n" + self.result += "Date: 2019-11-17 10:44:20-05:00\n" + self.result += "Link: link\n" + self.result += "Info about image: info_about_image\n" + self.result += "Briefly about news: briefly_about_news\n" + self.result += "Links: \n" + self.result += "[0] link\n" + self.result += "[1] link_on_image" + with patch('sys.stdout', new=StringIO()) as fake_out_put: + print_functions.print_news_in_multi_colored_format([self.item, ]) + self.assertEqual(fake_out_put.getvalue().strip(), self.result) + + +if __name__ == '__main__': + unittest.main()