# from app.models import Customer, Account, Transaction from app.api.enums import TransactionType from flask import jsonify from marshmallow import ValidationError import logging from app.api.integrations import KafkaIntegration from app.config import Config logger = logging.getLogger(__name__) from app.api.integrations import StripeIntegration from flask_mail import Mail, Message import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import datetime import jwt import requests import redis import json class BaseService: TRANSACTION_TYPE = None JWT_SECRET_KEY = Config.JWT_SECRET_KEY SEND_EMAIL_FROM = Config.SEND_EMAIL_FROM SEND_EMAIL_PASS = Config.SEND_EMAIL_PASS THIS_SITE_URL = Config.THIS_SITE_URL CACHE_SERVER = Config.CACHE_SERVER CACHE_PORT = Config.CACHE_PORT CACHE_PASSWORD = Config.CACHE_PASSWORD @staticmethod def addStripeCustomer(customerData): customer_data = { "email": customerData["email"], "name": customerData["name"], } stripe_customer = StripeIntegration.create_customer(customer_data) logger.info(f"Stripe_Customer ===== : {stripe_customer}") return stripe_customer @staticmethod def send_completepass_mail(signup_email, pending_uid, pending_id, firstname, lastname): msg_body = f""" Hello {firstname}, Password Reset Completed For any Support Reach Out support@mermsemr.com """ html_body = f"""\
Hello {firstname}!
Password Reset Completed
For any support
Reach Out
support@mermsemr.com
https://www.mermsemr.com/
""" sender_email = BaseService.SEND_EMAIL_FROM sender_password = BaseService.SEND_EMAIL_PASS receiver_email = signup_email subject = "Reset Password Completed" body = msg_body msg = MIMEMultipart() msg['Subject'] = subject msg['From'] = sender_email msg['To'] = receiver_email msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content try: # For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL) # For other providers, consult their documentation for SMTP server and port server = smtplib.SMTP('smtp.gmail.com', 587) # server.starttls() # Enable TLS encryption server.login(sender_email, sender_password) server.sendmail(sender_email, receiver_email, msg.as_string()) print("Email sent successfully!") except Exception as e: print(f"Error sending email: {e}") logger.error(f"Error sending email: {e}") finally: server.quit() # Close the connection @staticmethod def send_resetpass_mail(signup_email, pending_uid, pending_id, firstname, lastname): pending_member = { "email": signup_email, "pending_uid": pending_uid, "first_name": firstname, "last_name": lastname, "pending_id": pending_id, } logger.info(f"Send Email DATA ***** {pending_member}") jwt_part = jwt.encode( {"user": pending_member, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=3330)}, BaseService.JWT_SECRET_KEY, algorithm='HS256' ) panel_url = BaseService.THIS_SITE_URL # "https://qa-panel.mermsemr.com" link_url = str(panel_url) + '/accreset/' + jwt_part msg_body = f""" Hello {firstname}, You received this message for account reset password Follow the link:{link_url} For any Support Reach Out support@mermsemr.com """ html_body = f"""\
Hello {firstname}!
You received this message for account reset password
Follow the link: link to reset your password.
For any support
Reach Out
support@mermsemr.com
https://www.mermsemr.com/
""" sender_email = BaseService.SEND_EMAIL_FROM sender_password = BaseService.SEND_EMAIL_PASS receiver_email = signup_email subject = "Reset Password Email" body = msg_body msg = MIMEMultipart() msg['Subject'] = subject msg['From'] = sender_email msg['To'] = receiver_email msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content try: # For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL) # For other providers, consult their documentation for SMTP server and port server = smtplib.SMTP('smtp.gmail.com', 587) # server.starttls() # Enable TLS encryption server.login(sender_email, sender_password) server.sendmail(sender_email, receiver_email, msg.as_string()) print("Email sent successfully!") except Exception as e: print(f"Error sending email: {e}") logger.error(f"Error sending email: {e}") finally: server.quit() # Close the connection def send_verify_signup_mail(signup_email, pending_uid, pending_id, firstname, lastname): pending_member = { "email": signup_email, "pending_uid": pending_uid, "first_name": firstname, "last_name": lastname, "pending_id": pending_id, } jwt_part = jwt.encode( {"user": pending_member, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=3330)}, BaseService.JWT_SECRET_KEY, algorithm='HS256' ) panel_url = panel_url = BaseService.THIS_SITE_URL # "https://qa-panel.mermsemr.com" link_url = str(panel_url) + '/csignup/' + jwt_part msg_body = f""" Hello {firstname}, You received this message for account verification Follow the link:{link_url} For any Support Reach Out support@mermsemr.com """ sender_email = BaseService.SEND_EMAIL_FROM sender_password = BaseService.SEND_EMAIL_PASS receiver_email = signup_email subject = "Verify your MERMS(AI) Account Setup" body = msg_body html_body = f"""\
Hello {firstname}!
You have received this message for your account verification
Follow the link: link to complete the verification process.
For any support
Reach Out
support@mermsemr.com
https://www.mermsemr.com/
""" msg = MIMEMultipart() msg['Subject'] = subject msg['From'] = sender_email msg['To'] = receiver_email msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content try: # For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL) # For other providers, consult their documentation for SMTP server and port server = smtplib.SMTP('smtp.gmail.com', 587) # server.starttls() # Enable TLS encryption server.login(sender_email, sender_password) server.sendmail(sender_email, receiver_email, msg.as_string()) print("Email sent successfully!") except Exception as e: print(f"Error sending email: {e}") logger.error(f"Error sending email: {e}") finally: server.quit() # Close the connection @classmethod def validate_data(cls, data, schema): """ Validate input data based on the provided schema. """ logger.info(f"Processing {cls.TRANSACTION_TYPE} request") return schema.load(data) # @classmethod # def get_or_create_customer(cls, validated_data): # # """ # Check if a customer exists; if not, create one. # """ # # customer = Customer.query.filter_by(id=validated_data.get("customerId")).first() # if not customer: # customer = Customer.create_customer( # id=validated_data.get("customerId"), # msisdn=validated_data.get("msisdn"), # country_code=validated_data.get("countryCode"), # account_id=validated_data.get("accountId"), # ) # return customer # @classmethod # def validate_account_ownership(cls, account_id, customer_id): # """ # Check if the provided account belongs to the customer. # """ # is_valid = Account.is_valid_account(account_id, customer_id) # return is_valid # @classmethod # def log_transaction(cls, validated_data): # """ # Create a new transaction. # """ # channel = "USSD" if validated_data.get("channel") is None else validated_data.get("channel") # # return Transaction.create_transaction( # transaction_id = validated_data.get("transactionId"), # customer_id = validated_data.get('customerId', None), # account_id = validated_data.get("accountId", None), # type = cls.TRANSACTION_TYPE, # channel = channel, # ) @classmethod def async_send_settings_refresh_to_kafka(cls, settings_data, subscription_uid, topic): KafkaIntegration.send_setting_refresh_request(settings_data=settings_data, subscription_uid=subscription_uid, topic=topic) KafkaIntegration.flush() @classmethod def async_send_to_kafka(cls, loan_data, request_id, topic): KafkaIntegration.send_loan_request(loan_data=loan_data, request_id=request_id, topic=topic) KafkaIntegration.flush() @staticmethod def get_site_imges_data(provision_uid, primary_server, provision_port, selected_flavor): destination_server = "http://" + str(primary_server) + ":" + str(provision_port) api_url = destination_server + "/api/props" try: payload = {'provision_uid': provision_uid, 'flavor': selected_flavor} logger.info(f"api_url: {str(api_url)}") logger.info(f"selected_flavor: {str(selected_flavor)}") # Make the GET request res_data = [] response = requests.get(api_url, params=payload) if response.status_code == 200: logger.info(f"Response Site Images: {res_data}") # Convert the JSON response to a Python dictionary res_data = response.json() logger.info(f"Response Site Images: {res_data}") response_data = { "provision_uid": provision_uid, "data": res_data, # "product_id": product_id, } return response_data except Exception as e: logger.error(f"An error occurred while get_site_imges_data data: {str(e)}", exc_info=True) return None @staticmethod def write_cache_data(cacheSection, cacheId, cacheData): try: cacheKey = cacheSection + '-' + cacheId logger.info(f"write_cache_data () key {cacheKey}", exc_info=True) # Define connection parameters and connect r = redis.Redis(host=BaseService.CACHE_SERVER, port=BaseService.CACHE_PORT, password=BaseService.CACHE_PASSWORD, decode_responses=True) if r.exists(cacheKey): r.unlink(cacheKey) # Set a key 'foo' with value 'bar' json_string = json.dumps(cacheData, indent=4) r.set(cacheKey, json_string, ex=120) # Verify by getting the value value = r.get(cacheKey) print(f"Value of {cacheKey}: {value}") # Output: Value of 'foo': bar response_data = { "session_details": value, # "product_id": product_id, } return response_data except Exception as e: logger.error(f"An error occurred while write_cache_data data: {str(e)}", exc_info=True) return None # # @classmethod # def calculate_charges(cls, offer, amount): # """ # Calculates and returns the charges for the given offer and amount. # # Args: # offer (Offer): The offer object that contains the charges. # amount (float): The requested loan amount. # # Returns: # dict: A dictionary containing the calculated charges. # """ # if not offer or not offer.charges: # logger.error(f"No charges found for offer ID {offer.id}") # return {"error": "No charges found for the offer"} # # loan_charges = offer.charges # tenor = offer.schedule # offer.tenor // 30 # Convert to months # interest = cls.get_charge_detail(rates = offer.interest_rate, charges = loan_charges, code = "INTEREST", amount = amount) # management = cls.get_charge_detail(rates = offer.management_rate, charges = loan_charges, code = "MGTFEE", amount = amount) # insurance = cls.get_charge_detail(rates = offer.insurance_rate, charges = loan_charges, code = "INSURANCE", amount = amount) # vat = cls.get_charge_detail(rates = offer.vat_rate, charges = loan_charges, code = "VAT", amount = amount, management_fee = management["fee"]) # # # Separate fees into upfront and postpaid # upfront_fees = [ # fee["fee"] # for fee in [interest, management, insurance, vat] # if fee["due_days"] == 0 # ] # # postpaid_fees = [ # fee["fee"] # for fee in [interest, management, insurance, vat] # if fee["due_days"] != 0 # ] # vat_test = vat["fee"] # logger.info(f"VAT fee == *************** : {vat_test}") # # # Up-front payment: (only those fees due immediately i.e due_days == 0) # # upfront_payment = sum(upfront_fees) # if offer.schedule == 1: # upfront_payment = vat["fee"] + management["fee"] + insurance["fee"] + interest["fee"] # interest_amount = interest["fee"] # repayment_amount = amount # else: # upfront_payment = vat["fee"] + insurance["fee"]+management["fee"] # interest_amount = interest["fee"]*offer.schedule # repayment_amount = amount + interest_amount # # # # Repayment amount: (principal + only those fees not due immediately i.e due_days != 0) # # repayment_amount = amount + (sum(postpaid_fees) * tenor) # # # Total amount: (upfront_payment + repayment_amount) # total_amount = upfront_payment + repayment_amount # # # Calculate the installment amount # installment_amount = repayment_amount / offer.schedule # # return { # "interest": interest, # "interest_amount": interest_amount, # "management": management, # "insurance": insurance, # "vat": vat, # "upfront_payment": round(upfront_payment, 2), # "repayment_amount": round(repayment_amount, 2), # "installment_amount": round(installment_amount, 2), # "total_amount": round(total_amount, 2) # } # #