Files
MermsCoreFlask/app/api/services/base_service.py
T
CHIEFSOFT\ameye a8f94b9017 file_url
2026-02-14 14:19:51 -05:00

516 lines
18 KiB
Python

# from app.models import Customer, Account, Transaction
from app.api.enums import TransactionType
from flask import jsonify
from marshmallow import ValidationError
import logging
from app.api.integrations import KafkaIntegration
from app.config import Config
from app.models import MembersWebfiles
logger = logging.getLogger(__name__)
from app.api.integrations import StripeIntegration
from flask_mail import Mail, Message
import smtplib
from email.mime.text import MIMEText
from email.mime.multipart import MIMEMultipart
import datetime
import jwt
import requests
import redis
import json
class BaseService:
TRANSACTION_TYPE = None
JWT_SECRET_KEY = Config.JWT_SECRET_KEY
SEND_EMAIL_FROM = Config.SEND_EMAIL_FROM
SEND_EMAIL_PASS = Config.SEND_EMAIL_PASS
THIS_SITE_URL = Config.THIS_SITE_URL
CACHE_SERVER = Config.CACHE_SERVER
CACHE_PORT = Config.CACHE_PORT
CACHE_PASSWORD = Config.CACHE_PASSWORD
CACHE_DEFAULT_EXPIRE = Config.CACHE_DEFAULT_EXPIRE
MEDIA_SERVER = Config.MEDIA_SERVER
@staticmethod
def get_profile_picture_url(profile_uid):
file_url = ''
if profile_uid is None or profile_uid == '':
return file_url
selectedFile = MembersWebfiles.get_member_webfiles_by_file_uid(profile_uid)
if selectedFile:
file_url = (
BaseService.MEDIA_SERVER + "/" + selectedFile.file_group + "/" + str(
selectedFile.uid) + "/" + selectedFile.filename).lower()
return file_url
@staticmethod
def addStripeCustomer(customerData):
customer_data = {
"email": customerData["email"],
"name": customerData["name"],
}
stripe_customer = StripeIntegration.create_customer(customer_data)
logger.info(f"Stripe_Customer ===== : {stripe_customer}")
return stripe_customer
@staticmethod
def send_completepass_mail(signup_email, pending_uid, pending_id, firstname, lastname):
msg_body = f"""
Hello {firstname},
Password Reset Completed
For any Support
Reach Out
support@mermsemr.com
"""
html_body = f"""\
<html>
<head></head>
<body style="font-size:14px;line-height:1.5;">
<table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px">
<tr>
<td style="text-align:center">
<img style="width:150px; height:auto;" src="mermsemr.com/images/logo-pink.png" />
</td>
</tr>
<tr>
<td>
Hello <b>{firstname}!</b>
</td>
</tr>
<tr>
<td>
Password Reset Completed<br>
</td>
</tr>
<tr>
<td>
For any support<br>
Reach Out<br>
support@mermsemr.com<br>
https://www.mermsemr.com/
</td>
</tr>
</table>
</body>
</html>
"""
sender_email = BaseService.SEND_EMAIL_FROM
sender_password = BaseService.SEND_EMAIL_PASS
receiver_email = signup_email
subject = "Reset Password Completed"
body = msg_body
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = sender_email
msg['To'] = receiver_email
msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content
try:
# For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL)
# For other providers, consult their documentation for SMTP server and port
server = smtplib.SMTP('smtp.gmail.com', 587)
# server.starttls() # Enable TLS encryption
server.login(sender_email, sender_password)
server.sendmail(sender_email, receiver_email, msg.as_string())
print("Email sent successfully!")
except Exception as e:
print(f"Error sending email: {e}")
logger.error(f"Error sending email: {e}")
finally:
server.quit() # Close the connection
@staticmethod
def send_resetpass_mail(signup_email, pending_uid, pending_id, firstname, lastname):
pending_member = {
"email": signup_email,
"pending_uid": pending_uid,
"first_name": firstname,
"last_name": lastname,
"pending_id": pending_id,
}
logger.info(f"Send Email DATA ***** {pending_member}")
jwt_part = jwt.encode(
{"user": pending_member, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=3330)},
BaseService.JWT_SECRET_KEY, algorithm='HS256'
)
panel_url = BaseService.THIS_SITE_URL # "https://qa-panel.mermsemr.com"
link_url = str(panel_url) + '/accreset/' + jwt_part
msg_body = f"""
Hello {firstname},
You received this message for account reset password
Follow the link:{link_url}
For any Support
Reach Out
support@mermsemr.com
"""
html_body = f"""\
<html>
<head></head>
<body style="font-size:14px;line-height:1.5;">
<table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px">
<tr>
<td style="text-align:center">
<img style="width:150px; height:auto;" src="mermsemr.com/images/logo-pink.png" />
</td>
</tr>
<tr>
<td>
Hello <b>{firstname}!</b>
</td>
</tr>
<tr>
<td>
You received this message for account reset password<br>
</td>
</tr>
<tr>
<td>
Follow the link: <a href="{link_url}">link</a> to reset your password.<br>
</td>
</tr>
<tr>
<td>
For any support<br>
Reach Out<br>
support@mermsemr.com<br>
https://www.mermsemr.com/
</td>
</tr>
</table>
</body>
</html>
"""
sender_email = BaseService.SEND_EMAIL_FROM
sender_password = BaseService.SEND_EMAIL_PASS
receiver_email = signup_email
subject = "Reset Password Email"
body = msg_body
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = sender_email
msg['To'] = receiver_email
msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content
try:
# For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL)
# For other providers, consult their documentation for SMTP server and port
server = smtplib.SMTP('smtp.gmail.com', 587)
# server.starttls() # Enable TLS encryption
server.login(sender_email, sender_password)
server.sendmail(sender_email, receiver_email, msg.as_string())
print("Email sent successfully!")
except Exception as e:
print(f"Error sending email: {e}")
logger.error(f"Error sending email: {e}")
finally:
server.quit() # Close the connection
def send_verify_signup_mail(signup_email, pending_uid, pending_id, firstname, lastname):
pending_member = {
"email": signup_email,
"pending_uid": pending_uid,
"first_name": firstname,
"last_name": lastname,
"pending_id": pending_id,
}
jwt_part = jwt.encode(
{"user": pending_member, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=3330)},
BaseService.JWT_SECRET_KEY, algorithm='HS256'
)
panel_url = panel_url = BaseService.THIS_SITE_URL # "https://qa-panel.mermsemr.com"
link_url = str(panel_url) + '/csignup/' + jwt_part
msg_body = f"""
Hello {firstname},
You received this message for account verification
Follow the link:{link_url}
For any Support
Reach Out
support@mermsemr.com
"""
sender_email = BaseService.SEND_EMAIL_FROM
sender_password = BaseService.SEND_EMAIL_PASS
receiver_email = signup_email
subject = "Verify your MERMS(AI) Account Setup"
body = msg_body
html_body = f"""\
<html>
<head></head>
<body style="font-size:14px;line-height:1.5;">
<table width="550px" border="0" cellpadding="3" cellspacing="3" background-color="#F0F8FF" style="font-size:16px">
<tr>
<td style="text-align:center">
<img style="width:150px; height:auto;" src="mermsemr.com/images/logo-pink.png" />
</td>
</tr>
<tr>
<td>
Hello <b>{firstname}!</b>
</td>
</tr>
<tr>
<td>
You have received this message for your account verification<br>
</td>
</tr>
<tr>
<td>
Follow the link: <a href="{link_url}">link</a> to complete the verification process.<br>
</td>
</tr>
<tr>
<td>
For any support<br>
Reach Out<br>
support@mermsemr.com<br>
https://www.mermsemr.com/
</td>
</tr>
</table>
</body>
</html>
"""
msg = MIMEMultipart()
msg['Subject'] = subject
msg['From'] = sender_email
msg['To'] = receiver_email
msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content
try:
# For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL)
# For other providers, consult their documentation for SMTP server and port
server = smtplib.SMTP('smtp.gmail.com', 587)
# server.starttls() # Enable TLS encryption
server.login(sender_email, sender_password)
server.sendmail(sender_email, receiver_email, msg.as_string())
print("Email sent successfully!")
except Exception as e:
print(f"Error sending email: {e}")
logger.error(f"Error sending email: {e}")
finally:
server.quit() # Close the connection
@classmethod
def validate_data(cls, data, schema):
"""
Validate input data based on the provided schema.
"""
logger.info(f"Processing {cls.TRANSACTION_TYPE} request")
return schema.load(data)
# @classmethod
# def get_or_create_customer(cls, validated_data):
#
# """
# Check if a customer exists; if not, create one.
# """
#
# customer = Customer.query.filter_by(id=validated_data.get("customerId")).first()
# if not customer:
# customer = Customer.create_customer(
# id=validated_data.get("customerId"),
# msisdn=validated_data.get("msisdn"),
# country_code=validated_data.get("countryCode"),
# account_id=validated_data.get("accountId"),
# )
# return customer
# @classmethod
# def validate_account_ownership(cls, account_id, customer_id):
# """
# Check if the provided account belongs to the customer.
# """
# is_valid = Account.is_valid_account(account_id, customer_id)
# return is_valid
# @classmethod
# def log_transaction(cls, validated_data):
# """
# Create a new transaction.
# """
# channel = "USSD" if validated_data.get("channel") is None else validated_data.get("channel")
#
# return Transaction.create_transaction(
# transaction_id = validated_data.get("transactionId"),
# customer_id = validated_data.get('customerId', None),
# account_id = validated_data.get("accountId", None),
# type = cls.TRANSACTION_TYPE,
# channel = channel,
# )
@classmethod
def async_send_settings_refresh_to_kafka(cls, settings_data, subscription_uid, topic):
KafkaIntegration.send_setting_refresh_request(settings_data=settings_data, subscription_uid=subscription_uid,
topic=topic)
KafkaIntegration.flush()
@classmethod
def async_send_to_kafka(cls, loan_data, request_id, topic):
KafkaIntegration.send_loan_request(loan_data=loan_data, request_id=request_id, topic=topic)
KafkaIntegration.flush()
@staticmethod
def get_site_imges_data(provision_uid, primary_server, provision_port, selected_flavor):
destination_server = "http://" + str(primary_server) + ":" + str(provision_port)
api_url = destination_server + "/api/props"
try:
payload = {'provision_uid': provision_uid, 'flavor': selected_flavor}
logger.info(f"api_url: {str(api_url)}")
logger.info(f"selected_flavor: {str(selected_flavor)}")
# Make the GET request
res_data = []
response = requests.get(api_url, params=payload)
if response.status_code == 200:
logger.info(f"Response Site Images: {res_data}")
# Convert the JSON response to a Python dictionary
res_data = response.json()
logger.info(f"Response Site Images: {res_data}")
response_data = {
"provision_uid": str(provision_uid),
"data": res_data,
# "product_id": product_id,
}
return response_data
except Exception as e:
logger.error(f"An error occurred while get_site_imges_data data: {str(e)}", exc_info=True)
return None
@staticmethod
def write_cache_data(cacheSection, cacheId, cacheData):
try:
cacheKey = "MERMS-" + cacheSection + '-' + cacheId # note theh use of -
logger.info(f"write_cache_data () key {cacheKey}", exc_info=True)
# Define connection parameters and connect
r = redis.Redis(host=BaseService.CACHE_SERVER, port=BaseService.CACHE_PORT,
password=BaseService.CACHE_PASSWORD,
decode_responses=True)
if r.exists(cacheKey):
r.unlink(cacheKey)
# Set a key 'foo' with value 'bar'
json_string = json.dumps(cacheData, indent=4)
r.set(cacheKey, json_string, ex=BaseService.CACHE_DEFAULT_EXPIRE)
# Verify by getting the value
value = r.get(cacheKey)
print(f"Value of {cacheKey}: {value}") # Output: Value of 'foo': bar
response_data = {
"session_details": value,
# "product_id": product_id,
}
return response_data
except Exception as e:
logger.error(f"An error occurred while write_cache_data data: {str(e)}", exc_info=True)
return None
#
# @classmethod
# def calculate_charges(cls, offer, amount):
# """
# Calculates and returns the charges for the given offer and amount.
#
# Args:
# offer (Offer): The offer object that contains the charges.
# amount (float): The requested loan amount.
#
# Returns:
# dict: A dictionary containing the calculated charges.
# """
# if not offer or not offer.charges:
# logger.error(f"No charges found for offer ID {offer.id}")
# return {"error": "No charges found for the offer"}
#
# loan_charges = offer.charges
# tenor = offer.schedule # offer.tenor // 30 # Convert to months
# interest = cls.get_charge_detail(rates = offer.interest_rate, charges = loan_charges, code = "INTEREST", amount = amount)
# management = cls.get_charge_detail(rates = offer.management_rate, charges = loan_charges, code = "MGTFEE", amount = amount)
# insurance = cls.get_charge_detail(rates = offer.insurance_rate, charges = loan_charges, code = "INSURANCE", amount = amount)
# vat = cls.get_charge_detail(rates = offer.vat_rate, charges = loan_charges, code = "VAT", amount = amount, management_fee = management["fee"])
#
# # Separate fees into upfront and postpaid
# upfront_fees = [
# fee["fee"]
# for fee in [interest, management, insurance, vat]
# if fee["due_days"] == 0
# ]
#
# postpaid_fees = [
# fee["fee"]
# for fee in [interest, management, insurance, vat]
# if fee["due_days"] != 0
# ]
# vat_test = vat["fee"]
# logger.info(f"VAT fee == *************** : {vat_test}")
#
# # Up-front payment: (only those fees due immediately i.e due_days == 0)
# # upfront_payment = sum(upfront_fees)
# if offer.schedule == 1:
# upfront_payment = vat["fee"] + management["fee"] + insurance["fee"] + interest["fee"]
# interest_amount = interest["fee"]
# repayment_amount = amount
# else:
# upfront_payment = vat["fee"] + insurance["fee"]+management["fee"]
# interest_amount = interest["fee"]*offer.schedule
# repayment_amount = amount + interest_amount
#
#
# # Repayment amount: (principal + only those fees not due immediately i.e due_days != 0)
# # repayment_amount = amount + (sum(postpaid_fees) * tenor)
#
# # Total amount: (upfront_payment + repayment_amount)
# total_amount = upfront_payment + repayment_amount
#
# # Calculate the installment amount
# installment_amount = repayment_amount / offer.schedule
#
# return {
# "interest": interest,
# "interest_amount": interest_amount,
# "management": management,
# "insurance": insurance,
# "vat": vat,
# "upfront_payment": round(upfront_payment, 2),
# "repayment_amount": round(repayment_amount, 2),
# "installment_amount": round(installment_amount, 2),
# "total_amount": round(total_amount, 2)
# }
#
#