from flask import Flask, flash, request, redirect, url_for, jsonify, Response, send_from_directory import requests from flask_cors import CORS, cross_origin from openVoiceGen import OpenVoiceGen import multiprocessing import smtplib from email.mime.multipart import MIMEMultipart from email.mime.base import MIMEBase from email import encoders from email.mime.text import MIMEText import os from celery_config import make_celery import logging #from tasks import generate_audio_and_send_email as helper_func from moviepy.editor import VideoFileClip from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) flask_app = Flask(__name__) import math from moviepy.editor import VideoFileClip, CompositeVideoClip, concatenate_videoclips, ImageClip, AudioFileClip, CompositeAudioClip, ipython_display #flask_app.config.update( # CELERY_BROKER_URL='redis://localhost:6379/0', # CELERY_RESULT_BACKEND='redis://localhost:6379/0' #) #celery = make_celery(flask_app) CORS(flask_app) def send_mp4_email(recipient_email): def send_mp4_email_helper(smtp_server, port, sender_email, sender_password, recipient_email, subject, body, file_path): try: # Create the email message msg = MIMEMultipart() msg['From'] = sender_email msg['To'] = recipient_email msg['Subject'] = subject # Attach the email body msg.attach(MIMEText(body, 'plain')) # Attach the MP4 file with open(file_path, 'rb') as attachment: part = MIMEBase('application', 'octet-stream') part.set_payload(attachment.read()) encoders.encode_base64(part) part.add_header( 'Content-Disposition', f'attachment; filename={os.path.basename(file_path)}', ) msg.attach(part) # Send the email with smtplib.SMTP(smtp_server, port) as server: server.starttls() server.login(sender_email, sender_password) server.send_message(msg) print("Email sent successfully.") except Exception as e: print(f"Failed to send email: {e}") send_mp4_email_helper( smtp_server="smtp.gmail.com", port=587, sender_email="e45578679@gmail.com", sender_password="bdiohxbprqafphng", recipient_email=recipient_email, subject="Your Video is Ready!", body="Here is your video attachment.", file_path="/home/edward/outputs/result_video_final.mp4" ) #@celery.task # def generate_audio_and_send_email(text, person_model, bgm_image_urls, voice_id, email): # #logger = logging.getLogger(__name__) # open_voice_gen = OpenVoiceGen() # FINAL_OUTPUT_PATH = "output/result_video_final.mp4" # try: # os.remove(FINAL_OUTPUT_PATH) # logger.info("Removed existing output file.") # except FileNotFoundError: # logger.warning("Output file does not exist.") # except Exception as e: # logger.error(f"Error removing output file: {e}") # try: # logger.info("Generating audio...") # open_voice_gen.process_pipeline(text, person_model, bgm_image_urls, voice_id) # logger.info("Audio generation completed.") # except Exception as e: # logger.error(f"Failed to generate audio: {e}") # return {"message": f"Failed to generate audio : {e}"} # try: # logger.info("Sending email...") # send_mp4_email(email) # logger.info("Email sent successfully.") # except Exception as e: # logger.error(f"Failed to send email: {e}") # return {"message": "Failed to send email"} # return {"message": "Email sent successfully"} @flask_app.route("/resend", methods=['POST']) def resent(): email = request.json.get('email') send_mp4_email(email) return jsonify({"message": "email sent"}) # @flask_app.route('/generate_vid', methods=['POST']) # def generate_audio_openvoice(): # text = request.json.get('text') # person_model = request.json.get('person_model') # template = request.json.get('template') # voice_id = request.json.get('voice_id') # if not voice_id: # voice_id = "Badr Odhiambo" # bgm_image_urls = request.json.get('bgm_image_urls') # email = request.json.get('email') # valid_person_models = ["person1"] # valid_templates = ["living_room"] # open_voice_gen = OpenVoiceGen() # FINAL_OUTPUT_PATH = "output/result_video_final.mp4" # #if the file exists, delete it # try: # os.remove(FINAL_OUTPUT_PATH) # except: # pass # try: # print("Generating audio...") # open_voice_gen.process_pipeline(text, person_model, bgm_image_urls, voice_id) # except Exception as e: # print(f"Failed to generate audio: {e}") # os.chdir('/home/edward/OpenVoice') if os.getcwd() != '/home/edward/OpenVoice' else None # return jsonify({"message": "Failed to generate audio"}) # print("Audio generated successfully! Sending email...") # try: # os.chdir('/home/edward/OpenVoice') if os.getcwd() != '/home/edward/OpenVoice' else None # send_mp4_email(email) # except Exception as e: # print(f"Failed to send email: {e}") # return jsonify({"message": "Failed to send email"}) # print("Email sent successfully!") # return jsonify({"message": "email sent"}) # @flask_app.route('/generate_vid2', methods=['POST']) # def generate_audio_openvoice2(): # text = request.json.get('text') # person_model = request.json.get('person_model') # bgm_image_urls = request.json.get('bgm_image_urls') # voice_id = request.json.get('voice_id') or "Badr Odhiambo" # email = request.json.get('email') # #generate_audio_and_send_email.apply_async(args=[text, person_model, bgm_image_urls, voice_id, email]) # #helper_func.delay(text, person_model, bgm_image_urls, voice_id, email) # executor = ProcessPoolExecutor() # future = executor.submit(generate_audio_and_send_email, text, person_model, bgm_image_urls, voice_id, email) # logger.info(f"Task submitted: {future}") # return jsonify({"message": "Generating, email will be sent when done"}) @flask_app.route('/upload', methods=['POST']) def upload_file(): # Ensure the 'model' folder exists within the app context model_folder = os.path.join(flask_app.instance_path, 'model') os.makedirs(model_folder, exist_ok=True) if 'file' not in request.files: return jsonify({'error': 'No file part'}), 400 file = request.files['file'] if file.filename == '': return jsonify({'error': 'No selected file'}), 400 file_path = os.path.join(model_folder, file.filename) file.save(file_path) return jsonify({'message': 'File successfully uploaded', 'file_path': file_path}) @flask_app.route('/reset', methods = ['POST']) def reset(): os.chdir('/home/edward/OpenVoice') if os.getcwd() != '/home/edward/OpenVoice' else None return jsonify({'message': 'path reset to /home/edward/OpenVoice'}), 200 @flask_app.route('/chroma_test', methods = ['POST']) def chroma_test(): open_voice_gen = OpenVoiceGen() try: open_voice_gen.chroma_test() except Exception as e: return jsonify({'message': f'chroma test failed: {e}'}), 500 return jsonify({'message': 'chroma test done'}), 200 def process_pipeline_simple(image_urls, bg_image, bg_music="music/bgMusic.mp3", color_key = (129, 204, 56)): #the video withn lipsync is already at /home/edward/lip-sync-only/LatentSync/assets/video_out.mp4 set_video_path = "/home/edward/lip-sync-only/LatentSync/assets/video_out.mp4" path_after_background = "/home/edward/outputs/after_background.mp4" open_voice_gen = OpenVoiceGen() #generate print("create chrome key started") open_voice_gen.create_chroma_key_video( foreground_video_path=set_video_path, background_image_path=bg_image, output_path=path_after_background, color=color_key, #adjust the threshold and softness to get the best result threshold=50, softness=10, position="center", fps=24, codec="libx264" ) print("creating slideshow") #6 images, duration per image 6 seconds, 45 seconds audio green_screen_vid = VideoFileClip(set_video_path) slideshow_video, original_images = open_voice_gen.create_slideshow(image_urls, duration_per_image=green_screen_vid.duration/len(image_urls)) print("slideshow created") #debuggings steps: save the slideshow video #slideshow_video.write_videofile("/home/edward/outputs/slideshow.mp4") #debugging steps: check the slideshow duration and number of images print(f"Slideshow video duration: {slideshow_video.duration}") print(f"Number of images in slideshow: {len(original_images)}") #ok so the slideshow is working, now let's check if the images are loading properly #Check if images are loading properly for i, image in enumerate(original_images): print(f"Image {i}: {image}") if image is None: print(f"Image {i} is empty!") #something might be up with the rendering #change directory if needed os.chdir('/home/edward/OpenVoice') if os.getcwd() != '/home/edward/OpenVoice' else None print("current working directory is" + os.getcwd()) #combine the two videos try: open_voice_gen.combine_video_and_background2( lip_synced_video_path = path_after_background, green_screen_video_path = set_video_path, slideshow_video = slideshow_video, original_images=original_images, output_video_path="/home/edward/outputs/result_video_final.mp4", output_audio="/home/edward/outputs/audio.wav", bgMusic=bg_music, color_key=color_key ) except Exception as e: print(f"Failed to combine videos: {e}") return {"message": f"Failed to combine videos: {e}"} BACKGROUND_MODEL_PATH = "bgimage" @flask_app.route("/show_bgimages", methods=["GET"]) def list_files(): try: files = os.listdir(BACKGROUND_MODEL_PATH) file_list = [{"name": file, "view": f"/files/{file}"} for file in files if os.path.isfile(os.path.join(BACKGROUND_MODEL_PATH, file))] return jsonify(file_list) except Exception as e: return jsonify({"error": str(e)}), 500 @flask_app.route("/files/", methods=["GET"]) def get_file(filename): try: return send_from_directory(BACKGROUND_MODEL_PATH , filename) except Exception as e: return jsonify({"error": str(e)}), 404 @flask_app.route("/get_original_audio", methods=["GET"]) def get_original_audio(): try: return send_from_directory("/home/edward/outputs", "audio.wav", as_attachment=True) except Exception as e: return jsonify({"error": str(e)}), 404 @flask_app.route("/get_lip_synced_video", methods=["GET"]) def get_lip_synced_video(): try: return send_from_directory("/home/edward/lip-sync-only/LatentSync/assets", "video_out.mp4") except Exception as e: return jsonify({"error": str(e)}), 404 @flask_app.route("/slideshow", methods=["GET"]) def get_slideshow(): try: return send_from_directory("/home/edward/outputs", "slideshow.mp4") except Exception as e: return jsonify({"error": str(e)}), 404 @flask_app.route('/process_pipeline', methods = ['POST']) def api_process_pipeline(): image_urls = request.json.get('image_urls') bg_image_path = "bgimage/bgImage1.jpg" email = request.json.get('email') bg_music_key = request.json.get('bg_music') if request.json.get('bg_music') else None bg_music_map = { "Chill": "music/Chill.mp3", "Cozy": "music/Cozy.mp3", "Inspiring": "music/Inspiring.mp3", "Upbeat": "music/Upbeat.mp3", "Vibes": "music/Vibes.mp3", "Vlog": "music/Vlog.mp3" } bg_music_val = bg_music_map.get(bg_music_key) if bg_music_key else None bg_image = request.json.get('bg_image') person_model = request.json.get('person_model') if request.json.get('person_model') else "model1" color_key = (129, 204, 56) if person_model == "model1": color_key = (129, 204, 56) elif person_model == "model2": color_key = (129, 204, 56) elif person_model == "model3": color_key = (15, 192, 61) elif person_model == "model4": color_key = (37, 212, 34) elif person_model == "model5": color_key = (37, 212, 34) elif person_model == "model6": color_key = (15, 192, 61) if bg_image: bg_image_path = f"bgimage/{bg_image}.jpg" try: #delete existing output file os.remove("/home/edward/outputs/result_video_final.mp4") if os.path.exists("/home/edward/outputs/result_video_final.mp4") else None print("Removed existing output file.") process_pipeline_simple(image_urls, bg_image_path, color_key=color_key, bg_music=bg_music_val) except Exception as e: return jsonify({'message': f'pipeline failed: {e}'}), 500 try: send_mp4_email(email) except Exception as e: return jsonify({'message': f'pipeline done, but email failed: {e}'}), 500 return jsonify({'message': 'pipeline done'}), 200 if __name__ == '__main__': #multiprocessing.set_start_method('spawn') flask_app.run(host = "0.0.0.0",port=3334, debug = True, threaded = True)