Files
FloatBackOfffice/CRONS/transactions_record_import.php
dev-chiefworks f76abffdcd first commit
2022-05-31 16:21:53 -04:00

319 lines
9.0 KiB
PHP

<?php
define('TRANSACTION_REMOVED', 'removed');
$job_name = pathinfo(__FILE__, PATHINFO_FILENAME);
echo "[" . date("Y-m-d H:i:s") . "] " . $job_name . " job is starting.\n";
include '../backend.php';
require_once('./lock.php');
require_once('./common/Logger.php');
$db_host = $savvyext->cfgReadChar('database.host');
$db_name = $savvyext->cfgReadChar('database.name');
$db_user = $savvyext->cfgReadChar('database.user');
$db_pass = $savvyext->cfgReadChar('database.pass');
$db_port = $savvyext->cfgReadLong('database.port');
$connstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
$con = pg_connect($connstr) or die("Could not connect to server\n");
$db_host = $savvyext->cfgReadChar('database_replica.host');
$db_name = $savvyext->cfgReadChar('database_replica.name');
$db_user = $savvyext->cfgReadChar('database_replica.user');
$db_pass = $savvyext->cfgReadChar('database_replica.pass');
$db_port = $savvyext->cfgReadLong('database_replica.port');
$readOnlyReplicaConnstr = "host=${db_host} port=${db_port} dbname=${db_name} user=${db_user} password=${db_pass}";
$readOnlyReplicaConn = pg_connect($readOnlyReplicaConnstr);
if($readOnlyReplicaConn === FALSE){
$readOnlyReplicaConn = $con;
}
if ($con === FALSE || $readOnlyReplicaConn === FALSE) {
unlock_pid_file($lock_file);
die("Could not connect to server\n");
}
function execute($inX = [])
{
$outX = [];
// get transaction list
fetch_bank_transactions($inX, $outX);
if ($outX['count']) {
$response = $outX['results'];
remove_transaction($response);
// the rest of tranactions
$import_transactions = array_filter($response, function ($ele) {
return $ele['tstatus'] !== TRANSACTION_REMOVED;
});
update_transaction($import_transactions, $imported_ids);
insert_transaction($import_transactions, $imported_ids);
}
if (!empty($outX['next'])) {
parse_str(parse_url($outX['next'], PHP_URL_QUERY), $inX);
execute($inX);
} else {
return;
}
}
function generate_params($data) {
$value = [];
foreach ($data as $element) {
$value[] = [
'import_id' => $element['id'],
'account' => $element['account'],
'member_id' => $element['member_id'],
'amount' => $element['amount'],
'currency' => $element['currency'],
'status' => $element['status'],
'tstatus' => $element['tstatus'],
'description' => $element['description'],
'time' => $element['time'],
'category' => $element['category'],
'provider_category' => $element['provider_category'],
'merchant_provider_id' => $element['merchant_provider_id'],
'merchant_name' => $element['merchant_name'],
'extra' => json_encode($element['extra'])
];
}
return $value;
}
function fetch_bank_transactions($in, &$out) {
global $savvyext;
$target_url = str_replace("'", "", $savvyext->cfgReadChar('microservices.account'))
. "/api/v1/transactions/get_raw_transactions";
$target_url = $target_url . "/?" . http_build_query($in);
//open connection
$ch = curl_init();
//set the url, number of POST vars, POST data
curl_setopt($ch, CURLOPT_URL, $target_url);
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
'Content-Type: application/json',
'Connection: Keep-Alive',
'Authorization: Server-Token ' . $savvyext->cfgReadChar('system.bank_token'),
'Accept: application/json'
));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_FAILONERROR, true);
//execute post
$result = curl_exec($ch);
if (curl_errno($ch)) {
$error_msg = curl_error($ch);
}
//close connection
curl_close($ch);
if (isset($error_msg)) {
Logger::debug('transactions_record_import fetch_bank_transactions error:');
Logger::debug($error_msg);
}
$out = json_decode($result, true);
}
function get_import_id_list_in_table($import_ids) {
global $readOnlyReplicaConn;
$query =
'SELECT import_id
FROM members_transactions_import_raw
WHERE import_id IN (' . implode(',', $import_ids) . ')';
$result = pg_query($readOnlyReplicaConn, $query);
$imported_ids = [];
if ($result && pg_num_rows($result)) {
while ($row = pg_fetch_row($result)) {
$imported_ids[] = $row[0];
}
}
return $imported_ids;
}
function delete_tranasction($remove_ids) {
global $con;
$query = 'DELETE FROM members_transactions_import_raw
WHERE import_id IN (' . implode(',', $remove_ids) . ')';
if (!pg_query($con, $query)) {
echo pg_last_error();
Logger::debug('transactions_record_import delete_tranasction error:');
Logger::debug(pg_last_error());
return false;
}
return true;
}
function mark_transaction_as_imported($id) {
global $savvyext;
$target_url = str_replace("'", "", $savvyext->cfgReadChar('microservices.account'))
. "/api/v1/transactions/{$id}/";
//open connection
$ch = curl_init();
//set the url, number of POST vars, POST data
curl_setopt($ch, CURLOPT_URL, $target_url);
curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'PATCH');
curl_setopt($ch, CURLOPT_POSTFIELDS, json_encode(
['points_imported' => date('Y-m-d\TH:i:s.Z\Z', time())]
));
curl_setopt($ch, CURLOPT_HTTPHEADER, array(
'Content-Type: application/json',
'Connection: Keep-Alive',
'Authorization: Server-Token ' . $savvyext->cfgReadChar('system.bank_token'),
'Accept: application/json'
));
curl_setopt($ch, CURLOPT_RETURNTRANSFER, true);
curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
curl_setopt($ch, CURLOPT_FAILONERROR, true);
//execute post
$result = curl_exec($ch);
if (curl_errno($ch)) {
$error_msg = curl_error($ch);
}
//close connection
curl_close($ch);
if (isset($error_msg)) {
Logger::debug('transactions_record_import mark_transaction_as_imported error:');
Logger::debug($error_msg);
}
$out = json_decode($result, true);
if (isset($out['error'])) echo implode("\n",$out['non_field_errors']);
}
function remove_transaction($data) {
// transactions removed
$remove_ids = array_column(array_filter($data, function ($ele) {
return $ele['tstatus'] === TRANSACTION_REMOVED;
}), 'id');
if ($remove_ids && delete_tranasction($remove_ids)) {
foreach ($remove_ids as $id) {
mark_transaction_as_imported($id);
}
}
}
function update_transaction($data, &$imported_ids) {
global $con;
// get transactions imported
$imported_ids = get_import_id_list_in_table(array_column($data, 'id'));
if ($imported_ids) {
$imported_transactions = array_filter($data, function ($ele) use ($imported_ids) {
return in_array($ele['id'], $imported_ids);
});
$params_update = generate_params($imported_transactions);
foreach ($params_update as $val) {
$rs = pg_query_params($con,
"UPDATE members_transactions_import_raw
SET
account = $2,
member_id = $3,
amount = $4,
currency = $5,
status = $6,
tstatus = $7,
description = $8,
time = $9,
category = $10,
provider_category = $11,
merchant_provider_id = $12,
merchant_name = $13,
extra = $14
WHERE import_id = $1
",
$val
);
if (!$rs) {
echo pg_last_error();
Logger::debug('transactions_record_import update_transaction error:');
Logger::debug(pg_last_error());
} else {
mark_transaction_as_imported($val['import_id']);
}
}
}
}
function insert_transaction($data, $imported_ids) {
global $con;
// get the transactions not importedimported_ids
$import_transactions = array_filter($data, function ($ele) use ($imported_ids) {
return !in_array($ele['id'], $imported_ids);
});
$params_insert = generate_params($import_transactions);
foreach ($params_insert as $val) {
$rs = pg_query_params($con,
"INSERT INTO members_transactions_import_raw
(
import_id,
account,
member_id,
amount,
currency,
status,
tstatus,
description,
time,
category,
provider_category,
merchant_provider_id,
merchant_name,
extra
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14)
ON CONFLICT(import_id) DO NOTHING",
$val
);
if (!$rs) {
echo "Cannot execute query with import id:" . $val['id']; // import_id
Logger::debug('transactions_record_import insert_transaction error:');
Logger::debug(pg_last_error());
} else {
mark_transaction_as_imported($val['import_id']);
}
}
}
$lock_file = lock_pid_file();
execute();
unlock_pid_file($lock_file);
echo "[" . date("Y-m-d H:i:s") . "] " . $job_name . " job complete.\n";