from urllib import request from flask import jsonify from app.config import settings from app.utils.logger import logger from app.api.services.base_service import BaseService from sqlalchemy import func, desc from app.models import MembersProducts, Products, Payments, Members, CustomTemplates, ProductsTemplates, MembersProfile, \ ProductsDetails, MembersWebfiles, Signup class OfficeDashboardService(BaseService): @staticmethod def get_payments_data(filters): try: if filters is None: filters = {} # Extract filters option_name = filters.get('option_name') # member_id = filters.get('member_id') # Extract pagination parameters page = int(filters.get('page', 1)) limit = int(filters.get('limit', 20)) # Ensure page and limit are valid if page < 1: page = 1 if limit < 1 or limit > 100: limit = 20 membersPayList, total_count = Payments.get_all_payments(option_name, None, page, limit) # Convert loans to dictionary format member_sub_data = [] for subs in membersPayList: member_sub_data.append({ 'id': subs.id, 'member_id': subs.member_id, 'option_name': subs.option_name, 'option_type': subs.option_type, 'payment_uid': subs.payment_uid, 'amount': subs.amount * 0.01, 'status': subs.status, 'sub_start': subs.sub_start, 'sub_stop': subs.sub_stop, "added": subs.added, }) # Calculate total pages total_pages = (total_count + limit - 1) // limit response_data = { 'payments': member_sub_data, 'count': len(member_sub_data), 'pagination': { 'total_count': total_count, 'total_pages': total_pages, 'current_page': page, 'limit': limit, 'has_next': page < total_pages, 'has_prev': page > 1 } } return response_data except Exception as e: logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True) return jsonify({"message": "Internal Server Error"}), 500 @staticmethod def get_dashboard_data(): try: subscription_data = [] subscription = MembersProducts.get_dash_recent_subscription(15) if subscription: subscription_data = [{ 'id': t.id, 'uid': t.uid, 'product_id': t.product_id, 'internal_url': t.internal_url, 'external_url': t.external_url, 'dns_group': t.dns_group, 'status': t.status, 'added': t.added, 'primary_server': t.primary_server, 'provision_port': t.provision_port, 'updated': t.updated } for t in subscription] sub_count = MembersProducts.get_subscription_counts(0) dashboard_data = { "subscription": subscription_data, "signups": { "currency": "Naira", "currency_text": "\u20a6", "text": "last 30 days", "value": int(sub_count) }, "payments": { "currency": "Dollars", "currency_text": "\u0024", "text": "this week", "value": 0 }, "request_summary": { "eligibility_check": { "Started": 6 }, "provide_loan": { "Scheduled": 2 }, "repayment": { "Processing": 0 }, "select_offer": { "Completed": 3 } } } return dashboard_data except Exception as e: logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True) return jsonify({"message": "Internal Server Error"}), 500 @staticmethod def get_office_products(filters): products = Products.get_user_product_list(0) product_data = [] for t in products: product_data.append({ 'id': t.id, 'uid': t.uid, 'product_id': t.product_id, 'description': t.description, 'name': t.name, 'status': t.status, 'added': t.added.isoformat() if t.added else None, 'updated': t.updated.isoformat() if t.updated else None, 'banner': t.banner, }) products_result = { "products": product_data, } return products_result @staticmethod def get_office_product_detail(filters): product_id = filters.get('product_id') product_data = [] products = Products.get_product_by_product_id(product_id) if products: product_data = { 'id': products.id, 'uid': products.uid, 'product_id': products.product_id, 'description': products.description, 'name': products.name, 'status': products.status, 'added': products.added.isoformat() if products.added else None, 'updated': products.updated.isoformat() if products.updated else None, 'banner': products.banner, } product_details = [] productDetails = ProductsDetails.get_product_details_with_product_id(product_id) if productDetails: product_details = { 'product_detail_id': productDetails.id, 'product_id': productDetails.product_id, 'details': productDetails.details, 'sale_text': productDetails.sale_text, 'added': productDetails.added.isoformat() if productDetails.added else None, } products_result = { "product_configuration": product_data, "product_details": product_details, } return products_result @staticmethod def get_office_product_update(filters): product_id = filters.get('product_id') product_data = [] products = Products.get_product_by_product_id(product_id) if products: product_data = { 'id': products.id, 'uid': products.uid, 'product_id': products.product_id, 'description': products.description, 'name': products.name, 'status': products.status, 'added': products.added.isoformat() if products.added else None, 'updated': products.updated.isoformat() if products.updated else None, 'banner': products.banner, } product_details = [] productDetails = ProductsDetails.get_product_details_with_product_id(product_id) if productDetails: product_details = { 'product_detail_id': productDetails.id, 'product_id': productDetails.product_id, 'details': productDetails.details, 'sale_text': productDetails.sale_text, 'added': productDetails.added.isoformat() if productDetails.added else None, } products_result = { "update_result": [], } return products_result @staticmethod def get_office_product_templates(filters): # Extract filters product_id = filters.get('product_id') provision_name = filters.get('provision_name') # Extract pagination parameters page = int(filters.get('page', 1)) limit = int(filters.get('limit', 20)) # Ensure page and limit are valid if page < 1: page = 1 if limit < 1 or limit > 100: limit = 20 templates = ProductsTemplates.get_template_for_office(product_id, provision_name, page, limit) templates_data = [] if templates: for t in templates: templates_data.append({ 'id': t.id, 'uid': t.uid, 'product_id': t.product_id, 'provision_name': t.provision_name, 'status': t.status, 'added': t.added.isoformat() if t.added else None, }) templates_result = { "templates": templates_data, "product_options": [] } return templates_result @staticmethod def get_office_custom_templates(filters): logger.info("ENTER :: Getting office custom templates") templates = CustomTemplates.get_custom_template_for_office(filters) templates_data = [] if templates: for t in templates: templates_data.append({ 'id': t.id, 'uid': t.uid, 'custom_id': t.custom_id, 'provision_name': t.provision_name, 'added': t.added.isoformat() if t.added else None, }) templates_result = { "templates": templates_data, } return templates_result @staticmethod def get_office_account_view(filters): logger.info('ENTER API::get office account view') try: # member_uid = filters.member_uid member_uid = filters.get('member_uid') account_result = Members.get_member_by_uid(member_uid) account_data = { "id": account_result.id, "uid": str(account_result.uid), "username": account_result.username, "country": account_result.country, "added": account_result.added, "email": account_result.email, "account_name": account_result.account_name, "firstname": account_result.firstname, "lastname": account_result.lastname, "trial_end": account_result.trial_end, } member_id = account_result.id logger.info(f"member_id :: member_uid: {member_id} :: {member_uid} ") # "profile_comp leted": account_result.profile_completed, # "next_billing": account_result.next_billing, membersSubList = MembersProducts.get_member_productlist_by_member_id(account_result.id) member_sub_data = [] if membersSubList: for subs in membersSubList: member_sub_data.append({ 'id': subs.id, 'member_id': subs.member_id, 'subscription_uid': str(subs.uid), 'product_id': subs.product_id, 'internal_url': subs.internal_url, 'external_url': subs.external_url, 'dns_group': subs.dns_group, 'status': subs.status, 'primary_server': subs.primary_server, 'provision_port': subs.provision_port, 'product_template': subs.product_template, 'custom_template': subs.custom_template, 'updated': subs.updated, "added": subs.added, }) member_payments = Payments.get_member_payments_by_member_id(account_result.id) member_payments_data = [] for t in member_payments: member_payments_data.append({ 'id': t.id, 'uid': t.uid, 'option_name': t.option_name, 'option_type': t.option_type, 'payment_uid': t.payment_uid, 'amount': round(t.amount * 0.01, 2), 'status': t.status, 'added': t.added }) account_profile_data = [] current_profile = MembersProfile.get_member_profile_by_member_id(member_id) if current_profile is None: account_profile_data = [] else: account_profile_data = { 'id': current_profile.id, 'profile_uid': current_profile.uid, 'member_id': current_profile.member_id, 'practice': current_profile.practice, 'specialization': current_profile.specialization, 'url_name': current_profile.url_name } account_result = { "account": account_data, "account_profile": account_profile_data, "subscriptions": member_sub_data, "payments": member_payments_data } logger.info('RETURN API::get office account view') logger.info(account_result) return account_result except Exception as e: logger.error(f"An error occurred while getting cusomer data: {str(e)}", exc_info=True) return jsonify({"message": "Internal Server Error"}), 500 @staticmethod def get_subscription_view_data(filters=None): try: if filters is None: filters = {} # Extract filters subscription_uid = filters.get('subscription_uid') member_sub = [] flavor = '' template_name = '' product_templates_data = [] templates_data = [] membersSubResult = MembersProducts.get_member_product_by_subscription_uid(subscription_uid) if membersSubResult: if membersSubResult.product_template: templateData = ProductsTemplates.get_template_by_uid(str(membersSubResult.product_template)) if templateData: flavor = templateData.flavor template_name = templateData.name member_sub = { 'id': membersSubResult.id, 'flavor': flavor, 'template_name': template_name, 'subscription_uid': str(membersSubResult.uid), 'member_id': membersSubResult.member_id, 'product_id': membersSubResult.product_id, 'internal_url': membersSubResult.internal_url, 'external_url': membersSubResult.external_url, 'dns_group': membersSubResult.dns_group, 'product_template': membersSubResult.product_template, 'primary_server': membersSubResult.primary_server, 'provision_port': membersSubResult.provision_port, 'custom_template': membersSubResult.custom_template, 'status': membersSubResult.status, 'updated': membersSubResult.updated, "added": membersSubResult.added, } templates = CustomTemplates.get_custom_template_for_office(filters) if templates: for t in templates: templates_data.append({ 'id': t.id, 'uid': t.uid, 'custom_id': t.custom_id, 'provision_name': t.provision_name, 'added': t.added.isoformat() if t.added else None, }) product_templates = ProductsTemplates.get_template_by_product_id(membersSubResult.product_id) if product_templates: for t in product_templates: product_templates_data.append({ 'id': t.id, 'template_uid': t.uid, 'product_id': t.product_id, 'provision_name': t.provision_name, 'added': t.added.isoformat() if t.added else None, }) response_data = { 'subscription': member_sub, 'available_templates': product_templates_data, 'available_custom_templates': templates_data } return response_data except Exception as e: logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True) return jsonify({"message": "Internal Server Error"}), 500 @staticmethod def get_office_sidebar(filters): try: if filters is None: filters = {} # Extract filters option_name = filters.get('option_name') # member_id = filters.get('member_id') # Extract pagination parameters page = int(filters.get('page', 1)) limit = int(filters.get('limit', 20)) # Ensure page and limit are valid if page < 1: page = 1 if limit < 1 or limit > 100: limit = 20 membersPayList, total_count = Payments.get_all_payments(option_name, None, page, limit) # Convert loans to dictionary format member_sub_data = [] for subs in membersPayList: member_sub_data.append({ 'id': subs.id, 'member_id': subs.member_id, 'option_name': subs.option_name, 'option_type': subs.option_type, 'payment_uid': subs.payment_uid, 'amount': subs.amount * 0.01, 'status': subs.status, 'sub_start': subs.sub_start, 'sub_stop': subs.sub_stop, "added": subs.added, }) recentLogin = Members.get_recent_member_login(10) if recentLogin: recent_login_data = [] for subs in recentLogin: recent_login_data.append({ 'member_id': subs.id, 'firstname': subs.firstname, 'lastname': subs.lastname, 'username': subs.username, 'member_uid': subs.uid, "added": subs.added, }) # Calculate total pages total_pages = (total_count + limit - 1) // limit recent_payment_summary = { "approved": 0, "verified": 0, "failed": 0, "total": 0 } recent_deployment_summary = { "pending": 0, "started": 0, "stuck": 0, "completed": 0 } system_url = [ { "name": "AirPlay System", "url": settings.SYSTEM_SERVERS_AIRPLAY }, { "name": "Kafka", "url": settings.SYSTEM_SERVERS_KAFKA }, { "name": "Ansible", "url": settings.SYSTEM_SERVERS_ANSIBLE }, { "name": "Int HaProxy", "url": settings.SYSTEM_SERVERS_ANSIBLE }, { "name": "Provision Server", "url": settings.SYSTEM_SERVERS_PROVISION }, { "name": "Event Manager", "url": settings.SYSTEM_SERVERS_EVENT_MANAGER }, { "name": "Socket Server", "url": settings.SYSTEM_SERVERS_SOCKET_MANAGER }, ] response_data = { 'system_url': system_url, 'recent_payment_summary': recent_payment_summary, 'recent_deployment_summary': recent_deployment_summary, 'recent_login': recent_login_data, 'recent_signup': recent_login_data, 'recent_deployment_error': [], } return response_data except Exception as e: logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True) return jsonify({"message": "Internal Server Error"}), 500 @staticmethod def get_file_upload_data(filters): try: if filters is None: filters = {} # Extract filters file_group = filters.get('file_group') member_id = filters.get('member_id') # Extract pagination parameters page = int(filters.get('page', 1)) limit = int(filters.get('limit', 20)) # Ensure page and limit are valid if page < 1: page = 1 if limit < 1 or limit > 100: limit = 20 fileListData, total_count = MembersWebfiles.get_all_webfiles(file_group, file_group, page, limit) # Convert loans to dictionary format member_sub_data = [] if fileListData: for subs in fileListData: member_sub_data.append({ 'id': subs.id, 'file_uid': subs.uid, 'member_id': subs.member_id, 'member_uid': subs.member_uid, 'file_group': subs.file_group, 'filename': subs.filename, 'save_filename': subs.save_filename, 'file_type': subs.file_type, 'status': subs.status, 'file_size': subs.file_size, 'added': subs.added.isoformat() if subs.added else None, 'updated': subs.updated.isoformat() if subs.updated else None }) # Calculate total pages total_pages = (total_count + limit - 1) // limit response_data = { 'file_list': member_sub_data, 'media_server': settings.MEDIA_SERVER, 'count': len(member_sub_data), 'pagination': { 'total_count': total_count, 'total_pages': total_pages, 'current_page': page, 'limit': limit, 'has_next': page < total_pages, 'has_prev': page > 1 } } return response_data except Exception as e: logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True) return jsonify({"message": "Internal Server Error"}), 500 @staticmethod def get_recent_account_data(filters): try: if filters is None: filters = {} # Extract pagination parameters page = int(filters.get('page', 1)) limit = int(filters.get('limit', 20)) # Ensure page and limit are valid if page < 1: page = 1 if limit < 1 or limit > 100: limit = 20 membersList, total_count = Members.get_all_member(None, None, 1, 30) # Convert loans to dictionary format member_data = [] for member in membersList: member_data.append({ 'id': member.id, 'username': member.username, 'email': member.email, 'firstname': member.firstname, 'lastname': member.lastname, 'country': member.country, 'member_uid': member.uid, 'profile_completed': member.profile_completed, "added": member.added, }) response_data = { 'members': member_data, 'count': 30, 'pagination': { 'total_count': total_count, 'total_pages': 1, 'current_page': page, 'limit': limit, 'has_next': page < 1, 'has_prev': page > 1 } } return response_data except Exception as e: logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True) return jsonify({"message": "Internal Server Error"}), 500 @staticmethod def get_recent_signup_data(filters): try: if filters is None: filters = {} # Extract pagination parameters page = int(filters.get('page', 1)) limit = int(filters.get('limit', 20)) email = filters.get('email') firstname = filters.get('firstname') lastname = filters.get('lastname') # Ensure page and limit are valid if page < 1: page = 1 if limit < 1 or limit > 100: limit = 20 membersList, total_count = Signup.get_all_start_signup(email, firstname, lastname, page, limit) # Convert loans to dictionary format member_data = [] for member in membersList: member_data.append({ 'id': member.id, 'email': member.email, 'firstname': member.firstname, 'lastname': member.lastname, 'member_uid': member.uid, 'status': member.status, "added": member.added, }) response_data = { 'members': member_data, 'count': 30, 'pagination': { 'total_count': total_count, 'total_pages': 1, 'current_page': page, 'limit': limit, 'has_next': page < 1, 'has_prev': page > 1 } } return response_data except Exception as e: logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True) return jsonify({"message": "Internal Server Error"}), 500