Files
2026-05-02 14:15:12 -04:00

726 lines
27 KiB
Python

from urllib import request
from flask import jsonify
from app.config import settings
from app.utils.logger import logger
from app.api.services.base_service import BaseService
from sqlalchemy import func, desc
from app.models import MembersProducts, Products, Payments, Members, CustomTemplates, ProductsTemplates, MembersProfile, \
ProductsDetails, MembersWebfiles, Signup
class OfficeDashboardService(BaseService):
@staticmethod
def get_payments_data(filters):
try:
if filters is None:
filters = {}
# Extract filters
option_name = filters.get('option_name')
# member_id = filters.get('member_id')
# Extract pagination parameters
page = int(filters.get('page', 1))
limit = int(filters.get('limit', 20))
# Ensure page and limit are valid
if page < 1:
page = 1
if limit < 1 or limit > 100:
limit = 20
membersPayList, total_count = Payments.get_all_payments(option_name, None, page, limit)
# Convert loans to dictionary format
member_sub_data = []
for subs in membersPayList:
member_sub_data.append({
'id': subs.id,
'member_id': subs.member_id,
'option_name': subs.option_name,
'option_type': subs.option_type,
'payment_uid': subs.payment_uid,
'amount': subs.amount * 0.01,
'status': subs.status,
'sub_start': subs.sub_start,
'sub_stop': subs.sub_stop,
"added": subs.added,
})
# Calculate total pages
total_pages = (total_count + limit - 1) // limit
response_data = {
'payments': member_sub_data,
'count': len(member_sub_data),
'pagination': {
'total_count': total_count,
'total_pages': total_pages,
'current_page': page,
'limit': limit,
'has_next': page < total_pages,
'has_prev': page > 1
}
}
return response_data
except Exception as e:
logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True)
return jsonify({"message": "Internal Server Error"}), 500
@staticmethod
def get_dashboard_data():
try:
subscription_data = []
subscription = MembersProducts.get_dash_recent_subscription(15)
if subscription:
subscription_data = [{
'id': t.id,
'uid': t.uid,
'product_id': t.product_id,
'internal_url': t.internal_url,
'external_url': t.external_url,
'dns_group': t.dns_group,
'status': t.status,
'added': t.added,
'primary_server': t.primary_server,
'provision_port': t.provision_port,
'updated': t.updated
} for t in subscription]
sub_count = MembersProducts.get_subscription_counts(0)
dashboard_data = {
"subscription": subscription_data,
"signups": {
"currency": "Naira",
"currency_text": "\u20a6",
"text": "last 30 days",
"value": int(sub_count)
},
"payments": {
"currency": "Dollars",
"currency_text": "\u0024",
"text": "this week",
"value": 0
},
"request_summary": {
"eligibility_check": {
"Started": 6
},
"provide_loan": {
"Scheduled": 2
},
"repayment": {
"Processing": 0
},
"select_offer": {
"Completed": 3
}
}
}
return dashboard_data
except Exception as e:
logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True)
return jsonify({"message": "Internal Server Error"}), 500
@staticmethod
def get_office_products(filters):
products = Products.get_user_product_list(0)
product_data = []
for t in products:
product_data.append({
'id': t.id,
'uid': t.uid,
'product_id': t.product_id,
'description': t.description,
'name': t.name,
'status': t.status,
'added': t.added.isoformat() if t.added else None,
'updated': t.updated.isoformat() if t.updated else None,
'banner': t.banner,
})
products_result = {
"products": product_data,
}
return products_result
@staticmethod
def get_office_product_detail(filters):
product_id = filters.get('product_id')
product_data = []
products = Products.get_product_by_product_id(product_id)
if products:
product_data = {
'id': products.id,
'uid': products.uid,
'product_id': products.product_id,
'description': products.description,
'name': products.name,
'status': products.status,
'added': products.added.isoformat() if products.added else None,
'updated': products.updated.isoformat() if products.updated else None,
'banner': products.banner,
}
product_details = []
productDetails = ProductsDetails.get_product_details_with_product_id(product_id)
if productDetails:
product_details = {
'product_detail_id': productDetails.id,
'product_id': productDetails.product_id,
'details': productDetails.details,
'sale_text': productDetails.sale_text,
'added': productDetails.added.isoformat() if productDetails.added else None,
}
products_result = {
"product_configuration": product_data,
"product_details": product_details,
}
return products_result
@staticmethod
def get_office_product_update(filters):
product_id = filters.get('product_id')
product_data = []
products = Products.get_product_by_product_id(product_id)
if products:
product_data = {
'id': products.id,
'uid': products.uid,
'product_id': products.product_id,
'description': products.description,
'name': products.name,
'status': products.status,
'added': products.added.isoformat() if products.added else None,
'updated': products.updated.isoformat() if products.updated else None,
'banner': products.banner,
}
product_details = []
productDetails = ProductsDetails.get_product_details_with_product_id(product_id)
if productDetails:
product_details = {
'product_detail_id': productDetails.id,
'product_id': productDetails.product_id,
'details': productDetails.details,
'sale_text': productDetails.sale_text,
'added': productDetails.added.isoformat() if productDetails.added else None,
}
products_result = {
"update_result": [],
}
return products_result
@staticmethod
def get_office_product_templates(filters):
# Extract filters
product_id = filters.get('product_id')
provision_name = filters.get('provision_name')
# Extract pagination parameters
page = int(filters.get('page', 1))
limit = int(filters.get('limit', 20))
# Ensure page and limit are valid
if page < 1:
page = 1
if limit < 1 or limit > 100:
limit = 20
templates = ProductsTemplates.get_template_for_office(product_id, provision_name, page, limit)
templates_data = []
if templates:
for t in templates:
templates_data.append({
'id': t.id,
'uid': t.uid,
'product_id': t.product_id,
'provision_name': t.provision_name,
'status': t.status,
'added': t.added.isoformat() if t.added else None,
})
templates_result = {
"templates": templates_data,
"product_options": []
}
return templates_result
@staticmethod
def get_office_custom_templates(filters):
logger.info("ENTER :: Getting office custom templates")
templates = CustomTemplates.get_custom_template_for_office(filters)
templates_data = []
if templates:
for t in templates:
templates_data.append({
'id': t.id,
'uid': t.uid,
'custom_id': t.custom_id,
'provision_name': t.provision_name,
'added': t.added.isoformat() if t.added else None,
})
templates_result = {
"templates": templates_data,
}
return templates_result
@staticmethod
def get_office_account_view(filters):
logger.info('ENTER API::get office account view')
try:
# member_uid = filters.member_uid
member_uid = filters.get('member_uid')
account_result = Members.get_member_by_uid(member_uid)
account_data = {
"id": account_result.id,
"uid": str(account_result.uid),
"username": account_result.username,
"country": account_result.country,
"added": account_result.added,
"email": account_result.email,
"account_name": account_result.account_name,
"firstname": account_result.firstname,
"lastname": account_result.lastname,
"trial_end": account_result.trial_end,
}
member_id = account_result.id
logger.info(f"member_id :: member_uid: {member_id} :: {member_uid} ")
# "profile_comp leted": account_result.profile_completed,
# "next_billing": account_result.next_billing,
membersSubList = MembersProducts.get_member_productlist_by_member_id(account_result.id)
member_sub_data = []
if membersSubList:
for subs in membersSubList:
member_sub_data.append({
'id': subs.id,
'member_id': subs.member_id,
'subscription_uid': str(subs.uid),
'product_id': subs.product_id,
'internal_url': subs.internal_url,
'external_url': subs.external_url,
'dns_group': subs.dns_group,
'status': subs.status,
'primary_server': subs.primary_server,
'provision_port': subs.provision_port,
'product_template': subs.product_template,
'custom_template': subs.custom_template,
'updated': subs.updated,
"added": subs.added,
})
member_payments = Payments.get_member_payments_by_member_id(account_result.id)
member_payments_data = []
for t in member_payments:
member_payments_data.append({
'id': t.id,
'uid': t.uid,
'option_name': t.option_name,
'option_type': t.option_type,
'payment_uid': t.payment_uid,
'amount': round(t.amount * 0.01, 2),
'status': t.status,
'added': t.added
})
account_profile_data = []
current_profile = MembersProfile.get_member_profile_by_member_id(member_id)
if current_profile is None:
account_profile_data = []
else:
account_profile_data = {
'id': current_profile.id,
'profile_uid': current_profile.uid,
'member_id': current_profile.member_id,
'practice': current_profile.practice,
'specialization': current_profile.specialization,
'url_name': current_profile.url_name
}
account_result = {
"account": account_data,
"account_profile": account_profile_data,
"subscriptions": member_sub_data,
"payments": member_payments_data
}
logger.info('RETURN API::get office account view')
logger.info(account_result)
return account_result
except Exception as e:
logger.error(f"An error occurred while getting cusomer data: {str(e)}", exc_info=True)
return jsonify({"message": "Internal Server Error"}), 500
@staticmethod
def get_subscription_view_data(filters=None):
try:
if filters is None:
filters = {}
# Extract filters
subscription_uid = filters.get('subscription_uid')
member_sub = []
flavor = ''
template_name = ''
product_templates_data = []
templates_data = []
membersSubResult = MembersProducts.get_member_product_by_subscription_uid(subscription_uid)
if membersSubResult:
if membersSubResult.product_template:
templateData = ProductsTemplates.get_template_by_uid(str(membersSubResult.product_template))
if templateData:
flavor = templateData.flavor
template_name = templateData.name
member_sub = {
'id': membersSubResult.id,
'flavor': flavor,
'template_name': template_name,
'subscription_uid': str(membersSubResult.uid),
'member_id': membersSubResult.member_id,
'product_id': membersSubResult.product_id,
'internal_url': membersSubResult.internal_url,
'external_url': membersSubResult.external_url,
'dns_group': membersSubResult.dns_group,
'product_template': membersSubResult.product_template,
'primary_server': membersSubResult.primary_server,
'provision_port': membersSubResult.provision_port,
'custom_template': membersSubResult.custom_template,
'status': membersSubResult.status,
'updated': membersSubResult.updated,
"added": membersSubResult.added,
}
templates = CustomTemplates.get_custom_template_for_office(filters)
if templates:
for t in templates:
templates_data.append({
'id': t.id,
'uid': t.uid,
'custom_id': t.custom_id,
'provision_name': t.provision_name,
'added': t.added.isoformat() if t.added else None,
})
product_templates = ProductsTemplates.get_template_by_product_id(membersSubResult.product_id)
if product_templates:
for t in product_templates:
product_templates_data.append({
'id': t.id,
'template_uid': t.uid,
'product_id': t.product_id,
'provision_name': t.provision_name,
'added': t.added.isoformat() if t.added else None,
})
response_data = {
'subscription': member_sub,
'available_templates': product_templates_data,
'available_custom_templates': templates_data
}
return response_data
except Exception as e:
logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True)
return jsonify({"message": "Internal Server Error"}), 500
@staticmethod
def get_office_sidebar(filters):
try:
if filters is None:
filters = {}
# Extract filters
option_name = filters.get('option_name')
# member_id = filters.get('member_id')
# Extract pagination parameters
page = int(filters.get('page', 1))
limit = int(filters.get('limit', 20))
# Ensure page and limit are valid
if page < 1:
page = 1
if limit < 1 or limit > 100:
limit = 20
membersPayList, total_count = Payments.get_all_payments(option_name, None, page, limit)
# Convert loans to dictionary format
member_sub_data = []
for subs in membersPayList:
member_sub_data.append({
'id': subs.id,
'member_id': subs.member_id,
'option_name': subs.option_name,
'option_type': subs.option_type,
'payment_uid': subs.payment_uid,
'amount': subs.amount * 0.01,
'status': subs.status,
'sub_start': subs.sub_start,
'sub_stop': subs.sub_stop,
"added": subs.added,
})
recentLogin = Members.get_recent_member_login(10)
if recentLogin:
recent_login_data = []
for subs in recentLogin:
recent_login_data.append({
'member_id': subs.id,
'firstname': subs.firstname,
'lastname': subs.lastname,
'username': subs.username,
'member_uid': subs.uid,
"added": subs.added,
})
# Calculate total pages
total_pages = (total_count + limit - 1) // limit
recent_payment_summary = {
"approved": 0,
"verified": 0,
"failed": 0,
"total": 0
}
recent_deployment_summary = {
"pending": 0,
"started": 0,
"stuck": 0,
"completed": 0
}
system_url = [
{
"name": "AirPlay System",
"url": settings.SYSTEM_SERVERS_AIRPLAY
},
{
"name": "Kafka",
"url": settings.SYSTEM_SERVERS_KAFKA
},
{
"name": "Ansible",
"url": settings.SYSTEM_SERVERS_ANSIBLE
},
{
"name": "Int HaProxy",
"url": settings.SYSTEM_SERVERS_ANSIBLE
},
{
"name": "Provision Server",
"url": settings.SYSTEM_SERVERS_PROVISION
},
{
"name": "Event Manager",
"url": settings.SYSTEM_SERVERS_EVENT_MANAGER
},
{
"name": "Socket Server",
"url": settings.SYSTEM_SERVERS_SOCKET_MANAGER
},
]
response_data = {
'system_url': system_url,
'recent_payment_summary': recent_payment_summary,
'recent_deployment_summary': recent_deployment_summary,
'recent_login': recent_login_data,
'recent_signup': recent_login_data,
'recent_deployment_error': [],
}
return response_data
except Exception as e:
logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True)
return jsonify({"message": "Internal Server Error"}), 500
@staticmethod
def get_file_upload_data(filters):
try:
if filters is None:
filters = {}
# Extract filters
file_group = filters.get('file_group')
member_id = filters.get('member_id')
# Extract pagination parameters
page = int(filters.get('page', 1))
limit = int(filters.get('limit', 20))
# Ensure page and limit are valid
if page < 1:
page = 1
if limit < 1 or limit > 100:
limit = 20
fileListData, total_count = MembersWebfiles.get_all_webfiles(file_group, file_group, page, limit)
# Convert loans to dictionary format
member_sub_data = []
if fileListData:
for subs in fileListData:
member_sub_data.append({
'id': subs.id,
'file_uid': subs.uid,
'member_id': subs.member_id,
'member_uid': subs.member_uid,
'file_group': subs.file_group,
'filename': subs.filename,
'save_filename': subs.save_filename,
'file_type': subs.file_type,
'status': subs.status,
'file_size': subs.file_size,
'added': subs.added.isoformat() if subs.added else None,
'updated': subs.updated.isoformat() if subs.updated else None
})
# Calculate total pages
total_pages = (total_count + limit - 1) // limit
response_data = {
'file_list': member_sub_data,
'media_server': settings.MEDIA_SERVER,
'count': len(member_sub_data),
'pagination': {
'total_count': total_count,
'total_pages': total_pages,
'current_page': page,
'limit': limit,
'has_next': page < total_pages,
'has_prev': page > 1
}
}
return response_data
except Exception as e:
logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True)
return jsonify({"message": "Internal Server Error"}), 500
@staticmethod
def get_recent_account_data(filters):
try:
if filters is None:
filters = {}
# Extract pagination parameters
page = int(filters.get('page', 1))
limit = int(filters.get('limit', 20))
# Ensure page and limit are valid
if page < 1:
page = 1
if limit < 1 or limit > 100:
limit = 20
membersList, total_count = Members.get_all_member(None, None, 1, 30)
# Convert loans to dictionary format
member_data = []
for member in membersList:
member_data.append({
'id': member.id,
'username': member.username,
'email': member.email,
'firstname': member.firstname,
'lastname': member.lastname,
'country': member.country,
'member_uid': member.uid,
'profile_completed': member.profile_completed,
"added": member.added,
})
response_data = {
'members': member_data,
'count': 30,
'pagination': {
'total_count': total_count,
'total_pages': 1,
'current_page': page,
'limit': limit,
'has_next': page < 1,
'has_prev': page > 1
}
}
return response_data
except Exception as e:
logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True)
return jsonify({"message": "Internal Server Error"}), 500
@staticmethod
def get_recent_signup_data(filters):
try:
if filters is None:
filters = {}
# Extract pagination parameters
page = int(filters.get('page', 1))
limit = int(filters.get('limit', 20))
email = filters.get('email')
firstname = filters.get('firstname')
lastname = filters.get('lastname')
# Ensure page and limit are valid
if page < 1:
page = 1
if limit < 1 or limit > 100:
limit = 20
membersList, total_count = Signup.get_all_start_signup(email, firstname, lastname, page, limit)
# Convert loans to dictionary format
member_data = []
for member in membersList:
member_data.append({
'id': member.id,
'email': member.email,
'firstname': member.firstname,
'lastname': member.lastname,
'member_uid': member.uid,
'status': member.status,
"added": member.added,
})
response_data = {
'members': member_data,
'count': 30,
'pagination': {
'total_count': total_count,
'total_pages': 1,
'current_page': page,
'limit': limit,
'has_next': page < 1,
'has_prev': page > 1
}
}
return response_data
except Exception as e:
logger.error(f"An error occurred while getting dashboard data: {str(e)}", exc_info=True)
return jsonify({"message": "Internal Server Error"}), 500