247 lines
8.6 KiB
Python
247 lines
8.6 KiB
Python
from app.models import Customer, Account, Transaction
|
|
from app.api.enums import TransactionType
|
|
from flask import jsonify
|
|
from marshmallow import ValidationError
|
|
import logging
|
|
from app.api.integrations import KafkaIntegration
|
|
from app.config import Config
|
|
logger = logging.getLogger(__name__)
|
|
|
|
from flask_mail import Mail, Message
|
|
import smtplib
|
|
from email.mime.text import MIMEText
|
|
from email.mime.multipart import MIMEMultipart
|
|
import datetime
|
|
import jwt
|
|
|
|
class BaseService:
|
|
TRANSACTION_TYPE = None
|
|
JWT_SECRET_KEY = Config.JWT_SECRET_KEY
|
|
|
|
SEND_EMAIL_FROM = Config.SEND_EMAIL_FROM
|
|
SEND_EMAIL_PASS = Config.SEND_EMAIL_PASS
|
|
THIS_SITE_URL = Config.THIS_SITE_URL
|
|
|
|
@staticmethod
|
|
def send_resetpass_mail(signup_email, pending_uid, pending_id, firstname, lastname):
|
|
|
|
pending_member = {
|
|
"email": signup_email,
|
|
"pending_uid": pending_uid,
|
|
"first_name": firstname,
|
|
"last_name": lastname,
|
|
"pending_id": pending_id,
|
|
}
|
|
|
|
logger.info(f"Send Email DATA ***** {pending_member}")
|
|
|
|
jwt_part = jwt.encode(
|
|
{"user": pending_member, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=3330)},
|
|
BaseService.JWT_SECRET_KEY, algorithm='HS256'
|
|
)
|
|
panel_url = BaseService.THIS_SITE_URL #"https://qa-panel.mermsemr.com"
|
|
link_url = str(panel_url) + '/accreset/' + jwt_part
|
|
|
|
msg_body = f"""
|
|
Hello {firstname},
|
|
|
|
You received this message for account reset password
|
|
|
|
Follow the link:{link_url}
|
|
|
|
For any Support
|
|
Reach Out
|
|
support@mermsemr.com
|
|
"""
|
|
|
|
sender_email = BaseService.SEND_EMAIL_FROM
|
|
sender_password = BaseService.SEND_EMAIL_PASS
|
|
receiver_email = signup_email
|
|
subject = "Reset Password Email"
|
|
body = msg_body
|
|
|
|
msg = MIMEMultipart()
|
|
msg['Subject'] = subject
|
|
msg['From'] = sender_email
|
|
msg['To'] = receiver_email
|
|
msg.attach(MIMEText(body, 'plain')) # or 'html' for HTML content
|
|
|
|
try:
|
|
# For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL)
|
|
# For other providers, consult their documentation for SMTP server and port
|
|
server = smtplib.SMTP('smtp.gmail.com', 587)
|
|
# server.starttls() # Enable TLS encryption
|
|
server.login(sender_email, sender_password)
|
|
server.sendmail(sender_email, receiver_email, msg.as_string())
|
|
print("Email sent successfully!")
|
|
except Exception as e:
|
|
print(f"Error sending email: {e}")
|
|
logger.error(f"Error sending email: {e}")
|
|
finally:
|
|
server.quit() # Close the connection
|
|
|
|
|
|
@classmethod
|
|
def validate_data(cls, data, schema):
|
|
"""
|
|
Validate input data based on the provided schema.
|
|
"""
|
|
logger.info(f"Processing {cls.TRANSACTION_TYPE} request")
|
|
return schema.load(data)
|
|
|
|
@classmethod
|
|
def get_or_create_customer(cls, validated_data):
|
|
|
|
"""
|
|
Check if a customer exists; if not, create one.
|
|
"""
|
|
|
|
customer = Customer.query.filter_by(id=validated_data.get("customerId")).first()
|
|
if not customer:
|
|
customer = Customer.create_customer(
|
|
id=validated_data.get("customerId"),
|
|
msisdn=validated_data.get("msisdn"),
|
|
country_code=validated_data.get("countryCode"),
|
|
account_id=validated_data.get("accountId"),
|
|
)
|
|
return customer
|
|
|
|
@classmethod
|
|
def validate_account_ownership(cls, account_id, customer_id):
|
|
"""
|
|
Check if the provided account belongs to the customer.
|
|
"""
|
|
is_valid = Account.is_valid_account(account_id, customer_id)
|
|
return is_valid
|
|
|
|
@classmethod
|
|
def log_transaction(cls, validated_data):
|
|
"""
|
|
Create a new transaction.
|
|
"""
|
|
channel = "USSD" if validated_data.get("channel") is None else validated_data.get("channel")
|
|
|
|
return Transaction.create_transaction(
|
|
transaction_id = validated_data.get("transactionId"),
|
|
customer_id = validated_data.get('customerId', None),
|
|
account_id = validated_data.get("accountId", None),
|
|
type = cls.TRANSACTION_TYPE,
|
|
channel = channel,
|
|
)
|
|
|
|
@classmethod
|
|
def async_send_to_kafka(cls, loan_data, request_id, topic):
|
|
KafkaIntegration.send_loan_request(loan_data = loan_data, request_id = request_id, topic = topic)
|
|
KafkaIntegration.flush()
|
|
|
|
|
|
@classmethod
|
|
def calculate_charges(cls, offer, amount):
|
|
"""
|
|
Calculates and returns the charges for the given offer and amount.
|
|
|
|
Args:
|
|
offer (Offer): The offer object that contains the charges.
|
|
amount (float): The requested loan amount.
|
|
|
|
Returns:
|
|
dict: A dictionary containing the calculated charges.
|
|
"""
|
|
if not offer or not offer.charges:
|
|
logger.error(f"No charges found for offer ID {offer.id}")
|
|
return {"error": "No charges found for the offer"}
|
|
|
|
loan_charges = offer.charges
|
|
tenor = offer.schedule # offer.tenor // 30 # Convert to months
|
|
interest = cls.get_charge_detail(rates = offer.interest_rate, charges = loan_charges, code = "INTEREST", amount = amount)
|
|
management = cls.get_charge_detail(rates = offer.management_rate, charges = loan_charges, code = "MGTFEE", amount = amount)
|
|
insurance = cls.get_charge_detail(rates = offer.insurance_rate, charges = loan_charges, code = "INSURANCE", amount = amount)
|
|
vat = cls.get_charge_detail(rates = offer.vat_rate, charges = loan_charges, code = "VAT", amount = amount, management_fee = management["fee"])
|
|
|
|
# Separate fees into upfront and postpaid
|
|
upfront_fees = [
|
|
fee["fee"]
|
|
for fee in [interest, management, insurance, vat]
|
|
if fee["due_days"] == 0
|
|
]
|
|
|
|
postpaid_fees = [
|
|
fee["fee"]
|
|
for fee in [interest, management, insurance, vat]
|
|
if fee["due_days"] != 0
|
|
]
|
|
vat_test = vat["fee"]
|
|
logger.info(f"VAT fee == *************** : {vat_test}")
|
|
|
|
# Up-front payment: (only those fees due immediately i.e due_days == 0)
|
|
# upfront_payment = sum(upfront_fees)
|
|
if offer.schedule == 1:
|
|
upfront_payment = vat["fee"] + management["fee"] + insurance["fee"] + interest["fee"]
|
|
interest_amount = interest["fee"]
|
|
repayment_amount = amount
|
|
else:
|
|
upfront_payment = vat["fee"] + insurance["fee"]+management["fee"]
|
|
interest_amount = interest["fee"]*offer.schedule
|
|
repayment_amount = amount + interest_amount
|
|
|
|
|
|
# Repayment amount: (principal + only those fees not due immediately i.e due_days != 0)
|
|
# repayment_amount = amount + (sum(postpaid_fees) * tenor)
|
|
|
|
# Total amount: (upfront_payment + repayment_amount)
|
|
total_amount = upfront_payment + repayment_amount
|
|
|
|
# Calculate the installment amount
|
|
installment_amount = repayment_amount / offer.schedule
|
|
|
|
return {
|
|
"interest": interest,
|
|
"interest_amount": interest_amount,
|
|
"management": management,
|
|
"insurance": insurance,
|
|
"vat": vat,
|
|
"upfront_payment": round(upfront_payment, 2),
|
|
"repayment_amount": round(repayment_amount, 2),
|
|
"installment_amount": round(installment_amount, 2),
|
|
"total_amount": round(total_amount, 2)
|
|
}
|
|
|
|
|
|
@classmethod
|
|
def get_charge_detail(cls, rates, charges, code, amount, management_fee= 0.0):
|
|
"""
|
|
Get details for a specific charge code from a list of loan charges.
|
|
|
|
Returns default values if not found.
|
|
"""
|
|
fee = 0.0
|
|
|
|
if code == "VAT" and management_fee > 0:
|
|
fee = management_fee * rates / 100
|
|
else:
|
|
fee = amount * rates / 100
|
|
|
|
return {
|
|
"rate": rates,
|
|
"fee": round(fee, 2),
|
|
"due_days": 30,
|
|
"code": code,
|
|
"description" : "have no idea how to get this yet"
|
|
}
|
|
|
|
# if charge.code == code:
|
|
# if code == "VAT" and management_fee > 0:
|
|
# fee = management_fee * rates / 100
|
|
# else:
|
|
# fee = amount * rates / 100
|
|
#
|
|
# return {
|
|
# "rate": rates,
|
|
# "fee": round(fee, 2),
|
|
# "due_days": charge.due
|
|
# }
|
|
|
|
# return {"rate": 0, "fee": 0, "due_days": 0}
|
|
|
|
|