Added AWS SES email

This commit is contained in:
2026-04-26 01:49:14 -04:00
parent 86e897c54d
commit fe19511d11
3 changed files with 137 additions and 608 deletions
+5 -316
View File
@@ -1,29 +1,17 @@
from flask import session, jsonify
from app.api.schemas.user_update import UserUpdateSchema from app.api.schemas.user_update import UserUpdateSchema
# from app.models.loan import Loan
from app.utils.logger import logger from app.utils.logger import logger
from app.api.services.base_service import BaseService from app.api.services.base_service import BaseService
# from app.api.schemas.eligibility_check import EligibilityCheckSchema
from marshmallow import ValidationError from marshmallow import ValidationError
from app.api.enums import TransactionType
# from app.api.integrations import SimbrellaIntegration
from app.extensions import db from app.extensions import db
from app.models import Members, MembersActions, MembersProfile, Payments, MembersProducts, ProvisionActions from app.models import Members, MembersActions, MembersProfile, Payments, MembersProducts, ProvisionActions
# from app.api.services.offer_analysis import OfferAnalysis
from app.api.helpers.response_helper import ResponseHelper from app.api.helpers.response_helper import ResponseHelper
from werkzeug.security import generate_password_hash, check_password_hash
from app.api.schemas.user import UserSchema from app.api.schemas.user import UserSchema
from app.api.schemas.start_profile import StartProfileSchema from app.api.schemas.start_profile import StartProfileSchema
from app.api.schemas.profile_links import ProfileLinksSchema from app.api.schemas.profile_links import ProfileLinksSchema
from flask_mail import Mail, Message
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
from sendgrid import SendGridAPIClient # from sendgrid import SendGridAPIClient
from sendgrid.helpers.mail import Mail # from sendgrid.helpers.mail import Mail
import datetime import datetime
import jwt import jwt
@@ -31,10 +19,7 @@ import random
from app.config import Config from app.config import Config
import smtplib, ssl from app.notifications.mail_factory import send_email_factory
from email.message import EmailMessage
from app.notifications.aws.aws_mailer import send_email_ses
class AccountService(BaseService): class AccountService(BaseService):
@@ -583,39 +568,8 @@ class AccountService(BaseService):
for key in url_list.keys(): for key in url_list.keys():
url_list_value = url_list[key] url_list_value = url_list[key]
logger.info(f"URL LIST : {key} : {url_list_value}") logger.info(f"URL LIST : {key} : {url_list_value}")
#
# firstname = str(validated_data.get('firstname'))
# lastname = str(validated_data.get('lastname'))
# account_name = str(validated_data.get('account_name'))
# phone = str(validated_data.get('phone'))
# full_address = str(validated_data.get('full_address'))
# # email = str(validated_data.get('email'))
# profile_data = {
# "firstname": firstname,
# "lastname": lastname,
# "account_name": account_name,
# "phone": phone,
# "full_address": full_address,
# # "email": email,
# }
# Members.set_member_update_profile(uid, profile_data)
# # Read the saved data
# member_data = Members.get_member_by_uid(uid)
# personal_data = {
# "firstname": member_data.firstname,
# "lastname": member_data.lastname,
# "account_name": member_data.account_name,
# "email": member_data.email,
# "phone": member_data.phone,
# "full_address": member_data.full_address,
# "country": member_data.country,
# }
external_links = { external_links = {
"facebook_url": "https://facebook.com", "facebook_url": "https://facebook.com",
"twitter_url": "twitter.com", "twitter_url": "twitter.com",
@@ -651,276 +605,11 @@ class AccountService(BaseService):
@staticmethod @staticmethod
def process_test_email(data): def process_test_email(data):
logger.info(f"Email Test Enter 001", exc_info=True) logger.info(f"Email Test Enter 001", exc_info=True)
send_email_ses( send_email_factory(
to_email="ameye@chiefsoft.com", to_email="ameye@chiefsoft.com",
subject="Test from Merms", subject="Test from Merms",
html_content="<strong>Hello from AWS SES</strong>" html_content="<strong>Hello from AWS SES</strong>"
) )
# AccountService.test_new_mailer()
# AccountService.send_register_mail('ameye@chiefsoft.com', 'pend-uid---', 100, 'olutest', 'Ameyetest')
logger.info(f"Email Test", exc_info=True) logger.info(f"Email Test", exc_info=True)
@staticmethod
def test_new_mailer_old():
logger.info("test_new_mailer 000 ")
# --- Email Configuration ---
sender_email = "support@mermsemr.com" # Enter your email address
receiver_email = "ameye@chiefsoft.com" # Enter the recipient's address
# Use the App Password generated in Step 1
app_password = "flhf cjjx bguv fycg"
app_password = "flhfcjjxbguvfycg"
# --- Create the Email Message ---
# msg = EmailMessage()
# msg.set_content("This is the body of the email sent from Python.")
msg = MIMEText("This is the body of my email.")
msg['Subject'] = "Subject Line from Python"
msg['From'] = sender_email
msg['To'] = receiver_email
# --- Connect to Gmail's SMTP Server and Send the Email ---
smtp_server = "smtp.gmail.com"
port = 587 # STARTTLS
context = ssl.create_default_context()
logger.info("test_new_mailer 003 ")
try:
logger.info("test_new_mailer 000 44")
with smtplib.SMTP(smtp_server, port) as server:
logger.info("test_new_mailer 000 44-1")
server.ehlo()
logger.info("test_new_mailer 000 44-2")
server.starttls(context=context)
server.ehlo()
logger.info("test_new_mailer 000 55")
server.login(sender_email, app_password)
logger.info("test_new_mailer 000 66")
server.send_message(msg)
logger.info("test_new_mailer 000 77")
logger.info("Email sent successfully!")
except smtplib.SMTPAuthenticationError:
logger.error("Authentication error: Check your username and password.")
except smtplib.SMTPConnectError as e:
logger.error(f"Connection error: {e}")
except smtplib.SMTPException as e:
logger.error(f"An unexpected SMTP error occurred: {e}")
except Exception as e:
logger.error(f"An unrelated error occurred: {e}")
@staticmethod
def test_new_mailer():
# Replace these with your actual details
SENDGRID_API_KEY = "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"
FROM_EMAIL = 'support@mermsemr.com' # Must be a verified sender in SendGrid
TO_EMAIL = 'works@chiefsoft.com'
message = Mail(
from_email=FROM_EMAIL,
to_emails=TO_EMAIL,
subject='Sending with Twilio SendGrid is Fun',
html_content='<strong>and easy to do anywhere, even with Python</strong>'
)
try:
# Initialize the SendGrid client
sg = SendGridAPIClient(SENDGRID_API_KEY)
# Send the email
response = sg.send(message)
# Print status codes: 202 means the request was accepted for delivery
print(f"Status Code: {response.status_code}")
print(f"Body: {response.body}")
print(f"Headers: {response.headers}")
except Exception as e:
logger.error(f"An unrelated error occurred: {e}")
# logger.info("test_new_mailer 000 ")
# # --- Email Configuration ---
# sender_email = "support@mermsemr.com" # Enter your email address
# receiver_email = "ameye@chiefsoft.com" # Enter the recipient's address
# # Use the App Password generated in Step 1
# app_password = "flhf cjjx bguv fycg"
# app_password = "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"
#
# # --- Create the Email Message ---
# # msg = EmailMessage()
# # msg.set_content("This is the body of the email sent from Python.")
# msg = MIMEText("This is the body of my email.")
# msg['Subject'] = "Subject Line from Python"
# msg['From'] = sender_email
# msg['To'] = receiver_email
#
# # --- Connect to Gmail's SMTP Server and Send the Email ---
# smtp_server = "smtp.sendgrid.net"
# port = 587 # STARTTLS
#
# context = ssl.create_default_context()
# logger.info("test_new_mailer 003 ")
# try:
# logger.info("test_new_mailer 000 44")
# with smtplib.SMTP(smtp_server, port) as server:
# logger.info("test_new_mailer 000 44-1")
# server.ehlo()
# logger.info("test_new_mailer 000 44-2")
# server.starttls(context=context)
# server.ehlo()
# logger.info("test_new_mailer 000 55")
# server.login(sender_email, app_password)
# logger.info("test_new_mailer 000 66")
# server.send_message(msg)
# logger.info("test_new_mailer 000 77")
# logger.info("Email sent successfully!")
#
# except smtplib.SMTPAuthenticationError:
# logger.error("Authentication error: Check your username and password.")
#
# except smtplib.SMTPConnectError as e:
# logger.error(f"Connection error: {e}")
# except smtplib.SMTPException as e:
# logger.error(f"An unexpected SMTP error occurred: {e}")
# except Exception as e:
# logger.error(f"An unrelated error occurred: {e}")
def send_register_mail(signup_email, pending_uid, pending_id, firstname, lastname):
pending_member = {
"email": signup_email,
"pending_uid": pending_uid,
"first_name": firstname,
"last_name": lastname,
"pending_id": pending_id,
}
jwt_part = jwt.encode(
{"user": pending_member, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=3330)},
AccountService.JWT_SECRET_KEY, algorithm='HS256'
)
panel_url = panel_url = AccountService.THIS_SITE_URL # "https://qa-panel.mermsemr.com"
link_url = str(panel_url) + '/csignup/' + jwt_part
msg_body = f"""
Hello {firstname},
You received this message for account verification
Follow the link:{link_url}
For any Support
Reach Out
support@mermsemr.com
"""
sender_email = AccountService.SEND_EMAIL_FROM
sender_password = AccountService.SEND_EMAIL_PASS
receiver_email = signup_email
subject = "Verify your MERMS(AI) Account Setup"
body = msg_body
html_body = f"""\
<html>
<head></head>
<body style="font-size:14px;line-height:1.5;">
<table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px">
<tr>
<td style="text-align:center">
<img style="width:150px; height:auto;" src="mermsemr.com/images/logo-pink.png" />
</td>
</tr>
<tr>
<td>
Hello <b>{firstname}!</b>
</td>
</tr>
<tr>
<td>
You have received this message for your account verification<br>
</td>
</tr>
<tr>
<td>
Follow the link: <a href="{link_url}">link</a> to complete the verification process.<br>
</td>
</tr>
</table>
</body>
</html>
"""
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = sender_email
msg['To'] = receiver_email
msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content
try:
# For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL)
# For other providers, consult their documentation for SMTP server and port
server = smtplib.SMTP('smtp.gmail.com', 587)
# server.starttls() # Enable TLS encryption
server.login(sender_email, sender_password)
server.sendmail(sender_email, receiver_email, msg.as_string())
print("Email sent successfully!")
except Exception as e:
print(f"Error sending email: {e}")
logger.error(f"Error sending email: {e}")
finally:
server.quit() # Close the connection
# @staticmethod
# def get_profile_data(data):
#
# try:
# with db.session.begin():
#
# validated_data = AccountService.validate_data(data, UserSchema())
# user_token = validated_data.get('token')
# uid = str(validated_data.get('uid'))
# member_data = Members.get_member_by_uid(uid)
#
# personal_data = {
# "firstname": member_data.firstname,
# "lastname": member_data.lastname,
# "account_name": member_data.account_name,
# "phone": "911 111 1111",
# "full_address": "100 White House, Washington, DC 00000",
# "country": member_data.country,
# }
#
# external_links = {
# "facebook_url": "facebook.com",
# "twitter_url": "twitter.com",
# "blogger_url": "blogger.com",
# "google_url": "google.com",
# "linked_url": "linkedin.com",
# "website_url": "www.mysite.com",
# }
#
# profile_data = {
# "personal_data": personal_data,
# "external_links": external_links,
# }
#
# return ResponseHelper.success(data=profile_data)
#
# except ValidationError as err:
#
# logger.error(f"Validation Error: {getattr(err, 'messages', str(err))}")
# db.session.rollback()
# return ResponseHelper.unprocessable_entity(result_description="Validation exception")
#
# except ValueError as err:
# logger.error(f"{getattr(err, 'messages', str(err))}")
# db.session.rollback()
# return ResponseHelper.error(result_description=str(err))
#
# except Exception as e:
# logger.error(f"An error occurred: {str(e)}", exc_info=True)
# db.session.rollback()
# return ResponseHelper.internal_server_error()
+85 -292
View File
@@ -1,6 +1,3 @@
# from app.models import Customer, Account, Transaction
from app.api.enums import TransactionType
from flask import jsonify
from marshmallow import ValidationError from marshmallow import ValidationError
import logging import logging
from app.api.integrations import KafkaIntegration from app.api.integrations import KafkaIntegration
@@ -9,16 +6,12 @@ from app.models import MembersWebfiles
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
from app.api.integrations import StripeIntegration from app.api.integrations import StripeIntegration
from flask_mail import Mail, Message
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import datetime import datetime
import jwt import jwt
import requests import requests
import redis import redis
import json import json
from app.notifications.aws.aws_mailer import send_email_ses from app.notifications.mail_factory import send_email_factory
class BaseService: class BaseService:
@@ -60,85 +53,63 @@ class BaseService:
@staticmethod @staticmethod
def send_completepass_mail(signup_email, pending_uid, pending_id, firstname, lastname): def send_completepass_mail(signup_email, pending_uid, pending_id, firstname, lastname):
msg_body = f"""
Hello {firstname},
Password Reset Completed # html_body = f"""\
# <html>
# <head></head>
For any Support # <body style="font-size:14px;line-height:1.5;">
Reach Out # <table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px">
support@mermsemr.com # <tr>
""" # <td style="text-align:center">
# <img style="width:150px; height:auto;" src="mermsemr.com/images/logo-pink.png" />
html_body = f"""\ # </td>
<html> # </tr>
<head></head> # <tr>
<body style="font-size:14px;line-height:1.5;"> # <td>
# Hello <b>{firstname}!</b>
# </td>
# </tr>
# <tr>
# <td>
# Password Reset Completed<br>
# </td>
# </tr>
#
# <tr>
# <td>
# For any support<br>
# Reach Out<br>
# support@mermsemr.com<br>
# https://www.mermsemr.com/
# </td>
# </tr>
# </table>
# </body>
# </html>
# """
html_body = f"""
<table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px"> <table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px">
<tr> <tr>
<td style="text-align:center"> <td>
<img style="width:150px; height:auto;" src="mermsemr.com/images/logo-pink.png" /> Hello <b>{firstname}!</b>
</td> </td>
</tr> </tr>
<tr> <tr>
<td> <td>
Hello <b>{firstname}!</b> Password Reset Completed<br>
</td> </td>
</tr>
<tr>
<td>
Password Reset Completed<br>
</td>
</tr>
<tr>
<td>
For any support<br>
Reach Out<br>
support@mermsemr.com<br>
https://www.mermsemr.com/
</td>
</tr> </tr>
</table> </table>
</body>
</html>
""" """
sender_email = BaseService.SEND_EMAIL_FROM
sender_password = BaseService.SEND_EMAIL_PASS
receiver_email = signup_email receiver_email = signup_email
subject = "Reset Password Completed" subject = "Reset Password Completed"
send_email_ses( send_email_factory(
to_email=receiver_email, to_email=receiver_email,
subject=subject, subject=subject,
html_content=html_body html_content=html_body
) )
#
# body = msg_body
#
# msg = MIMEMultipart()
# msg['Subject'] = subject
# msg['From'] = sender_email
# msg['To'] = receiver_email
# msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content
#
# try:
# # For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL)
# # For other providers, consult their documentation for SMTP server and port
# server = smtplib.SMTP('smtp.gmail.com', 587)
# # server.starttls() # Enable TLS encryption
# server.login(sender_email, sender_password)
# server.sendmail(sender_email, receiver_email, msg.as_string())
# print("Email sent successfully!")
# except Exception as e:
# print(f"Error sending email: {e}")
# logger.error(f"Error sending email: {e}")
# finally:
# server.quit() # Close the connection
@staticmethod @staticmethod
def send_resetpass_mail(signup_email, pending_uid, pending_id, firstname, lastname): def send_resetpass_mail(signup_email, pending_uid, pending_id, firstname, lastname):
@@ -159,29 +130,47 @@ class BaseService:
panel_url = BaseService.THIS_SITE_URL # "https://qa-panel.mermsemr.com" panel_url = BaseService.THIS_SITE_URL # "https://qa-panel.mermsemr.com"
link_url = str(panel_url) + '/accreset/' + jwt_part link_url = str(panel_url) + '/accreset/' + jwt_part
msg_body = f""" # html_body = f"""\
Hello {firstname}, # <html>
# <head></head>
You received this message for account reset password # <body style="font-size:14px;line-height:1.5;">
# <table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px">
Follow the link:{link_url} # <tr>
# <td style="text-align:center">
For any Support # <img style="width:150px; height:auto;" src="mermsemr.com/images/logo-pink.png" />
Reach Out # </td>
support@mermsemr.com # </tr>
""" # <tr>
# <td>
# Hello <b>{firstname}!</b>
# </td>
# </tr>
# <tr>
# <td>
# You received this message for account reset password<br>
# </td>
# </tr>
# <tr>
# <td>
# Follow the link: <a href="{link_url}">link</a> to reset your password.<br>
# </td>
# </tr>
# <tr>
# <td>
# For any support<br>
# Reach Out<br>
# support@mermsemr.com<br>
# https://www.mermsemr.com/
# </td>
# </tr>
# </table>
# </body>
# </html>
# """
html_body = f"""\ html_body = f"""\
<html>
<head></head>
<body style="font-size:14px;line-height:1.5;">
<table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px"> <table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px">
<tr> <tr>
<td style="text-align:center">
<img style="width:150px; height:auto;" src="mermsemr.com/images/logo-pink.png" />
</td>
</tr>
<tr>
<td> <td>
Hello <b>{firstname}!</b> Hello <b>{firstname}!</b>
</td> </td>
@@ -196,51 +185,17 @@ class BaseService:
Follow the link: <a href="{link_url}">link</a> to reset your password.<br> Follow the link: <a href="{link_url}">link</a> to reset your password.<br>
</td> </td>
</tr> </tr>
<tr>
<td>
For any support<br>
Reach Out<br>
support@mermsemr.com<br>
https://www.mermsemr.com/
</td>
</tr>
</table> </table>
</body>
</html>
""" """
sender_email = BaseService.SEND_EMAIL_FROM
sender_password = BaseService.SEND_EMAIL_PASS
receiver_email = signup_email receiver_email = signup_email
subject = "Reset Password Email" subject = "Reset Password Email"
body = msg_body
send_email_ses( send_email_factory(
to_email=receiver_email, to_email=receiver_email,
subject=subject, subject=subject,
html_content=html_body html_content=html_body
) )
# msg = MIMEMultipart()
# msg['Subject'] = subject
# msg['From'] = sender_email
# msg['To'] = receiver_email
# msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content
#
# try:
# # For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL)
# # For other providers, consult their documentation for SMTP server and port
# server = smtplib.SMTP('smtp.gmail.com', 587)
# # server.starttls() # Enable TLS encryption
# server.login(sender_email, sender_password)
# server.sendmail(sender_email, receiver_email, msg.as_string())
# print("Email sent successfully!")
# except Exception as e:
# print(f"Error sending email: {e}")
# logger.error(f"Error sending email: {e}")
# finally:
# server.quit() # Close the connection
def send_verify_signup_mail(signup_email, pending_uid, pending_id, firstname, lastname): def send_verify_signup_mail(signup_email, pending_uid, pending_id, firstname, lastname):
pending_member = { pending_member = {
@@ -254,39 +209,17 @@ class BaseService:
{"user": pending_member, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=3330)}, {"user": pending_member, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=3330)},
BaseService.JWT_SECRET_KEY, algorithm='HS256' BaseService.JWT_SECRET_KEY, algorithm='HS256'
) )
panel_url = panel_url = BaseService.THIS_SITE_URL # "https://qa-panel.mermsemr.com"
panel_url = BaseService.THIS_SITE_URL # "https://qa-panel.mermsemr.com"
link_url = str(panel_url) + '/csignup/' + jwt_part link_url = str(panel_url) + '/csignup/' + jwt_part
msg_body = f"""
Hello {firstname},
You received this message for account verification
Follow the link:{link_url}
For any Support
Reach Out
support@mermsemr.com
"""
sender_email = BaseService.SEND_EMAIL_FROM
sender_password = BaseService.SEND_EMAIL_PASS
receiver_email = signup_email receiver_email = signup_email
subject = "Verify your MERMS(AI) Account Setup" subject = "Verify your MERMS(AI) Account Setup"
body = msg_body
html_body = f"""\ html_body = f"""\
<html>
<head></head>
<body style="font-size:14px;line-height:1.5;">
<table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px"> <table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px">
<tr> <tr>
<td style="text-align:center">
<img style="width:150px; height:auto;" src="mermsemr.com/images/logo-pink.png" />
</td>
</tr>
<tr>
<td> <td>
Hello <b>{firstname}!</b> Hello <b>{firstname}!</b>
</td> </td>
@@ -301,44 +234,17 @@ class BaseService:
Follow the link: <a href="{link_url}">link</a> to complete the verification process.<br> Follow the link: <a href="{link_url}">link</a> to complete the verification process.<br>
</td> </td>
</tr> </tr>
<tr>
<td>
For any support<br>
Reach Out<br>
support@mermsemr.com<br>
https://www.mermsemr.com/
</td>
</tr>
</table> </table>
</body>
</html>
""" """
send_email_ses( send_email_factory(
to_email=receiver_email, to_email=receiver_email,
subject=subject, subject=subject,
html_content=html_body html_content=html_body
) )
# msg = MIMEMultipart()
# msg['Subject'] = subject
# msg['From'] = sender_email
# msg['To'] = receiver_email
# msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content
#
# try:
# # For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL)
# # For other providers, consult their documentation for SMTP server and port
# server = smtplib.SMTP('smtp.gmail.com', 587)
# # server.starttls() # Enable TLS encryption
# server.login(sender_email, sender_password)
# server.sendmail(sender_email, receiver_email, msg.as_string())
# print("Email sent successfully!")
# except Exception as e:
# print(f"Error sending email: {e}")
# logger.error(f"Error sending email: {e}")
# finally:
# server.quit() # Close the connection
@classmethod @classmethod
def validate_data(cls, data, schema): def validate_data(cls, data, schema):
@@ -348,45 +254,6 @@ class BaseService:
logger.info(f"Processing {cls.TRANSACTION_TYPE} request") logger.info(f"Processing {cls.TRANSACTION_TYPE} request")
return schema.load(data) return schema.load(data)
# @classmethod
# def get_or_create_customer(cls, validated_data):
#
# """
# Check if a customer exists; if not, create one.
# """
#
# customer = Customer.query.filter_by(id=validated_data.get("customerId")).first()
# if not customer:
# customer = Customer.create_customer(
# id=validated_data.get("customerId"),
# msisdn=validated_data.get("msisdn"),
# country_code=validated_data.get("countryCode"),
# account_id=validated_data.get("accountId"),
# )
# return customer
# @classmethod
# def validate_account_ownership(cls, account_id, customer_id):
# """
# Check if the provided account belongs to the customer.
# """
# is_valid = Account.is_valid_account(account_id, customer_id)
# return is_valid
# @classmethod
# def log_transaction(cls, validated_data):
# """
# Create a new transaction.
# """
# channel = "USSD" if validated_data.get("channel") is None else validated_data.get("channel")
#
# return Transaction.create_transaction(
# transaction_id = validated_data.get("transactionId"),
# customer_id = validated_data.get('customerId', None),
# account_id = validated_data.get("accountId", None),
# type = cls.TRANSACTION_TYPE,
# channel = channel,
# )
@classmethod @classmethod
def async_send_settings_refresh_to_kafka(cls, settings_data, subscription_uid, topic): def async_send_settings_refresh_to_kafka(cls, settings_data, subscription_uid, topic):
@@ -461,77 +328,3 @@ class BaseService:
except Exception as e: except Exception as e:
logger.error(f"An error occurred while write_cache_data data: {str(e)}", exc_info=True) logger.error(f"An error occurred while write_cache_data data: {str(e)}", exc_info=True)
return None return None
#
# @classmethod
# def calculate_charges(cls, offer, amount):
# """
# Calculates and returns the charges for the given offer and amount.
#
# Args:
# offer (Offer): The offer object that contains the charges.
# amount (float): The requested loan amount.
#
# Returns:
# dict: A dictionary containing the calculated charges.
# """
# if not offer or not offer.charges:
# logger.error(f"No charges found for offer ID {offer.id}")
# return {"error": "No charges found for the offer"}
#
# loan_charges = offer.charges
# tenor = offer.schedule # offer.tenor // 30 # Convert to months
# interest = cls.get_charge_detail(rates = offer.interest_rate, charges = loan_charges, code = "INTEREST", amount = amount)
# management = cls.get_charge_detail(rates = offer.management_rate, charges = loan_charges, code = "MGTFEE", amount = amount)
# insurance = cls.get_charge_detail(rates = offer.insurance_rate, charges = loan_charges, code = "INSURANCE", amount = amount)
# vat = cls.get_charge_detail(rates = offer.vat_rate, charges = loan_charges, code = "VAT", amount = amount, management_fee = management["fee"])
#
# # Separate fees into upfront and postpaid
# upfront_fees = [
# fee["fee"]
# for fee in [interest, management, insurance, vat]
# if fee["due_days"] == 0
# ]
#
# postpaid_fees = [
# fee["fee"]
# for fee in [interest, management, insurance, vat]
# if fee["due_days"] != 0
# ]
# vat_test = vat["fee"]
# logger.info(f"VAT fee == *************** : {vat_test}")
#
# # Up-front payment: (only those fees due immediately i.e due_days == 0)
# # upfront_payment = sum(upfront_fees)
# if offer.schedule == 1:
# upfront_payment = vat["fee"] + management["fee"] + insurance["fee"] + interest["fee"]
# interest_amount = interest["fee"]
# repayment_amount = amount
# else:
# upfront_payment = vat["fee"] + insurance["fee"]+management["fee"]
# interest_amount = interest["fee"]*offer.schedule
# repayment_amount = amount + interest_amount
#
#
# # Repayment amount: (principal + only those fees not due immediately i.e due_days != 0)
# # repayment_amount = amount + (sum(postpaid_fees) * tenor)
#
# # Total amount: (upfront_payment + repayment_amount)
# total_amount = upfront_payment + repayment_amount
#
# # Calculate the installment amount
# installment_amount = repayment_amount / offer.schedule
#
# return {
# "interest": interest,
# "interest_amount": interest_amount,
# "management": management,
# "insurance": insurance,
# "vat": vat,
# "upfront_payment": round(upfront_payment, 2),
# "repayment_amount": round(repayment_amount, 2),
# "installment_amount": round(installment_amount, 2),
# "total_amount": round(total_amount, 2)
# }
#
#
+47
View File
@@ -0,0 +1,47 @@
from app.notifications.aws.aws_mailer import send_email_ses
from app.utils.logger import logger
def send_email_factory(to_email, subject, html_content, text_content=None):
try:
send_email_ses(
to_email=to_email,
subject=subject,
html_content=body_template(html_content)
)
except Exception as e:
logger.error(f"Send_Email_Factory failed: {e}")
raise
def body_template(html_content):
html_body = f"""\
<html>
<head></head>
<body style="font-size:14px;line-height:1.5;">
<table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px">
<tr>
<td style="text-align:center">
<img style="width:150px; height:auto;" src="https://www.mermsemr.com/images/logo-pink.png" />
</td>
</tr>
<tr>
<td>
{html_content}
</td>
</tr>
<tr>
<td>
For any support<br>
Reach Out<br>
support@mermsemr.com<br>
https://www.mermsemr.com/
</td>
</tr>
</table>
</body>
</html>
"""
return html_body