# from app.models import Customer, Account, Transaction from app.api.enums import TransactionType from flask import jsonify from marshmallow import ValidationError import logging from app.api.integrations import KafkaIntegration from app.config import Config from app.models import MembersWebfiles logger = logging.getLogger(__name__) from app.api.integrations import StripeIntegration from flask_mail import Mail, Message import smtplib from email.mime.text import MIMEText from email.mime.multipart import MIMEMultipart import datetime import jwt import requests import redis import json class BaseService: TRANSACTION_TYPE = None JWT_SECRET_KEY = Config.JWT_SECRET_KEY SEND_EMAIL_FROM = Config.SEND_EMAIL_FROM SEND_EMAIL_PASS = Config.SEND_EMAIL_PASS THIS_SITE_URL = Config.THIS_SITE_URL CACHE_SERVER = Config.CACHE_SERVER CACHE_PORT = Config.CACHE_PORT CACHE_PASSWORD = Config.CACHE_PASSWORD CACHE_DEFAULT_EXPIRE = Config.CACHE_DEFAULT_EXPIRE MEDIA_SERVER = Config.MEDIA_SERVER @staticmethod def get_profile_picture_url(profile_uid): file_url = '' if profile_uid is None or profile_uid == '': return file_url selectedFile = MembersWebfiles.get_member_webfiles_by_file_uid(profile_uid) if selectedFile: file_url = ( BaseService.MEDIA_SERVER + "/" + selectedFile.file_group + "/" + str( selectedFile.uid) + "/" + selectedFile.filename).lower() return file_url @staticmethod def addStripeCustomer(customerData): customer_data = { "email": customerData["email"], "name": customerData["name"], } stripe_customer = StripeIntegration.create_customer(customer_data) logger.info(f"Stripe_Customer ===== : {stripe_customer}") return stripe_customer @staticmethod def send_completepass_mail(signup_email, pending_uid, pending_id, firstname, lastname): msg_body = f""" Hello {firstname}, Password Reset Completed For any Support Reach Out support@mermsemr.com """ html_body = f"""\
Hello {firstname}!
Password Reset Completed
For any support
Reach Out
support@mermsemr.com
https://www.mermsemr.com/
""" sender_email = BaseService.SEND_EMAIL_FROM sender_password = BaseService.SEND_EMAIL_PASS receiver_email = signup_email subject = "Reset Password Completed" body = msg_body msg = MIMEMultipart() msg['Subject'] = subject msg['From'] = sender_email msg['To'] = receiver_email msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content try: # For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL) # For other providers, consult their documentation for SMTP server and port server = smtplib.SMTP('smtp.gmail.com', 587) # server.starttls() # Enable TLS encryption server.login(sender_email, sender_password) server.sendmail(sender_email, receiver_email, msg.as_string()) print("Email sent successfully!") except Exception as e: print(f"Error sending email: {e}") logger.error(f"Error sending email: {e}") finally: server.quit() # Close the connection @staticmethod def send_resetpass_mail(signup_email, pending_uid, pending_id, firstname, lastname): pending_member = { "email": signup_email, "pending_uid": pending_uid, "first_name": firstname, "last_name": lastname, "pending_id": pending_id, } logger.info(f"Send Email DATA ***** {pending_member}") jwt_part = jwt.encode( {"user": pending_member, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=3330)}, BaseService.JWT_SECRET_KEY, algorithm='HS256' ) panel_url = BaseService.THIS_SITE_URL # "https://qa-panel.mermsemr.com" link_url = str(panel_url) + '/accreset/' + jwt_part msg_body = f""" Hello {firstname}, You received this message for account reset password Follow the link:{link_url} For any Support Reach Out support@mermsemr.com """ html_body = f"""\
Hello {firstname}!
You received this message for account reset password
Follow the link: link to reset your password.
For any support
Reach Out
support@mermsemr.com
https://www.mermsemr.com/
""" sender_email = BaseService.SEND_EMAIL_FROM sender_password = BaseService.SEND_EMAIL_PASS receiver_email = signup_email subject = "Reset Password Email" body = msg_body msg = MIMEMultipart() msg['Subject'] = subject msg['From'] = sender_email msg['To'] = receiver_email msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content try: # For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL) # For other providers, consult their documentation for SMTP server and port server = smtplib.SMTP('smtp.gmail.com', 587) # server.starttls() # Enable TLS encryption server.login(sender_email, sender_password) server.sendmail(sender_email, receiver_email, msg.as_string()) print("Email sent successfully!") except Exception as e: print(f"Error sending email: {e}") logger.error(f"Error sending email: {e}") finally: server.quit() # Close the connection def send_verify_signup_mail(signup_email, pending_uid, pending_id, firstname, lastname): pending_member = { "email": signup_email, "pending_uid": pending_uid, "first_name": firstname, "last_name": lastname, "pending_id": pending_id, } jwt_part = jwt.encode( {"user": pending_member, 'exp': datetime.datetime.utcnow() + datetime.timedelta(minutes=3330)}, BaseService.JWT_SECRET_KEY, algorithm='HS256' ) panel_url = panel_url = BaseService.THIS_SITE_URL # "https://qa-panel.mermsemr.com" link_url = str(panel_url) + '/csignup/' + jwt_part msg_body = f""" Hello {firstname}, You received this message for account verification Follow the link:{link_url} For any Support Reach Out support@mermsemr.com """ sender_email = BaseService.SEND_EMAIL_FROM sender_password = BaseService.SEND_EMAIL_PASS receiver_email = signup_email subject = "Verify your MERMS(AI) Account Setup" body = msg_body html_body = f"""\
Hello {firstname}!
You have received this message for your account verification
Follow the link: link to complete the verification process.
For any support
Reach Out
support@mermsemr.com
https://www.mermsemr.com/
""" msg = MIMEMultipart() msg['Subject'] = subject msg['From'] = sender_email msg['To'] = receiver_email msg.attach(MIMEText(html_body, 'html')) # or 'html' for HTML content try: # For Gmail, use 'smtp.gmail.com' and port 587 (TLS) or 465 (SSL) # For other providers, consult their documentation for SMTP server and port server = smtplib.SMTP('smtp.gmail.com', 587) # server.starttls() # Enable TLS encryption server.login(sender_email, sender_password) server.sendmail(sender_email, receiver_email, msg.as_string()) print("Email sent successfully!") except Exception as e: print(f"Error sending email: {e}") logger.error(f"Error sending email: {e}") finally: server.quit() # Close the connection @classmethod def validate_data(cls, data, schema): """ Validate input data based on the provided schema. """ logger.info(f"Processing {cls.TRANSACTION_TYPE} request") return schema.load(data) # @classmethod # def get_or_create_customer(cls, validated_data): # # """ # Check if a customer exists; if not, create one. # """ # # customer = Customer.query.filter_by(id=validated_data.get("customerId")).first() # if not customer: # customer = Customer.create_customer( # id=validated_data.get("customerId"), # msisdn=validated_data.get("msisdn"), # country_code=validated_data.get("countryCode"), # account_id=validated_data.get("accountId"), # ) # return customer # @classmethod # def validate_account_ownership(cls, account_id, customer_id): # """ # Check if the provided account belongs to the customer. # """ # is_valid = Account.is_valid_account(account_id, customer_id) # return is_valid # @classmethod # def log_transaction(cls, validated_data): # """ # Create a new transaction. # """ # channel = "USSD" if validated_data.get("channel") is None else validated_data.get("channel") # # return Transaction.create_transaction( # transaction_id = validated_data.get("transactionId"), # customer_id = validated_data.get('customerId', None), # account_id = validated_data.get("accountId", None), # type = cls.TRANSACTION_TYPE, # channel = channel, # ) @classmethod def async_send_settings_refresh_to_kafka(cls, settings_data, subscription_uid, topic): KafkaIntegration.send_setting_refresh_request(settings_data=settings_data, subscription_uid=subscription_uid, topic=topic) KafkaIntegration.flush() @classmethod def async_send_to_kafka(cls, loan_data, request_id, topic): KafkaIntegration.send_loan_request(loan_data=loan_data, request_id=request_id, topic=topic) KafkaIntegration.flush() @staticmethod def get_site_imges_data(provision_uid, primary_server, provision_port, selected_flavor): destination_server = "http://" + str(primary_server) + ":" + str(provision_port) api_url = destination_server + "/api/props" try: payload = {'provision_uid': provision_uid, 'flavor': selected_flavor} logger.info(f"api_url: {str(api_url)}") logger.info(f"selected_flavor: {str(selected_flavor)}") # Make the GET request res_data = [] response = requests.get(api_url, params=payload) if response.status_code == 200: logger.info(f"Response Site Images: {res_data}") # Convert the JSON response to a Python dictionary res_data = response.json() logger.info(f"Response Site Images: {res_data}") response_data = { "provision_uid": str(provision_uid), "data": res_data, # "product_id": product_id, } return response_data except Exception as e: logger.error(f"An error occurred while get_site_imges_data data: {str(e)}", exc_info=True) return None @staticmethod def write_cache_data(cacheSection, cacheId, cacheData): try: cacheKey = "MERMS-" + cacheSection + '-' + cacheId # note theh use of - logger.info(f"write_cache_data () key {cacheKey}", exc_info=True) # Define connection parameters and connect r = redis.Redis(host=BaseService.CACHE_SERVER, port=BaseService.CACHE_PORT, password=BaseService.CACHE_PASSWORD, decode_responses=True) if r.exists(cacheKey): r.unlink(cacheKey) # Set a key 'foo' with value 'bar' json_string = json.dumps(cacheData, indent=4) r.set(cacheKey, json_string, ex=BaseService.CACHE_DEFAULT_EXPIRE) # Verify by getting the value value = r.get(cacheKey) print(f"Value of {cacheKey}: {value}") # Output: Value of 'foo': bar response_data = { "session_details": value, # "product_id": product_id, } return response_data except Exception as e: logger.error(f"An error occurred while write_cache_data data: {str(e)}", exc_info=True) return None # # @classmethod # def calculate_charges(cls, offer, amount): # """ # Calculates and returns the charges for the given offer and amount. # # Args: # offer (Offer): The offer object that contains the charges. # amount (float): The requested loan amount. # # Returns: # dict: A dictionary containing the calculated charges. # """ # if not offer or not offer.charges: # logger.error(f"No charges found for offer ID {offer.id}") # return {"error": "No charges found for the offer"} # # loan_charges = offer.charges # tenor = offer.schedule # offer.tenor // 30 # Convert to months # interest = cls.get_charge_detail(rates = offer.interest_rate, charges = loan_charges, code = "INTEREST", amount = amount) # management = cls.get_charge_detail(rates = offer.management_rate, charges = loan_charges, code = "MGTFEE", amount = amount) # insurance = cls.get_charge_detail(rates = offer.insurance_rate, charges = loan_charges, code = "INSURANCE", amount = amount) # vat = cls.get_charge_detail(rates = offer.vat_rate, charges = loan_charges, code = "VAT", amount = amount, management_fee = management["fee"]) # # # Separate fees into upfront and postpaid # upfront_fees = [ # fee["fee"] # for fee in [interest, management, insurance, vat] # if fee["due_days"] == 0 # ] # # postpaid_fees = [ # fee["fee"] # for fee in [interest, management, insurance, vat] # if fee["due_days"] != 0 # ] # vat_test = vat["fee"] # logger.info(f"VAT fee == *************** : {vat_test}") # # # Up-front payment: (only those fees due immediately i.e due_days == 0) # # upfront_payment = sum(upfront_fees) # if offer.schedule == 1: # upfront_payment = vat["fee"] + management["fee"] + insurance["fee"] + interest["fee"] # interest_amount = interest["fee"] # repayment_amount = amount # else: # upfront_payment = vat["fee"] + insurance["fee"]+management["fee"] # interest_amount = interest["fee"]*offer.schedule # repayment_amount = amount + interest_amount # # # # Repayment amount: (principal + only those fees not due immediately i.e due_days != 0) # # repayment_amount = amount + (sum(postpaid_fees) * tenor) # # # Total amount: (upfront_payment + repayment_amount) # total_amount = upfront_payment + repayment_amount # # # Calculate the installment amount # installment_amount = repayment_amount / offer.schedule # # return { # "interest": interest, # "interest_amount": interest_amount, # "management": management, # "insurance": insurance, # "vat": vat, # "upfront_payment": round(upfront_payment, 2), # "repayment_amount": round(repayment_amount, 2), # "installment_amount": round(installment_amount, 2), # "total_amount": round(total_amount, 2) # } # #