From 71a2165b5660c04bdc66ff34b915b65bbbead2a4 Mon Sep 17 00:00:00 2001 From: Jai Date: Tue, 5 Mar 2024 11:39:50 +0530 Subject: [PATCH] segregration of postgres and mysql connection services --- connectors/sources/api_import.py | 54 ++++ connectors/sources/mysql.py | 159 ++++++++++++ connectors/sources/postgres.py | 157 ++++++++++++ connectors/utils.py | 65 +++++ participant/views.py | 409 +++++++------------------------ 5 files changed, 518 insertions(+), 326 deletions(-) create mode 100644 connectors/sources/api_import.py create mode 100644 connectors/sources/mysql.py create mode 100644 connectors/sources/postgres.py create mode 100644 connectors/utils.py diff --git a/connectors/sources/api_import.py b/connectors/sources/api_import.py new file mode 100644 index 00000000..e730deb5 --- /dev/null +++ b/connectors/sources/api_import.py @@ -0,0 +1,54 @@ +from connectors.utils import create_dataset_v2_for_data_import +from core import settings +from utils import file_operations as file_ops +import requests +import json + +from django.http import JsonResponse +from rest_framework import status +import logging + +from rest_framework.response import Response + +LOGGER = logging.getLogger(__name__) + + +def import_using_api_endpoint(auth_type, request, url, dataset, source, file_name, dataset_name): + if auth_type == 'NO_AUTH': + response = requests.get(url) + elif auth_type == 'API_KEY': + headers = {request.data.get( + "api_key_name"): request.data.get("api_key_value")} + response = requests.get(url, headers=headers) + elif auth_type == 'BEARER': + headers = {"Authorization": "Bearer " + + request.data.get("token")} + response = requests.get(url, headers=headers) + + # response = requests.get(url) + if response.status_code in [200, 201]: + try: + data = response.json() + except ValueError: + data = response.text + + file_path = file_ops.create_directory( + settings.DATASET_FILES_URL, [dataset_name, source]) + with open(file_path + "/" + file_name + ".json", "w") as outfile: + if type(data) == list: + json.dump(data, outfile) + else: + outfile.write(json.dumps(data)) + + LOGGER.error("Fetch OK") + + # result = os.listdir(file_path) + serializer = create_dataset_v2_for_data_import( + dataset=dataset, source=source, dataset_name=dataset_name, + file_name=file_name + ) + return JsonResponse(serializer.data, status=status.HTTP_200_OK) + + else: + LOGGER.error("Failed to fetch data from api") + return Response({"message": f"API Response: {response.json()}"}, status=status.HTTP_400_BAD_REQUEST) diff --git a/connectors/sources/mysql.py b/connectors/sources/mysql.py new file mode 100644 index 00000000..b3074fea --- /dev/null +++ b/connectors/sources/mysql.py @@ -0,0 +1,159 @@ +import json +from contextlib import closing +import mysql.connector +import os +import psycopg2 +from django.http import HttpResponse, JsonResponse +from rest_framework import pagination, serializers, status +from rest_framework.decorators import action, api_view, permission_classes +from rest_framework.response import Response +from rest_framework.viewsets import GenericViewSet, ViewSet + +from connectors.utils import update_cookies, create_dataset_v2_for_data_import +from core import settings +from core.constants import Constants, NumericalConstants +import logging +import datetime +from rest_framework.exceptions import ValidationError +import pandas as pd + +from datahub.serializers import DatasetFileV2NewSerializer +from utils import file_operations as file_ops + +LOGGER = logging.getLogger(__name__) + + +def connect_with_mysql(request, cookie_data, config): + LOGGER.info(f"Connecting to MySQL") + + try: + # Try to connect to the database using the provided configuration + mydb = mysql.connector.connect(**config) + mycursor = mydb.cursor() + db_name = request.data.get("database") + mycursor.execute("use " + db_name + ";") + mycursor.execute("show tables;") + table_list = mycursor.fetchall() + table_list = [ + element for innerList in table_list for element in innerList] + + # send the tables as a list in response body + response = HttpResponse(json.dumps( + table_list), status=status.HTTP_200_OK) + # set the cookies in response + response = update_cookies( + "conn_details", cookie_data, response) + return response + except mysql.connector.Error as err: + if err.errno == mysql.connector.errorcode.ER_ACCESS_DENIED_ERROR: + return Response( + { + "username": ["Incorrect username or password"], + "password": ["Incorrect username or password"], + }, + status=status.HTTP_400_BAD_REQUEST, + ) + elif err.errno == mysql.connector.errorcode.ER_NO_SUCH_TABLE: + return Response({"table": ["Table does not exist"]}, + status=status.HTTP_400_BAD_REQUEST) + elif err.errno == mysql.connector.errorcode.ER_BAD_DB_ERROR: + # Port is incorrect + return Response({ + "dbname": ["Invalid database name. Connection Failed."]}, status=status.HTTP_400_BAD_REQUEST) + # Return an error message if the connection fails + return Response({"host": ["Invalid host . Connection Failed."]}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + return Response(str(e), status=status.HTTP_400_BAD_REQUEST) + + +def connect_and_get_column_using_mysql(config, table_name): + """Create a PostgreSQL connection object on valid database credentials""" + LOGGER.info(f"Connecting to MySQL") + try: + col_list = [] + with closing(psycopg2.connect(**config)) as conn: + with closing(conn.cursor()) as cursor: + cursor = conn.cursor() + # Fetch columns & return as a response + cursor.execute( + "SELECT column_name FROM information_schema.columns WHERE table_name='{0}';".format( + table_name + ) + ) + col_list = cursor.fetchall() + + if len(col_list) <= 0: + return Response({"table_name": ["Table does not exist."]}, status=status.HTTP_400_BAD_REQUEST) + + cols = [column_details[0] for column_details in col_list] + return HttpResponse(json.dumps(cols), status=status.HTTP_200_OK) + except psycopg2.Error as error: + LOGGER.error(error, exc_info=True) + return Response({"table_name": ["Something went wrong please try again later."]}, + status=status.HTTP_400_BAD_REQUEST) + + +def export_database_data_into_xls_using_mysql( + config, col_names, t_name, serializer, dataset_name, source, file_name, dataset +): + """Create a PostgreSQL connection object on valid database credentials""" + LOGGER.info(f"Connecting to MYSQL") + + try: + mydb = mysql.connector.connect(**config) + mycursor = mydb.cursor() + db_name = config["database"] + mycursor.execute("use " + db_name + ";") + + query_string = f"SELECT {col_names} FROM {t_name}" + sub_queries = [] # List to store individual filter sub-queries + if serializer.data.get("filter_data"): + + filter_data = json.loads(serializer.data.get("filter_data")[0]) + for query_dict in filter_data: + query_string = f"SELECT {col_names} FROM {t_name} WHERE " + column_name = query_dict.get('column_name') + operation = query_dict.get('operation') + value = query_dict.get('value') + sub_query = f"{column_name} {operation} '{value}'" # Using %s as a placeholder for the value + sub_queries.append(sub_query) + query_string += " AND ".join(sub_queries) + + mycursor.execute(query_string) + result = mycursor.fetchall() + + # save the list of files to a temp directory + file_path = file_ops.create_directory( + settings.DATASET_FILES_URL, [dataset_name, source]) + df = pd.read_sql(query_string, mydb) + if df.empty: + return Response({"data": [f"No data was found for the filter applied. Please try again."]}, + status=status.HTTP_400_BAD_REQUEST) + df = df.astype(str) + df.to_excel(file_path + "/" + file_name + ".xls") + serializer = create_dataset_v2_for_data_import( + dataset=dataset, + source=source, + dataset_name=dataset_name, + file_name=file_name + ) + return JsonResponse(serializer.data, status=status.HTTP_200_OK) + # return HttpResponse(json.dumps(result), status=status.HTTP_200_OK) + + except mysql.connector.Error as err: + LOGGER.error(err, exc_info=True) + if err.errno == mysql.connector.errorcode.ER_ACCESS_DENIED_ERROR: + return Response( + { + "username": ["Incorrect username or password"], + "password": ["Incorrect username or password"], + }, + status=status.HTTP_400_BAD_REQUEST, + ) + elif err.errno == mysql.connector.errorcode.ER_NO_SUCH_TABLE: + return Response({"table_name": ["Table does not exist"]}, status=status.HTTP_400_BAD_REQUEST) + # elif err.errno == mysql.connector.errorcode.ER_KEY_COLUMN_DOES_NOT_EXITS: + elif str(err).__contains__("Unknown column"): + return Response({"col": ["Columns does not exist."]}, status=status.HTTP_400_BAD_REQUEST) + # Return an error message if the connection fails + return Response({"": [str(err)]}, status=status.HTTP_400_BAD_REQUEST) diff --git a/connectors/sources/postgres.py b/connectors/sources/postgres.py new file mode 100644 index 00000000..c1f7b38a --- /dev/null +++ b/connectors/sources/postgres.py @@ -0,0 +1,157 @@ +import json +from contextlib import closing +import mysql.connector +import os +import psycopg2 +from django.http import HttpResponse, JsonResponse +from rest_framework import pagination, serializers, status +from rest_framework.decorators import action, api_view, permission_classes +from rest_framework.response import Response +from rest_framework.viewsets import GenericViewSet, ViewSet + +from connectors.utils import update_cookies, create_dataset_v2_for_data_import +from core import settings +from core.constants import Constants, NumericalConstants +import logging +import datetime +from rest_framework.exceptions import ValidationError +import pandas as pd + +from datahub.serializers import DatasetFileV2NewSerializer +from utils import file_operations as file_ops + +LOGGER = logging.getLogger(__name__) + + +def connect_with_postgres(config, cookie_data): + LOGGER.info(f"Connecting to Postgres") + try: + tables = [] + with closing(psycopg2.connect(**config)) as conn: + with closing(conn.cursor()) as cursor: + cursor.execute( + "SELECT table_name FROM information_schema.tables WHERE table_schema='public';") + table_list = cursor.fetchall() + # send the tables as a list in response body & set cookies + tables = [ + table for inner_list in table_list for table in inner_list] + response = HttpResponse(json.dumps( + tables), status=status.HTTP_200_OK) + response = update_cookies( + "conn_details", cookie_data, response) + return response + except psycopg2.Error as err: + print(err) + if "password authentication failed for user" in str(err) or "role" in str(err): + # Incorrect username or password + return Response( + { + "username": ["Incorrect username or password"], + "password": ["Incorrect username or password"], + }, + status=status.HTTP_400_BAD_REQUEST, + ) + elif "database" in str(err): + # Database does not exist + return Response({"dbname": ["Database does not exist"]}, status=status.HTTP_400_BAD_REQUEST) + elif "could not translate host name" in str(err): + # Database does not exist + return Response({"host": ["Invalid Host address"]}, status=status.HTTP_400_BAD_REQUEST) + + elif "Operation timed out" in str(err): + # Server is not available + return Response({"port": ["Invalid port or DB Server is down"]}, status=status.HTTP_400_BAD_REQUEST) + + # Return an error message if the connection fails + return Response({"error": [str(err)]}, status=status.HTTP_400_BAD_REQUEST) + + +def connect_and_get_column_using_postgres(config,table_name): + """Create a PostgreSQL connection object on valid database credentials""" + LOGGER.info(f"Connecting to Postgres") + try: + # Try to connect to the database using the provided configuration + connection = mysql.connector.connect(**config) + mydb = connection + mycursor = mydb.cursor() + db_name = config["database"] + mycursor.execute("use " + db_name + ";") + mycursor.execute("SHOW COLUMNS FROM " + + db_name + "." + table_name + ";") + + # Fetch columns & return as a response + col_list = mycursor.fetchall() + cols = [column_details[0] for column_details in col_list] + response = HttpResponse(json.dumps( + cols), status=status.HTTP_200_OK) + return response + + except mysql.connector.Error as err: + if err.errno == mysql.connector.errorcode.ER_ACCESS_DENIED_ERROR: + return Response( + { + "username": ["Incorrect username or password"], + "password": ["Incorrect username or password"], + }, + status=status.HTTP_400_BAD_REQUEST, + ) + elif err.errno == mysql.connector.errorcode.ER_NO_SUCH_TABLE: + return Response({"table_name": ["Table does not exist"]}, status=status.HTTP_400_BAD_REQUEST) + elif err.errno == mysql.connector.errorcode.ER_KEY_COLUMN_DOES_NOT_EXITS: + return Response({"col": ["Columns does not exist."]}, status=status.HTTP_400_BAD_REQUEST) + # Return an error message if the connection fails + return Response({"error": [str(err)]}, status=status.HTTP_400_BAD_REQUEST) + except Exception as e: + LOGGER.error(e, exc_info=True) + return Response(str(e), status=status.HTTP_400_BAD_REQUEST) + + +def export_database_data_into_xls_using_postgres( + config,col_names,t_name,serializer,dataset_name,source,file_name,dataset +): + """Create a PostgreSQL connection object on valid database credentials""" + LOGGER.info(f"Connecting to Postgres") + try: + with closing(psycopg2.connect(**config)) as conn: + try: + + query_string = f"SELECT {col_names} FROM {t_name}" + sub_queries = [] # List to store individual filter sub-queries + + if serializer.data.get("filter_data"): + filter_data = json.loads(serializer.data.get("filter_data")[0]) + + for query_dict in filter_data: + query_string = f"SELECT {col_names} FROM {t_name} WHERE " + column_name = query_dict.get('column_name') + operation = query_dict.get('operation') + value = query_dict.get('value') + sub_query = f"{column_name} {operation} '{value}'" # Using %s as a placeholder for the value + sub_queries.append(sub_query) + query_string += " AND ".join(sub_queries) + df = pd.read_sql(query_string, conn) + if df.empty: + return Response({"data": [f"No data was found for the filter applied. Please try again."]}, + status=status.HTTP_400_BAD_REQUEST) + + df = df.astype(str) + except pd.errors.DatabaseError as error: + LOGGER.error(error, exc_info=True) + return Response({"col": ["Columns does not exist."]}, status=status.HTTP_400_BAD_REQUEST) + + file_path = file_ops.create_directory( + settings.DATASET_FILES_URL, [dataset_name, source]) + df.to_excel(os.path.join( + file_path, file_name + ".xls")) + serializer = create_dataset_v2_for_data_import( + dataset=dataset, + source=source, + dataset_name=dataset_name, + file_name=file_name + ) + return JsonResponse(serializer.data, status=status.HTTP_200_OK) + + except psycopg2.Error as error: + LOGGER.error(error, exc_info=True) + return Response(str(error), status=status.HTTP_400_BAD_REQUEST) + diff --git a/connectors/utils.py b/connectors/utils.py new file mode 100644 index 00000000..cf4b5eb9 --- /dev/null +++ b/connectors/utils.py @@ -0,0 +1,65 @@ +# common utility functions comes here +import json +from contextlib import closing +import mysql.connector +import os +import psycopg2 +from django.http import HttpResponse, JsonResponse +from rest_framework import pagination, serializers, status +from rest_framework.decorators import action, api_view, permission_classes +from rest_framework.response import Response +from rest_framework.viewsets import GenericViewSet, ViewSet + +from core import settings +from core.constants import Constants, NumericalConstants +import logging +import datetime +from rest_framework.exceptions import ValidationError + +from datahub.models import DatasetV2File +from datahub.serializers import DatasetFileV2NewSerializer + +LOGGER = logging.getLogger(__name__) + + +def update_cookies(key, value, response): + try: + max_age = 1 * 24 * 60 * 60 + expires = datetime.datetime.strftime( + datetime.datetime.utcnow() + datetime.timedelta(seconds=max_age), + "%a, %d-%b-%Y %H:%M:%S GMT", + ) + response.set_cookie( + key, + value, + max_age=max_age, + expires=expires, + domain=os.environ.get("PUBLIC_DOMAIN"), + secure=False, + ) + return response + + except ValidationError as e: + LOGGER.error(e, exc_info=True) + return Response(e.detail, status=status.HTTP_400_BAD_REQUEST) + + except Exception as e: + LOGGER.error(e, exc_info=True) + return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR) + + +def create_dataset_v2_for_data_import( + dataset, source, dataset_name, file_name +): + instance = DatasetV2File.objects.create( + dataset=dataset, + source=source, + file=os.path.join(dataset_name, source, + file_name + ".json"), + file_size=os.path.getsize( + os.path.join(settings.DATASET_FILES_URL, dataset_name, source, file_name + ".json")), + standardised_file=os.path.join( + dataset_name, source, file_name + ".json"), + ) + serializer = DatasetFileV2NewSerializer(instance) + return serializer diff --git a/participant/views.py b/participant/views.py index 16f5583e..db43c7a5 100644 --- a/participant/views.py +++ b/participant/views.py @@ -36,6 +36,11 @@ from uritemplate import partial from accounts.models import User +from connectors.sources.api_import import import_using_api_endpoint +from connectors.sources.mysql import connect_with_mysql, connect_and_get_column_using_mysql, \ + export_database_data_into_xls_using_mysql +from connectors.sources.postgres import connect_with_postgres, connect_and_get_column_using_postgres, \ + export_database_data_into_xls_using_postgres from core.constants import Constants, NumericalConstants from core.utils import ( CustomPagination, @@ -1642,30 +1647,30 @@ def destroy(self, request, pk): return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR) -def update_cookies(key, value, response): - try: - max_age = 1 * 24 * 60 * 60 - expires = datetime.datetime.strftime( - datetime.datetime.utcnow() + datetime.timedelta(seconds=max_age), - "%a, %d-%b-%Y %H:%M:%S GMT", - ) - response.set_cookie( - key, - value, - max_age=max_age, - expires=expires, - domain=os.environ.get("PUBLIC_DOMAIN"), - secure=False, - ) - return response - - except ValidationError as e: - LOGGER.error(e, exc_info=True) - return Response(e.detail, status=status.HTTP_400_BAD_REQUEST) - - except Exception as e: - LOGGER.error(e, exc_info=True) - return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR) +# def update_cookies(key, value, response): +# try: +# max_age = 1 * 24 * 60 * 60 +# expires = datetime.datetime.strftime( +# datetime.datetime.utcnow() + datetime.timedelta(seconds=max_age), +# "%a, %d-%b-%Y %H:%M:%S GMT", +# ) +# response.set_cookie( +# key, +# value, +# max_age=max_age, +# expires=expires, +# domain=os.environ.get("PUBLIC_DOMAIN"), +# secure=False, +# ) +# return response +# +# except ValidationError as e: +# LOGGER.error(e, exc_info=True) +# return Response(e.detail, status=status.HTTP_400_BAD_REQUEST) +# +# except Exception as e: +# LOGGER.error(e, exc_info=True) +# return Response(str(e), status=status.HTTP_500_INTERNAL_SERVER_ERROR) class DataBaseViewSet(GenericViewSet): @@ -1678,10 +1683,6 @@ class DataBaseViewSet(GenericViewSet): @action(detail=False, methods=["post"]) def database_config(self, request): - """ - Configure the database connection based on the database type. - Return tables retrieved from the database and set database configuration in the cookies. - """ database_type = request.data.get("database_type") serializer = self.get_serializer(data=request.data, context={ "source": database_type}) @@ -1690,91 +1691,17 @@ def database_config(self, request): config = serializer.validated_data # remove database_type before passing it to db conn config.pop("database_type") - if database_type == Constants.SOURCE_MYSQL_FILE_TYPE: - """Create a MySQL connection object on valid database credentials and return tables""" - LOGGER.info(f"Connecting to {database_type}") - - try: - # Try to connect to the database using the provided configuration - mydb = mysql.connector.connect(**config) - mycursor = mydb.cursor() - db_name = request.data.get("database") - mycursor.execute("use " + db_name + ";") - mycursor.execute("show tables;") - table_list = mycursor.fetchall() - table_list = [ - element for innerList in table_list for element in innerList] - - # send the tables as a list in response body - response = HttpResponse(json.dumps( - table_list), status=status.HTTP_200_OK) - # set the cookies in response - response = update_cookies( - "conn_details", cookie_data, response) - return response - except mysql.connector.Error as err: - if err.errno == mysql.connector.errorcode.ER_ACCESS_DENIED_ERROR: - return Response( - { - "username": ["Incorrect username or password"], - "password": ["Incorrect username or password"], - }, - status=status.HTTP_400_BAD_REQUEST, - ) - elif err.errno == mysql.connector.errorcode.ER_NO_SUCH_TABLE: - return Response({"table": ["Table does not exist"]}, - status=status.HTTP_400_BAD_REQUEST) - elif err.errno == mysql.connector.errorcode.ER_BAD_DB_ERROR: - # Port is incorrect - return Response({ - "dbname": ["Invalid database name. Connection Failed."]}, status=status.HTTP_400_BAD_REQUEST) - # Return an error message if the connection fails - return Response({"host": ["Invalid host . Connection Failed."]}, status=status.HTTP_400_BAD_REQUEST) - except Exception as e: - return Response(str(e), status=status.HTTP_400_BAD_REQUEST) - - elif database_type == Constants.SOURCE_POSTGRESQL_FILE_TYPE: - """Create a PostgreSQL connection object on valid database credentials""" - LOGGER.info(f"Connecting to {database_type}") - try: - tables = [] - with closing(psycopg2.connect(**config)) as conn: - with closing(conn.cursor()) as cursor: - cursor.execute( - "SELECT table_name FROM information_schema.tables WHERE table_schema='public';") - table_list = cursor.fetchall() - # send the tables as a list in response body & set cookies - tables = [ - table for inner_list in table_list for table in inner_list] - response = HttpResponse(json.dumps( - tables), status=status.HTTP_200_OK) - response = update_cookies( - "conn_details", cookie_data, response) - return response - except psycopg2.Error as err: - print(err) - if "password authentication failed for user" in str(err) or "role" in str(err): - # Incorrect username or password - return Response( - { - "username": ["Incorrect username or password"], - "password": ["Incorrect username or password"], - }, - status=status.HTTP_400_BAD_REQUEST, - ) - elif "database" in str(err): - # Database does not exist - return Response({"dbname": ["Database does not exist"]}, status=status.HTTP_400_BAD_REQUEST) - elif "could not translate host name" in str(err): - # Database does not exist - return Response({"host": ["Invalid Host address"]}, status=status.HTTP_400_BAD_REQUEST) + if database_type == Constants.SOURCE_POSTGRESQL_FILE_TYPE: + return connect_with_postgres( + config=config, cookie_data=cookie_data + ) - elif "Operation timed out" in str(err): - # Server is not available - return Response({"port": ["Invalid port or DB Server is down"]}, status=status.HTTP_400_BAD_REQUEST) + elif database_type == Constants.SOURCE_MYSQL_FILE_TYPE: + return connect_with_mysql(request=request, cookie_data=cookie_data, config=config) - # Return an error message if the connection fails - return Response({"error": [str(err)]}, status=status.HTTP_400_BAD_REQUEST) + else: + return Response({"database": ["This database type is not supported."]}, + status=status.HTTP_400_BAD_REQUEST) @action(detail=False, methods=["post"]) def database_col_names(self, request): @@ -1791,67 +1718,20 @@ def database_col_names(self, request): return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) if database_type == Constants.SOURCE_MYSQL_FILE_TYPE: - """Create a PostgreSQL connection object on valid database credentials""" - LOGGER.info(f"Connecting to {database_type}") - try: - # Try to connect to the database using the provided configuration - connection = mysql.connector.connect(**config) - mydb = connection - mycursor = mydb.cursor() - db_name = config["database"] - mycursor.execute("use " + db_name + ";") - mycursor.execute("SHOW COLUMNS FROM " + - db_name + "." + table_name + ";") - - # Fetch columns & return as a response - col_list = mycursor.fetchall() - cols = [column_details[0] for column_details in col_list] - response = HttpResponse(json.dumps( - cols), status=status.HTTP_200_OK) - return response - - except mysql.connector.Error as err: - if err.errno == mysql.connector.errorcode.ER_ACCESS_DENIED_ERROR: - return Response( - { - "username": ["Incorrect username or password"], - "password": ["Incorrect username or password"], - }, - status=status.HTTP_400_BAD_REQUEST, - ) - elif err.errno == mysql.connector.errorcode.ER_NO_SUCH_TABLE: - return Response({"table_name": ["Table does not exist"]}, status=status.HTTP_400_BAD_REQUEST) - elif err.errno == mysql.connector.errorcode.ER_KEY_COLUMN_DOES_NOT_EXITS: - return Response({"col": ["Columns does not exist."]}, status=status.HTTP_400_BAD_REQUEST) - # Return an error message if the connection fails - return Response({"error": [str(err)]}, status=status.HTTP_400_BAD_REQUEST) - except Exception as e: - LOGGER.error(e, exc_info=True) - return Response(str(e), status=status.HTTP_400_BAD_REQUEST) + return connect_and_get_column_using_mysql( + config=config, + table_name=table_name + ) elif database_type == Constants.SOURCE_POSTGRESQL_FILE_TYPE: - """Create a PostgreSQL connection object on valid database credentials""" - LOGGER.info(f"Connecting to {database_type}") - try: - col_list = [] - with closing(psycopg2.connect(**config)) as conn: - with closing(conn.cursor()) as cursor: - cursor = conn.cursor() - # Fetch columns & return as a response - cursor.execute( - "SELECT column_name FROM information_schema.columns WHERE table_name='{0}';".format( - table_name - ) - ) - col_list = cursor.fetchall() - - if len(col_list) <= 0: - return Response({"table_name": ["Table does not exist."]}, status=status.HTTP_400_BAD_REQUEST) + return connect_and_get_column_using_postgres( + config=config, + table_name=table_name + ) - cols = [column_details[0] for column_details in col_list] - return HttpResponse(json.dumps(cols), status=status.HTTP_200_OK) - except psycopg2.Error as error: - LOGGER.error(error, exc_info=True) + else: + return Response({"database": ["This database type is not supported."]}, + status=status.HTTP_400_BAD_REQUEST) @action(detail=False, methods=["post"]) def database_xls_file(self, request): @@ -1884,125 +1764,32 @@ def database_xls_file(self, request): config.pop("database_type") if database_type == Constants.SOURCE_MYSQL_FILE_TYPE: - """Create a PostgreSQL connection object on valid database credentials""" - LOGGER.info(f"Connecting to {database_type}") - - try: - mydb = mysql.connector.connect(**config) - mycursor = mydb.cursor() - db_name = config["database"] - mycursor.execute("use " + db_name + ";") - - query_string = f"SELECT {col_names} FROM {t_name}" - sub_queries = [] # List to store individual filter sub-queries - if serializer.data.get("filter_data"): - - filter_data = json.loads(serializer.data.get("filter_data")[0]) - for query_dict in filter_data: - query_string = f"SELECT {col_names} FROM {t_name} WHERE " - column_name = query_dict.get('column_name') - operation = query_dict.get('operation') - value = query_dict.get('value') - sub_query = f"{column_name} {operation} '{value}'" # Using %s as a placeholder for the value - sub_queries.append(sub_query) - query_string += " AND ".join(sub_queries) - - mycursor.execute(query_string) - result = mycursor.fetchall() - - # save the list of files to a temp directory - file_path = file_ops.create_directory( - settings.DATASET_FILES_URL, [dataset_name, source]) - df = pd.read_sql(query_string, mydb) - if df.empty: - return Response({"data": [f"No data was found for the filter applied. Please try again."]}, - status=status.HTTP_400_BAD_REQUEST) - df = df.astype(str) - df.to_excel(file_path + "/" + file_name + ".xls") - instance = DatasetV2File.objects.create( - dataset=dataset, - source=source, - file=os.path.join(dataset_name, source, - file_name + ".xls"), - file_size=os.path.getsize( - os.path.join(settings.DATASET_FILES_URL, dataset_name, source, file_name + ".xls")), - standardised_file=os.path.join( - dataset_name, source, file_name + ".xls"), - ) - # result = os.listdir(file_path) - serializer = DatasetFileV2NewSerializer(instance) - return JsonResponse(serializer.data, status=status.HTTP_200_OK) - # return HttpResponse(json.dumps(result), status=status.HTTP_200_OK) - - except mysql.connector.Error as err: - LOGGER.error(err, exc_info=True) - if err.errno == mysql.connector.errorcode.ER_ACCESS_DENIED_ERROR: - return Response( - { - "username": ["Incorrect username or password"], - "password": ["Incorrect username or password"], - }, - status=status.HTTP_400_BAD_REQUEST, - ) - elif err.errno == mysql.connector.errorcode.ER_NO_SUCH_TABLE: - return Response({"table_name": ["Table does not exist"]}, status=status.HTTP_400_BAD_REQUEST) - # elif err.errno == mysql.connector.errorcode.ER_KEY_COLUMN_DOES_NOT_EXITS: - elif str(err).__contains__("Unknown column"): - return Response({"col": ["Columns does not exist."]}, status=status.HTTP_400_BAD_REQUEST) - # Return an error message if the connection fails - return Response({"": [str(err)]}, status=status.HTTP_400_BAD_REQUEST) + return export_database_data_into_xls_using_mysql( + config=config, + col_names=col_names, + t_name=t_name, + serializer=serializer, + dataset_name=dataset_name, + source=source, + file_name=file_name, + dataset=dataset + ) elif database_type == Constants.SOURCE_POSTGRESQL_FILE_TYPE: - """Create a PostgreSQL connection object on valid database credentials""" - LOGGER.info(f"Connecting to {database_type}") - try: - with closing(psycopg2.connect(**config)) as conn: - try: - - query_string = f"SELECT {col_names} FROM {t_name}" - sub_queries = [] # List to store individual filter sub-queries - - if serializer.data.get("filter_data"): - filter_data = json.loads(serializer.data.get("filter_data")[0]) - - for query_dict in filter_data: - query_string = f"SELECT {col_names} FROM {t_name} WHERE " - column_name = query_dict.get('column_name') - operation = query_dict.get('operation') - value = query_dict.get('value') - sub_query = f"{column_name} {operation} '{value}'" # Using %s as a placeholder for the value - sub_queries.append(sub_query) - query_string += " AND ".join(sub_queries) - df = pd.read_sql(query_string, conn) - if df.empty: - return Response({"data": [f"No data was found for the filter applied. Please try again."]}, - status=status.HTTP_400_BAD_REQUEST) - - df = df.astype(str) - except pd.errors.DatabaseError as error: - LOGGER.error(error, exc_info=True) - return Response({"col": ["Columns does not exist."]}, status=status.HTTP_400_BAD_REQUEST) - - file_path = file_ops.create_directory( - settings.DATASET_FILES_URL, [dataset_name, source]) - df.to_excel(os.path.join( - file_path, file_name + ".xls")) - instance = DatasetV2File.objects.create( - dataset=dataset, - source=source, - file=os.path.join(dataset_name, source, - file_name + ".xls"), - file_size=os.path.getsize( - os.path.join(settings.DATASET_FILES_URL, dataset_name, source, file_name + ".xls")), - standardised_file=os.path.join( - dataset_name, source, file_name + ".xls"), - ) - # result = os.listdir(file_path) - serializer = DatasetFileV2NewSerializer(instance) - return JsonResponse(serializer.data, status=status.HTTP_200_OK) + return export_database_data_into_xls_using_postgres( + config=config, + col_names=col_names, + t_name=t_name, + serializer=serializer, + dataset_name=dataset_name, + source=source, + file_name=file_name, + dataset=dataset + ) - except psycopg2.Error as error: - LOGGER.error(error, exc_info=True) + else: + return Response({"database": ["This database type is not supported."]}, + status=status.HTTP_400_BAD_REQUEST) @action(detail=False, methods=["post"]) def database_live_api_export(self, request): @@ -2016,48 +1803,18 @@ def database_live_api_export(self, request): source = request.data.get("source") file_name = request.data.get("file_name") - if auth_type == 'NO_AUTH': - response = requests.get(url) - elif auth_type == 'API_KEY': - headers = {request.data.get( - "api_key_name"): request.data.get("api_key_value")} - response = requests.get(url, headers=headers) - elif auth_type == 'BEARER': - headers = {"Authorization": "Bearer " + - request.data.get("token")} - response = requests.get(url, headers=headers) - - # response = requests.get(url) - if response.status_code in [200, 201]: - try: - data = response.json() - except ValueError: - data = response.text - - file_path = file_ops.create_directory( - settings.DATASET_FILES_URL, [dataset_name, source]) - with open(file_path + "/" + file_name + ".json", "w") as outfile: - if type(data) == list: - json.dump(data, outfile) - else: - outfile.write(json.dumps(data)) - - # result = os.listdir(file_path) - instance = DatasetV2File.objects.create( - dataset=dataset, - source=source, - file=os.path.join(dataset_name, source, - file_name + ".json"), - file_size=os.path.getsize( - os.path.join(settings.DATASET_FILES_URL, dataset_name, source, file_name + ".json")), - standardised_file=os.path.join( - dataset_name, source, file_name + ".json"), - ) - serializer = DatasetFileV2NewSerializer(instance) - return JsonResponse(serializer.data, status=status.HTTP_200_OK) + service_response = import_using_api_endpoint( + auth_type=auth_type, + request=request, + url=url, + dataset=dataset, + source=source, + file_name=file_name, + dataset_name=dataset_name + ) + + return service_response - LOGGER.error("Failed to fetch data from api") - return Response({"message": f"API Response: {response.json()}"}, status=status.HTTP_400_BAD_REQUEST) except Exception as e: LOGGER.error( f"Failed to fetch data from api ERROR: {e} and input fields: {request.data}")