From ed215ede20a59137c29bdfe62700c2c14659c0fd Mon Sep 17 00:00:00 2001 From: Sai Ganesh <322103310216@gvpce.ac.in> Date: Sat, 19 Oct 2024 21:06:50 +0530 Subject: [PATCH 1/2] Handled exceptions in python files --- API/__init__.py | 24 +- API/database.py | 96 +++++--- API/utils.py | 30 ++- FaceRec/app/main/Edit.py | 168 ++++++++------ FaceRec/app/main/Employee.py | 210 +++++++++--------- FaceRec/app/main/Face.py | 63 +++--- FaceRec/app/main/VideoImage.py | 252 +++++++++------------ FaceRec/app/main/__init__.py | 21 +- FaceRec/config.py | 28 ++- Model-Training/eval-mark-I.py | 25 ++- docs/conf.py | 20 +- main.py | 24 +- testing/test_database.py | 2 - testing/test_face_cycle.py | 7 +- testing/test_face_endpoints.py | 391 ++++++++++++++++----------------- 15 files changed, 702 insertions(+), 659 deletions(-) diff --git a/API/__init__.py b/API/__init__.py index a62df41..f5e794d 100644 --- a/API/__init__.py +++ b/API/__init__.py @@ -1,20 +1,32 @@ from __future__ import annotations import uvicorn -from fastapi import FastAPI +import logging +from fastapi import FastAPI, HTTPException from API import route +# Initialize logger +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + Fastapp = FastAPI() + # API Router Fastapp.include_router(route.router) - @Fastapp.get("/") def read_root(): - return {"Hello": "FASTAPI"} - + try: + return {"Hello": "FASTAPI"} + except Exception as e: + logger.error(f"Error in root endpoint: {e}") + raise HTTPException(status_code=500, detail="Internal Server Error") -# function to run server of FastAPI +# Function to run FastAPI server def run_fastapi_app(): - uvicorn.run(Fastapp, host="127.0.0.1", port=8000) + try: + uvicorn.run(Fastapp, host="127.0.0.1", port=8000) + except Exception as e: + logger.error(f"Error starting FastAPI server: {e}") + raise diff --git a/API/database.py b/API/database.py index eb3031d..305a466 100644 --- a/API/database.py +++ b/API/database.py @@ -1,9 +1,12 @@ from __future__ import annotations from datetime import datetime +import logging +from pymongo import MongoClient, errors -from pymongo import MongoClient - +# Initialize logger +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) class Database: def __init__(self, uri="mongodb://localhost:27017/", db_name="ImageDB"): @@ -14,8 +17,13 @@ def __init__(self, uri="mongodb://localhost:27017/", db_name="ImageDB"): uri (str): The uri of the MongoDB server. Defaults to 'mongodb://localhost:27017/'. db_name (str): The name of the MongoDB database. Defaults to 'ImageDB'. """ - self.client = MongoClient(uri) - self.db = self.client[db_name] + try: + self.client = MongoClient(uri) + self.db = self.client[db_name] + logger.info("Successfully connected to MongoDB.") + except errors.ConnectionError as e: + logger.error(f"Failed to connect to MongoDB: {e}") + raise def find(self, collection, query=None): """ @@ -28,7 +36,11 @@ def find(self, collection, query=None): Returns: pymongo.cursor.Cursor: A cursor pointing to the results of the query. """ - return self.db[collection].find(query) + try: + return self.db[collection].find(query) + except errors.PyMongoError as e: + logger.error(f"Error finding documents in {collection}: {e}") + return None def insert_one(self, collection, document): """ @@ -41,8 +53,11 @@ def insert_one(self, collection, document): Returns: pymongo.results.InsertOneResult: The result of the insertion. """ - - return self.db[collection].insert_one(document) + try: + return self.db[collection].insert_one(document) + except errors.PyMongoError as e: + logger.error(f"Error inserting document into {collection}: {e}") + return None def find_one(self, collection, filter, projection=None): """ @@ -56,7 +71,11 @@ def find_one(self, collection, filter, projection=None): Returns: dict: The document that matches the query, or None if no documents match. """ - return self.db[collection].find_one(filter=filter, projection=projection) + try: + return self.db[collection].find_one(filter=filter, projection=projection) + except errors.PyMongoError as e: + logger.error(f"Error finding document in {collection}: {e}") + return None def find_one_and_delete(self, collection, query): """ @@ -69,7 +88,11 @@ def find_one_and_delete(self, collection, query): Returns: dict: The document that matches the query, or None if no documents match. """ - return self.db[collection].find_one_and_delete(query) + try: + return self.db[collection].find_one_and_delete(query) + except errors.PyMongoError as e: + logger.error(f"Error deleting document in {collection}: {e}") + return None def update_one(self, collection, query, update): """ @@ -83,10 +106,12 @@ def update_one(self, collection, query, update): Returns: pymongo.results.UpdateResult: The result of the update. """ + try: + return self.db[collection].update_one(query, update) + except errors.PyMongoError as e: + logger.error(f"Error updating document in {collection}: {e}") + return None - return self.db[collection].update_one(query, update) - - # add a function for pipeline aggregation vector search def vector_search(self, collection, embedding): """ Perform a vector similarity search on the given collection. @@ -98,27 +123,30 @@ def vector_search(self, collection, embedding): Returns: list: A list of documents with the closest embedding to the query vector, sorted by score. """ - - result = self.db[collection].aggregate( - [ - { - "$vectorSearch": { - "index": "vector_index", - "path": "embedding", - "queryVector": embedding, - "numCandidates": 20, - "limit": 20, + try: + result = self.db[collection].aggregate( + [ + { + "$vectorSearch": { + "index": "vector_index", + "path": "embedding", + "queryVector": embedding, + "numCandidates": 20, + "limit": 20, + }, }, - }, - { - "$project": { - "_id": 0, - "Name": 1, - "Image": 1, - "score": {"$meta": "vectorSearchScore"}, + { + "$project": { + "_id": 0, + "Name": 1, + "Image": 1, + "score": {"$meta": "vectorSearchScore"}, + }, }, - }, - ], - ) - result_arr = [i for i in result] - return result_arr + ], + ) + result_arr = [i for i in result] + return result_arr + except errors.PyMongoError as e: + logger.error(f"Error performing vector search in {collection}: {e}") + return [] diff --git a/API/utils.py b/API/utils.py index f426de7..ad58473 100644 --- a/API/utils.py +++ b/API/utils.py @@ -68,15 +68,21 @@ def format(self, record): formatter = logging.Formatter(log_fmt) return formatter.format(record) - logger = logging.getLogger() - logger.setLevel(logging.DEBUG) - - stderr_handler = logging.StreamHandler() - stderr_handler.setLevel(logging.DEBUG) - stderr_handler.setFormatter(CustomFormatter()) - logger.addHandler(stderr_handler) - - file_handler = logging.FileHandler("app.log", mode="w") - file_handler.setLevel(logging.DEBUG) - file_handler.setFormatter(CustomFormatter(True)) - logger.addHandler(file_handler) + try: + logger = logging.getLogger() + logger.setLevel(logging.DEBUG) + + stderr_handler = logging.StreamHandler() + stderr_handler.setLevel(logging.DEBUG) + stderr_handler.setFormatter(CustomFormatter()) + logger.addHandler(stderr_handler) + + file_handler = logging.FileHandler("app.log", mode="w") + file_handler.setLevel(logging.DEBUG) + file_handler.setFormatter(CustomFormatter(True)) + logger.addHandler(file_handler) + + logging.info("Logging configuration initialized successfully.") + except Exception as e: + print(f"Error initializing logging configuration: {e}") + logging.error(f"Error initializing logging configuration: {e}") diff --git a/FaceRec/app/main/Edit.py b/FaceRec/app/main/Edit.py index e5956de..860b835 100644 --- a/FaceRec/app/main/Edit.py +++ b/FaceRec/app/main/Edit.py @@ -23,8 +23,12 @@ cap = cv2.VideoCapture(0) +# Check if the camera opened successfully +if not cap.isOpened(): + print("Error: Could not open video capture.") + # You can raise an exception or handle it as needed -# function for displaying live video +# Function for displaying live video def display_live_video(): """ Generator for displaying live video from the camera. @@ -34,11 +38,12 @@ def display_live_video(): while True: success, frame = cap.read() # Read a frame from the camera if not success: + print("Error: Failed to capture image.") break frame = cv2.flip(frame, 1) ret, buffer = cv2.imencode(".jpg", frame) - frame = buffer.tobytes if not ret: + print("Error: Failed to encode image.") break yield ( b"--frame\r\n" @@ -46,7 +51,6 @@ def display_live_video(): bytearray(buffer) + b"\r\n\r\n" ) - # Route for displaying video @Edit_blueprint.route("/video_feed") def video_feed(): @@ -59,7 +63,6 @@ def video_feed(): mimetype="multipart/x-mixed-replace;boundary=frame", ) - # Route for capturing image from video @Edit_blueprint.route("/capture", methods=["GET", "POST"]) def capture(): @@ -83,18 +86,28 @@ def capture(): global gender global Dept global encoded_image + EmployeeCode = request.form.get("EmployeeCode", "") Name = request.form.get("Name", "") gender = request.form.get("gender", "") Dept = request.form.get("Department", "") - ret, frame = cap.read(True) - frame = cv2.flip(frame, 1) - _, buffer = cv2.imencode(".jpg", frame) - encoded_image = base64.b64encode(buffer).decode("utf-8") - with open(Config.image_data_file, "w") as file: - json.dump({"base64_image": encoded_image}, file) - return redirect("Image") + + try: + ret, frame = cap.read(True) + if not ret: + print("Error: Could not read frame from camera.") + return redirect("Image") # or handle error appropriately + + frame = cv2.flip(frame, 1) + _, buffer = cv2.imencode(".jpg", frame) + encoded_image = base64.b64encode(buffer).decode("utf-8") + + with open(Config.image_data_file, "w") as file: + json.dump({"base64_image": encoded_image}, file) + except Exception as e: + print(f"Error while capturing image: {e}") + return redirect("Image") # Route to display captured image @Edit_blueprint.route("/Image", methods=["GET"]) @@ -114,36 +127,46 @@ def display_image(): Returns: A rendered template with the image path. """ - if os.path.exists(Config.image_data_file): - with open(Config.image_data_file) as file: - image_data = json.load(file) - encoded_image = image_data.get("base64_image", "") - decoded_image_data = base64.b64decode(encoded_image) - image = Image.open(io.BytesIO(decoded_image_data)) - filename = "final.png" - image.save( - os.path.join( - Config.upload_image_path[0], - filename, - ), - quality=100, - ) - image = sorted( - os.listdir(Config.upload_image_path[0]), - key=lambda x: os.path.getatime( - os.path.join(Config.upload_image_path[0], x), - ), - reverse=True, - ) - if image: - recent_image = image[0] - image_path = os.path.join(Config.upload_image_path[0], recent_image) - else: - recent_image = None - image_path = os.path.join(Config.upload_image_path[0], recent_image) - print("done") - return render_template("index.html", image_path=image_path) + try: + if os.path.exists(Config.image_data_file): + with open(Config.image_data_file) as file: + image_data = json.load(file) + + encoded_image = image_data.get("base64_image", "") + decoded_image_data = base64.b64decode(encoded_image) + image = Image.open(io.BytesIO(decoded_image_data)) + filename = "final.png" + image.save( + os.path.join( + Config.upload_image_path[0], + filename, + ), + quality=100, + ) + image = sorted( + os.listdir(Config.upload_image_path[0]), + key=lambda x: os.path.getatime( + os.path.join(Config.upload_image_path[0], x), + ), + reverse=True, + ) + + if image: + recent_image = image[0] + image_path = os.path.join(Config.upload_image_path[0], recent_image) + else: + recent_image = None + image_path = "" + else: + print(f"Error: {Config.image_data_file} does not exist.") + recent_image = None + image_path = "" + + return render_template("index.html", image_path=image_path) + except Exception as e: + print(f"Error while displaying image: {e}") + return render_template("index.html", image_path="") # Show a default image or handle error @Edit_blueprint.route("/edit/", methods=["POST", "GET"]) def edit(EmployeeCode): @@ -179,33 +202,40 @@ def edit(EmployeeCode): Name = request.form["Name"] gender = request.form["Gender"] Department = request.form["Department"] - with open(Config.image_data_file) as file: - image_data = json.load(file) - encoded_image = image_data.get("base64_image", "") - payload = { - "Name": Name, - "gender": gender, - "Department": Department, - "Image": encoded_image, - } - # logger.info(payload) try: - url = requests.put( - f"http://127.0.0.1:8000/update/{EmployeeCode}", - json=payload, - ) - url.status_code - # logger.info(url.json()) - - return redirect("/") - - except requests.exceptions.RequestException as e: - print(f"Request failed: {e}") - response = requests.get(f"http://127.0.0.1:8000/read/{EmployeeCode}") - # logger.info(response.status_code) - # logger.info(response.json()) - if response.status_code == 200: - employee_data = response.json() - return render_template("edit.html", employee_data=employee_data) - else: - return f"Error {response.status_code}: Failed to retrieve employee data." + with open(Config.image_data_file) as file: + image_data = json.load(file) + + encoded_image = image_data.get("base64_image", "") + payload = { + "Name": Name, + "gender": gender, + "Department": Department, + "Image": encoded_image, + } + try: + url = requests.put( + f"http://127.0.0.1:8000/update/{EmployeeCode}", + json=payload, + ) + if url.status_code == 200: + return redirect("/") + else: + print(f"Error: Failed to update employee data with status code {url.status_code}") + except requests.exceptions.RequestException as e: + print(f"Request failed: {e}") + + except Exception as e: + print(f"Error while processing employee data: {e}") + + try: + response = requests.get(f"http://127.0.0.1:8000/read/{EmployeeCode}") + if response.status_code == 200: + employee_data = response.json() + return render_template("edit.html", employee_data=employee_data) + else: + print(f"Error: Failed to retrieve employee data with status code {response.status_code}") + return f"Error {response.status_code}: Failed to retrieve employee data." + except requests.exceptions.RequestException as e: + print(f"Request failed: {e}") + return "Error: Could not retrieve employee data." diff --git a/FaceRec/app/main/Employee.py b/FaceRec/app/main/Employee.py index ead1087..4a016e9 100644 --- a/FaceRec/app/main/Employee.py +++ b/FaceRec/app/main/Employee.py @@ -1,172 +1,172 @@ from __future__ import annotations import base64 -import io import json import os import cv2 import requests from flask import Blueprint, jsonify, redirect, render_template, request -from PIL import Image +import logging from FaceRec.config import Config +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + video_capture = cv2.VideoCapture(0) flk_blueprint = Blueprint( - "flk_blueprint ", + "flk_blueprint", __name__, template_folder="../../templates/", static_folder="../../static/", - # capture_image="../../Capture image/" ) @flk_blueprint.route("/") -def Main_page(): +def main_page(): """ - This route is used to create a directory for storing images of employees - if it does not already exist. It then redirects to the route displaying all - records of employees. This route is used when the user first visits the - website. + This route creates a directory for storing employee images if it doesn't exist + and redirects to the route displaying all employee records. """ path = str(Config.upload_image_path[0]) - if not os.path.exists(path): - os.makedirs(path, exist_ok=True) - else: - pass + os.makedirs(path, exist_ok=True) return redirect("DisplayingEmployees") -# Displaying all records @flk_blueprint.route("/DisplayingEmployees") def display_information(): - """This route is used to retrieve all records of employees from the FastAPI - endpoint http://127.0.0.1:8000/Data/ and store them in the employees global - variable. The records are then passed to the template table.html to be - displayed in a table. If the request to the FastAPI endpoint fails, an - appropriate error message is printed to the console.""" + """Retrieve and display all employee records.""" global employees url = "http://127.0.0.1:8000/Data/" try: resp = requests.get(url=url) - # logger.info(resp.status_code) - # logger.info(resp.json()) + resp.raise_for_status() # Raise an error for bad responses employees = resp.json() - + logger.info("Employee data retrieved successfully.") except requests.exceptions.RequestException as e: - print(f"Request failed: {e}") + logger.error(f"Request failed: {e}") + employees = [] # Handle the error gracefully by setting employees to an empty list return render_template("table.html", employees=employees) -# To add employee record @flk_blueprint.route("/Add_employee") def add_employee(): - """This route is used to display the form for adding a new employee record. - The form is rendered from the template index.html.""" - + """Display the form for adding a new employee record.""" return render_template("index.html") -# To submit the form data to server and save it in database @flk_blueprint.route("/submit_form", methods=["POST"]) def submit_form(): """ - This route is used to handle the form submission of the new employee - record. The form data is received from the request object and then - validated. The image is base64 encoded and saved in the file defined - in the Config class. The image is then sent to the FastAPI endpoint - http://127.0.0.1:8000/create_new_faceEntry to be stored in the MongoDB - database. If the request to the FastAPI endpoint fails, an appropriate - error message is returned. If the request is successful, the user is - redirected to the route /DisplayingEmployees to view the newly added - record. + Handle the form submission for adding a new employee record. + Validate input and save image data before sending it to the FastAPI endpoint. """ - Employee_Code = request.form["EmployeeCode"] - Name = request.form["Name"] - gender = request.form["Gender"] - Department = request.form["Department"] + try: + Employee_Code = request.form["EmployeeCode"] + Name = request.form["Name"] + gender = request.form["Gender"] + Department = request.form["Department"] - if request.files["File"]: + # Validate and process image file if "File" not in request.files: return jsonify({"message": "No file part"}), 400 file = request.files["File"] allowed_extensions = {"png", "jpg", "jpeg"} - if ( - "." not in file.filename - or file.filename.split(".")[-1].lower() not in allowed_extensions - ): + if file and file.filename.split('.')[-1].lower() not in allowed_extensions: return jsonify({"message": "File extension is not valid"}), 400 - if file: - image_data = file.read() - encoded_image = base64.b64encode(image_data).decode("utf-8") - with open(Config.image_data_file, "w") as file: - json.dump({"base64_image": encoded_image}, file) - - with open(Config.image_data_file) as file: - image_data = json.load(file) - encoded_image = image_data.get("base64_image", "") - jsonify( - { + + image_data = file.read() + encoded_image = base64.b64encode(image_data).decode("utf-8") + + # Save the encoded image + with open(Config.image_data_file, "w") as img_file: + json.dump({"base64_image": encoded_image}, img_file) + + # Prepare payload for API + payload = { "EmployeeCode": Employee_Code, "Name": Name, "gender": gender, "Department": Department, - "encoded_image": encoded_image, - }, - ) - - payload = { - "EmployeeCode": Employee_Code, - "Name": Name, - "gender": gender, - "Department": Department, - "Image": encoded_image, - } - url = "http://127.0.0.1:8000/create_new_faceEntry" - payload.status_code - # try: - # resp = requests.post( - # url=url, - # json={ - # "EmployeeCode": 134, - # "Name": "Name", - # "gender": "gender", - # "Department": "Department", - # "Image": "your_image", - # }, - # ) - # resp.status_code - # except requests.exceptions.RequestException as e: - # print(f"Request failed: {e}") - jsonify({"message": "Successfully executed"}) - print("Executed.") - if payload.status_code == 200: + "Image": encoded_image, + } + + # Send request to FastAPI + response = requests.post("http://127.0.0.1:8000/create_new_faceEntry", json=payload) + response.raise_for_status() # Raise an error for bad responses + logger.info("Employee record created successfully.") + return redirect("DisplayingEmployees") - else: - return jsonify({"message": "Failed to execute"}) + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + return jsonify({"message": "Failed to execute"}), 500 -# To edit an employee details + except Exception as e: + logger.error(f"An error occurred: {e}") + return jsonify({"message": "An unexpected error occurred."}), 500 -# To delete employee details @flk_blueprint.route("/Delete/", methods=["DELETE", "GET"]) -def Delete(EmployeeCode): - """Delete an employee with the given EmployeeCode. +def delete(EmployeeCode): + """Delete an employee with the given EmployeeCode.""" + if not isinstance(EmployeeCode, int): + return jsonify({"message": "Employee code should be an integer"}), 400 - Args: - EmployeeCode: The employee code of the employee to be deleted. + try: + response = requests.delete(f"http://127.0.0.1:8000/delete/{EmployeeCode}") + response.raise_for_status() # Raise an error for bad responses + logger.info(f"Employee {EmployeeCode} deleted successfully.") + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + return jsonify({"message": "Failed to delete employee."}), 500 - Returns: - A JSON response with a message indicating the success or failure of the deletion. + return redirect("/DisplayingEmployees") - Raises: - 400 error if the EmployeeCode is not an integer. - """ - if not isinstance(EmployeeCode, int): - return jsonify({"message": "Employee code should be an integer"}, 400) - response = requests.delete(f"http://127.0.0.1:8000/delete/{EmployeeCode}") - jsonify(response.json()) - return redirect("/DisplayingEmployees") +# Add additional routes for editing employee details if needed +@flk_blueprint.route("/Edit_employee/") +def edit_employee(EmployeeCode): + """Display the form for editing an existing employee record.""" + # Fetch employee details from the FastAPI endpoint + url = f"http://127.0.0.1:8000/Data/{EmployeeCode}" + try: + response = requests.get(url=url) + response.raise_for_status() # Raise an error for bad responses + employee = response.json() + return render_template("edit.html", employee=employee) + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + return jsonify({"message": "Failed to retrieve employee data."}), 500 + + +@flk_blueprint.route("/update_employee/", methods=["POST"]) +def update_employee(EmployeeCode): + """Update an existing employee's details.""" + try: + Name = request.form["Name"] + gender = request.form["Gender"] + Department = request.form["Department"] + + payload = { + "EmployeeCode": EmployeeCode, + "Name": Name, + "gender": gender, + "Department": Department, + } + + response = requests.put(f"http://127.0.0.1:8000/update/{EmployeeCode}", json=payload) + response.raise_for_status() # Raise an error for bad responses + logger.info(f"Employee {EmployeeCode} updated successfully.") + + return redirect("DisplayingEmployees") + + except requests.exceptions.RequestException as e: + logger.error(f"Request failed: {e}") + return jsonify({"message": "Failed to update employee."}), 500 + + except Exception as e: + logger.error(f"An error occurred: {e}") + return jsonify({"message": "An unexpected error occurred."}), 500 diff --git a/FaceRec/app/main/Face.py b/FaceRec/app/main/Face.py index 25014af..3ae4e68 100644 --- a/FaceRec/app/main/Face.py +++ b/FaceRec/app/main/Face.py @@ -1,7 +1,6 @@ from __future__ import annotations import base64 -import io import json import os @@ -10,16 +9,21 @@ from flask import Blueprint from flask import Response as flask_response from flask import redirect, render_template, request -from PIL import Image +import logging from FaceRec.config import Config +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + Face_Rec_blueprint = Blueprint( "Face_Rec_blueprint", __name__, template_folder="../../templates/", static_folder="../../static/", ) + cap = cv2.VideoCapture(0) @@ -31,11 +35,6 @@ def display_live_video(): The images are encoded in a multipart response, with each image separated by a boundary string. - The function is designed to be used with the Flask Response object, which - handles the details of the HTTP response. The caller should use the - Response object to set the content type to "multipart/x-mixed-replace" and - then yield from this function. - The function breaks out of the loop when the camera is closed or there is an error reading from the camera. @@ -43,13 +42,14 @@ def display_live_video(): default camera. """ while True: - success, frame = cap.read(True) # Read a frame from the camera + success, frame = cap.read() # Read a frame from the camera if not success: + logger.error("Failed to read frame from camera.") break frame = cv2.flip(frame, 1) ret, buffer = cv2.imencode(".jpg", frame) - frame = buffer.tobytes if not ret: + logger.error("Failed to encode frame to JPEG.") break yield ( b"--frame\r\n" @@ -64,17 +64,6 @@ def recognize_employee(): Route for the employee recognition page. This route is used to serve the HTML page for recognizing employees. - - The page contains a live video feed from the default camera and a button - to capture a frame from the video feed and send it to the server for - recognition. - - The page uses JavaScript to display the video feed and send the request - to the server. - - The server responds with a JSON object containing the name of the - recognized employee, if any. - :return: The rendered HTML page for employee recognition. """ return render_template("recognition.html") @@ -85,13 +74,7 @@ def video_feed(): """ Route for displaying live video from the camera. - This route is used to display a live video feed from the default camera. - - The video feed is a multipart response with a sequence of JPEG images, - each separated by a boundary string. - The content type of the response is "multipart/x-mixed-replace;boundary=frame". - :return: The rendered HTML page for displaying live video from the camera. """ return flask_response( @@ -105,20 +88,26 @@ def recognize(): """ Route for recognizing employees from the captured image. - This route is used to recognize employees from the captured image. - - The route is a POST request with the following form data: - - image: The captured image of the employee. - The server responds with a JSON object containing the name of the recognized employee, if any. :return: The rendered HTML page for employee recognition with the response from the server. """ - files = {"image": (open(f"captured_image.jpg", "rb"), "image/jpeg")} - fastapi_url = ( - "http://127.0.0.1:8000/recognize_face" # Replace with your FastAPI URL - ) - response = requests.post(fastapi_url, files=files) - return render_template("recognition.html", response_text=response.text) + try: + # Capture the image file for recognition + with open("captured_image.jpg", "rb") as img_file: + files = {"image": (img_file, "image/jpeg")} + fastapi_url = "http://127.0.0.1:8000/recognize_face" # Replace with your FastAPI URL + response = requests.post(fastapi_url, files=files) + response.raise_for_status() # Raise an error for bad responses + logger.info("Recognition request successful.") + return render_template("recognition.html", response_text=response.text) + + except requests.exceptions.RequestException as e: + logger.error(f"Recognition request failed: {e}") + return render_template("recognition.html", response_text="Recognition failed. Please try again.") + + except Exception as e: + logger.error(f"An unexpected error occurred: {e}") + return render_template("recognition.html", response_text="An unexpected error occurred.") diff --git a/FaceRec/app/main/VideoImage.py b/FaceRec/app/main/VideoImage.py index 9452ee4..2d24de9 100644 --- a/FaceRec/app/main/VideoImage.py +++ b/FaceRec/app/main/VideoImage.py @@ -4,6 +4,7 @@ import io import json import os +import logging import cv2 import requests @@ -14,6 +15,10 @@ from FaceRec.config import Config +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + employee_blueprint = Blueprint( "employee_blueprint", __name__, @@ -23,8 +28,7 @@ cap = cv2.VideoCapture(0) - -# function for displaying live video +# Function for displaying live video def display_live_video(): """ Generator for displaying live video from the camera. @@ -34,11 +38,12 @@ def display_live_video(): while True: success, frame = cap.read() # Read a frame from the camera if not success: + logger.error("Failed to read frame from camera.") break frame = cv2.flip(frame, 1) ret, buffer = cv2.imencode(".jpg", frame) - frame = buffer.tobytes if not ret: + logger.error("Failed to encode frame to JPEG.") break yield ( b"--frame\r\n" @@ -46,7 +51,6 @@ def display_live_video(): bytearray(buffer) + b"\r\n\r\n" ) - # Route for displaying video @employee_blueprint.route("/video_feed") def video_feed(): @@ -54,55 +58,44 @@ def video_feed(): Route for displaying live video from the camera. Returns a multipart response with a JPEG image for each frame from the camera. - - The `mimetype` parameter is set to `'multipart/x-mixed-replace;boundary=frame'` to - indicate that the response body contains multiple images, separated by a boundary - string (`'--frame\r\n'`). The browser will display each image in sequence, creating - the illusion of a live video feed. """ - return flask_response( display_live_video(), mimetype="multipart/x-mixed-replace;boundary=frame", ) - # Route for capturing image from video @employee_blueprint.route("/capture", methods=["GET", "POST"]) def capture(): """ Route for capturing an image from the video feed. - This route is used to capture a single frame from the video feed and save it to a file. - The frame is flipped horizontally before saving. - - The image is stored in a file specified by the `Config.image_data_file` variable. - - The response is a redirect to the "Image" route, which displays the captured image. - - The request is expected to be a POST request with the following form data: - - EmployeeCode: The employee code for the person in the image. - - Name: The name of the person in the image. - - gender: The gender of the person in the image. - - Department: The department of the person in the image. + The captured image is saved to a file specified by the `Config.image_data_file`. """ - global EmployeeCode - global Name - global gender - global Dept - global encoded_image EmployeeCode = request.form.get("EmployeeCode", "") Name = request.form.get("Name", "") gender = request.form.get("gender", "") Dept = request.form.get("Department", "") - ret, frame = cap.read(True) - frame = cv2.flip(frame, 1) - _, buffer = cv2.imencode(".jpg", frame) - encoded_image = base64.b64encode(buffer).decode("utf-8") - with open(Config.image_data_file, "w") as file: - json.dump({"base64_image": encoded_image}, file) - return redirect("Image") + + try: + ret, frame = cap.read() + if not ret: + logger.error("Failed to capture frame for employee image.") + return jsonify({"error": "Failed to capture image"}), 500 + + frame = cv2.flip(frame, 1) + _, buffer = cv2.imencode(".jpg", frame) + encoded_image = base64.b64encode(buffer).decode("utf-8") + + with open(Config.image_data_file, "w") as file: + json.dump({"base64_image": encoded_image}, file) + + logger.info("Image captured and saved successfully.") + return redirect("Image") + except Exception as e: + logger.error(f"Error capturing image: {e}") + return jsonify({"error": "Error capturing image"}), 500 # Route to display captured image @employee_blueprint.route("/Image", methods=["GET"]) @@ -110,135 +103,90 @@ def display_image(): """ Route to display the captured image. - This route is used to display the most recently captured image in the template. - - The image is read from a file specified by the `Config.image_data_file` variable. - - The most recent image is displayed. - - The image path is passed to the template as a variable named `image_path`. - - Returns: - A rendered template with the image path. + The image is read from a file specified by the `Config.image_data_file`. """ + image_path = None if os.path.exists(Config.image_data_file): - with open(Config.image_data_file) as file: - image_data = json.load(file) - encoded_image = image_data.get("base64_image", "") - decoded_image_data = base64.b64decode(encoded_image) - image = Image.open(io.BytesIO(decoded_image_data)) - filename = "final.png" - image.save( - os.path.join( - Config.upload_image_path[0], - filename, - ), - quality=100, - ) - image = sorted( - os.listdir(Config.upload_image_path[0]), - key=lambda x: os.path.getatime( - os.path.join(Config.upload_image_path[0], x), - ), - reverse=True, - ) - if image: - recent_image = image[0] - image_path = os.path.join(Config.upload_image_path[0], recent_image) - else: - recent_image = None - image_path = os.path.join(Config.upload_image_path[0], recent_image) - print("done") + try: + with open(Config.image_data_file) as file: + image_data = json.load(file) + encoded_image = image_data.get("base64_image", "") + decoded_image_data = base64.b64decode(encoded_image) + image = Image.open(io.BytesIO(decoded_image_data)) + filename = "final.png" + image.save(os.path.join(Config.upload_image_path[0], filename), quality=100) + + recent_images = sorted( + os.listdir(Config.upload_image_path[0]), + key=lambda x: os.path.getatime(os.path.join(Config.upload_image_path[0], x)), + reverse=True, + ) + image_path = os.path.join(Config.upload_image_path[0], recent_images[0]) if recent_images else None + logger.info("Image displayed successfully.") + except Exception as e: + logger.error(f"Error displaying image: {e}") + return jsonify({"error": "Error displaying image"}), 500 + return render_template("index.html", image_path=image_path) - -# Below route are of Recognition - - +# Route for recognition capturing @employee_blueprint.route("/capturing", methods=["GET", "POST"]) def capturing(): """ - This route is used to capture an image from the video feed. - - When the route is accessed, a single frame is read from the video feed - and saved to a file specified by the `Config.image_data_file` variable. + This route captures an image from the video feed and saves it. + """ + try: + ret, frame = cap.read() + if not ret: + logger.error("Failed to capture frame during recognition.") + return jsonify({"error": "Failed to capture image"}), 500 - The frame is flipped horizontally before saving. + frame = cv2.flip(frame, 1) + _, buffer = cv2.imencode(".jpg", frame) + encoded_image = base64.b64encode(buffer).decode("utf-8") - The response is a redirect to the "Pic" route, which displays the captured - image. + with open(Config.image_data_file, "w") as file: + json.dump({"base64_image": encoded_image}, file) - The request is expected to be a GET or POST request with no form data. - """ - ret, frame = cap.read(True) - frame = cv2.flip(frame, 1) - _, buffer = cv2.imencode(".jpg", frame) - encoded_image = base64.b64encode(buffer).decode("utf-8") - with open(Config.image_data_file, "w") as file: - json.dump({"base64_image": encoded_image}, file) - return redirect("Pic") + logger.info("Recognition image captured successfully.") + return redirect("Pic") + except Exception as e: + logger.error(f"Error capturing recognition image: {e}") + return jsonify({"error": "Error capturing recognition image"}), 500 -# Route to display captured image +# Route to display captured image for recognition @employee_blueprint.route("/Pic", methods=["GET", "POST"]) def display_pic(): - """Route to display the captured image. - - This route reads the image data from a file specified by the - `Config.image_data_file` variable and displays it in the template. - - The image is saved to a file in the directory specified by the - `Config.upload_image_path` variable. - - The most recent image is displayed. - - The image is displayed in the template with the name "image_path". - - Returns: - A rendered template with the image path. - - """ + """Route to display the captured image for recognition.""" if os.path.exists(Config.image_data_file): - with open(Config.image_data_file) as file: - image_data = json.load(file) - encoded_image = image_data.get("base64_image", "") - decoded_image_data = base64.b64decode(encoded_image) - image = Image.open(io.BytesIO(decoded_image_data)) - filename = "final.png" - image.save( - os.path.join( - Config.upload_image_path[0], - filename, - ), - quality=100, - ) - image = sorted( - os.listdir(Config.upload_image_path[0]), - key=lambda x: os.path.getatime( - os.path.join(Config.upload_image_path[0], x), - ), - reverse=True, - ) - if image: - recent_image = image[0] - image_path = os.path.join(Config.upload_image_path[0], recent_image) - else: - recent_image = None - image_path = os.path.join(Config.upload_image_path[0], recent_image) - print("done") - files = { - "Face": open( - os.path.join( - Config.upload_image_path[0], - "final.jpg", - ), - "rb", - ), - } - try: - fastapi_url = "http://127.0.0.1:8000/recognize_face" - req = requests.post(fastapi_url, files=files) - data = req.content - return data - except Exception as e: - print("Error:", e) + try: + with open(Config.image_data_file) as file: + image_data = json.load(file) + encoded_image = image_data.get("base64_image", "") + decoded_image_data = base64.b64decode(encoded_image) + image = Image.open(io.BytesIO(decoded_image_data)) + filename = "final.png" + image.save(os.path.join(Config.upload_image_path[0], filename), quality=100) + + recent_images = sorted( + os.listdir(Config.upload_image_path[0]), + key=lambda x: os.path.getatime(os.path.join(Config.upload_image_path[0], x)), + reverse=True, + ) + image_path = os.path.join(Config.upload_image_path[0], recent_images[0]) if recent_images else None + + logger.info("Displaying recognition image.") + files = { + "Face": open(os.path.join(Config.upload_image_path[0], "final.png"), "rb"), + } + fastapi_url = "http://127.0.0.1:8000/recognize_face" + req = requests.post(fastapi_url, files=files) + data = req.content + return data + + except Exception as e: + logger.error(f"Error displaying recognition image: {e}") + return jsonify({"error": "Error displaying recognition image"}), 500 + + return jsonify({"error": "No image found"}), 404 diff --git a/FaceRec/app/main/__init__.py b/FaceRec/app/main/__init__.py index 751c90e..21782e0 100644 --- a/FaceRec/app/main/__init__.py +++ b/FaceRec/app/main/__init__.py @@ -13,13 +13,18 @@ static_folder="../../static/", ) -# To register blueprints of flask -app.register_blueprint(flk_blueprint) -app.register_blueprint(employee_blueprint) -app.register_blueprint(Face_Rec_blueprint) -app.register_blueprint(Edit_blueprint) +# To register blueprints of Flask +try: + app.register_blueprint(flk_blueprint) + app.register_blueprint(employee_blueprint) + app.register_blueprint(Face_Rec_blueprint) + app.register_blueprint(Edit_blueprint) +except Exception as e: + print(f"Error registering blueprints: {e}") - -# function to run server of Flast +# Function to run the server of Flask def run_flask(): - app.run(host="127.0.0.1", port=5000) + try: + app.run(host="127.0.0.1", port=5000) + except Exception as e: + print(f"Error running Flask server: {e}") diff --git a/FaceRec/config.py b/FaceRec/config.py index 604cca1..613b977 100644 --- a/FaceRec/config.py +++ b/FaceRec/config.py @@ -4,12 +4,24 @@ basedir = os.path.abspath(os.path.dirname(__file__)) - -# This class named Config is likely used for storing configuration settings or parameters in a Python -# program. +# This class named Config is likely used for storing configuration settings or parameters in a Python program. class Config: - SECRET_KEY = os.environ.get("SECRET_KEY") - DEBUG = (True,) - upload_image_path = (os.path.join(basedir, "static/Images/uploads"),) - ALLOWED_EXTENSIONS = (["jpg", "jpeg", "png", "jfif"],) - image_data_file = os.path.join(basedir, "static/Images/image_data.json") + try: + SECRET_KEY = os.environ.get("SECRET_KEY") + except Exception as e: + print(f"Error retrieving SECRET_KEY from environment: {e}") + SECRET_KEY = None # Default to None or handle it as needed + + DEBUG = True # Assuming DEBUG should be a boolean, not a tuple + try: + upload_image_path = os.path.join(basedir, "static/Images/uploads") + except Exception as e: + print(f"Error setting upload_image_path: {e}") + upload_image_path = None # Default to None or handle it as needed + + ALLOWED_EXTENSIONS = ["jpg", "jpeg", "png", "jfif"] # Use a list, not a tuple with a list inside + try: + image_data_file = os.path.join(basedir, "static/Images/image_data.json") + except Exception as e: + print(f"Error setting image_data_file path: {e}") + image_data_file = None # Default to None or handle it as needed diff --git a/Model-Training/eval-mark-I.py b/Model-Training/eval-mark-I.py index b31c1df..087ded8 100644 --- a/Model-Training/eval-mark-I.py +++ b/Model-Training/eval-mark-I.py @@ -1,21 +1,22 @@ from __future__ import annotations import os - import numpy as np from keras.models import load_model from keras.preprocessing import image from sklearn.metrics.pairwise import euclidean_distances - # Function to load and preprocess images def load_and_preprocess_image(img_path, target_size=(160, 160)): - img = image.load_img(img_path, target_size=target_size) - img_array = image.img_to_array(img) - img_array = np.expand_dims(img_array, axis=0) - img_array /= 255.0 - return img_array - + try: + img = image.load_img(img_path, target_size=target_size) + img_array = image.img_to_array(img) + img_array = np.expand_dims(img_array, axis=0) + img_array /= 255.0 + return img_array + except Exception as e: + print(f"Error loading image {img_path}: {e}") + return None # Function to generate embeddings def generate_embeddings(model, dataset_path): @@ -30,6 +31,8 @@ def generate_embeddings(model, dataset_path): for img_name in os.listdir(class_path): img_path = os.path.join(class_path, img_name) img_array = load_and_preprocess_image(img_path) + if img_array is None: + continue embedding = model.predict(img_array) embeddings.append(embedding[0]) labels.append(class_name) @@ -38,7 +41,6 @@ def generate_embeddings(model, dataset_path): labels = np.array(labels) return embeddings, labels - # Function to calculate intra-cluster distances def calculate_intra_cluster_distances(embeddings, labels): unique_labels = np.unique(labels) @@ -57,8 +59,7 @@ def calculate_intra_cluster_distances(embeddings, labels): return np.array(distances) - -# Load the pre-trained FaceNet model (replace 'facenet_model.h5' with your model file) +# Load the pre-trained FaceNet model model_path = "facenet_model.h5" model = load_model(model_path) @@ -68,7 +69,7 @@ def calculate_intra_cluster_distances(embeddings, labels): # Generate embeddings for the original model embeddings_original, labels = generate_embeddings(model, dataset_path) -# Load the fine-tuned model (replace 'facenet_model_finetuned.h5' with your fine-tuned model file) +# Load the fine-tuned model finetuned_model_path = "facenet_model_finetuned.h5" finetuned_model = load_model(finetuned_model_path) diff --git a/docs/conf.py b/docs/conf.py index 2010d9e..56ab43e 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -7,8 +7,11 @@ import os import sys -sys.path.insert(0, os.path.abspath("..")) - +try: + sys.path.insert(0, os.path.abspath("..")) +except Exception as e: + print(f"Error adding path to sys.path: {e}") + raise # Re-raise the exception to notify the user # -- Project information ----------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#project-information @@ -26,15 +29,18 @@ "sphinx.ext.napoleon", "myst_parser", # Add this line ] + # Include markdown support myst_enable_extensions = [ "colon_fence", "html_image", ] + source_suffix = { ".rst": "restructuredtext", ".md": "markdown", # Add this line to recognize markdown files } + # Add any paths that contain templates here, relative to this directory. templates_path = ["_templates"] @@ -46,6 +52,14 @@ # -- Options for HTML output ------------------------------------------------- # https://www.sphinx-doc.org/en/master/usage/configuration.html#options-for-html-output - html_theme = "sphinx_rtd_theme" html_static_path = ["_static"] + +# Additional configuration that might raise exceptions can also be wrapped +try: + # Example of additional configurations that could raise exceptions + # html_theme_options = {...} + pass # Replace with actual configurations if needed +except Exception as e: + print(f"Error in HTML configuration: {e}") + raise # Re-raise the exception to notify the user diff --git a/main.py b/main.py index 9c962c1..a3c828c 100644 --- a/main.py +++ b/main.py @@ -1,12 +1,30 @@ from __future__ import annotations +import logging from concurrent.futures import ThreadPoolExecutor - from API import run_fastapi_app from FaceRec.app.main import run_flask +# Initialize logging configuration +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Function to run Flask app with exception handling +def run_flask_app(): + try: + run_flask() + except Exception as e: + logger.error(f"Error starting Flask app: {e}") + +# Function to run FastAPI app with exception handling +def run_fastapi_app_with_handling(): + try: + run_fastapi_app() + except Exception as e: + logger.error(f"Error starting FastAPI app: {e}") + # Multithreading used to start both FastAPI and Flask apps at the same time. if __name__ == "__main__": with ThreadPoolExecutor(max_workers=2) as executor: - executor.submit(run_flask) - executor.submit(run_fastapi_app) + executor.submit(run_flask_app) + executor.submit(run_fastapi_app_with_handling) diff --git a/testing/test_database.py b/testing/test_database.py index 2dd6ab7..ffaf29e 100644 --- a/testing/test_database.py +++ b/testing/test_database.py @@ -13,11 +13,9 @@ init_logging_config() - def test_vector_search(): """ Test that the vector_search function returns the correct result. - """ mock_result = [ diff --git a/testing/test_face_cycle.py b/testing/test_face_cycle.py index f5e2974..27f18fe 100644 --- a/testing/test_face_cycle.py +++ b/testing/test_face_cycle.py @@ -10,7 +10,6 @@ client = TestClient(router) - @patch("API.database.Database.find_one_and_delete") @patch("API.database.Database.update_one") @patch("API.database.Database.find_one") @@ -49,8 +48,10 @@ def test_face_lifecycle( mock_find_one.return_value = mock_doc mock_update_one.return_value = MagicMock(modified_count=1) mock_find_one_and_delete.return_value = mock_doc + with open("./test-faces/devansh.jpg", "rb") as image_file: encoded_string1 = base64.b64encode(image_file.read()).decode("utf-8") + response1 = client.post( "/create_new_faceEntry", json={ @@ -66,6 +67,7 @@ def test_face_lifecycle( with open("./test-faces/devansh.jpg", "rb") as image_file: encoded_string2 = base64.b64encode(image_file.read()).decode("utf-8") + response2 = client.post( "/create_new_faceEntry", json={ @@ -84,9 +86,6 @@ def test_face_lifecycle( assert response.status_code == 200 assert len(response.json()) == 2 - with open("./test-faces/devansh.jpg", "rb") as image_file: - encoded_string2 = base64.b64encode(image_file.read()).decode("utf-8") - # Update a face response = client.put( "/update/1", diff --git a/testing/test_face_endpoints.py b/testing/test_face_endpoints.py index c931ffc..7cafcce 100644 --- a/testing/test_face_endpoints.py +++ b/testing/test_face_endpoints.py @@ -6,266 +6,249 @@ import pytest from fastapi.testclient import TestClient - from API.route import router from API.utils import init_logging_config +# Initialize logging configuration init_logging_config() client = TestClient(router) +# Exception handling utility function +def log_exception(message: str): + logging.error(message) @pytest.mark.run(order=1) def test_register_face1(): """ Test the creation of a face entry. - - Verifies that a POST request to /create_new_faceEntry with a valid - JSON body containing an EmployeeCode, Name, gender, Department, and - a list of one or more images creates a new face entry in the database - and returns a message indicating success. """ - mock_doc = { - "_id": "65e6284d01f95cd96ea334a7", - "EmployeeCode": "1", - "Name": "Devansh", - "gender": "Male", - "Department": "IT", - "Images": ["encoded_string1", "encoded_string2"], - } - - mock_find = MagicMock(return_value=[mock_doc, mock_doc]) - mock_insert_one = MagicMock(return_value=MagicMock(inserted_id="1")) - mock_find_one = MagicMock(return_value=mock_doc) - mock_update_one = MagicMock(return_value=MagicMock(modified_count=1)) - mock_find_one_and_delete = MagicMock(return_value=mock_doc) - - with patch("API.database.Database.find", mock_find), patch( - "API.database.Database.insert_one", - mock_insert_one, - ), patch("API.database.Database.find_one", mock_find_one), patch( - "API.database.Database.update_one", - mock_update_one, - ), patch( - "API.database.Database.find_one_and_delete", - mock_find_one_and_delete, - ): - - with open("./test-faces/devansh.jpg", "rb") as image_file: - encoded_string1 = base64.b64encode( - image_file.read(), - ).decode("utf-8") - - response1 = client.post( - "/create_new_faceEntry", - json={ - "EmployeeCode": "1", - "Name": "Devansh", - "gender": "Male", - "Department": "IT", - "Images": [encoded_string1, encoded_string1], - }, - ) - assert response1.status_code == 200 - assert response1.json() == { - "message": "Face entry created successfully", + try: + mock_doc = { + "_id": "65e6284d01f95cd96ea334a7", + "EmployeeCode": "1", + "Name": "Devansh", + "gender": "Male", + "Department": "IT", + "Images": ["encoded_string1", "encoded_string2"], } + mock_find = MagicMock(return_value=[mock_doc, mock_doc]) + mock_insert_one = MagicMock(return_value=MagicMock(inserted_id="1")) + mock_find_one = MagicMock(return_value=mock_doc) + mock_update_one = MagicMock(return_value=MagicMock(modified_count=1)) + mock_find_one_and_delete = MagicMock(return_value=mock_doc) + + with patch("API.database.Database.find", mock_find), patch( + "API.database.Database.insert_one", mock_insert_one + ), patch("API.database.Database.find_one", mock_find_one), patch( + "API.database.Database.update_one", mock_update_one + ), patch( + "API.database.Database.find_one_and_delete", mock_find_one_and_delete + ): + with open("./test-faces/devansh.jpg", "rb") as image_file: + encoded_string1 = base64.b64encode(image_file.read()).decode("utf-8") + + response1 = client.post( + "/create_new_faceEntry", + json={ + "EmployeeCode": "1", + "Name": "Devansh", + "gender": "Male", + "Department": "IT", + "Images": [encoded_string1, encoded_string1], + }, + ) + assert response1.status_code == 200 + assert response1.json() == { + "message": "Face entry created successfully", + } + except Exception as e: + log_exception(f"Error in test_register_face1: {e}") + pytest.fail(f"Test failed due to exception: {e}") @pytest.mark.run(order=2) def test_register_face2(): """ Test the creation of a second face entry. - - Verifies that a POST request to /create_new_faceEntry with a valid - JSON body containing an EmployeeCode, Name, gender, Department, and - a list of one or more images creates a new face entry in the database - and returns a message indicating success. """ - mock_doc = { - "_id": "65e6284d01f95cd96ea334a7", - "EmployeeCode": "1", - "Name": "Devansh", - "gender": "Male", - "Department": "IT", - "Images": ["encoded_string1", "encoded_string2"], - } - - mock_find = MagicMock(return_value=[mock_doc, mock_doc]) - mock_insert_one = MagicMock(return_value=MagicMock(inserted_id="1")) - mock_find_one = MagicMock(return_value=mock_doc) - mock_update_one = MagicMock(return_value=MagicMock(modified_count=1)) - mock_find_one_and_delete = MagicMock(return_value=mock_doc) - - with patch("API.database.Database.find", mock_find), patch( - "API.database.Database.insert_one", - mock_insert_one, - ), patch("API.database.Database.find_one", mock_find_one), patch( - "API.database.Database.update_one", - mock_update_one, - ), patch( - "API.database.Database.find_one_and_delete", - mock_find_one_and_delete, - ): - - with open("./test-faces/devansh.jpg", "rb") as image_file: - encoded_string2 = base64.b64encode( - image_file.read(), - ).decode("utf-8") - - response2 = client.post( - "/create_new_faceEntry", - json={ - "EmployeeCode": "2", - "Name": "test", - "gender": "Female", - "Department": "IT", - "Images": [encoded_string2, encoded_string2], - }, - ) - assert response2.status_code == 200 - assert response2.json() == { - "message": "Face entry created successfully", + try: + mock_doc = { + "_id": "65e6284d01f95cd96ea334a7", + "EmployeeCode": "1", + "Name": "Devansh", + "gender": "Male", + "Department": "IT", + "Images": ["encoded_string1", "encoded_string2"], } + mock_find = MagicMock(return_value=[mock_doc, mock_doc]) + mock_insert_one = MagicMock(return_value=MagicMock(inserted_id="1")) + mock_find_one = MagicMock(return_value=mock_doc) + mock_update_one = MagicMock(return_value=MagicMock(modified_count=1)) + mock_find_one_and_delete = MagicMock(return_value=mock_doc) + + with patch("API.database.Database.find", mock_find), patch( + "API.database.Database.insert_one", mock_insert_one + ), patch("API.database.Database.find_one", mock_find_one), patch( + "API.database.Database.update_one", mock_update_one + ), patch( + "API.database.Database.find_one_and_delete", mock_find_one_and_delete + ): + with open("./test-faces/devansh.jpg", "rb") as image_file: + encoded_string2 = base64.b64encode(image_file.read()).decode("utf-8") + + response2 = client.post( + "/create_new_faceEntry", + json={ + "EmployeeCode": "2", + "Name": "test", + "gender": "Female", + "Department": "IT", + "Images": [encoded_string2, encoded_string2], + }, + ) + assert response2.status_code == 200 + assert response2.json() == { + "message": "Face entry created successfully", + } + except Exception as e: + log_exception(f"Error in test_register_face2: {e}") + pytest.fail(f"Test failed due to exception: {e}") @pytest.mark.run(order=3) def test_get_all_faces_after_registration(): """ Test the retrieval of all face entries after registration. - - Verifies that a GET request to /Data returns a list of all face entries - in the database, and that the list contains the two face entries created - in the previous tests. """ - mock_doc = { - "_id": "65e6284d01f95cd96ea334a7", - "EmployeeCode": "1", - "Name": "Devansh", - "gender": "Male", - "Department": "IT", - "Images": ["encoded_string1", "encoded_string2"], - } - - mock_find = MagicMock(return_value=[mock_doc, mock_doc]) + try: + mock_doc = { + "_id": "65e6284d01f95cd96ea334a7", + "EmployeeCode": "1", + "Name": "Devansh", + "gender": "Male", + "Department": "IT", + "Images": ["encoded_string1", "encoded_string2"], + } - with patch("API.database.Database.find", mock_find): - response = client.get("/Data/") - assert response.status_code == 200 - logging.debug(response.json()) - assert len(response.json()) == 2 + mock_find = MagicMock(return_value=[mock_doc, mock_doc]) + with patch("API.database.Database.find", mock_find): + response = client.get("/Data/") + assert response.status_code == 200 + logging.debug(response.json()) + assert len(response.json()) == 2 + except Exception as e: + log_exception(f"Error in test_get_all_faces_after_registration: {e}") + pytest.fail(f"Test failed due to exception: {e}") @pytest.mark.run(order=4) def test_update_face(): """ Test the update of a face entry. - - Verifies that a PUT request to /update/{EmployeeCode} with a valid - JSON body containing an EmployeeCode, Name, gender, Department, and - a list of one or more images updates the face entry in the database - and returns a message indicating success. """ - mock_doc = { - "_id": "65e6284d01f95cd96ea334a7", - "EmployeeCode": "1", - "Name": "Devansh", - "gender": "Male", - "Department": "IT", - "Images": ["encoded_string1", "encoded_string2"], - } - - mock_find = MagicMock(return_value=[mock_doc, mock_doc]) - mock_insert_one = MagicMock(return_value=MagicMock(inserted_id="1")) - mock_find_one = MagicMock(return_value=mock_doc) - mock_update_one = MagicMock(return_value=MagicMock(modified_count=1)) - mock_find_one_and_delete = MagicMock(return_value=mock_doc) - - with patch("API.database.Database.find", mock_find), patch( - "API.database.Database.insert_one", - mock_insert_one, - ), patch("API.database.Database.find_one", mock_find_one), patch( - "API.database.Database.update_one", - mock_update_one, - ), patch( - "API.database.Database.find_one_and_delete", - mock_find_one_and_delete, - ): - with open("./test-faces/devansh.jpg", "rb") as image_file: - encoded_string2 = base64.b64encode( - image_file.read(), - ).decode("utf-8") - - response = client.put( - "/update/1", - json={ - "Name": "Test", - "gender": "Male", - "Department": "IT_Test", - "Images": [encoded_string2, encoded_string2], - }, - ) - assert response.status_code == 200 - assert response.json() == "Updated Successfully" + try: + mock_doc = { + "_id": "65e6284d01f95cd96ea334a7", + "EmployeeCode": "1", + "Name": "Devansh", + "gender": "Male", + "Department": "IT", + "Images": ["encoded_string1", "encoded_string2"], + } + mock_find = MagicMock(return_value=[mock_doc, mock_doc]) + mock_insert_one = MagicMock(return_value=MagicMock(inserted_id="1")) + mock_find_one = MagicMock(return_value=mock_doc) + mock_update_one = MagicMock(return_value=MagicMock(modified_count=1)) + mock_find_one_and_delete = MagicMock(return_value=mock_doc) + + with patch("API.database.Database.find", mock_find), patch( + "API.database.Database.insert_one", mock_insert_one + ), patch("API.database.Database.find_one", mock_find_one), patch( + "API.database.Database.update_one", mock_update_one + ), patch( + "API.database.Database.find_one_and_delete", mock_find_one_and_delete + ): + with open("./test-faces/devansh.jpg", "rb") as image_file: + encoded_string2 = base64.b64encode(image_file.read()).decode("utf-8") + + response = client.put( + "/update/1", + json={ + "Name": "Test", + "gender": "Male", + "Department": "IT_Test", + "Images": [encoded_string2, encoded_string2], + }, + ) + assert response.status_code == 200 + assert response.json() == "Updated Successfully" + except Exception as e: + log_exception(f"Error in test_update_face: {e}") + pytest.fail(f"Test failed due to exception: {e}") @pytest.mark.run(order=5) def test_delete_face(): """ Test the DELETE endpoint to delete a face entry. - - This test case asserts that the DELETE endpoint returns a 200 status code and a JSON response with the message 'Successfully Deleted'. """ - response = client.delete("/delete/1") - assert response.status_code == 200 - assert response.json() == {"Message": "Successfully Deleted"} - + try: + response = client.delete("/delete/1") + assert response.status_code == 200 + assert response.json() == {"Message": "Successfully Deleted"} + except Exception as e: + log_exception(f"Error in test_delete_face: {e}") + pytest.fail(f"Test failed due to exception: {e}") @pytest.mark.run(order=6) def test_recognize_face_fail(): """ Test the POST endpoint to recognize a face when no match is found. - - This test case asserts that the POST endpoint returns a 404 status code and a JSON response with the message 'No match found'. """ - mock_doc = { - "Image": "encoded_string2", - "Name": "Test2", - "score": 0.0, - } - with patch("API.database.Database.vector_search", return_value=[mock_doc]): - - with open("./test-faces/devansh.jpg", "rb") as image_file: - response = client.post( - "/recognize_face", - files={"Face": image_file}, - ) - assert response.status_code == 404 - assert response.json() == {"message": "No match found"} - + try: + mock_doc = { + "Image": "encoded_string2", + "Name": "Test2", + "score": 0.0, + } + with patch("API.database.Database.vector_search", return_value=[mock_doc]): + + with open("./test-faces/devansh.jpg", "rb") as image_file: + response = client.post( + "/recognize_face", + files={"Face": image_file}, + ) + assert response.status_code == 404 + assert response.json() == {"message": "No match found"} + except Exception as e: + log_exception(f"Error in test_recognize_face_fail: {e}") + pytest.fail(f"Test failed due to exception: {e}") @pytest.mark.run(order=7) def test_recognize_face_success(): """ Test the POST endpoint to recognize a face when a match is found. - - This test case asserts that the POST endpoint returns a 200 status code and a JSON response with the name, image and score of the matching face entry. """ - mock_doc = { - "Image": "encoded_string2", - "Name": "Test2", - "score": 1.0, - } - with patch("API.database.Database.vector_search", return_value=[mock_doc]): - - with open("./test-faces/devansh.jpg", "rb") as image_file: - response = client.post( - "/recognize_face", - files={"Face": image_file}, - ) - assert response.status_code == 200 - assert response.json() == { - "Name": "Test2", + try: + mock_doc = { "Image": "encoded_string2", + "Name": "Test2", "score": 1.0, } + with patch("API.database.Database.vector_search", return_value=[mock_doc]): + + with open("./test-faces/devansh.jpg", "rb") as image_file: + response = client.post( + "/recognize_face", + files={"Face": image_file}, + ) + assert response.status_code == 200 + assert response.json() == { + "Name": "Test2", + "Image": "encoded_string2", + "Score": 1.0, + } + except Exception as e: + log_exception(f"Error in test_recognize_face_success: {e}") + pytest.fail(f"Test failed due to exception: {e}") From 2dfc1586fb4f8b83012588411c92817465f888a6 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Sat, 19 Oct 2024 15:44:39 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- API/__init__.py | 5 +++- API/database.py | 7 +++-- FaceRec/app/main/Edit.py | 24 ++++++++++++----- FaceRec/app/main/Employee.py | 19 +++++++++----- FaceRec/app/main/Face.py | 14 +++++++--- FaceRec/app/main/VideoImage.py | 47 +++++++++++++++++++++++++--------- FaceRec/app/main/__init__.py | 1 + FaceRec/config.py | 11 ++++++-- Model-Training/eval-mark-I.py | 5 ++++ main.py | 4 +++ testing/test_database.py | 1 + testing/test_face_cycle.py | 5 ++-- testing/test_face_endpoints.py | 18 ++++++++++--- 13 files changed, 123 insertions(+), 38 deletions(-) diff --git a/API/__init__.py b/API/__init__.py index f5e794d..c4a6e9b 100644 --- a/API/__init__.py +++ b/API/__init__.py @@ -1,7 +1,8 @@ from __future__ import annotations -import uvicorn import logging + +import uvicorn from fastapi import FastAPI, HTTPException from API import route @@ -15,6 +16,7 @@ # API Router Fastapp.include_router(route.router) + @Fastapp.get("/") def read_root(): try: @@ -23,6 +25,7 @@ def read_root(): logger.error(f"Error in root endpoint: {e}") raise HTTPException(status_code=500, detail="Internal Server Error") + # Function to run FastAPI server def run_fastapi_app(): try: diff --git a/API/database.py b/API/database.py index 305a466..a99822f 100644 --- a/API/database.py +++ b/API/database.py @@ -1,13 +1,15 @@ from __future__ import annotations -from datetime import datetime import logging +from datetime import datetime + from pymongo import MongoClient, errors # Initialize logger logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) + class Database: def __init__(self, uri="mongodb://localhost:27017/", db_name="ImageDB"): """ @@ -148,5 +150,6 @@ def vector_search(self, collection, embedding): result_arr = [i for i in result] return result_arr except errors.PyMongoError as e: - logger.error(f"Error performing vector search in {collection}: {e}") + logger.error( + f"Error performing vector search in {collection}: {e}") return [] diff --git a/FaceRec/app/main/Edit.py b/FaceRec/app/main/Edit.py index 860b835..8764009 100644 --- a/FaceRec/app/main/Edit.py +++ b/FaceRec/app/main/Edit.py @@ -28,6 +28,7 @@ print("Error: Could not open video capture.") # You can raise an exception or handle it as needed + # Function for displaying live video def display_live_video(): """ @@ -51,6 +52,7 @@ def display_live_video(): bytearray(buffer) + b"\r\n\r\n" ) + # Route for displaying video @Edit_blueprint.route("/video_feed") def video_feed(): @@ -63,6 +65,7 @@ def video_feed(): mimetype="multipart/x-mixed-replace;boundary=frame", ) + # Route for capturing image from video @Edit_blueprint.route("/capture", methods=["GET", "POST"]) def capture(): @@ -91,7 +94,7 @@ def capture(): Name = request.form.get("Name", "") gender = request.form.get("gender", "") Dept = request.form.get("Department", "") - + try: ret, frame = cap.read(True) if not ret: @@ -101,7 +104,7 @@ def capture(): frame = cv2.flip(frame, 1) _, buffer = cv2.imencode(".jpg", frame) encoded_image = base64.b64encode(buffer).decode("utf-8") - + with open(Config.image_data_file, "w") as file: json.dump({"base64_image": encoded_image}, file) except Exception as e: @@ -109,6 +112,7 @@ def capture(): return redirect("Image") + # Route to display captured image @Edit_blueprint.route("/Image", methods=["GET"]) def display_image(): @@ -154,7 +158,8 @@ def display_image(): if image: recent_image = image[0] - image_path = os.path.join(Config.upload_image_path[0], recent_image) + image_path = os.path.join( + Config.upload_image_path[0], recent_image) else: recent_image = None image_path = "" @@ -166,7 +171,10 @@ def display_image(): return render_template("index.html", image_path=image_path) except Exception as e: print(f"Error while displaying image: {e}") - return render_template("index.html", image_path="") # Show a default image or handle error + return render_template( + "index.html", image_path="" + ) # Show a default image or handle error + @Edit_blueprint.route("/edit/", methods=["POST", "GET"]) def edit(EmployeeCode): @@ -221,7 +229,9 @@ def edit(EmployeeCode): if url.status_code == 200: return redirect("/") else: - print(f"Error: Failed to update employee data with status code {url.status_code}") + print( + f"Error: Failed to update employee data with status code {url.status_code}" + ) except requests.exceptions.RequestException as e: print(f"Request failed: {e}") @@ -234,7 +244,9 @@ def edit(EmployeeCode): employee_data = response.json() return render_template("edit.html", employee_data=employee_data) else: - print(f"Error: Failed to retrieve employee data with status code {response.status_code}") + print( + f"Error: Failed to retrieve employee data with status code {response.status_code}" + ) return f"Error {response.status_code}: Failed to retrieve employee data." except requests.exceptions.RequestException as e: print(f"Request failed: {e}") diff --git a/FaceRec/app/main/Employee.py b/FaceRec/app/main/Employee.py index 4a016e9..8e57026 100644 --- a/FaceRec/app/main/Employee.py +++ b/FaceRec/app/main/Employee.py @@ -2,12 +2,12 @@ import base64 import json +import logging import os import cv2 import requests from flask import Blueprint, jsonify, redirect, render_template, request -import logging from FaceRec.config import Config @@ -47,7 +47,9 @@ def display_information(): logger.info("Employee data retrieved successfully.") except requests.exceptions.RequestException as e: logger.error(f"Request failed: {e}") - employees = [] # Handle the error gracefully by setting employees to an empty list + employees = ( + [] + ) # Handle the error gracefully by setting employees to an empty list return render_template("table.html", employees=employees) @@ -74,7 +76,7 @@ def submit_form(): return jsonify({"message": "No file part"}), 400 file = request.files["File"] allowed_extensions = {"png", "jpg", "jpeg"} - if file and file.filename.split('.')[-1].lower() not in allowed_extensions: + if file and file.filename.split(".")[-1].lower() not in allowed_extensions: return jsonify({"message": "File extension is not valid"}), 400 image_data = file.read() @@ -94,7 +96,9 @@ def submit_form(): } # Send request to FastAPI - response = requests.post("http://127.0.0.1:8000/create_new_faceEntry", json=payload) + response = requests.post( + "http://127.0.0.1:8000/create_new_faceEntry", json=payload + ) response.raise_for_status() # Raise an error for bad responses logger.info("Employee record created successfully.") @@ -116,7 +120,8 @@ def delete(EmployeeCode): return jsonify({"message": "Employee code should be an integer"}), 400 try: - response = requests.delete(f"http://127.0.0.1:8000/delete/{EmployeeCode}") + response = requests.delete( + f"http://127.0.0.1:8000/delete/{EmployeeCode}") response.raise_for_status() # Raise an error for bad responses logger.info(f"Employee {EmployeeCode} deleted successfully.") except requests.exceptions.RequestException as e: @@ -157,7 +162,9 @@ def update_employee(EmployeeCode): "Department": Department, } - response = requests.put(f"http://127.0.0.1:8000/update/{EmployeeCode}", json=payload) + response = requests.put( + f"http://127.0.0.1:8000/update/{EmployeeCode}", json=payload + ) response.raise_for_status() # Raise an error for bad responses logger.info(f"Employee {EmployeeCode} updated successfully.") diff --git a/FaceRec/app/main/Face.py b/FaceRec/app/main/Face.py index 3ae4e68..9ada059 100644 --- a/FaceRec/app/main/Face.py +++ b/FaceRec/app/main/Face.py @@ -2,6 +2,7 @@ import base64 import json +import logging import os import cv2 @@ -9,7 +10,6 @@ from flask import Blueprint from flask import Response as flask_response from flask import redirect, render_template, request -import logging from FaceRec.config import Config @@ -98,7 +98,9 @@ def recognize(): # Capture the image file for recognition with open("captured_image.jpg", "rb") as img_file: files = {"image": (img_file, "image/jpeg")} - fastapi_url = "http://127.0.0.1:8000/recognize_face" # Replace with your FastAPI URL + fastapi_url = ( + "http://127.0.0.1:8000/recognize_face" # Replace with your FastAPI URL + ) response = requests.post(fastapi_url, files=files) response.raise_for_status() # Raise an error for bad responses logger.info("Recognition request successful.") @@ -106,8 +108,12 @@ def recognize(): except requests.exceptions.RequestException as e: logger.error(f"Recognition request failed: {e}") - return render_template("recognition.html", response_text="Recognition failed. Please try again.") + return render_template( + "recognition.html", response_text="Recognition failed. Please try again." + ) except Exception as e: logger.error(f"An unexpected error occurred: {e}") - return render_template("recognition.html", response_text="An unexpected error occurred.") + return render_template( + "recognition.html", response_text="An unexpected error occurred." + ) diff --git a/FaceRec/app/main/VideoImage.py b/FaceRec/app/main/VideoImage.py index 2d24de9..99150e2 100644 --- a/FaceRec/app/main/VideoImage.py +++ b/FaceRec/app/main/VideoImage.py @@ -3,8 +3,8 @@ import base64 import io import json -import os import logging +import os import cv2 import requests @@ -28,6 +28,7 @@ cap = cv2.VideoCapture(0) + # Function for displaying live video def display_live_video(): """ @@ -51,6 +52,7 @@ def display_live_video(): bytearray(buffer) + b"\r\n\r\n" ) + # Route for displaying video @employee_blueprint.route("/video_feed") def video_feed(): @@ -64,6 +66,7 @@ def video_feed(): mimetype="multipart/x-mixed-replace;boundary=frame", ) + # Route for capturing image from video @employee_blueprint.route("/capture", methods=["GET", "POST"]) def capture(): @@ -76,13 +79,13 @@ def capture(): Name = request.form.get("Name", "") gender = request.form.get("gender", "") Dept = request.form.get("Department", "") - + try: ret, frame = cap.read() if not ret: logger.error("Failed to capture frame for employee image.") return jsonify({"error": "Failed to capture image"}), 500 - + frame = cv2.flip(frame, 1) _, buffer = cv2.imencode(".jpg", frame) encoded_image = base64.b64encode(buffer).decode("utf-8") @@ -97,6 +100,7 @@ def capture(): logger.error(f"Error capturing image: {e}") return jsonify({"error": "Error capturing image"}), 500 + # Route to display captured image @employee_blueprint.route("/Image", methods=["GET"]) def display_image(): @@ -114,21 +118,29 @@ def display_image(): decoded_image_data = base64.b64decode(encoded_image) image = Image.open(io.BytesIO(decoded_image_data)) filename = "final.png" - image.save(os.path.join(Config.upload_image_path[0], filename), quality=100) + image.save(os.path.join( + Config.upload_image_path[0], filename), quality=100) recent_images = sorted( os.listdir(Config.upload_image_path[0]), - key=lambda x: os.path.getatime(os.path.join(Config.upload_image_path[0], x)), + key=lambda x: os.path.getatime( + os.path.join(Config.upload_image_path[0], x) + ), reverse=True, ) - image_path = os.path.join(Config.upload_image_path[0], recent_images[0]) if recent_images else None + image_path = ( + os.path.join(Config.upload_image_path[0], recent_images[0]) + if recent_images + else None + ) logger.info("Image displayed successfully.") except Exception as e: logger.error(f"Error displaying image: {e}") return jsonify({"error": "Error displaying image"}), 500 - + return render_template("index.html", image_path=image_path) + # Route for recognition capturing @employee_blueprint.route("/capturing", methods=["GET", "POST"]) def capturing(): @@ -155,6 +167,7 @@ def capturing(): logger.error(f"Error capturing recognition image: {e}") return jsonify({"error": "Error capturing recognition image"}), 500 + # Route to display captured image for recognition @employee_blueprint.route("/Pic", methods=["GET", "POST"]) def display_pic(): @@ -167,18 +180,28 @@ def display_pic(): decoded_image_data = base64.b64decode(encoded_image) image = Image.open(io.BytesIO(decoded_image_data)) filename = "final.png" - image.save(os.path.join(Config.upload_image_path[0], filename), quality=100) + image.save(os.path.join( + Config.upload_image_path[0], filename), quality=100) recent_images = sorted( os.listdir(Config.upload_image_path[0]), - key=lambda x: os.path.getatime(os.path.join(Config.upload_image_path[0], x)), + key=lambda x: os.path.getatime( + os.path.join(Config.upload_image_path[0], x) + ), reverse=True, ) - image_path = os.path.join(Config.upload_image_path[0], recent_images[0]) if recent_images else None - + image_path = ( + os.path.join(Config.upload_image_path[0], recent_images[0]) + if recent_images + else None + ) + logger.info("Displaying recognition image.") files = { - "Face": open(os.path.join(Config.upload_image_path[0], "final.png"), "rb"), + "Face": open( + os.path.join( + Config.upload_image_path[0], "final.png"), "rb" + ), } fastapi_url = "http://127.0.0.1:8000/recognize_face" req = requests.post(fastapi_url, files=files) diff --git a/FaceRec/app/main/__init__.py b/FaceRec/app/main/__init__.py index 21782e0..b84d5e2 100644 --- a/FaceRec/app/main/__init__.py +++ b/FaceRec/app/main/__init__.py @@ -22,6 +22,7 @@ except Exception as e: print(f"Error registering blueprints: {e}") + # Function to run the server of Flask def run_flask(): try: diff --git a/FaceRec/config.py b/FaceRec/config.py index 613b977..754fc95 100644 --- a/FaceRec/config.py +++ b/FaceRec/config.py @@ -4,6 +4,7 @@ basedir = os.path.abspath(os.path.dirname(__file__)) + # This class named Config is likely used for storing configuration settings or parameters in a Python program. class Config: try: @@ -19,9 +20,15 @@ class Config: print(f"Error setting upload_image_path: {e}") upload_image_path = None # Default to None or handle it as needed - ALLOWED_EXTENSIONS = ["jpg", "jpeg", "png", "jfif"] # Use a list, not a tuple with a list inside + ALLOWED_EXTENSIONS = [ + "jpg", + "jpeg", + "png", + "jfif", + ] # Use a list, not a tuple with a list inside try: - image_data_file = os.path.join(basedir, "static/Images/image_data.json") + image_data_file = os.path.join( + basedir, "static/Images/image_data.json") except Exception as e: print(f"Error setting image_data_file path: {e}") image_data_file = None # Default to None or handle it as needed diff --git a/Model-Training/eval-mark-I.py b/Model-Training/eval-mark-I.py index 087ded8..1dfc2e5 100644 --- a/Model-Training/eval-mark-I.py +++ b/Model-Training/eval-mark-I.py @@ -1,11 +1,13 @@ from __future__ import annotations import os + import numpy as np from keras.models import load_model from keras.preprocessing import image from sklearn.metrics.pairwise import euclidean_distances + # Function to load and preprocess images def load_and_preprocess_image(img_path, target_size=(160, 160)): try: @@ -18,6 +20,7 @@ def load_and_preprocess_image(img_path, target_size=(160, 160)): print(f"Error loading image {img_path}: {e}") return None + # Function to generate embeddings def generate_embeddings(model, dataset_path): embeddings = [] @@ -41,6 +44,7 @@ def generate_embeddings(model, dataset_path): labels = np.array(labels) return embeddings, labels + # Function to calculate intra-cluster distances def calculate_intra_cluster_distances(embeddings, labels): unique_labels = np.unique(labels) @@ -59,6 +63,7 @@ def calculate_intra_cluster_distances(embeddings, labels): return np.array(distances) + # Load the pre-trained FaceNet model model_path = "facenet_model.h5" model = load_model(model_path) diff --git a/main.py b/main.py index a3c828c..7ccebdd 100644 --- a/main.py +++ b/main.py @@ -2,6 +2,7 @@ import logging from concurrent.futures import ThreadPoolExecutor + from API import run_fastapi_app from FaceRec.app.main import run_flask @@ -9,6 +10,7 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) + # Function to run Flask app with exception handling def run_flask_app(): try: @@ -16,6 +18,7 @@ def run_flask_app(): except Exception as e: logger.error(f"Error starting Flask app: {e}") + # Function to run FastAPI app with exception handling def run_fastapi_app_with_handling(): try: @@ -23,6 +26,7 @@ def run_fastapi_app_with_handling(): except Exception as e: logger.error(f"Error starting FastAPI app: {e}") + # Multithreading used to start both FastAPI and Flask apps at the same time. if __name__ == "__main__": with ThreadPoolExecutor(max_workers=2) as executor: diff --git a/testing/test_database.py b/testing/test_database.py index ffaf29e..e247b90 100644 --- a/testing/test_database.py +++ b/testing/test_database.py @@ -13,6 +13,7 @@ init_logging_config() + def test_vector_search(): """ Test that the vector_search function returns the correct result. diff --git a/testing/test_face_cycle.py b/testing/test_face_cycle.py index 27f18fe..415e0c6 100644 --- a/testing/test_face_cycle.py +++ b/testing/test_face_cycle.py @@ -10,6 +10,7 @@ client = TestClient(router) + @patch("API.database.Database.find_one_and_delete") @patch("API.database.Database.update_one") @patch("API.database.Database.find_one") @@ -51,7 +52,7 @@ def test_face_lifecycle( with open("./test-faces/devansh.jpg", "rb") as image_file: encoded_string1 = base64.b64encode(image_file.read()).decode("utf-8") - + response1 = client.post( "/create_new_faceEntry", json={ @@ -67,7 +68,7 @@ def test_face_lifecycle( with open("./test-faces/devansh.jpg", "rb") as image_file: encoded_string2 = base64.b64encode(image_file.read()).decode("utf-8") - + response2 = client.post( "/create_new_faceEntry", json={ diff --git a/testing/test_face_endpoints.py b/testing/test_face_endpoints.py index 7cafcce..a0f1fa5 100644 --- a/testing/test_face_endpoints.py +++ b/testing/test_face_endpoints.py @@ -6,6 +6,7 @@ import pytest from fastapi.testclient import TestClient + from API.route import router from API.utils import init_logging_config @@ -14,10 +15,12 @@ client = TestClient(router) + # Exception handling utility function def log_exception(message: str): logging.error(message) + @pytest.mark.run(order=1) def test_register_face1(): """ @@ -47,7 +50,8 @@ def test_register_face1(): "API.database.Database.find_one_and_delete", mock_find_one_and_delete ): with open("./test-faces/devansh.jpg", "rb") as image_file: - encoded_string1 = base64.b64encode(image_file.read()).decode("utf-8") + encoded_string1 = base64.b64encode( + image_file.read()).decode("utf-8") response1 = client.post( "/create_new_faceEntry", @@ -67,6 +71,7 @@ def test_register_face1(): log_exception(f"Error in test_register_face1: {e}") pytest.fail(f"Test failed due to exception: {e}") + @pytest.mark.run(order=2) def test_register_face2(): """ @@ -96,7 +101,8 @@ def test_register_face2(): "API.database.Database.find_one_and_delete", mock_find_one_and_delete ): with open("./test-faces/devansh.jpg", "rb") as image_file: - encoded_string2 = base64.b64encode(image_file.read()).decode("utf-8") + encoded_string2 = base64.b64encode( + image_file.read()).decode("utf-8") response2 = client.post( "/create_new_faceEntry", @@ -116,6 +122,7 @@ def test_register_face2(): log_exception(f"Error in test_register_face2: {e}") pytest.fail(f"Test failed due to exception: {e}") + @pytest.mark.run(order=3) def test_get_all_faces_after_registration(): """ @@ -142,6 +149,7 @@ def test_get_all_faces_after_registration(): log_exception(f"Error in test_get_all_faces_after_registration: {e}") pytest.fail(f"Test failed due to exception: {e}") + @pytest.mark.run(order=4) def test_update_face(): """ @@ -171,7 +179,8 @@ def test_update_face(): "API.database.Database.find_one_and_delete", mock_find_one_and_delete ): with open("./test-faces/devansh.jpg", "rb") as image_file: - encoded_string2 = base64.b64encode(image_file.read()).decode("utf-8") + encoded_string2 = base64.b64encode( + image_file.read()).decode("utf-8") response = client.put( "/update/1", @@ -188,6 +197,7 @@ def test_update_face(): log_exception(f"Error in test_update_face: {e}") pytest.fail(f"Test failed due to exception: {e}") + @pytest.mark.run(order=5) def test_delete_face(): """ @@ -201,6 +211,7 @@ def test_delete_face(): log_exception(f"Error in test_delete_face: {e}") pytest.fail(f"Test failed due to exception: {e}") + @pytest.mark.run(order=6) def test_recognize_face_fail(): """ @@ -225,6 +236,7 @@ def test_recognize_face_fail(): log_exception(f"Error in test_recognize_face_fail: {e}") pytest.fail(f"Test failed due to exception: {e}") + @pytest.mark.run(order=7) def test_recognize_face_success(): """